trillium_grpc/server/grpc_conn.rs
1//! [`GrpcServerConn`]: the per-call control surface handed to every service method.
2//!
3//! A `GrpcServerConn` *owns* the [`Conn`] for one RPC — value in, value out, exactly
4//! like a trillium [`Handler`](trillium::Handler) owns its `Conn` (and like
5//! `WebSocketConn` owns its `Upgrade`). On top of that `Conn` it exposes the
6//! gRPC control interface: the request's initial metadata, mutable response
7//! initial- and trailing-metadata bags, the request deadline, and — for the
8//! shapes that read a request stream — a typed [`RequestStream`] via
9//! [`requests`](GrpcServerConn::requests). A half-duplex gRPC method is, structurally,
10//! a trillium `run()` handler: read the request body, set the response, hand
11//! the `Conn` back to be flushed. That is why these shapes never touch
12//! `Upgrade`.
13//!
14//! Response headers write straight through to the owned `Conn`'s response
15//! headers, committed to the wire when the handler returns and the head is
16//! flushed. Response trailers are different: trillium has no trailers field on
17//! `Conn` — they're emitted dynamically by [`BodySource::trailers`] at
18//! end-of-body — so they accumulate in a bag here that the framework hands to
19//! the response body source. The handler never holds the `Conn` directly, so
20//! there is no way to mutate headers after the head is flushed — the "exactly
21//! one bite at headers" guarantee is structural.
22//!
23//! [`BodySource::trailers`]: trillium_http::BodySource::trailers
24//!
25//! `GrpcServerConn` is generic over the codec, defaulted to [`Prost`], so the common
26//! signature is just `&mut GrpcServerConn`. The codec is what lets
27//! [`requests`](GrpcServerConn::requests) decode the wire bytes into your message
28//! type.
29
30use crate::{
31 Codec, Encoding, Prost, server::streaming::RequestStream, timeout::parse_grpc_timeout,
32};
33use std::{marker::PhantomData, time::Instant};
34use trillium::{Conn, Headers};
35
36/// The control surface for a single gRPC call. Owns the `Conn` for the
37/// duration of one RPC — value in, value out.
38pub struct GrpcServerConn<C = Prost> {
39 conn: Conn,
40 response_trailers: Headers,
41 request_encoding: Encoding,
42 deadline: Option<Instant>,
43 codec: PhantomData<fn() -> C>,
44}
45
46impl<C> GrpcServerConn<C> {
47 /// Take ownership of a `Conn` for the duration of one RPC.
48 /// `request_encoding` is the already-validated inbound `grpc-encoding`,
49 /// used to decompress request frames.
50 pub(crate) fn new(conn: Conn, request_encoding: Encoding) -> Self {
51 let deadline = conn
52 .request_headers()
53 .get_str("grpc-timeout")
54 .and_then(parse_grpc_timeout)
55 .map(|d| Instant::now() + d);
56 Self {
57 conn,
58 response_trailers: Headers::new(),
59 request_encoding,
60 deadline,
61 codec: PhantomData,
62 }
63 }
64
65 /// The request's initial metadata (received headers), including any custom
66 /// metadata the client attached.
67 pub fn received_headers(&self) -> &Headers {
68 self.conn.request_headers()
69 }
70
71 /// The response's initial metadata, written straight through to the
72 /// `Conn`. Committed to the wire when the handler returns, before the first
73 /// response byte.
74 pub fn response_headers_mut(&mut self) -> &mut Headers {
75 self.conn.response_headers_mut()
76 }
77
78 /// The response's trailing metadata, emitted alongside `grpc-status` after
79 /// the response body.
80 pub fn response_trailers_mut(&mut self) -> &mut Headers {
81 &mut self.response_trailers
82 }
83
84 /// The deadline derived from the request's `grpc-timeout` header, if any.
85 pub fn deadline(&self) -> Option<Instant> {
86 self.deadline
87 }
88
89 /// A typed stream of decoded request messages.
90 ///
91 /// Borrows the connection for the lifetime of the returned stream; read it
92 /// to EOF (or drop it) before touching response headers again. The codec's
93 /// `decode` is selected by the `GrpcServerConn`'s codec parameter, so the only
94 /// thing you supply is the message type:
95 ///
96 /// ```ignore
97 /// let mut reqs = conn.requests::<HelloRequest>();
98 /// while let Some(req) = reqs.recv().await? { /* … */ }
99 /// ```
100 pub fn requests<Req>(&mut self) -> RequestStream<'_, Req>
101 where
102 C: Codec<Req>,
103 Req: 'static,
104 {
105 RequestStream::new(
106 Box::pin(self.conn.request_body()),
107 <C as Codec<Req>>::decode,
108 self.request_encoding,
109 )
110 }
111
112 /// Consume the wrapper, returning the owned `Conn` (with response headers
113 /// already set on it) plus the accumulated response trailers for the
114 /// framework to hand to the body source.
115 pub(crate) fn into_parts(self) -> (Conn, Headers) {
116 (self.conn, self.response_trailers)
117 }
118}