Skip to main content

trillium_grpc/client/
typed.rs

1//! Typed, shape-specific call handles built on the [`GrpcClientConn`] engine.
2//!
3//! A generated `<Service>Client` method returns one of these rather than the
4//! engine directly, so the handle you hold exposes only the operations that make
5//! sense for that RPC's shape — you never need to know the proto to use it
6//! correctly. Each mirrors `trillium_client::Conn`'s lifecycle: build it with
7//! chainable `with_*` setters, drive it (`.await` and/or `.next()`), then read
8//! the response metadata off the same value. Like a trillium `Conn`, the readers
9//! behave like an `Option` that execution populates — they return `None` until
10//! the call has run.
11//!
12//! - [`UnaryConn`] — unary and client-streaming (one response). `.await` runs the
13//!   whole call; read it with [`UnaryConn::into_message`] / [`message`].
14//! - [`StreamingConn`] — server-streaming. A [`Stream`] of responses; `.await` is
15//!   an optional opt-in to read the head early for initial metadata.
16//! - [`BidiConn`] — full-duplex bidi. Live [`send`] / [`close_send`] / [`next`].
17//!
18//! [`message`]: UnaryConn::message
19//! [`send`]: BidiConn::send
20//! [`close_send`]: BidiConn::close_send
21//! [`next`]: futures_lite::StreamExt::next
22
23use crate::{
24    Codec, Metadata, Status,
25    client::conn::{CancelHandle, GrpcClientConn},
26    timeout::parse_grpc_timeout,
27};
28use futures_lite::{Stream, StreamExt};
29use std::{
30    future::{Future, IntoFuture},
31    pin::Pin,
32    task::{Context, Poll},
33    time::Duration,
34};
35use trillium::Headers;
36use trillium_client::Client;
37
38type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
39type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
40
41/// Read the client's configured default `grpc-timeout` into a per-call deadline.
42fn default_timeout(client: &Client) -> Option<Duration> {
43    client
44        .default_headers()
45        .get_str("grpc-timeout")
46        .and_then(parse_grpc_timeout)
47}
48
49/// A unary or client-streaming call: one request (or a stream of requests) in,
50/// exactly one response out.
51///
52/// Build it with the chainable `with_*` setters, then `.await` it — awaiting
53/// sends the request(s), reads the single response, drains the stream to its
54/// trailers, and enforces the one-response cardinality. Transport-level failures
55/// (connection, non-200 head) surface as the `await`'s `Err`; the RPC's logical
56/// `grpc-status` is read afterward via [`status`](Self::status) /
57/// [`into_message`](Self::into_message), so response metadata stays readable even
58/// on a logical error.
59pub struct UnaryConn<Req, Resp> {
60    engine: GrpcClientConn<Req, Resp>,
61    /// The request source, drained when the call fires. `None` once awaited.
62    requests: Option<BoxStream<Req>>,
63    /// `(message, verdict)`, populated by `.await`. `None` until then.
64    outcome: Option<(Option<Resp>, Result<(), Status>)>,
65}
66
67impl<Req, Resp> UnaryConn<Req, Resp>
68where
69    Req: Send + 'static,
70    Resp: Send + 'static,
71{
72    /// A unary call sending the single `request`.
73    pub fn unary<C>(client: &Client, path: &str, request: Req) -> Self
74    where
75        C: Codec<Req> + Codec<Resp>,
76    {
77        let mut engine = GrpcClientConn::new::<C>(
78            client,
79            path,
80            Metadata::new(),
81            default_timeout(client),
82            false,
83        );
84        engine.buffer_request(request);
85        Self {
86            engine,
87            requests: None,
88            outcome: None,
89        }
90    }
91
92    /// A client-streaming call sending each message from `requests`.
93    pub fn client_streaming<C>(
94        client: &Client,
95        path: &str,
96        requests: impl Stream<Item = Req> + Send + 'static,
97    ) -> Self
98    where
99        C: Codec<Req> + Codec<Resp>,
100    {
101        let engine = GrpcClientConn::new::<C>(
102            client,
103            path,
104            Metadata::new(),
105            default_timeout(client),
106            false,
107        );
108        Self {
109            engine,
110            requests: Some(Box::pin(requests)),
111            outcome: None,
112        }
113    }
114
115    /// Attach an ASCII request-metadata entry. Chainable; takes effect when the
116    /// call fires.
117    pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
118        self.engine.add_ascii_metadata(key, value);
119        self
120    }
121
122    /// Attach a binary (`-bin`) request-metadata entry. Chainable.
123    pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
124        self.engine.add_binary_metadata(key, value.into());
125        self
126    }
127
128    /// Set this call's deadline, `timeout` from now. Chainable.
129    pub fn with_timeout(mut self, timeout: Duration) -> Self {
130        self.engine.set_deadline_from_now(timeout);
131        self
132    }
133
134    /// A handle that cancels this call from elsewhere (including before/while it
135    /// is awaited).
136    pub fn cancel_handle(&self) -> CancelHandle {
137        self.engine.cancel_handle()
138    }
139
140    /// The response's initial metadata, once the call has run. `None` before.
141    pub fn metadata(&self) -> Option<&Headers> {
142        self.engine.headers()
143    }
144
145    /// The response's trailing metadata, once the call has run. `None` before.
146    pub fn trailers(&self) -> Option<&Headers> {
147        self.engine.trailers()
148    }
149
150    /// The response message, if the call ran and yielded one. `None` before the
151    /// call, and on the error path.
152    pub fn message(&self) -> Option<&Resp> {
153        self.outcome.as_ref().and_then(|(m, _)| m.as_ref())
154    }
155
156    /// The RPC's logical verdict, once the call has run: `Ok(())` on success, the
157    /// server's `grpc-status` (or a cardinality violation) as `Err`. `Ok(())`
158    /// before the call has run.
159    pub fn status(&self) -> Result<(), Status> {
160        self.outcome
161            .as_ref()
162            .map(|(_, s)| s.clone())
163            .unwrap_or(Ok(()))
164    }
165
166    /// Consume the conn and yield the response message, folding the logical
167    /// verdict: `Err` on a `grpc-status` error (or cardinality violation), else
168    /// the single message. Read [`metadata`](Self::metadata) /
169    /// [`trailers`](Self::trailers) first if you need them on the error path.
170    pub fn into_message(self) -> Result<Resp, Status> {
171        match self.outcome {
172            Some((message, status)) => {
173                status?;
174                message.ok_or_else(|| Status::internal("unary response had no message"))
175            }
176            None => Err(Status::internal("call has not been awaited")),
177        }
178    }
179}
180
181impl<Req, Resp> IntoFuture for UnaryConn<Req, Resp>
182where
183    Req: Send + 'static,
184    Resp: Send + 'static,
185{
186    type Output = Result<UnaryConn<Req, Resp>, Status>;
187    type IntoFuture = BoxFuture<Self::Output>;
188
189    fn into_future(mut self) -> Self::IntoFuture {
190        Box::pin(async move {
191            let mut engine = self.engine;
192            if let Some(mut requests) = self.requests.take() {
193                while let Some(message) = requests.next().await {
194                    engine.send(message).await?;
195                }
196            }
197            // Fires the request (body + END_STREAM) and awaits the head; a
198            // transport / non-200 / non-gRPC head fails here, before any verdict.
199            engine.close_send().await?;
200            let outcome = read_unary(&mut engine).await;
201            Ok(UnaryConn {
202                engine,
203                requests: None,
204                outcome: Some(outcome),
205            })
206        })
207    }
208}
209
210/// Read exactly one response message and confirm the stream ends cleanly. The
211/// second `recv` is load-bearing: `grpc-status` rides the trailing frame *after*
212/// the message, so it drives the body to EOF (populating the trailers), reads the
213/// verdict, and detects a cardinality-violating second message.
214async fn read_unary<Req, Resp>(
215    engine: &mut GrpcClientConn<Req, Resp>,
216) -> (Option<Resp>, Result<(), Status>)
217where
218    Req: Send + 'static,
219    Resp: Send + 'static,
220{
221    match engine.recv().await {
222        Ok(Some(message)) => match engine.recv().await {
223            Ok(None) => (Some(message), Ok(())),
224            Ok(Some(_)) => (
225                None,
226                Err(Status::internal("unary response had multiple messages")),
227            ),
228            // A trailing error after a message: keep the message, report the error.
229            Err(status) => (Some(message), Err(status)),
230        },
231        // Clean EOF with no message is malformed for a unary response.
232        Ok(None) => (None, Err(Status::internal("unary response had no message"))),
233        // A trailers-only / logical error (no message). Metadata stays readable.
234        Err(status) => (None, Err(status)),
235    }
236}
237
238/// A server-streaming call: one request in, a stream of responses out.
239///
240/// It is a [`Stream`] of decoded responses — iterating it lazily fires the
241/// request and reads the head on the first poll, so no explicit `.await` is
242/// required. Awaiting it *is* available as an opt-in: `.await` reads just the
243/// response head, so [`metadata`](Self::metadata) is populated before you consume
244/// the first message. [`trailers`](Self::trailers) populate once the stream ends.
245pub struct StreamingConn<Req, Resp> {
246    /// Held between polls; moved into [`in_flight`](Self::in_flight) for the
247    /// duration of a single `recv` and restored when it resolves. `None` only
248    /// while a poll is mid-flight (not observable through `&self` accessors).
249    /// Boxed so the conn is `Unpin` despite the engine holding a `!Unpin`
250    /// cancellation receiver.
251    engine: Option<Box<GrpcClientConn<Req, Resp>>>,
252    /// The in-progress `recv`, owning the engine while it runs so the future
253    /// survives `Pending` polls. Yields `(engine, result)` so the engine returns.
254    #[allow(clippy::type_complexity)]
255    in_flight: Option<BoxFuture<(Box<GrpcClientConn<Req, Resp>>, Result<Option<Resp>, Status>)>>,
256}
257
258impl<Req, Resp> StreamingConn<Req, Resp>
259where
260    Req: Send + 'static,
261    Resp: Send + 'static,
262{
263    /// A server-streaming call sending the single `request`.
264    pub fn server_streaming<C>(client: &Client, path: &str, request: Req) -> Self
265    where
266        C: Codec<Req> + Codec<Resp>,
267    {
268        let mut engine = GrpcClientConn::new::<C>(
269            client,
270            path,
271            Metadata::new(),
272            default_timeout(client),
273            false,
274        );
275        // The single request is buffered now; the body fires on the first recv.
276        engine.buffer_request(request);
277        Self {
278            engine: Some(Box::new(engine)),
279            in_flight: None,
280        }
281    }
282
283    /// The engine between polls. Panics only if called while a poll is in
284    /// flight, which `&self`/`&mut self` borrow rules make unreachable.
285    fn engine(&self) -> &GrpcClientConn<Req, Resp> {
286        self.engine
287            .as_deref()
288            .expect("engine present between polls")
289    }
290
291    fn engine_mut(&mut self) -> &mut GrpcClientConn<Req, Resp> {
292        self.engine
293            .as_deref_mut()
294            .expect("engine present between polls")
295    }
296
297    /// Attach an ASCII request-metadata entry. Chainable.
298    pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
299        self.engine_mut().add_ascii_metadata(key, value);
300        self
301    }
302
303    /// Attach a binary (`-bin`) request-metadata entry. Chainable.
304    pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
305        self.engine_mut().add_binary_metadata(key, value.into());
306        self
307    }
308
309    /// Set this call's deadline, `timeout` from now. Chainable.
310    pub fn with_timeout(mut self, timeout: Duration) -> Self {
311        self.engine_mut().set_deadline_from_now(timeout);
312        self
313    }
314
315    /// A handle that cancels this call from elsewhere.
316    pub fn cancel_handle(&self) -> CancelHandle {
317        self.engine().cancel_handle()
318    }
319
320    /// The response's initial metadata, once the head has arrived. `None` before.
321    pub fn metadata(&self) -> Option<&Headers> {
322        self.engine().headers()
323    }
324
325    /// The response's trailing metadata, once the stream has ended. `None` before.
326    pub fn trailers(&self) -> Option<&Headers> {
327        self.engine().trailers()
328    }
329
330    /// Read the next response message. `Ok(None)` at end-of-stream, `Err` on RPC
331    /// error. Equivalent to using the [`Stream`] impl.
332    pub async fn recv(&mut self) -> Result<Option<Resp>, Status> {
333        self.engine_mut().recv().await
334    }
335}
336
337impl<Req, Resp> Stream for StreamingConn<Req, Resp>
338where
339    Req: Send + 'static,
340    Resp: Send + 'static,
341{
342    type Item = Result<Resp, Status>;
343
344    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
345        let this = self.get_mut();
346        if this.in_flight.is_none() {
347            // Move the engine into a fresh recv future so it persists across
348            // Pending polls; the future hands the engine back when it resolves.
349            let mut engine = this.engine.take().expect("engine present between polls");
350            this.in_flight = Some(Box::pin(async move {
351                let result = engine.recv().await;
352                (engine, result)
353            }));
354        }
355        let fut = this.in_flight.as_mut().unwrap();
356        match fut.as_mut().poll(cx) {
357            Poll::Pending => Poll::Pending,
358            Poll::Ready((engine, result)) => {
359                this.engine = Some(engine);
360                this.in_flight = None;
361                match result {
362                    Ok(None) => Poll::Ready(None),
363                    Ok(Some(msg)) => Poll::Ready(Some(Ok(msg))),
364                    Err(status) => Poll::Ready(Some(Err(status))),
365                }
366            }
367        }
368    }
369}
370
371impl<Req, Resp> IntoFuture for StreamingConn<Req, Resp>
372where
373    Req: Send + 'static,
374    Resp: Send + 'static,
375{
376    type Output = Result<StreamingConn<Req, Resp>, Status>;
377    type IntoFuture = BoxFuture<Self::Output>;
378
379    /// Fire the request and read the head only, so initial metadata is available
380    /// before the first message is consumed.
381    fn into_future(mut self) -> Self::IntoFuture {
382        Box::pin(async move {
383            self.engine_mut().open_head().await?;
384            Ok(self)
385        })
386    }
387}
388
389/// A full-duplex bidirectional call: send and receive messages interleaved over
390/// one live stream.
391///
392/// Drive it by turns on `&mut self` (the [`WebSocketConn`]-style single-stream
393/// model): [`send`](Self::send) a request, [`next`](futures_lite::StreamExt::next)
394/// (or [`recv`](Self::recv)) a response, repeat, and [`close_send`](Self::close_send)
395/// the request half when done. The first send/recv flushes the request prelude
396/// and reads the response head, after which sends write live on the wire.
397///
398/// [`WebSocketConn`]: https://docs.rs/trillium-websockets
399pub struct BidiConn<Req, Resp> {
400    /// See [`StreamingConn`]'s fields — same move-into-`recv`-and-back scheme.
401    engine: Option<Box<GrpcClientConn<Req, Resp>>>,
402    #[allow(clippy::type_complexity)]
403    in_flight: Option<BoxFuture<(Box<GrpcClientConn<Req, Resp>>, Result<Option<Resp>, Status>)>>,
404}
405
406impl<Req, Resp> BidiConn<Req, Resp>
407where
408    Req: Send + 'static,
409    Resp: Send + 'static,
410{
411    /// A full-duplex bidi call. Requests are sent live via [`send`](Self::send).
412    pub fn bidi<C>(client: &Client, path: &str) -> Self
413    where
414        C: Codec<Req> + Codec<Resp>,
415    {
416        let engine =
417            GrpcClientConn::new::<C>(client, path, Metadata::new(), default_timeout(client), true);
418        Self {
419            engine: Some(Box::new(engine)),
420            in_flight: None,
421        }
422    }
423
424    fn engine(&self) -> &GrpcClientConn<Req, Resp> {
425        self.engine
426            .as_deref()
427            .expect("engine present between polls")
428    }
429
430    fn engine_mut(&mut self) -> &mut GrpcClientConn<Req, Resp> {
431        self.engine
432            .as_deref_mut()
433            .expect("engine present between polls")
434    }
435
436    /// Attach an ASCII request-metadata entry, before the first send. Chainable.
437    pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
438        self.engine_mut().add_ascii_metadata(key, value);
439        self
440    }
441
442    /// Attach a binary (`-bin`) request-metadata entry, before the first send.
443    pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
444        self.engine_mut().add_binary_metadata(key, value.into());
445        self
446    }
447
448    /// Set this call's deadline, `timeout` from now. Chainable.
449    pub fn with_timeout(mut self, timeout: Duration) -> Self {
450        self.engine_mut().set_deadline_from_now(timeout);
451        self
452    }
453
454    /// A handle that cancels this call from elsewhere.
455    pub fn cancel_handle(&self) -> CancelHandle {
456        self.engine().cancel_handle()
457    }
458
459    /// The response's initial metadata, once the head has arrived. `None` before.
460    pub fn metadata(&self) -> Option<&Headers> {
461        self.engine().headers()
462    }
463
464    /// The response's trailing metadata, once the stream has ended. `None` before.
465    pub fn trailers(&self) -> Option<&Headers> {
466        self.engine().trailers()
467    }
468
469    /// Send one request message. The first send flushes the prelude and reads the
470    /// head; later sends write live on the wire.
471    pub async fn send(&mut self, message: Req) -> Result<(), Status> {
472        self.engine_mut().send(message).await
473    }
474
475    /// Half-close the request side (sends END_STREAM), leaving the response side
476    /// open to read.
477    pub async fn close_send(&mut self) -> Result<(), Status> {
478        self.engine_mut().close_send().await
479    }
480
481    /// Read the next response message. `Ok(None)` at end-of-stream, `Err` on RPC
482    /// error. Equivalent to using the [`Stream`] impl.
483    pub async fn recv(&mut self) -> Result<Option<Resp>, Status> {
484        self.engine_mut().recv().await
485    }
486}
487
488impl<Req, Resp> Stream for BidiConn<Req, Resp>
489where
490    Req: Send + 'static,
491    Resp: Send + 'static,
492{
493    type Item = Result<Resp, Status>;
494
495    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
496        let this = self.get_mut();
497        if this.in_flight.is_none() {
498            let mut engine = this.engine.take().expect("engine present between polls");
499            this.in_flight = Some(Box::pin(async move {
500                let result = engine.recv().await;
501                (engine, result)
502            }));
503        }
504        let fut = this.in_flight.as_mut().unwrap();
505        match fut.as_mut().poll(cx) {
506            Poll::Pending => Poll::Pending,
507            Poll::Ready((engine, result)) => {
508                this.engine = Some(engine);
509                this.in_flight = None;
510                match result {
511                    Ok(None) => Poll::Ready(None),
512                    Ok(Some(msg)) => Poll::Ready(Some(Ok(msg))),
513                    Err(status) => Poll::Ready(Some(Err(status))),
514                }
515            }
516        }
517    }
518}