trillium_proxy/upstream/
connection_counting.rs1use super::{IntoUpstreamSelector, UpstreamSelector};
3use std::cmp::Ordering::*;
4use std::{
5 fmt::Debug,
6 ops::{Deref, DerefMut},
7};
8use trillium::Conn;
9use trillium_server_common::{CloneCounter, CloneCounterObserver};
10use url::Url;
11
12#[derive(Debug)]
13pub struct ConnectionCounting<T>(Vec<(T, CloneCounterObserver)>);
17impl<T> ConnectionCounting<T>
18where
19 T: UpstreamSelector,
20{
21 pub fn new<I, U>(urls: I) -> Self
23 where
24 I: IntoIterator<Item = U>,
25 U: IntoUpstreamSelector<UpstreamSelector = T>,
26 {
27 Self(
28 urls.into_iter()
29 .map(|u| (u.into_upstream(), CloneCounterObserver::new()))
30 .collect(),
31 )
32 }
33}
34
35#[allow(dead_code)]
36struct ConnectionCount(CloneCounter);
37
38impl<T> UpstreamSelector for ConnectionCounting<T>
39where
40 T: UpstreamSelector,
41{
42 fn determine_upstream(&self, conn: &mut Conn) -> Option<Url> {
43 let mut current_lowest = usize::MAX;
44 let mut current_selection = vec![];
45 for (u, c) in &self.0 {
46 let current = c.current();
47 match current.cmp(¤t_lowest) {
48 Less => {
49 current_lowest = current;
50 current_selection = vec![(u, c)];
51 }
52
53 Equal => {
54 current_selection.push((u, c));
55 }
56
57 Greater => {}
58 }
59 }
60
61 fastrand::choice(current_selection).and_then(|(u, cc)| {
62 conn.insert_state(ConnectionCount(cc.counter()));
63 u.determine_upstream(conn)
64 })
65 }
66}
67
68impl<T> Deref for ConnectionCounting<T>
69where
70 T: UpstreamSelector,
71{
72 type Target = [(T, CloneCounterObserver)];
73 fn deref(&self) -> &Self::Target {
74 &self.0
75 }
76}
77impl<T> DerefMut for ConnectionCounting<T>
78where
79 T: UpstreamSelector,
80{
81 fn deref_mut(&mut self) -> &mut Self::Target {
82 &mut self.0
83 }
84}
85impl<U, T> Extend<U> for ConnectionCounting<T>
86where
87 T: UpstreamSelector,
88 U: IntoUpstreamSelector<UpstreamSelector = T>,
89{
90 fn extend<I: IntoIterator<Item = U>>(&mut self, iter: I) {
91 self.0.extend(
92 iter.into_iter()
93 .map(|i| (i.into_upstream(), CloneCounterObserver::new())),
94 );
95 }
96}
97
98impl<U, V> FromIterator<U> for ConnectionCounting<V>
99where
100 U: IntoUpstreamSelector<UpstreamSelector = V>,
101 V: UpstreamSelector,
102{
103 fn from_iter<T>(urls: T) -> Self
104 where
105 T: IntoIterator<Item = U>,
106 {
107 Self::new(urls)
108 }
109}