Skip to main content

trillium_channels/
channel_client.rs

1use crate::{client_receiver::ClientReceiver, subscriptions::Subscriptions, ChannelEvent, Version};
2use async_broadcast::{Receiver, Sender as BroadcastSender};
3use async_channel::Sender;
4use serde::Serialize;
5use trillium::log_error;
6use trillium_websockets::Message;
7
8/**
9# Communicate with the connected client.
10
11Note that although each client is unique and represents a specific
12websocket connection, the ChannelClient can be cloned and moved
13elsewhere if needed and any updates to the topic subscriptions
14will be kept synchronized across clones.
15*/
16#[derive(Debug, Clone)]
17pub struct ChannelClient {
18    subscriptions: Subscriptions,
19    sender: Sender<ChannelEvent>,
20    broadcast_sender: BroadcastSender<ChannelEvent>,
21    version: Version,
22}
23
24impl ChannelClient {
25    pub(crate) fn new(
26        broadcast_sender: BroadcastSender<ChannelEvent>,
27        broadcast_receiver: Receiver<ChannelEvent>,
28        version: Version,
29    ) -> (Self, ClientReceiver) {
30        let (sender, individual) = async_channel::unbounded();
31        let subscriptions = Subscriptions::default();
32        (
33            Self {
34                subscriptions: subscriptions.clone(),
35                sender,
36                broadcast_sender,
37                version,
38            },
39            ClientReceiver::new(individual, broadcast_receiver, subscriptions, version),
40        )
41    }
42
43    /**
44    Send a [`ChannelEvent`] to all connected clients. Note that
45    these messages will only reach clients that subscribe to the
46    event's topic.
47    */
48    pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
49        let mut event = event.into();
50        event.reference = None;
51        log_error!(self.broadcast_sender.try_broadcast(event));
52    }
53
54    /**
55    Send a [`ChannelEvent`] to this specific client. Note that
56    this message will only be received if the client subscribes to
57    the event's topic.
58    */
59    pub async fn send_event(&self, event: impl Into<ChannelEvent>) {
60        log_error!(self.sender.send(event.into()).await);
61    }
62
63    /**
64    Send an ok reply in reference to the provided ChannelEvent
65    with the provided response payload.
66
67    Note that this sets the event as `"phx_reply"` and the payload as
68    `{"status": "ok", "response": response }`, as well as setting the
69    reference field.
70    */
71    pub async fn reply_ok(&self, event: &ChannelEvent, payload: &impl Serialize) {
72        #[derive(serde::Serialize)]
73        struct Reply<'a, S> {
74            status: &'static str,
75            response: &'a S,
76        }
77
78        self.send_event(event.build_reply(
79            "phx_reply",
80            &Reply {
81                status: "ok",
82                response: payload,
83            },
84        ))
85        .await
86    }
87
88    /**
89    Send an error reply in reference to the provided ChannelEvent
90    with the provided response payload.
91
92    Note that this sets the event as `"phx_error"` as well as setting
93    the reference field.
94    */
95    pub async fn reply_error(&self, event: &ChannelEvent, error: &impl Serialize) {
96        self.send_event(event.build_reply("phx_error", &error))
97            .await
98    }
99
100    /**
101    Join a topic, sending an ok reply with the provided optional
102    value. This sends an ok reply to the client as well as adding the
103    topic to the client's subscriptions.
104
105    Use `&()` as the payload if no payload is needed.
106
107    */
108    pub async fn allow_join(&self, event: &ChannelEvent, payload: &impl Serialize) {
109        if event.event() != "phx_join" {
110            log::error!(
111                "allow_join called with an event other than phx_join: {:?}",
112                event
113            );
114            return;
115        }
116        self.subscriptions.join(event.topic.to_string());
117        self.reply_ok(event, payload).await;
118    }
119
120    /**
121    Leave a topic as requested by the provided channel event,
122    including the optional payload. This sends an ok reply to the
123    client as well as removing the channel from the client's
124    subscriptions.
125
126    Use `&()` as the payload if no payload is needed.
127    */
128    pub async fn allow_leave(&self, event: &ChannelEvent, payload: &impl Serialize) {
129        if event.event() != "phx_leave" {
130            log::error!(
131                "allow_leave called with an event other than phx_leave: {:?}",
132                event
133            );
134            return;
135        }
136        self.subscriptions.leave(&event.topic);
137        self.reply_ok(event, payload).await;
138    }
139
140    /**
141    Borrow this client's subscriptions
142     */
143    pub fn subscriptions(&self) -> &Subscriptions {
144        &self.subscriptions
145    }
146
147    pub(crate) fn deserialize(&self, message: Message) -> Option<ChannelEvent> {
148        let string = message.to_text().ok()?;
149        ChannelEvent::deserialize(string, self.version).ok()
150    }
151}