Skip to main content

trillium_channels/
channel_broadcaster.rs

1use crate::ChannelEvent;
2use async_broadcast::{InactiveReceiver, Receiver as ActiveReceiver, Sender};
3use futures_lite::Stream;
4use std::{
5    mem,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10/**
11Channel-wide event broadcaster and subscriber
12
13This can be cloned and stored elsewhere in an application in order to
14send events to connected channel clients. Retrieve a [`ChannelBroadcaster`] from a
15[`Channel`](crate::Channel) by calling
16[`Channel::broadcaster`](crate::Channel::broadcaster)
17
18ChannelBroadcaster also implements [`Stream`] so that your application
19can listen in on ChannelEvents happening elsewhere. This might be used
20for spawning a task to log events, or synchronizing events between
21servers.
22
23*/
24#[derive(Clone, Debug)]
25pub struct ChannelBroadcaster {
26    sender: Sender<ChannelEvent>,
27    receiver: Receiver<ChannelEvent>,
28}
29
30#[derive(Debug)]
31enum Receiver<C> {
32    Active(ActiveReceiver<C>),
33    Inactive(InactiveReceiver<C>),
34    Activating,
35}
36
37impl<C> Clone for Receiver<C> {
38    fn clone(&self) -> Self {
39        match self {
40            Self::Active(active) => Self::Inactive(active.clone().deactivate()),
41            Self::Inactive(inactive) => Self::Inactive(inactive.clone()),
42            Self::Activating => Self::Activating, // should not be reachable
43        }
44    }
45}
46
47impl<C> Stream for Receiver<C>
48where
49    C: Clone + std::fmt::Debug,
50{
51    type Item = C;
52
53    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        self.activate();
55
56        match &mut *self {
57            Receiver::Active(a) => Pin::new(a).poll_next(cx),
58            _ => Poll::Ready(None), // unreachable, but why panic when we can just end the stream?
59        }
60    }
61}
62
63impl<C> Receiver<C>
64where
65    C: Clone,
66{
67    fn activate(&mut self) {
68        if let Receiver::Inactive(_) = self {
69            if let Receiver::Inactive(inactive) = mem::replace(self, Self::Activating) {
70                *self = Receiver::Active(inactive.activate());
71            };
72        }
73    }
74}
75
76impl ChannelBroadcaster {
77    pub(crate) fn new(
78        sender: Sender<ChannelEvent>,
79        receiver: InactiveReceiver<ChannelEvent>,
80    ) -> Self {
81        Self {
82            sender,
83            receiver: Receiver::Inactive(receiver),
84        }
85    }
86
87    /**
88    Send this ChannelEvent to all subscribed channel clients
89    */
90    pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
91        // we don't care about whether there are any connected clients
92        // here, so we ignore error results.
93        self.sender.try_broadcast(event.into()).ok();
94    }
95
96    /**
97    Returns the number of connected clients. Note that the number of
98    clients listening on any given channel will likely be smaller than
99    this, and currently that number is not available.
100    */
101    pub fn connected_clients(&self) -> usize {
102        self.sender.receiver_count()
103    }
104}
105
106impl Stream for ChannelBroadcaster {
107    type Item = ChannelEvent;
108
109    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
110        Pin::new(&mut self.receiver).poll_next(cx)
111    }
112}