trillium_channels/
channel_broadcaster.rs1use crate::ChannelEvent;
2use async_broadcast::{InactiveReceiver, Receiver as ActiveReceiver, Sender};
3use futures_lite::Stream;
4use std::{
5 mem,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10#[derive(Clone, Debug)]
25pub struct ChannelBroadcaster {
26 sender: Sender<ChannelEvent>,
27 receiver: Receiver<ChannelEvent>,
28}
29
30#[derive(Debug)]
31enum Receiver<C> {
32 Active(ActiveReceiver<C>),
33 Inactive(InactiveReceiver<C>),
34 Activating,
35}
36
37impl<C> Clone for Receiver<C> {
38 fn clone(&self) -> Self {
39 match self {
40 Self::Active(active) => Self::Inactive(active.clone().deactivate()),
41 Self::Inactive(inactive) => Self::Inactive(inactive.clone()),
42 Self::Activating => Self::Activating, }
44 }
45}
46
47impl<C> Stream for Receiver<C>
48where
49 C: Clone + std::fmt::Debug,
50{
51 type Item = C;
52
53 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54 self.activate();
55
56 match &mut *self {
57 Receiver::Active(a) => Pin::new(a).poll_next(cx),
58 _ => Poll::Ready(None), }
60 }
61}
62
63impl<C> Receiver<C>
64where
65 C: Clone,
66{
67 fn activate(&mut self) {
68 if let Receiver::Inactive(_) = self {
69 if let Receiver::Inactive(inactive) = mem::replace(self, Self::Activating) {
70 *self = Receiver::Active(inactive.activate());
71 };
72 }
73 }
74}
75
76impl ChannelBroadcaster {
77 pub(crate) fn new(
78 sender: Sender<ChannelEvent>,
79 receiver: InactiveReceiver<ChannelEvent>,
80 ) -> Self {
81 Self {
82 sender,
83 receiver: Receiver::Inactive(receiver),
84 }
85 }
86
87 pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
91 self.sender.try_broadcast(event.into()).ok();
94 }
95
96 pub fn connected_clients(&self) -> usize {
102 self.sender.receiver_count()
103 }
104}
105
106impl Stream for ChannelBroadcaster {
107 type Item = ChannelEvent;
108
109 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
110 Pin::new(&mut self.receiver).poll_next(cx)
111 }
112}