1#![forbid(unsafe_code)]
56#![deny(
57 missing_copy_implementations,
58 rustdoc::missing_crate_level_docs,
59 missing_debug_implementations,
60 nonstandard_style,
61 unused_qualifications
62)]
63#![warn(missing_docs)]
64
65#[cfg(test)]
66#[doc = include_str!("../README.md")]
67mod readme {}
68
69use futures_lite::{AsyncRead, stream::Stream};
70use std::{
71 borrow::Cow,
72 fmt::Write,
73 io,
74 marker::PhantomData,
75 pin::Pin,
76 task::{Context, Poll},
77};
78use trillium::{Body, Conn, KnownHeaderName, Status};
79
80struct SseBody<S, E> {
81 stream: S,
82 buffer: Vec<u8>,
83 event: PhantomData<E>,
84}
85
86impl<S, E> SseBody<S, E>
87where
88 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
89 E: Eventable,
90{
91 pub fn new(stream: S) -> Self {
92 Self {
93 stream,
94 buffer: Vec::new(),
95 event: PhantomData,
96 }
97 }
98}
99
100fn encode(event: impl Eventable) -> String {
101 let mut output = String::new();
102 if let Some(event_type) = event.event_type() {
103 writeln!(&mut output, "event: {event_type}").unwrap();
104 }
105
106 if let Some(id) = event.id() {
107 writeln!(&mut output, "id: {id}").unwrap();
108 }
109
110 for part in event.data().lines() {
111 writeln!(&mut output, "data: {part}").unwrap();
112 }
113
114 writeln!(output).unwrap();
115
116 output
117}
118
119impl<S, E> AsyncRead for SseBody<S, E>
120where
121 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
122 E: Eventable,
123{
124 fn poll_read(
125 self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 buf: &mut [u8],
128 ) -> Poll<io::Result<usize>> {
129 let Self { buffer, stream, .. } = self.get_mut();
130
131 let buffer_read = buffer.len().min(buf.len());
132 if buffer_read > 0 {
133 buf[0..buffer_read].copy_from_slice(&buffer[0..buffer_read]);
134 buffer.drain(0..buffer_read);
135 return Poll::Ready(Ok(buffer_read));
136 }
137
138 match Pin::new(stream).poll_next(cx) {
139 Poll::Pending => Poll::Pending,
140 Poll::Ready(Some(item)) => {
141 let data = encode(item).into_bytes();
142 let writable_len = data.len().min(buf.len());
143 buf[0..writable_len].copy_from_slice(&data[0..writable_len]);
144 if writable_len < data.len() {
145 buffer.extend_from_slice(&data[writable_len..]);
146 }
147 Poll::Ready(Ok(writable_len))
148 }
149
150 Poll::Ready(None) => Poll::Ready(Ok(0)),
151 }
152 }
153}
154
155impl<S, E> From<SseBody<S, E>> for Body
156where
157 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
158 E: Eventable,
159{
160 fn from(sse_body: SseBody<S, E>) -> Self {
161 Body::new_streaming(sse_body, None)
162 }
163}
164
165pub trait SseConnExt {
167 fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
174 where
175 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
176 E: Eventable;
177}
178
179impl SseConnExt for Conn {
180 fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
181 where
182 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
183 E: Eventable,
184 {
185 let body = SseBody::new(self.swansong().interrupt(sse_stream));
186 self.with_response_header(KnownHeaderName::ContentType, "text/event-stream")
187 .with_response_header(KnownHeaderName::CacheControl, "no-cache")
188 .with_body(body)
189 .with_status(Status::Ok)
190 .halt()
191 }
192}
193
194pub trait Eventable: Unpin + Send + Sync + 'static {
199 fn data(&self) -> &str;
201
202 fn event_type(&self) -> Option<&str> {
204 None
205 }
206
207 fn id(&self) -> Option<&str> {
209 None
210 }
211}
212
213impl Eventable for Event {
214 fn data(&self) -> &str {
215 Event::data(self)
216 }
217
218 fn event_type(&self) -> Option<&str> {
219 Event::event_type(self)
220 }
221}
222
223impl Eventable for &'static str {
224 fn data(&self) -> &str {
225 self
226 }
227}
228
229impl Eventable for String {
230 fn data(&self) -> &str {
231 self
232 }
233}
234
235#[derive(Debug, Clone, Eq, PartialEq)]
237pub struct Event {
238 data: Cow<'static, str>,
239 event_type: Option<Cow<'static, str>>,
240}
241
242impl From<&'static str> for Event {
243 fn from(s: &'static str) -> Self {
244 Self::from(Cow::Borrowed(s))
245 }
246}
247
248impl From<String> for Event {
249 fn from(s: String) -> Self {
250 Self::from(Cow::Owned(s))
251 }
252}
253
254impl From<Cow<'static, str>> for Event {
255 fn from(data: Cow<'static, str>) -> Self {
256 Event {
257 data,
258 event_type: None,
259 }
260 }
261}
262
263impl Event {
264 pub fn new(data: impl Into<Cow<'static, str>>) -> Self {
269 Self::from(data.into())
270 }
271
272 pub fn with_type(mut self, event_type: impl Into<Cow<'static, str>>) -> Self {
280 self.set_type(event_type);
281 self
282 }
283
284 pub fn set_type(&mut self, event_type: impl Into<Cow<'static, str>>) {
293 self.event_type = Some(event_type.into());
294 }
295
296 pub fn data(&self) -> &str {
298 &self.data
299 }
300
301 pub fn event_type(&self) -> Option<&str> {
303 self.event_type.as_deref()
304 }
305}