use crate::{Result, Role, WebSocketConfig};
use async_tungstenite::{
tungstenite::{self, Message},
use futures_util::{
stream::{SplitSink, SplitStream, Stream},
SinkExt, StreamExt,
use std::{
task::{Context, Poll},
use stopper::{Stopper, StreamStopper};
use trillium::{Headers, Method, StateSet, Upgrade};
use trillium_http::transport::BoxedTransport;
A struct that represents an specific websocket connection.
This can be thought of as a combination of a [`async_tungstenite::WebSocketStream`] and a
[`trillium::Conn`], as it contains a combination of their fields and
associated functions.
The WebSocketConn implements `Stream<Item=Result<Message, Error>>`,
and can be polled with `StreamExt::next`
pub struct WebSocketConn {
request_headers: Headers,
path: String,
method: Method,
state: StateSet,
peer_ip: Option<IpAddr>,
stopper: Stopper,
sink: SplitSink<Wss, Message>,
stream: Option<WStream>,
type Wss = WebSocketStream<BoxedTransport>;
impl WebSocketConn {
/// send a [`Message::Text`] variant
pub async fn send_string(&mut self, string: String) -> Result<()> {
/// send a [`Message::Binary`] variant
pub async fn send_bytes(&mut self, bin: Vec<u8>) -> Result<()> {
#[cfg(feature = "json")]
/// send a [`Message::Text`] that contains json
/// note that json messages are not actually part of the websocket specification
pub async fn send_json(&mut self, json: &impl serde::Serialize) -> Result<()> {
/// Sends a [`Message`] to the client
pub async fn send(&mut self, message: Message) -> Result<()> {
/// Create a `WebSocketConn` from an HTTP upgrade, with optional config and the specified role
/// You should not typically need to call this; the trillium client and server both provide
/// your code with a `WebSocketConn`.
pub async fn new(upgrade: Upgrade, config: Option<WebSocketConfig>, role: Role) -> Self {
let Upgrade {
} = upgrade;
let wss = if let Some(vec) = buffer {
WebSocketStream::from_partially_read(transport, vec, role, config).await
} else {
WebSocketStream::from_raw_socket(transport, role, config).await
let (sink, stream) = wss.split();
let stream = Some(WStream {
stream: stopper.stop_stream(stream),
Self {
peer_ip: None,
/// retrieve a clone of the server's [`Stopper`]
pub fn stopper(&self) -> Stopper {
/// close the websocket connection gracefully
pub async fn close(&mut self) -> Result<()> {
/// retrieve the request headers for this conn
pub fn headers(&self) -> &Headers {
/// retrieves the peer ip for this conn, if available
pub fn peer_ip(&self) -> Option<IpAddr> {
/// Sets the peer ip for this conn
pub fn set_peer_ip(&mut self, peer_ip: Option<IpAddr>) {
self.peer_ip = peer_ip
retrieves the path part of the request url, up to and excluding
any query component
pub fn path(&self) -> &str {
Retrieves the query component of the path, excluding `?`. Returns
an empty string if there is no query component.
pub fn querystring(&self) -> &str {
.map(|(_, q)| q)
/// retrieve the request method for this conn
pub fn method(&self) -> Method {
retrieve state from the state set that has been accumulated by
trillium handlers run on the [`trillium::Conn`] before it
became a websocket. see [`trillium::Conn::state`] for more
pub fn state<T: 'static>(&self) -> Option<&T> {
retrieve a mutable borrow of the state from the state set
pub fn state_mut<T: 'static>(&mut self) -> Option<&mut T> {
/// see [`insert_state`]
#[deprecated = "use WebsocketConn::insert_state"]
pub fn set_state<T: Send + Sync + 'static>(&mut self, state: T) {
/// inserts new state
/// returns the previously set state of the same type, if any existed
pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
take some type T out of the state set that has been
accumulated by trillium handlers run on the [`trillium::Conn`]
before it became a websocket. see [`trillium::Conn::take_state`]
for more information
pub fn take_state<T: 'static>(&mut self) -> Option<T> {
/// take the inbound Message stream from this conn
pub fn take_inbound_stream(&mut self) -> Option<impl Stream<Item = MessageResult>> {
/// borrow the inbound Message stream from this conn
pub fn inbound_stream(&mut self) -> Option<impl Stream<Item = MessageResult> + '_> {
type MessageResult = std::result::Result<Message, tungstenite::Error>;
pub struct WStream {
stream: StreamStopper<SplitStream<Wss>>,
impl Stream for WStream {
type Item = MessageResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
impl AsMut<StateSet> for WebSocketConn {
fn as_mut(&mut self) -> &mut StateSet {
&mut self.state
impl AsRef<StateSet> for WebSocketConn {
fn as_ref(&self) -> &StateSet {
impl Stream for WebSocketConn {
type Item = MessageResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match {
Some(stream) => stream.poll_next_unpin(cx),
None => Poll::Ready(None),