Skip to main content

trillium_grpc/server/
grpc_conn.rs

1//! [`GrpcServerConn`]: the per-call control surface handed to every service method.
2//!
3//! A `GrpcServerConn` *owns* the [`Conn`] for one RPC — value in, value out, exactly
4//! like a trillium [`Handler`](trillium::Handler) owns its `Conn` (and like
5//! `WebSocketConn` owns its `Upgrade`). On top of that `Conn` it exposes the
6//! gRPC control interface: the request's initial metadata, mutable response
7//! initial- and trailing-metadata bags, the request deadline, and — for the
8//! shapes that read a request stream — a typed [`RequestStream`] via
9//! [`requests`](GrpcServerConn::requests). A half-duplex gRPC method is, structurally,
10//! a trillium `run()` handler: read the request body, set the response, hand
11//! the `Conn` back to be flushed. That is why these shapes never touch
12//! `Upgrade`.
13//!
14//! Response headers write straight through to the owned `Conn`'s response
15//! headers, committed to the wire when the handler returns and the head is
16//! flushed. Response trailers are different: trillium has no trailers field on
17//! `Conn` — they're emitted dynamically by [`BodySource::trailers`] at
18//! end-of-body — so they accumulate in a bag here that the framework hands to
19//! the response body source. The handler never holds the `Conn` directly, so
20//! there is no way to mutate headers after the head is flushed — the "exactly
21//! one bite at headers" guarantee is structural.
22//!
23//! [`BodySource::trailers`]: trillium_http::BodySource::trailers
24//!
25//! `GrpcServerConn` is generic over the codec, defaulted to [`Prost`], so the common
26//! signature is just `&mut GrpcServerConn`. The codec is what lets
27//! [`requests`](GrpcServerConn::requests) decode the wire bytes into your message
28//! type.
29
30use crate::{
31    Codec, Encoding, Prost, server::streaming::RequestStream, timeout::parse_grpc_timeout,
32};
33use std::{marker::PhantomData, time::Instant};
34use trillium::{Conn, Headers};
35
36/// The control surface for a single gRPC call. Owns the `Conn` for the
37/// duration of one RPC — value in, value out.
38pub struct GrpcServerConn<C = Prost> {
39    conn: Conn,
40    response_trailers: Headers,
41    request_encoding: Encoding,
42    deadline: Option<Instant>,
43    codec: PhantomData<fn() -> C>,
44}
45
46impl<C> GrpcServerConn<C> {
47    /// Take ownership of a `Conn` for the duration of one RPC.
48    /// `request_encoding` is the already-validated inbound `grpc-encoding`,
49    /// used to decompress request frames.
50    pub(crate) fn new(conn: Conn, request_encoding: Encoding) -> Self {
51        let deadline = conn
52            .request_headers()
53            .get_str("grpc-timeout")
54            .and_then(parse_grpc_timeout)
55            .map(|d| Instant::now() + d);
56        Self {
57            conn,
58            response_trailers: Headers::new(),
59            request_encoding,
60            deadline,
61            codec: PhantomData,
62        }
63    }
64
65    /// The request's initial metadata (received headers), including any custom
66    /// metadata the client attached.
67    pub fn received_headers(&self) -> &Headers {
68        self.conn.request_headers()
69    }
70
71    /// The response's initial metadata, written straight through to the
72    /// `Conn`. Committed to the wire when the handler returns, before the first
73    /// response byte.
74    pub fn response_headers_mut(&mut self) -> &mut Headers {
75        self.conn.response_headers_mut()
76    }
77
78    /// The response's trailing metadata, emitted alongside `grpc-status` after
79    /// the response body.
80    pub fn response_trailers_mut(&mut self) -> &mut Headers {
81        &mut self.response_trailers
82    }
83
84    /// The deadline derived from the request's `grpc-timeout` header, if any.
85    pub fn deadline(&self) -> Option<Instant> {
86        self.deadline
87    }
88
89    /// A typed stream of decoded request messages.
90    ///
91    /// Borrows the connection for the lifetime of the returned stream; read it
92    /// to EOF (or drop it) before touching response headers again. The codec's
93    /// `decode` is selected by the `GrpcServerConn`'s codec parameter, so the only
94    /// thing you supply is the message type:
95    ///
96    /// ```ignore
97    /// let mut reqs = conn.requests::<HelloRequest>();
98    /// while let Some(req) = reqs.recv().await? { /* … */ }
99    /// ```
100    pub fn requests<Req>(&mut self) -> RequestStream<'_, Req>
101    where
102        C: Codec<Req>,
103        Req: 'static,
104    {
105        RequestStream::new(
106            Box::pin(self.conn.request_body()),
107            <C as Codec<Req>>::decode,
108            self.request_encoding,
109        )
110    }
111
112    /// Consume the wrapper, returning the owned `Conn` (with response headers
113    /// already set on it) plus the accumulated response trailers for the
114    /// framework to hand to the body source.
115    pub(crate) fn into_parts(self) -> (Conn, Headers) {
116        (self.conn, self.response_trailers)
117    }
118}