trillium_http/h3/connection.rs
1use super::{
2 H3Error,
3 frame::{Frame, FrameDecodeError, UniStreamType},
4 quic_varint::{self, QuicVarIntError},
5 settings::H3Settings,
6};
7use crate::{Buffer, Conn, HttpContext, h3::H3ErrorCode};
8use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
9use std::{
10 future::Future,
11 io::{self, ErrorKind},
12 sync::{
13 Arc, OnceLock,
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 },
16};
17use swansong::{ShutdownCompletion, Swansong};
18
19/// The result of processing an HTTP/3 bidirectional stream.
20#[derive(Debug)]
21#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
22pub enum H3StreamResult<Transport> {
23 /// The stream carried a normal HTTP/3 request.
24 Request(Conn<Transport>),
25
26 /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
27 /// the associated WebTransport session.
28 WebTransport {
29 /// The WebTransport session ID (stream ID of the CONNECT request).
30 session_id: u64,
31 /// The underlying transport, ready for application data.
32 transport: Transport,
33 /// Any bytes buffered after the session ID during stream negotiation.
34 buffer: Buffer,
35 },
36}
37
38/// The result of processing an HTTP/3 unidirectional stream.
39#[derive(Debug)]
40pub enum UniStreamResult<T> {
41 /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
42 /// automatically.
43 Handled,
44
45 /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
46 /// WebTransport session.
47 WebTransport {
48 /// The WebTransport session ID.
49 session_id: u64,
50 /// The receive stream, ready for application data.
51 stream: T,
52 /// Any bytes buffered after the session ID during stream negotiation.
53 buffer: Buffer,
54 },
55
56 /// An unknown or unsupported stream type (e.g. Push). The caller should close or reset
57 /// this stream without processing it.
58 Unknown {
59 /// The raw stream type value.
60 stream_type: u64,
61 /// The stream.
62 stream: T,
63 },
64}
65
66/// Shared state for a single HTTP/3 QUIC connection.
67///
68/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
69#[derive(Debug)]
70pub struct H3Connection {
71 /// Shared configuration for the entire server, including tcp-based listeners
72 context: Arc<HttpContext>,
73
74 /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
75 /// the server-level Swansong shuts down. Request stream tasks use this to interrupt
76 /// in-progress work.
77 swansong: Swansong,
78
79 /// The peer's H3 settings, received on their control stream. Request streams may need to
80 /// consult these (e.g. max field section size).
81 peer_settings: OnceLock<H3Settings>,
82
83 /// The highest bidirectional stream ID we have accepted. Used to compute the GOAWAY value
84 /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
85 /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
86 max_accepted_stream_id: AtomicU64,
87
88 /// Whether we have accepted any streams yet.
89 has_accepted_stream: AtomicBool,
90}
91
92impl H3Connection {
93 /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
94 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
95 let swansong = context.swansong.child();
96 Arc::new(Self {
97 context,
98 swansong,
99 peer_settings: OnceLock::new(),
100 max_accepted_stream_id: AtomicU64::new(0),
101 has_accepted_stream: AtomicBool::new(false),
102 })
103 }
104
105 /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
106 /// [`H3Connection::shut_down`]
107 pub fn swansong(&self) -> &Swansong {
108 &self.swansong
109 }
110
111 /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
112 ///
113 /// The returned [`ShutdownCompletion`] type can
114 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
115 /// blocking context
116 ///
117 /// Note that this will NOT shut down the server. To shut down the whole server, use
118 /// [`HttpContext::shut_down`]
119 pub fn shut_down(&self) -> ShutdownCompletion {
120 self.swansong.shut_down()
121 }
122
123 /// Retrieve the [`HttpContext`] for this server.
124 pub fn context(&self) -> Arc<HttpContext> {
125 self.context.clone()
126 }
127
128 /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
129 /// processed.
130 pub fn peer_settings(&self) -> Option<&H3Settings> {
131 self.peer_settings.get()
132 }
133
134 /// Record that we accepted a bidirectional stream with this ID.
135 fn record_accepted_stream(&self, stream_id: u64) {
136 self.max_accepted_stream_id
137 .fetch_max(stream_id, Ordering::Relaxed);
138 self.has_accepted_stream.store(true, Ordering::Relaxed);
139 }
140
141 /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
142 /// haven't accepted any.
143 fn goaway_id(&self) -> u64 {
144 if self.has_accepted_stream.load(Ordering::Relaxed) {
145 self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
146 } else {
147 0
148 }
149 }
150
151 /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
152 ///
153 /// Call this once per accepted bidirectional stream. Returns
154 /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
155 /// a standard HTTP/3 request.
156 ///
157 /// # Errors
158 ///
159 /// Returns an `H3Error` in case of io error or http/3 semantic error.
160 pub async fn process_inbound_bidi<Transport, Handler, Fut>(
161 self: Arc<Self>,
162 transport: Transport,
163 handler: Handler,
164 stream_id: u64,
165 ) -> Result<H3StreamResult<Transport>, H3Error>
166 where
167 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
168 Handler: FnOnce(Conn<Transport>) -> Fut,
169 Fut: Future<Output = Conn<Transport>>,
170 {
171 self.record_accepted_stream(stream_id);
172 let _guard = self.swansong.guard();
173 let buffer = Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
174 match Conn::new_h3(self, transport, buffer).await? {
175 H3StreamResult::Request(conn) => Ok(H3StreamResult::Request(
176 handler(conn).await.send_h3().await?,
177 )),
178 wt @ H3StreamResult::WebTransport { .. } => Ok(wt),
179 }
180 }
181
182 /// Run this server's HTTP/3 outbound control stream.
183 ///
184 /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
185 /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
186 /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
187 ///
188 /// # Errors
189 ///
190 /// Returns an `H3Error` in case of io error or http/3 semantic error.
191 pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
192 where
193 T: AsyncWrite + Unpin + Send,
194 {
195 let mut buf = vec![0; 128];
196
197 // Stream type + SETTINGS frame
198 let settings = Frame::Settings(H3Settings::from(&self.context.config));
199
200 write(&mut buf, &mut stream, |buf| {
201 let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
202 written += settings.encode(&mut buf[written..])?;
203 Some(written)
204 })
205 .await?;
206
207 // Wait for shutdown
208 self.swansong.clone().await;
209
210 // Send GOAWAY
211 write(&mut buf, &mut stream, |buf| {
212 Frame::Goaway(self.goaway_id()).encode(buf)
213 })
214 .await?;
215
216 Ok(())
217 }
218
219 /// Initialize and hold open the outbound QPACK encoder stream for the duration of the
220 /// connection.
221 ///
222 /// # Errors
223 ///
224 /// Returns an `H3Error` in case of io error or http/3 semantic error.
225 // Currently idle (static table only); will carry encoder instructions when dynamic table is
226 // added.
227 pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
228 where
229 T: AsyncWrite + Unpin + Send,
230 {
231 let mut buf = vec![0; 8];
232 write(&mut buf, &mut stream, |buf| {
233 quic_varint::encode(UniStreamType::QpackEncoder, buf)
234 })
235 .await?;
236
237 self.swansong.clone().await;
238 Ok(())
239 }
240
241 /// Initialize and hold open the outbound QPACK decoder stream for the duration of the
242 /// connection.
243 ///
244 /// # Errors
245 ///
246 /// Returns an `H3Error` in case of io error or http/3 semantic error.
247 // Currently idle (static table only); will carry decoder instructions when dynamic table is
248 // added.
249 pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
250 where
251 T: AsyncWrite + Unpin + Send,
252 {
253 let mut buf = vec![0; 8];
254 write(&mut buf, &mut stream, |buf| {
255 quic_varint::encode(UniStreamType::QpackDecoder, buf)
256 })
257 .await?;
258
259 self.swansong.clone().await;
260 Ok(())
261 }
262
263 /// Handle an inbound unidirectional HTTP/3 stream from the peer.
264 ///
265 /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
266 /// application streams are returned via [`UniStreamResult`] for the caller to process.
267 ///
268 /// # Errors
269 ///
270 /// Returns a `H3Error` in case of io error or http/3 semantic error.
271 pub async fn process_inbound_uni<T>(&self, mut stream: T) -> Result<UniStreamResult<T>, H3Error>
272 where
273 T: AsyncRead + Unpin + Send,
274 {
275 let mut buf = vec![0; 128];
276 let mut filled = 0;
277
278 // Read stream type varint (decode as raw u64 to handle unknown types)
279 let stream_type = read(
280 &mut buf,
281 &mut filled,
282 &mut stream,
283 |data| match quic_varint::decode::<u64>(data) {
284 Ok(ok) => Ok(Some(ok)),
285 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
286 // this branch is unreachable because u64 is always From<u64>
287 Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
288 },
289 )
290 .await?;
291
292 match UniStreamType::try_from(stream_type) {
293 Ok(UniStreamType::Control) => {
294 self.run_inbound_control(&mut buf, &mut filled, &mut stream)
295 .await?;
296 Ok(UniStreamResult::Handled)
297 }
298
299 Ok(UniStreamType::QpackEncoder | UniStreamType::QpackDecoder) => {
300 // Static table only — hold stream open until shutdown
301 self.swansong.clone().await;
302 Ok(UniStreamResult::Handled)
303 }
304
305 Ok(UniStreamType::WebTransport) => {
306 let session_id =
307 read(
308 &mut buf,
309 &mut filled,
310 &mut stream,
311 |data| match quic_varint::decode::<u64>(data) {
312 Ok(ok) => Ok(Some(ok)),
313 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
314 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
315 Ok(Some((value, bytes)))
316 }
317 },
318 )
319 .await?;
320 buf.truncate(filled);
321 Ok(UniStreamResult::WebTransport {
322 session_id,
323 stream,
324 buffer: buf.into(),
325 })
326 }
327
328 Ok(UniStreamType::Push) | Err(_) => Ok(UniStreamResult::Unknown {
329 stream_type,
330 stream,
331 }),
332 }
333 }
334
335 /// Handle the http/3 peer's inbound control stream.
336 ///
337 /// # Errors
338 ///
339 /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
340 // The first frame must be SETTINGS. After that, watches for
341 // GOAWAY to initiate connection shutdown.
342 async fn run_inbound_control<T>(
343 &self,
344 buf: &mut Vec<u8>,
345 filled: &mut usize,
346 stream: &mut T,
347 ) -> Result<(), H3Error>
348 where
349 T: AsyncRead + Unpin + Send,
350 {
351 // First frame must be SETTINGS (§6.2.1)
352 let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
353 Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
354 Ok(_) => Err(H3ErrorCode::FrameUnexpected),
355 Err(FrameDecodeError::Incomplete) => Ok(None),
356 Err(FrameDecodeError::Error(code)) => Err(code),
357 })
358 .await?;
359
360 self.peer_settings
361 .set(settings)
362 .map_err(|_| H3ErrorCode::FrameUnexpected)?;
363
364 // Read subsequent frames, watching for GOAWAY
365 loop {
366 let frame = read(buf, filled, stream, |data| match Frame::decode(data) {
367 Ok((frame, consumed)) => Ok(Some((frame, consumed))),
368 Err(FrameDecodeError::Incomplete) => Ok(None),
369 Err(FrameDecodeError::Error(code)) => Err(code),
370 })
371 .await?;
372
373 match frame {
374 Frame::Goaway(_) => {
375 self.swansong.shut_down();
376 return Ok(());
377 }
378 Frame::Settings(_) => {
379 return Err(H3ErrorCode::FrameUnexpected.into());
380 }
381
382 _ => { /* MAX_PUSH_ID, CANCEL_PUSH, unknown — ignored for now */ }
383 }
384 }
385 }
386}
387
388const MAX_BUFFER_SIZE: usize = 1024 * 10;
389
390async fn write(
391 buf: &mut Vec<u8>,
392 mut stream: impl AsyncWrite + Unpin + Send,
393 mut f: impl FnMut(&mut [u8]) -> Option<usize>,
394) -> io::Result<usize> {
395 let written = loop {
396 if let Some(w) = f(buf) {
397 break w;
398 }
399 if buf.len() >= MAX_BUFFER_SIZE {
400 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
401 }
402 buf.resize(buf.len() * 2, 0);
403 };
404
405 stream.write_all(&buf[..written]).await?;
406 stream.flush().await?;
407 Ok(written)
408}
409
410/// Read from `stream` into `buf` until `f` can decode a value.
411///
412/// `f` receives the filled portion of the buffer and returns:
413/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
414/// - `Ok(None)` — need more data; reads more bytes and retries
415/// - `Err(e)` — unrecoverable error; propagated to caller
416async fn read<R>(
417 buf: &mut Vec<u8>,
418 filled: &mut usize,
419 stream: &mut (impl AsyncRead + Unpin + Send),
420 f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
421) -> Result<R, H3Error> {
422 loop {
423 if let Some((result, consumed)) = f(&buf[..*filled])? {
424 buf.copy_within(consumed..*filled, 0);
425 *filled -= consumed;
426 return Ok(result);
427 }
428
429 if *filled >= buf.len() {
430 if buf.len() >= MAX_BUFFER_SIZE {
431 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
432 }
433 buf.resize(buf.len() * 2, 0);
434 }
435
436 let n = stream.read(&mut buf[*filled..]).await?;
437 if n == 0 {
438 return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
439 }
440 *filled += n;
441 }
442}