trillium_channels/
client_receiver.rs1use crate::{subscriptions::Subscriptions, ChannelEvent, Version};
2use async_broadcast::Receiver as BroadcastReceiver;
3use async_channel::Receiver;
4use futures_lite::{stream::Race, Stream, StreamExt};
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9use trillium_websockets::Message;
10
11#[derive(Debug)]
12pub struct ClientReceiver {
13 subscriptions: Subscriptions,
14 race: Pin<Box<Race<BroadcastReceiver<ChannelEvent>, Receiver<ChannelEvent>>>>,
15 version: Version,
16}
17
18impl ClientReceiver {
19 pub fn new(
20 individual: Receiver<ChannelEvent>,
21 broadcast: BroadcastReceiver<ChannelEvent>,
22 subscriptions: Subscriptions,
23 version: Version,
24 ) -> Self {
25 Self {
26 race: Box::pin(broadcast.race(individual)),
27 subscriptions,
28 version,
29 }
30 }
31}
32
33impl Stream for ClientReceiver {
34 type Item = Message;
35
36 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 loop {
38 match self.race.poll_next(cx) {
39 Poll::Ready(Some(event)) if !self.subscriptions.subscribes(&event) => continue,
40 Poll::Ready(Some(event)) => {
41 if let Ok(text) = event.serialize(self.version) {
42 log::trace!(
43 "serialized {:?} with {:?} as {:?}",
44 event,
45 &self.version,
46 &text
47 );
48 break Poll::Ready(Some(Message::Text(text)));
49 }
50 }
51 Poll::Pending => break Poll::Pending,
52 Poll::Ready(None) => break Poll::Ready(None),
53 }
54 }
55 }
56}