trillium_channels/
channel_client.rs1use crate::{client_receiver::ClientReceiver, subscriptions::Subscriptions, ChannelEvent, Version};
2use async_broadcast::{Receiver, Sender as BroadcastSender};
3use async_channel::Sender;
4use serde::Serialize;
5use trillium::log_error;
6use trillium_websockets::Message;
7
8#[derive(Debug, Clone)]
17pub struct ChannelClient {
18 subscriptions: Subscriptions,
19 sender: Sender<ChannelEvent>,
20 broadcast_sender: BroadcastSender<ChannelEvent>,
21 version: Version,
22}
23
24impl ChannelClient {
25 pub(crate) fn new(
26 broadcast_sender: BroadcastSender<ChannelEvent>,
27 broadcast_receiver: Receiver<ChannelEvent>,
28 version: Version,
29 ) -> (Self, ClientReceiver) {
30 let (sender, individual) = async_channel::unbounded();
31 let subscriptions = Subscriptions::default();
32 (
33 Self {
34 subscriptions: subscriptions.clone(),
35 sender,
36 broadcast_sender,
37 version,
38 },
39 ClientReceiver::new(individual, broadcast_receiver, subscriptions, version),
40 )
41 }
42
43 pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
49 let mut event = event.into();
50 event.reference = None;
51 log_error!(self.broadcast_sender.try_broadcast(event));
52 }
53
54 pub async fn send_event(&self, event: impl Into<ChannelEvent>) {
60 log_error!(self.sender.send(event.into()).await);
61 }
62
63 pub async fn reply_ok(&self, event: &ChannelEvent, payload: &impl Serialize) {
72 #[derive(serde::Serialize)]
73 struct Reply<'a, S> {
74 status: &'static str,
75 response: &'a S,
76 }
77
78 self.send_event(event.build_reply(
79 "phx_reply",
80 &Reply {
81 status: "ok",
82 response: payload,
83 },
84 ))
85 .await
86 }
87
88 pub async fn reply_error(&self, event: &ChannelEvent, error: &impl Serialize) {
96 self.send_event(event.build_reply("phx_error", &error))
97 .await
98 }
99
100 pub async fn allow_join(&self, event: &ChannelEvent, payload: &impl Serialize) {
109 if event.event() != "phx_join" {
110 log::error!(
111 "allow_join called with an event other than phx_join: {:?}",
112 event
113 );
114 return;
115 }
116 self.subscriptions.join(event.topic.to_string());
117 self.reply_ok(event, payload).await;
118 }
119
120 pub async fn allow_leave(&self, event: &ChannelEvent, payload: &impl Serialize) {
129 if event.event() != "phx_leave" {
130 log::error!(
131 "allow_leave called with an event other than phx_leave: {:?}",
132 event
133 );
134 return;
135 }
136 self.subscriptions.leave(&event.topic);
137 self.reply_ok(event, payload).await;
138 }
139
140 pub fn subscriptions(&self) -> &Subscriptions {
144 &self.subscriptions
145 }
146
147 pub(crate) fn deserialize(&self, message: Message) -> Option<ChannelEvent> {
148 let string = message.to_text().ok()?;
149 ChannelEvent::deserialize(string, self.version).ok()
150 }
151}