Skip to main content

trillium_sse/
lib.rs

1//! # Trillium tools for server sent events
2//!
3//! This primarily provides [`SseConnExt`], an
4//! extension trait for [`trillium::Conn`] that has a
5//! [`with_sse_stream`](crate::SseConnExt::with_sse_stream) chainable
6//! method that takes a [`Stream`] where the `Item`
7//! implements [`Eventable`].
8//!
9//! Often, you will want this stream to be something like a channel, but
10//! the specifics of that are dependent on the event fanout
11//! characteristics of your application.
12//!
13//! This crate implements [`Eventable`] for an [`Event`] type that you can
14//! use in your application, for `String`, and for `&'static str`. You can
15//! also implement [`Eventable`] for any type in your application.
16//!
17//! ## Example usage
18//!
19//! ```
20//! use broadcaster::BroadcastChannel;
21//! use trillium::{Conn, Method, State, conn_try, conn_unwrap, log_error};
22//! use trillium_sse::SseConnExt;
23//! use trillium_static_compiled::static_compiled;
24//!
25//! type Channel = BroadcastChannel<String>;
26//!
27//! fn get_sse(mut conn: Conn) -> Conn {
28//!     let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
29//!     conn.with_sse_stream(broadcaster)
30//! }
31//!
32//! async fn post_broadcast(mut conn: Conn) -> Conn {
33//!     let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
34//!     let body = conn_try!(conn.request_body_string().await, conn);
35//!     log_error!(broadcaster.send(&body).await);
36//!     conn.ok("sent")
37//! }
38//!
39//! fn main() {
40//!     let handler = (
41//!         static_compiled!("examples/static").with_index_file("index.html"),
42//!         State::new(Channel::new()),
43//!         |conn: Conn| async move {
44//!             match (conn.method(), conn.path()) {
45//!                 (Method::Get, "/sse") => get_sse(conn),
46//!                 (Method::Post, "/broadcast") => post_broadcast(conn).await,
47//!                 _ => conn,
48//!             }
49//!         },
50//!     );
51//!
52//!     // trillium_smol::run(handler);
53//! }
54//! ```
55#![forbid(unsafe_code)]
56#![deny(
57    missing_copy_implementations,
58    rustdoc::missing_crate_level_docs,
59    missing_debug_implementations,
60    nonstandard_style,
61    unused_qualifications
62)]
63#![warn(missing_docs)]
64
65#[cfg(test)]
66#[doc = include_str!("../README.md")]
67mod readme {}
68
69use futures_lite::{AsyncRead, stream::Stream};
70use std::{
71    borrow::Cow,
72    fmt::Write,
73    io,
74    marker::PhantomData,
75    pin::Pin,
76    task::{Context, Poll},
77};
78use trillium::{Body, Conn, KnownHeaderName, Status};
79
80struct SseBody<S, E> {
81    stream: S,
82    buffer: Vec<u8>,
83    event: PhantomData<E>,
84}
85
86impl<S, E> SseBody<S, E>
87where
88    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
89    E: Eventable,
90{
91    pub fn new(stream: S) -> Self {
92        Self {
93            stream,
94            buffer: Vec::new(),
95            event: PhantomData,
96        }
97    }
98}
99
100fn encode(event: impl Eventable) -> String {
101    let mut output = String::new();
102    if let Some(event_type) = event.event_type() {
103        writeln!(&mut output, "event: {event_type}").unwrap();
104    }
105
106    if let Some(id) = event.id() {
107        writeln!(&mut output, "id: {id}").unwrap();
108    }
109
110    for part in event.data().lines() {
111        writeln!(&mut output, "data: {part}").unwrap();
112    }
113
114    writeln!(output).unwrap();
115
116    output
117}
118
119impl<S, E> AsyncRead for SseBody<S, E>
120where
121    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
122    E: Eventable,
123{
124    fn poll_read(
125        self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127        buf: &mut [u8],
128    ) -> Poll<io::Result<usize>> {
129        let Self { buffer, stream, .. } = self.get_mut();
130
131        let buffer_read = buffer.len().min(buf.len());
132        if buffer_read > 0 {
133            buf[0..buffer_read].copy_from_slice(&buffer[0..buffer_read]);
134            buffer.drain(0..buffer_read);
135            return Poll::Ready(Ok(buffer_read));
136        }
137
138        match Pin::new(stream).poll_next(cx) {
139            Poll::Pending => Poll::Pending,
140            Poll::Ready(Some(item)) => {
141                let data = encode(item).into_bytes();
142                let writable_len = data.len().min(buf.len());
143                buf[0..writable_len].copy_from_slice(&data[0..writable_len]);
144                if writable_len < data.len() {
145                    buffer.extend_from_slice(&data[writable_len..]);
146                }
147                Poll::Ready(Ok(writable_len))
148            }
149
150            Poll::Ready(None) => Poll::Ready(Ok(0)),
151        }
152    }
153}
154
155impl<S, E> From<SseBody<S, E>> for Body
156where
157    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
158    E: Eventable,
159{
160    fn from(sse_body: SseBody<S, E>) -> Self {
161        Body::new_streaming(sse_body, None)
162    }
163}
164
165/// Extension trait for server sent events
166pub trait SseConnExt {
167    /// builds and sets a streaming response body that conforms to the
168    /// [server-sent-events
169    /// spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events)
170    /// from a Stream of any [`Eventable`] type (such as
171    /// [`Event`], as well as setting appropiate headers for
172    /// this response.
173    fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
174    where
175        S: Stream<Item = E> + Unpin + Send + Sync + 'static,
176        E: Eventable;
177}
178
179impl SseConnExt for Conn {
180    fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
181    where
182        S: Stream<Item = E> + Unpin + Send + Sync + 'static,
183        E: Eventable,
184    {
185        let body = SseBody::new(self.swansong().interrupt(sse_stream));
186        self.with_response_header(KnownHeaderName::ContentType, "text/event-stream")
187            .with_response_header(KnownHeaderName::CacheControl, "no-cache")
188            .with_body(body)
189            .with_status(Status::Ok)
190            .halt()
191    }
192}
193
194/// A trait that allows any Unpin + Send + Sync type to act as an event.
195///
196/// For a concrete implementation of this trait, you can use [`Event`],
197/// but it is also implemented for [`String`] and [`&'static str`].
198pub trait Eventable: Unpin + Send + Sync + 'static {
199    /// return the data for this event. non-optional.
200    fn data(&self) -> &str;
201
202    /// return the event type, optionally
203    fn event_type(&self) -> Option<&str> {
204        None
205    }
206
207    /// return a unique event id, optionally
208    fn id(&self) -> Option<&str> {
209        None
210    }
211}
212
213impl Eventable for Event {
214    fn data(&self) -> &str {
215        Event::data(self)
216    }
217
218    fn event_type(&self) -> Option<&str> {
219        Event::event_type(self)
220    }
221}
222
223impl Eventable for &'static str {
224    fn data(&self) -> &str {
225        self
226    }
227}
228
229impl Eventable for String {
230    fn data(&self) -> &str {
231        self
232    }
233}
234
235/// Events are a concrete implementation of the [`Eventable`] trait.
236#[derive(Debug, Clone, Eq, PartialEq)]
237pub struct Event {
238    data: Cow<'static, str>,
239    event_type: Option<Cow<'static, str>>,
240}
241
242impl From<&'static str> for Event {
243    fn from(s: &'static str) -> Self {
244        Self::from(Cow::Borrowed(s))
245    }
246}
247
248impl From<String> for Event {
249    fn from(s: String) -> Self {
250        Self::from(Cow::Owned(s))
251    }
252}
253
254impl From<Cow<'static, str>> for Event {
255    fn from(data: Cow<'static, str>) -> Self {
256        Event {
257            data,
258            event_type: None,
259        }
260    }
261}
262
263impl Event {
264    /// builds a new [`Event`]
265    ///
266    /// by default, this event has no event type. to set an event type,
267    /// use [`Event::with_type`] or [`Event::set_type`]
268    pub fn new(data: impl Into<Cow<'static, str>>) -> Self {
269        Self::from(data.into())
270    }
271
272    /// chainable constructor to set the type on an event
273    ///
274    /// ```
275    /// let event = trillium_sse::Event::new("event data").with_type("userdata");
276    /// assert_eq!(event.event_type(), Some("userdata"));
277    /// assert_eq!(event.data(), "event data");
278    /// ```
279    pub fn with_type(mut self, event_type: impl Into<Cow<'static, str>>) -> Self {
280        self.set_type(event_type);
281        self
282    }
283
284    /// set the event type for this Event. The default is None.
285    ///
286    /// ```
287    /// let mut event = trillium_sse::Event::new("event data");
288    /// assert_eq!(event.event_type(), None);
289    /// event.set_type("userdata");
290    /// assert_eq!(event.event_type(), Some("userdata"));
291    /// ```
292    pub fn set_type(&mut self, event_type: impl Into<Cow<'static, str>>) {
293        self.event_type = Some(event_type.into());
294    }
295
296    /// returns this Event's data as a &str
297    pub fn data(&self) -> &str {
298        &self.data
299    }
300
301    /// returns this Event's type as a str, if set
302    pub fn event_type(&self) -> Option<&str> {
303        self.event_type.as_deref()
304    }
305}