Skip to main content

trillium_http/h3/
connection.rs

1use super::{
2    H3Error,
3    frame::{Frame, FrameDecodeError, UniStreamType},
4    quic_varint::{self, QuicVarIntError},
5    settings::H3Settings,
6};
7use crate::{Buffer, Conn, HttpContext, h3::H3ErrorCode};
8use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
9use std::{
10    future::Future,
11    io::{self, ErrorKind},
12    sync::{
13        Arc, OnceLock,
14        atomic::{AtomicBool, AtomicU64, Ordering},
15    },
16};
17use swansong::{ShutdownCompletion, Swansong};
18
19/// The result of processing an HTTP/3 bidirectional stream.
20#[derive(Debug)]
21#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
22pub enum H3StreamResult<Transport> {
23    /// The stream carried a normal HTTP/3 request.
24    Request(Conn<Transport>),
25
26    /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
27    /// the associated WebTransport session.
28    WebTransport {
29        /// The WebTransport session ID (stream ID of the CONNECT request).
30        session_id: u64,
31        /// The underlying transport, ready for application data.
32        transport: Transport,
33        /// Any bytes buffered after the session ID during stream negotiation.
34        buffer: Buffer,
35    },
36}
37
38/// The result of processing an HTTP/3 unidirectional stream.
39#[derive(Debug)]
40pub enum UniStreamResult<T> {
41    /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
42    /// automatically.
43    Handled,
44
45    /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
46    /// WebTransport session.
47    WebTransport {
48        /// The WebTransport session ID.
49        session_id: u64,
50        /// The receive stream, ready for application data.
51        stream: T,
52        /// Any bytes buffered after the session ID during stream negotiation.
53        buffer: Buffer,
54    },
55
56    /// An unknown or unsupported stream type (e.g. Push). The caller should close or reset
57    /// this stream without processing it.
58    Unknown {
59        /// The raw stream type value.
60        stream_type: u64,
61        /// The stream.
62        stream: T,
63    },
64}
65
66/// Shared state for a single HTTP/3 QUIC connection.
67///
68/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
69#[derive(Debug)]
70pub struct H3Connection {
71    /// Shared configuration for the entire server, including tcp-based listeners
72    context: Arc<HttpContext>,
73
74    /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
75    /// the server-level Swansong shuts down.  Request stream tasks use this to interrupt
76    /// in-progress work.
77    swansong: Swansong,
78
79    /// The peer's H3 settings, received on their control stream.  Request streams may need to
80    /// consult these (e.g. max field section size).
81    peer_settings: OnceLock<H3Settings>,
82
83    /// The highest bidirectional stream ID we have accepted.  Used to compute the GOAWAY value
84    /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
85    /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
86    max_accepted_stream_id: AtomicU64,
87
88    /// Whether we have accepted any streams yet.
89    has_accepted_stream: AtomicBool,
90}
91
92impl H3Connection {
93    /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
94    pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
95        let swansong = context.swansong.child();
96        Arc::new(Self {
97            context,
98            swansong,
99            peer_settings: OnceLock::new(),
100            max_accepted_stream_id: AtomicU64::new(0),
101            has_accepted_stream: AtomicBool::new(false),
102        })
103    }
104
105    /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
106    /// [`H3Connection::shut_down`]
107    pub fn swansong(&self) -> &Swansong {
108        &self.swansong
109    }
110
111    /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
112    ///
113    /// The returned [`ShutdownCompletion`] type can
114    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
115    /// blocking context
116    ///
117    /// Note that this will NOT shut down the server. To shut down the whole server, use
118    /// [`HttpContext::shut_down`]
119    pub fn shut_down(&self) -> ShutdownCompletion {
120        self.swansong.shut_down()
121    }
122
123    /// Retrieve the [`HttpContext`] for this server.
124    pub fn context(&self) -> Arc<HttpContext> {
125        self.context.clone()
126    }
127
128    /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
129    /// processed.
130    pub fn peer_settings(&self) -> Option<&H3Settings> {
131        self.peer_settings.get()
132    }
133
134    /// Record that we accepted a bidirectional stream with this ID.
135    fn record_accepted_stream(&self, stream_id: u64) {
136        self.max_accepted_stream_id
137            .fetch_max(stream_id, Ordering::Relaxed);
138        self.has_accepted_stream.store(true, Ordering::Relaxed);
139    }
140
141    /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
142    /// haven't accepted any.
143    fn goaway_id(&self) -> u64 {
144        if self.has_accepted_stream.load(Ordering::Relaxed) {
145            self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
146        } else {
147            0
148        }
149    }
150
151    /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
152    ///
153    /// Call this once per accepted bidirectional stream. Returns
154    /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
155    /// a standard HTTP/3 request.
156    ///
157    /// # Errors
158    ///
159    /// Returns an `H3Error` in case of io error or http/3 semantic error.
160    pub async fn process_inbound_bidi<Transport, Handler, Fut>(
161        self: Arc<Self>,
162        transport: Transport,
163        handler: Handler,
164        stream_id: u64,
165    ) -> Result<H3StreamResult<Transport>, H3Error>
166    where
167        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
168        Handler: FnOnce(Conn<Transport>) -> Fut,
169        Fut: Future<Output = Conn<Transport>>,
170    {
171        self.record_accepted_stream(stream_id);
172        let _guard = self.swansong.guard();
173        let buffer = Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
174        match Conn::new_h3(self, transport, buffer).await? {
175            H3StreamResult::Request(conn) => Ok(H3StreamResult::Request(
176                handler(conn).await.send_h3().await?,
177            )),
178            wt @ H3StreamResult::WebTransport { .. } => Ok(wt),
179        }
180    }
181
182    /// Run this server's HTTP/3 outbound control stream.
183    ///
184    /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
185    /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
186    /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
187    ///
188    /// # Errors
189    ///
190    /// Returns an `H3Error` in case of io error or http/3 semantic error.
191    pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
192    where
193        T: AsyncWrite + Unpin + Send,
194    {
195        let mut buf = vec![0; 128];
196
197        // Stream type + SETTINGS frame
198        let settings = Frame::Settings(H3Settings::from(&self.context.config));
199
200        write(&mut buf, &mut stream, |buf| {
201            let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
202            written += settings.encode(&mut buf[written..])?;
203            Some(written)
204        })
205        .await?;
206
207        // Wait for shutdown
208        self.swansong.clone().await;
209
210        // Send GOAWAY
211        write(&mut buf, &mut stream, |buf| {
212            Frame::Goaway(self.goaway_id()).encode(buf)
213        })
214        .await?;
215
216        Ok(())
217    }
218
219    /// Initialize and hold open the outbound QPACK encoder stream for the duration of the
220    /// connection.
221    ///
222    /// # Errors
223    ///
224    /// Returns an `H3Error` in case of io error or http/3 semantic error.
225    // Currently idle (static table only); will carry encoder instructions when dynamic table is
226    // added.
227    pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
228    where
229        T: AsyncWrite + Unpin + Send,
230    {
231        let mut buf = vec![0; 8];
232        write(&mut buf, &mut stream, |buf| {
233            quic_varint::encode(UniStreamType::QpackEncoder, buf)
234        })
235        .await?;
236
237        self.swansong.clone().await;
238        Ok(())
239    }
240
241    /// Initialize and hold open the outbound QPACK decoder stream for the duration of the
242    /// connection.
243    ///
244    /// # Errors
245    ///
246    /// Returns an `H3Error` in case of io error or http/3 semantic error.
247    // Currently idle (static table only); will carry decoder instructions when dynamic table is
248    // added.
249    pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
250    where
251        T: AsyncWrite + Unpin + Send,
252    {
253        let mut buf = vec![0; 8];
254        write(&mut buf, &mut stream, |buf| {
255            quic_varint::encode(UniStreamType::QpackDecoder, buf)
256        })
257        .await?;
258
259        self.swansong.clone().await;
260        Ok(())
261    }
262
263    /// Handle an inbound unidirectional HTTP/3 stream from the peer.
264    ///
265    /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
266    /// application streams are returned via [`UniStreamResult`] for the caller to process.
267    ///
268    /// # Errors
269    ///
270    /// Returns a `H3Error` in case of io error or http/3 semantic error.
271    pub async fn process_inbound_uni<T>(&self, mut stream: T) -> Result<UniStreamResult<T>, H3Error>
272    where
273        T: AsyncRead + Unpin + Send,
274    {
275        let mut buf = vec![0; 128];
276        let mut filled = 0;
277
278        // Read stream type varint (decode as raw u64 to handle unknown types)
279        let stream_type = read(
280            &mut buf,
281            &mut filled,
282            &mut stream,
283            |data| match quic_varint::decode::<u64>(data) {
284                Ok(ok) => Ok(Some(ok)),
285                Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
286                // this branch is unreachable because u64 is always From<u64>
287                Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
288            },
289        )
290        .await?;
291
292        match UniStreamType::try_from(stream_type) {
293            Ok(UniStreamType::Control) => {
294                self.run_inbound_control(&mut buf, &mut filled, &mut stream)
295                    .await?;
296                Ok(UniStreamResult::Handled)
297            }
298
299            Ok(UniStreamType::QpackEncoder | UniStreamType::QpackDecoder) => {
300                // Static table only — hold stream open until shutdown
301                self.swansong.clone().await;
302                Ok(UniStreamResult::Handled)
303            }
304
305            Ok(UniStreamType::WebTransport) => {
306                let session_id =
307                    read(
308                        &mut buf,
309                        &mut filled,
310                        &mut stream,
311                        |data| match quic_varint::decode::<u64>(data) {
312                            Ok(ok) => Ok(Some(ok)),
313                            Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
314                            Err(QuicVarIntError::UnknownValue { bytes, value }) => {
315                                Ok(Some((value, bytes)))
316                            }
317                        },
318                    )
319                    .await?;
320                buf.truncate(filled);
321                Ok(UniStreamResult::WebTransport {
322                    session_id,
323                    stream,
324                    buffer: buf.into(),
325                })
326            }
327
328            Ok(UniStreamType::Push) | Err(_) => Ok(UniStreamResult::Unknown {
329                stream_type,
330                stream,
331            }),
332        }
333    }
334
335    /// Handle the http/3 peer's inbound control stream.
336    ///
337    /// # Errors
338    ///
339    /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
340    // The first frame must be SETTINGS. After that, watches for
341    // GOAWAY to initiate connection shutdown.
342    async fn run_inbound_control<T>(
343        &self,
344        buf: &mut Vec<u8>,
345        filled: &mut usize,
346        stream: &mut T,
347    ) -> Result<(), H3Error>
348    where
349        T: AsyncRead + Unpin + Send,
350    {
351        // First frame must be SETTINGS (§6.2.1)
352        let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
353            Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
354            Ok(_) => Err(H3ErrorCode::FrameUnexpected),
355            Err(FrameDecodeError::Incomplete) => Ok(None),
356            Err(FrameDecodeError::Error(code)) => Err(code),
357        })
358        .await?;
359
360        self.peer_settings
361            .set(settings)
362            .map_err(|_| H3ErrorCode::FrameUnexpected)?;
363
364        // Read subsequent frames, watching for GOAWAY
365        loop {
366            let frame = read(buf, filled, stream, |data| match Frame::decode(data) {
367                Ok((frame, consumed)) => Ok(Some((frame, consumed))),
368                Err(FrameDecodeError::Incomplete) => Ok(None),
369                Err(FrameDecodeError::Error(code)) => Err(code),
370            })
371            .await?;
372
373            match frame {
374                Frame::Goaway(_) => {
375                    self.swansong.shut_down();
376                    return Ok(());
377                }
378                Frame::Settings(_) => {
379                    return Err(H3ErrorCode::FrameUnexpected.into());
380                }
381
382                _ => { /* MAX_PUSH_ID, CANCEL_PUSH, unknown — ignored for now */ }
383            }
384        }
385    }
386}
387
388const MAX_BUFFER_SIZE: usize = 1024 * 10;
389
390async fn write(
391    buf: &mut Vec<u8>,
392    mut stream: impl AsyncWrite + Unpin + Send,
393    mut f: impl FnMut(&mut [u8]) -> Option<usize>,
394) -> io::Result<usize> {
395    let written = loop {
396        if let Some(w) = f(buf) {
397            break w;
398        }
399        if buf.len() >= MAX_BUFFER_SIZE {
400            return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
401        }
402        buf.resize(buf.len() * 2, 0);
403    };
404
405    stream.write_all(&buf[..written]).await?;
406    stream.flush().await?;
407    Ok(written)
408}
409
410/// Read from `stream` into `buf` until `f` can decode a value.
411///
412/// `f` receives the filled portion of the buffer and returns:
413/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
414/// - `Ok(None)` — need more data; reads more bytes and retries
415/// - `Err(e)` — unrecoverable error; propagated to caller
416async fn read<R>(
417    buf: &mut Vec<u8>,
418    filled: &mut usize,
419    stream: &mut (impl AsyncRead + Unpin + Send),
420    f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
421) -> Result<R, H3Error> {
422    loop {
423        if let Some((result, consumed)) = f(&buf[..*filled])? {
424            buf.copy_within(consumed..*filled, 0);
425            *filled -= consumed;
426            return Ok(result);
427        }
428
429        if *filled >= buf.len() {
430            if buf.len() >= MAX_BUFFER_SIZE {
431                return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
432            }
433            buf.resize(buf.len() * 2, 0);
434        }
435
436        let n = stream.read(&mut buf[*filled..]).await?;
437        if n == 0 {
438            return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
439        }
440        *filled += n;
441    }
442}