1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
use crate::{client_receiver::ClientReceiver, subscriptions::Subscriptions, ChannelEvent, Version};
use async_broadcast::{Receiver, Sender as BroadcastSender};
use async_channel::Sender;
use serde::Serialize;
use trillium::log_error;
use trillium_websockets::Message;
/**
# Communicate with the connected client.
Note that although each client is unique and represents a specific
websocket connection, the ChannelClient can be cloned and moved
elsewhere if needed and any updates to the topic subscriptions
will be kept synchronized across clones.
*/
#[derive(Debug, Clone)]
pub struct ChannelClient {
subscriptions: Subscriptions,
sender: Sender<ChannelEvent>,
broadcast_sender: BroadcastSender<ChannelEvent>,
version: Version,
}
impl ChannelClient {
pub(crate) fn new(
broadcast_sender: BroadcastSender<ChannelEvent>,
broadcast_receiver: Receiver<ChannelEvent>,
version: Version,
) -> (Self, ClientReceiver) {
let (sender, individual) = async_channel::unbounded();
let subscriptions = Subscriptions::default();
(
Self {
subscriptions: subscriptions.clone(),
sender,
broadcast_sender,
version,
},
ClientReceiver::new(individual, broadcast_receiver, subscriptions, version),
)
}
/**
Send a [`ChannelEvent`] to all connected clients. Note that
these messages will only reach clients that subscribe to the
event's topic.
*/
pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
let mut event = event.into();
event.reference = None;
log_error!(self.broadcast_sender.try_broadcast(event));
}
/**
Send a [`ChannelEvent`] to this specific client. Note that
this message will only be received if the client subscribes to
the event's topic.
*/
pub async fn send_event(&self, event: impl Into<ChannelEvent>) {
log_error!(self.sender.send(event.into()).await);
}
/**
Send an ok reply in reference to the provided ChannelEvent
with the provided response payload.
Note that this sets the event as `"phx_reply"` and the payload as
`{"status": "ok", "response": response }`, as well as setting the
reference field.
*/
pub async fn reply_ok(&self, event: &ChannelEvent, payload: &impl Serialize) {
#[derive(serde::Serialize)]
struct Reply<'a, S> {
status: &'static str,
response: &'a S,
}
self.send_event(event.build_reply(
"phx_reply",
&Reply {
status: "ok",
response: payload,
},
))
.await
}
/**
Send an error reply in reference to the provided ChannelEvent
with the provided response payload.
Note that this sets the event as `"phx_error"` as well as setting
the reference field.
*/
pub async fn reply_error(&self, event: &ChannelEvent, error: &impl Serialize) {
self.send_event(event.build_reply("phx_error", &error))
.await
}
/**
Join a topic, sending an ok reply with the provided optional
value. This sends an ok reply to the client as well as adding the
topic to the client's subscriptions.
Use `&()` as the payload if no payload is needed.
*/
pub async fn allow_join(&self, event: &ChannelEvent, payload: &impl Serialize) {
if event.event() != "phx_join" {
log::error!(
"allow_join called with an event other than phx_join: {:?}",
event
);
return;
}
self.subscriptions.join(event.topic.to_string());
self.reply_ok(event, payload).await;
}
/**
Leave a topic as requested by the provided channel event,
including the optional payload. This sends an ok reply to the
client as well as removing the channel from the client's
subscriptions.
Use `&()` as the payload if no payload is needed.
*/
pub async fn allow_leave(&self, event: &ChannelEvent, payload: &impl Serialize) {
if event.event() != "phx_leave" {
log::error!(
"allow_leave called with an event other than phx_leave: {:?}",
event
);
return;
}
self.subscriptions.leave(&event.topic);
self.reply_ok(event, payload).await;
}
/**
Borrow this client's subscriptions
*/
pub fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}
pub(crate) fn deserialize(&self, message: Message) -> Option<ChannelEvent> {
let string = message.to_text().ok()?;
ChannelEvent::deserialize(string, self.version).ok()
}
}