Skip to main content

trillium_grpc/server/
bidi.rs

1//! Bidirectional-streaming: the prologue + responder split that straddles the
2//! run→upgrade seam.
3//!
4//! Bidi is the one shape that can't run entirely in `Handler::run`: read-while-
5//! write needs the response head already flushed, which only happens once `run`
6//! returns and `Handler::upgrade` is called. But the response's *initial*
7//! metadata (and the content-type) must be committed in `run`, before the
8//! flush — and that decision may depend on reading the first request message
9//! (the conformance suite, for instance, carries the response-header definition
10//! in request 1). So bidi is two functions joined by an **object**, never a
11//! suspended future:
12//!
13//! 1. **The prologue** — the service trait's bidi method. It runs to completion
14//!    in `run`, holding a [`GrpcServerConn`]: it may read request messages (via
15//!    [`GrpcServerConn::requests`]) to decide response headers, sets that initial
16//!    metadata straight through to the `Conn`, and returns a
17//!    [`BidiResponder`]. Returning `Err(Status)` rejects before the flush
18//!    (trailers-only, no upgrade).
19//! 2. **The responder** — [`BidiResponder::respond`], a *fresh* future built and
20//!    driven in `upgrade` over a [`Channel`]. It owns the read-while-write loop.
21//!    Trailing metadata is written through the channel
22//!    ([`Channel::response_trailers_mut`], seeded with whatever the prologue
23//!    set) and emitted with `grpc-status` when it returns `Ok(())` or
24//!    `Err(Status)`.
25//!
26//! What actually crosses the seam is a `BidiUpgrade` in the `Conn`'s state: a
27//! type-erased `BidiDriver` (the boxed responder plus the codec fn-pointers,
28//! encodings, and prologue-set trailers it needs). The user's loop future is
29//! *not* suspended across the seam — it's created fresh in `upgrade` — so it
30//! never has to be moved while holding borrows into the channel. The request
31//! body's receive state is retained from `Conn` onto `Upgrade` by trillium-http,
32//! so the responder's `Channel` continues reading at the next frame after
33//! whatever the prologue consumed.
34//!
35//! [`GrpcServerConn`]: crate::server::GrpcServerConn
36//! [`GrpcServerConn::requests`]: crate::server::GrpcServerConn::requests
37
38use crate::{
39    Encoding, Status,
40    server::{dispatch::Cancellation, streaming::Channel},
41};
42use bytes::Bytes;
43use std::{future::Future, pin::Pin, time::Instant};
44use sync_wrapper::SyncWrapper;
45use trillium::{Headers, Upgrade};
46
47/// The user-driven half of a bidirectional-streaming RPC: the read-while-write
48/// loop, created by the service method (the prologue) and run after the response
49/// head is flushed.
50///
51/// Implement this on a type that carries whatever the prologue computed (any
52/// requests it already read, the chosen response shape, …). [`respond`] is given
53/// a [`Channel`] to interleave `recv` and `send`; trailing metadata (including
54/// `grpc-status-details-bin` error details) is written through
55/// [`Channel::response_trailers_mut`] and emitted with `grpc-status` from the
56/// returned `Ok(())` / `Err(Status)`.
57///
58/// [`respond`]: BidiResponder::respond
59pub trait BidiResponder<Req, Resp>: Send + 'static {
60    /// Drive the bidirectional loop to completion. `Ok(())` ends the RPC with
61    /// `grpc-status: 0`; `Err` ends it with that status.
62    fn respond(
63        self,
64        channel: Channel<'_, Req, Resp>,
65    ) -> impl Future<Output = Result<(), Status>> + Send;
66}
67
68/// Object-safe bridge over [`BidiResponder`]: boxes the (lifetime-carrying)
69/// response future so the responder can be type-erased into a [`BidiDriver`].
70trait ErasedResponder<Req, Resp>: Send {
71    fn respond_boxed<'a>(
72        self: Box<Self>,
73        channel: Channel<'a, Req, Resp>,
74    ) -> Pin<Box<dyn Future<Output = Result<(), Status>> + Send + 'a>>
75    where
76        Req: 'a,
77        Resp: 'a;
78}
79
80impl<Req, Resp, R> ErasedResponder<Req, Resp> for R
81where
82    R: BidiResponder<Req, Resp>,
83{
84    fn respond_boxed<'a>(
85        self: Box<Self>,
86        channel: Channel<'a, Req, Resp>,
87    ) -> Pin<Box<dyn Future<Output = Result<(), Status>> + Send + 'a>>
88    where
89        Req: 'a,
90        Resp: 'a,
91    {
92        Box::pin(async move { (*self).respond(channel).await })
93    }
94}
95
96/// A fully type-erased bidi loop, ready to drive over an [`Upgrade`]. Erasing
97/// `Req`/`Resp` lets a single [`BidiUpgrade`] state key serve every bidi method
98/// in a service.
99trait BidiDriver: Send {
100    fn drive(self: Box<Self>, upgrade: Upgrade) -> Pin<Box<dyn Future<Output = ()> + Send>>;
101}
102
103/// Everything the upgrade phase needs to run one bidi RPC: the boxed responder,
104/// the trailing metadata the prologue accumulated, the codec fn-pointers, the
105/// negotiated encodings, and the deadline carried from the request.
106struct BidiState<Req, Resp> {
107    responder: Box<dyn ErasedResponder<Req, Resp>>,
108    base_trailers: Headers,
109    decode: fn(&[u8]) -> Result<Req, Status>,
110    encode: fn(&Resp) -> Result<Bytes, Status>,
111    request_encoding: Encoding,
112    response_encoding: Encoding,
113    deadline: Option<Instant>,
114}
115
116impl<Req, Resp> BidiDriver for BidiState<Req, Resp>
117where
118    Req: Send + 'static,
119    Resp: Send + 'static,
120{
121    fn drive(self: Box<Self>, mut upgrade: Upgrade) -> Pin<Box<dyn Future<Output = ()> + Send>> {
122        Box::pin(async move {
123            let BidiState {
124                responder,
125                mut base_trailers,
126                decode,
127                encode,
128                request_encoding,
129                response_encoding,
130                deadline,
131            } = *self;
132
133            // The responder writes trailing metadata through the channel into
134            // `base_trailers` (seeded by the prologue); we add `grpc-status`
135            // after it returns.
136            let cancellation = Cancellation::for_upgrade(&upgrade, deadline);
137            let result = cancellation
138                .race(async {
139                    let channel = Channel::new(
140                        &mut upgrade,
141                        &mut base_trailers,
142                        decode,
143                        encode,
144                        request_encoding,
145                        response_encoding,
146                    );
147                    responder.respond_boxed(channel).await
148                })
149                .await;
150
151            match result {
152                Ok(()) => Status::ok().write_into(&mut base_trailers),
153                Err(status) => status.write_into(&mut base_trailers),
154            }
155
156            if let Err(e) = upgrade.send_trailers(base_trailers).await {
157                log::warn!("trillium-grpc: send_trailers failed: {e}");
158            }
159        })
160    }
161}
162
163/// The bidi handoff stashed in the `Conn`'s state at the run→upgrade seam.
164/// [`SyncWrapper`] satisfies the state set's `Sync` bound without requiring the
165/// responder itself to be `Sync` (it never is concurrently shared).
166pub(crate) struct BidiUpgrade(SyncWrapper<Box<dyn BidiDriver>>);
167
168impl BidiUpgrade {
169    /// Box the responder together with its codec/encoding context into a
170    /// type-erased driver, ready to stash in `Conn` state. Called from the
171    /// run-phase bidi dispatch once the prologue returns a responder.
172    #[allow(clippy::too_many_arguments)]
173    pub(crate) fn new<Req, Resp, R>(
174        responder: R,
175        base_trailers: Headers,
176        decode: fn(&[u8]) -> Result<Req, Status>,
177        encode: fn(&Resp) -> Result<Bytes, Status>,
178        request_encoding: Encoding,
179        response_encoding: Encoding,
180        deadline: Option<Instant>,
181    ) -> Self
182    where
183        R: BidiResponder<Req, Resp>,
184        Req: Send + 'static,
185        Resp: Send + 'static,
186    {
187        let state = BidiState {
188            responder: Box::new(responder),
189            base_trailers,
190            decode,
191            encode,
192            request_encoding,
193            response_encoding,
194            deadline,
195        };
196        Self(SyncWrapper::new(Box::new(state)))
197    }
198}
199
200/// Whether this upgrade was marked for bidi by the run-phase dispatch. Generated
201/// `Handler::has_upgrade` delegates here.
202pub fn has_bidi_upgrade(upgrade: &Upgrade) -> bool {
203    upgrade.state().get::<BidiUpgrade>().is_some()
204}
205
206/// Drive a bidi RPC to completion over its upgraded transport: build the
207/// [`Channel`], run the responder loop, and write the terminating `grpc-status`
208/// trailers. Generated `Handler::upgrade` delegates here. A no-op if the upgrade
209/// wasn't ours (shouldn't happen given [`has_bidi_upgrade`] gates it).
210pub async fn drive_bidi_upgrade(mut upgrade: Upgrade) {
211    let Some(state) = upgrade.state_mut().take::<BidiUpgrade>() else {
212        return;
213    };
214    state.0.into_inner().drive(upgrade).await;
215}