Skip to main content

trillium_channels/
client_receiver.rs

1use crate::{subscriptions::Subscriptions, ChannelEvent, Version};
2use async_broadcast::Receiver as BroadcastReceiver;
3use async_channel::Receiver;
4use futures_lite::{stream::Race, Stream, StreamExt};
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9use trillium_websockets::Message;
10
11#[derive(Debug)]
12pub struct ClientReceiver {
13    subscriptions: Subscriptions,
14    race: Pin<Box<Race<BroadcastReceiver<ChannelEvent>, Receiver<ChannelEvent>>>>,
15    version: Version,
16}
17
18impl ClientReceiver {
19    pub fn new(
20        individual: Receiver<ChannelEvent>,
21        broadcast: BroadcastReceiver<ChannelEvent>,
22        subscriptions: Subscriptions,
23        version: Version,
24    ) -> Self {
25        Self {
26            race: Box::pin(broadcast.race(individual)),
27            subscriptions,
28            version,
29        }
30    }
31}
32
33impl Stream for ClientReceiver {
34    type Item = Message;
35
36    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        loop {
38            match self.race.poll_next(cx) {
39                Poll::Ready(Some(event)) if !self.subscriptions.subscribes(&event) => continue,
40                Poll::Ready(Some(event)) => {
41                    if let Ok(text) = event.serialize(self.version) {
42                        log::trace!(
43                            "serialized {:?} with {:?} as {:?}",
44                            event,
45                            &self.version,
46                            &text
47                        );
48                        break Poll::Ready(Some(Message::Text(text)));
49                    }
50                }
51                Poll::Pending => break Poll::Pending,
52                Poll::Ready(None) => break Poll::Ready(None),
53            }
54        }
55    }
56}