Skip to main content

trillium_proxy/upstream/
connection_counting.rs

1//! Upstream selectors
2use super::{IntoUpstreamSelector, UpstreamSelector};
3use std::cmp::Ordering::*;
4use std::{
5    fmt::Debug,
6    ops::{Deref, DerefMut},
7};
8use trillium::Conn;
9use trillium_server_common::{CloneCounter, CloneCounterObserver};
10use url::Url;
11
12#[derive(Debug)]
13/// an upstream selector that attempts to send requests to the upstream with the fewest open connections.
14///
15/// if there are several with the same lowest number of connections, a random upstream is chosen from them.
16pub struct ConnectionCounting<T>(Vec<(T, CloneCounterObserver)>);
17impl<T> ConnectionCounting<T>
18where
19    T: UpstreamSelector,
20{
21    ///
22    pub fn new<I, U>(urls: I) -> Self
23    where
24        I: IntoIterator<Item = U>,
25        U: IntoUpstreamSelector<UpstreamSelector = T>,
26    {
27        Self(
28            urls.into_iter()
29                .map(|u| (u.into_upstream(), CloneCounterObserver::new()))
30                .collect(),
31        )
32    }
33}
34
35#[allow(dead_code)]
36struct ConnectionCount(CloneCounter);
37
38impl<T> UpstreamSelector for ConnectionCounting<T>
39where
40    T: UpstreamSelector,
41{
42    fn determine_upstream(&self, conn: &mut Conn) -> Option<Url> {
43        let mut current_lowest = usize::MAX;
44        let mut current_selection = vec![];
45        for (u, c) in &self.0 {
46            let current = c.current();
47            match current.cmp(&current_lowest) {
48                Less => {
49                    current_lowest = current;
50                    current_selection = vec![(u, c)];
51                }
52
53                Equal => {
54                    current_selection.push((u, c));
55                }
56
57                Greater => {}
58            }
59        }
60
61        fastrand::choice(current_selection).and_then(|(u, cc)| {
62            conn.insert_state(ConnectionCount(cc.counter()));
63            u.determine_upstream(conn)
64        })
65    }
66}
67
68impl<T> Deref for ConnectionCounting<T>
69where
70    T: UpstreamSelector,
71{
72    type Target = [(T, CloneCounterObserver)];
73    fn deref(&self) -> &Self::Target {
74        &self.0
75    }
76}
77impl<T> DerefMut for ConnectionCounting<T>
78where
79    T: UpstreamSelector,
80{
81    fn deref_mut(&mut self) -> &mut Self::Target {
82        &mut self.0
83    }
84}
85impl<U, T> Extend<U> for ConnectionCounting<T>
86where
87    T: UpstreamSelector,
88    U: IntoUpstreamSelector<UpstreamSelector = T>,
89{
90    fn extend<I: IntoIterator<Item = U>>(&mut self, iter: I) {
91        self.0.extend(
92            iter.into_iter()
93                .map(|i| (i.into_upstream(), CloneCounterObserver::new())),
94        );
95    }
96}
97
98impl<U, V> FromIterator<U> for ConnectionCounting<V>
99where
100    U: IntoUpstreamSelector<UpstreamSelector = V>,
101    V: UpstreamSelector,
102{
103    fn from_iter<T>(urls: T) -> Self
104    where
105        T: IntoIterator<Item = U>,
106    {
107        Self::new(urls)
108    }
109}