1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
use crate::{Result, Role, WebSocketConfig};
use async_tungstenite::{
    tungstenite::{self, Message},
    WebSocketStream,
};
use futures_util::{
    stream::{SplitSink, SplitStream, Stream},
    SinkExt, StreamExt,
};
use std::{
    net::IpAddr,
    pin::Pin,
    task::{Context, Poll},
};
use stopper::{Stopper, StreamStopper};
use trillium::{Headers, Method, StateSet, Upgrade};
use trillium_http::transport::BoxedTransport;

/**
A struct that represents an specific websocket connection.

This can be thought of as a combination of a [`async_tungstenite::WebSocketStream`] and a
[`trillium::Conn`], as it contains a combination of their fields and
associated functions.

The WebSocketConn implements `Stream<Item=Result<Message, Error>>`,
and can be polled with `StreamExt::next`
 */

#[derive(Debug)]
pub struct WebSocketConn {
    request_headers: Headers,
    path: String,
    method: Method,
    state: StateSet,
    peer_ip: Option<IpAddr>,
    stopper: Stopper,
    sink: SplitSink<Wss, Message>,
    stream: Option<WStream>,
}

type Wss = WebSocketStream<BoxedTransport>;

impl WebSocketConn {
    /// send a [`Message::Text`] variant
    pub async fn send_string(&mut self, string: String) -> Result<()> {
        self.send(Message::Text(string)).await.map_err(Into::into)
    }

    /// send a [`Message::Binary`] variant
    pub async fn send_bytes(&mut self, bin: Vec<u8>) -> Result<()> {
        self.send(Message::Binary(bin)).await.map_err(Into::into)
    }

    #[cfg(feature = "json")]
    /// send a [`Message::Text`] that contains json
    /// note that json messages are not actually part of the websocket specification
    pub async fn send_json(&mut self, json: &impl serde::Serialize) -> Result<()> {
        self.send_string(serde_json::to_string(json)?).await
    }

    /// Sends a [`Message`] to the client
    pub async fn send(&mut self, message: Message) -> Result<()> {
        self.sink.send(message).await.map_err(Into::into)
    }

    /// Create a `WebSocketConn` from an HTTP upgrade, with optional config and the specified role
    ///
    /// You should not typically need to call this; the trillium client and server both provide
    /// your code with a `WebSocketConn`.
    #[doc(hidden)]
    pub async fn new(upgrade: Upgrade, config: Option<WebSocketConfig>, role: Role) -> Self {
        let Upgrade {
            request_headers,
            path,
            method,
            state,
            buffer,
            transport,
            stopper,
        } = upgrade;

        let wss = if let Some(vec) = buffer {
            WebSocketStream::from_partially_read(transport, vec, role, config).await
        } else {
            WebSocketStream::from_raw_socket(transport, role, config).await
        };

        let (sink, stream) = wss.split();
        let stream = Some(WStream {
            stream: stopper.stop_stream(stream),
        });

        Self {
            request_headers,
            path,
            method,
            state,
            peer_ip: None,
            sink,
            stream,
            stopper,
        }
    }

    /// retrieve a clone of the server's [`Stopper`]
    pub fn stopper(&self) -> Stopper {
        self.stopper.clone()
    }

    /// close the websocket connection gracefully
    pub async fn close(&mut self) -> Result<()> {
        self.send(Message::Close(None)).await
    }

    /// retrieve the request headers for this conn
    pub fn headers(&self) -> &Headers {
        &self.request_headers
    }

    /// retrieves the peer ip for this conn, if available
    pub fn peer_ip(&self) -> Option<IpAddr> {
        self.peer_ip
    }

    /// Sets the peer ip for this conn
    pub fn set_peer_ip(&mut self, peer_ip: Option<IpAddr>) {
        self.peer_ip = peer_ip
    }

    /**
    retrieves the path part of the request url, up to and excluding
    any query component
     */
    pub fn path(&self) -> &str {
        self.path.split('?').next().unwrap_or_default()
    }

    /**
    Retrieves the query component of the path, excluding `?`. Returns
    an empty string if there is no query component.
     */
    pub fn querystring(&self) -> &str {
        self.path
            .split_once('?')
            .map(|(_, q)| q)
            .unwrap_or_default()
    }

    /// retrieve the request method for this conn
    pub fn method(&self) -> Method {
        self.method
    }

    /**
    retrieve state from the state set that has been accumulated by
    trillium handlers run on the [`trillium::Conn`] before it
    became a websocket. see [`trillium::Conn::state`] for more
    information
     */
    pub fn state<T: 'static>(&self) -> Option<&T> {
        self.state.get()
    }

    /**
    retrieve a mutable borrow of the state from the state set
     */
    pub fn state_mut<T: 'static>(&mut self) -> Option<&mut T> {
        self.state.get_mut()
    }

    /// see [`insert_state`]
    #[deprecated = "use WebsocketConn::insert_state"]
    pub fn set_state<T: Send + Sync + 'static>(&mut self, state: T) {
        self.insert_state(state);
    }

    /// inserts new state
    ///
    /// returns the previously set state of the same type, if any existed
    pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
        self.state.insert(state)
    }

    /**
    take some type T out of the state set that has been
    accumulated by trillium handlers run on the [`trillium::Conn`]
    before it became a websocket. see [`trillium::Conn::take_state`]
    for more information
     */
    pub fn take_state<T: 'static>(&mut self) -> Option<T> {
        self.state.take()
    }

    /// take the inbound Message stream from this conn
    pub fn take_inbound_stream(&mut self) -> Option<impl Stream<Item = MessageResult>> {
        self.stream.take()
    }

    /// borrow the inbound Message stream from this conn
    pub fn inbound_stream(&mut self) -> Option<impl Stream<Item = MessageResult> + '_> {
        self.stream.as_mut()
    }
}

type MessageResult = std::result::Result<Message, tungstenite::Error>;

#[derive(Debug)]
pub struct WStream {
    stream: StreamStopper<SplitStream<Wss>>,
}

impl Stream for WStream {
    type Item = MessageResult;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.stream.poll_next_unpin(cx)
    }
}

impl AsMut<StateSet> for WebSocketConn {
    fn as_mut(&mut self) -> &mut StateSet {
        &mut self.state
    }
}

impl AsRef<StateSet> for WebSocketConn {
    fn as_ref(&self) -> &StateSet {
        &self.state
    }
}

impl Stream for WebSocketConn {
    type Item = MessageResult;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.stream.as_mut() {
            Some(stream) => stream.poll_next_unpin(cx),
            None => Poll::Ready(None),
        }
    }
}