1use crate::{
24 Codec, Metadata, Status,
25 client::conn::{CancelHandle, GrpcClientConn},
26 timeout::parse_grpc_timeout,
27};
28use futures_lite::{Stream, StreamExt};
29use std::{
30 future::{Future, IntoFuture},
31 pin::Pin,
32 task::{Context, Poll},
33 time::Duration,
34};
35use trillium::Headers;
36use trillium_client::Client;
37
38type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
39type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
40
41fn default_timeout(client: &Client) -> Option<Duration> {
43 client
44 .default_headers()
45 .get_str("grpc-timeout")
46 .and_then(parse_grpc_timeout)
47}
48
49pub struct UnaryConn<Req, Resp> {
60 engine: GrpcClientConn<Req, Resp>,
61 requests: Option<BoxStream<Req>>,
63 outcome: Option<(Option<Resp>, Result<(), Status>)>,
65}
66
67impl<Req, Resp> UnaryConn<Req, Resp>
68where
69 Req: Send + 'static,
70 Resp: Send + 'static,
71{
72 pub fn unary<C>(client: &Client, path: &str, request: Req) -> Self
74 where
75 C: Codec<Req> + Codec<Resp>,
76 {
77 let mut engine = GrpcClientConn::new::<C>(
78 client,
79 path,
80 Metadata::new(),
81 default_timeout(client),
82 false,
83 );
84 engine.buffer_request(request);
85 Self {
86 engine,
87 requests: None,
88 outcome: None,
89 }
90 }
91
92 pub fn client_streaming<C>(
94 client: &Client,
95 path: &str,
96 requests: impl Stream<Item = Req> + Send + 'static,
97 ) -> Self
98 where
99 C: Codec<Req> + Codec<Resp>,
100 {
101 let engine = GrpcClientConn::new::<C>(
102 client,
103 path,
104 Metadata::new(),
105 default_timeout(client),
106 false,
107 );
108 Self {
109 engine,
110 requests: Some(Box::pin(requests)),
111 outcome: None,
112 }
113 }
114
115 pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
118 self.engine.add_ascii_metadata(key, value);
119 self
120 }
121
122 pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
124 self.engine.add_binary_metadata(key, value.into());
125 self
126 }
127
128 pub fn with_timeout(mut self, timeout: Duration) -> Self {
130 self.engine.set_deadline_from_now(timeout);
131 self
132 }
133
134 pub fn cancel_handle(&self) -> CancelHandle {
137 self.engine.cancel_handle()
138 }
139
140 pub fn metadata(&self) -> Option<&Headers> {
142 self.engine.headers()
143 }
144
145 pub fn trailers(&self) -> Option<&Headers> {
147 self.engine.trailers()
148 }
149
150 pub fn message(&self) -> Option<&Resp> {
153 self.outcome.as_ref().and_then(|(m, _)| m.as_ref())
154 }
155
156 pub fn status(&self) -> Result<(), Status> {
160 self.outcome
161 .as_ref()
162 .map(|(_, s)| s.clone())
163 .unwrap_or(Ok(()))
164 }
165
166 pub fn into_message(self) -> Result<Resp, Status> {
171 match self.outcome {
172 Some((message, status)) => {
173 status?;
174 message.ok_or_else(|| Status::internal("unary response had no message"))
175 }
176 None => Err(Status::internal("call has not been awaited")),
177 }
178 }
179}
180
181impl<Req, Resp> IntoFuture for UnaryConn<Req, Resp>
182where
183 Req: Send + 'static,
184 Resp: Send + 'static,
185{
186 type Output = Result<UnaryConn<Req, Resp>, Status>;
187 type IntoFuture = BoxFuture<Self::Output>;
188
189 fn into_future(mut self) -> Self::IntoFuture {
190 Box::pin(async move {
191 let mut engine = self.engine;
192 if let Some(mut requests) = self.requests.take() {
193 while let Some(message) = requests.next().await {
194 engine.send(message).await?;
195 }
196 }
197 engine.close_send().await?;
200 let outcome = read_unary(&mut engine).await;
201 Ok(UnaryConn {
202 engine,
203 requests: None,
204 outcome: Some(outcome),
205 })
206 })
207 }
208}
209
210async fn read_unary<Req, Resp>(
215 engine: &mut GrpcClientConn<Req, Resp>,
216) -> (Option<Resp>, Result<(), Status>)
217where
218 Req: Send + 'static,
219 Resp: Send + 'static,
220{
221 match engine.recv().await {
222 Ok(Some(message)) => match engine.recv().await {
223 Ok(None) => (Some(message), Ok(())),
224 Ok(Some(_)) => (
225 None,
226 Err(Status::internal("unary response had multiple messages")),
227 ),
228 Err(status) => (Some(message), Err(status)),
230 },
231 Ok(None) => (None, Err(Status::internal("unary response had no message"))),
233 Err(status) => (None, Err(status)),
235 }
236}
237
238pub struct StreamingConn<Req, Resp> {
246 engine: Option<Box<GrpcClientConn<Req, Resp>>>,
252 #[allow(clippy::type_complexity)]
255 in_flight: Option<BoxFuture<(Box<GrpcClientConn<Req, Resp>>, Result<Option<Resp>, Status>)>>,
256}
257
258impl<Req, Resp> StreamingConn<Req, Resp>
259where
260 Req: Send + 'static,
261 Resp: Send + 'static,
262{
263 pub fn server_streaming<C>(client: &Client, path: &str, request: Req) -> Self
265 where
266 C: Codec<Req> + Codec<Resp>,
267 {
268 let mut engine = GrpcClientConn::new::<C>(
269 client,
270 path,
271 Metadata::new(),
272 default_timeout(client),
273 false,
274 );
275 engine.buffer_request(request);
277 Self {
278 engine: Some(Box::new(engine)),
279 in_flight: None,
280 }
281 }
282
283 fn engine(&self) -> &GrpcClientConn<Req, Resp> {
286 self.engine
287 .as_deref()
288 .expect("engine present between polls")
289 }
290
291 fn engine_mut(&mut self) -> &mut GrpcClientConn<Req, Resp> {
292 self.engine
293 .as_deref_mut()
294 .expect("engine present between polls")
295 }
296
297 pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
299 self.engine_mut().add_ascii_metadata(key, value);
300 self
301 }
302
303 pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
305 self.engine_mut().add_binary_metadata(key, value.into());
306 self
307 }
308
309 pub fn with_timeout(mut self, timeout: Duration) -> Self {
311 self.engine_mut().set_deadline_from_now(timeout);
312 self
313 }
314
315 pub fn cancel_handle(&self) -> CancelHandle {
317 self.engine().cancel_handle()
318 }
319
320 pub fn metadata(&self) -> Option<&Headers> {
322 self.engine().headers()
323 }
324
325 pub fn trailers(&self) -> Option<&Headers> {
327 self.engine().trailers()
328 }
329
330 pub async fn recv(&mut self) -> Result<Option<Resp>, Status> {
333 self.engine_mut().recv().await
334 }
335}
336
337impl<Req, Resp> Stream for StreamingConn<Req, Resp>
338where
339 Req: Send + 'static,
340 Resp: Send + 'static,
341{
342 type Item = Result<Resp, Status>;
343
344 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
345 let this = self.get_mut();
346 if this.in_flight.is_none() {
347 let mut engine = this.engine.take().expect("engine present between polls");
350 this.in_flight = Some(Box::pin(async move {
351 let result = engine.recv().await;
352 (engine, result)
353 }));
354 }
355 let fut = this.in_flight.as_mut().unwrap();
356 match fut.as_mut().poll(cx) {
357 Poll::Pending => Poll::Pending,
358 Poll::Ready((engine, result)) => {
359 this.engine = Some(engine);
360 this.in_flight = None;
361 match result {
362 Ok(None) => Poll::Ready(None),
363 Ok(Some(msg)) => Poll::Ready(Some(Ok(msg))),
364 Err(status) => Poll::Ready(Some(Err(status))),
365 }
366 }
367 }
368 }
369}
370
371impl<Req, Resp> IntoFuture for StreamingConn<Req, Resp>
372where
373 Req: Send + 'static,
374 Resp: Send + 'static,
375{
376 type Output = Result<StreamingConn<Req, Resp>, Status>;
377 type IntoFuture = BoxFuture<Self::Output>;
378
379 fn into_future(mut self) -> Self::IntoFuture {
382 Box::pin(async move {
383 self.engine_mut().open_head().await?;
384 Ok(self)
385 })
386 }
387}
388
389pub struct BidiConn<Req, Resp> {
400 engine: Option<Box<GrpcClientConn<Req, Resp>>>,
402 #[allow(clippy::type_complexity)]
403 in_flight: Option<BoxFuture<(Box<GrpcClientConn<Req, Resp>>, Result<Option<Resp>, Status>)>>,
404}
405
406impl<Req, Resp> BidiConn<Req, Resp>
407where
408 Req: Send + 'static,
409 Resp: Send + 'static,
410{
411 pub fn bidi<C>(client: &Client, path: &str) -> Self
413 where
414 C: Codec<Req> + Codec<Resp>,
415 {
416 let engine =
417 GrpcClientConn::new::<C>(client, path, Metadata::new(), default_timeout(client), true);
418 Self {
419 engine: Some(Box::new(engine)),
420 in_flight: None,
421 }
422 }
423
424 fn engine(&self) -> &GrpcClientConn<Req, Resp> {
425 self.engine
426 .as_deref()
427 .expect("engine present between polls")
428 }
429
430 fn engine_mut(&mut self) -> &mut GrpcClientConn<Req, Resp> {
431 self.engine
432 .as_deref_mut()
433 .expect("engine present between polls")
434 }
435
436 pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
438 self.engine_mut().add_ascii_metadata(key, value);
439 self
440 }
441
442 pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
444 self.engine_mut().add_binary_metadata(key, value.into());
445 self
446 }
447
448 pub fn with_timeout(mut self, timeout: Duration) -> Self {
450 self.engine_mut().set_deadline_from_now(timeout);
451 self
452 }
453
454 pub fn cancel_handle(&self) -> CancelHandle {
456 self.engine().cancel_handle()
457 }
458
459 pub fn metadata(&self) -> Option<&Headers> {
461 self.engine().headers()
462 }
463
464 pub fn trailers(&self) -> Option<&Headers> {
466 self.engine().trailers()
467 }
468
469 pub async fn send(&mut self, message: Req) -> Result<(), Status> {
472 self.engine_mut().send(message).await
473 }
474
475 pub async fn close_send(&mut self) -> Result<(), Status> {
478 self.engine_mut().close_send().await
479 }
480
481 pub async fn recv(&mut self) -> Result<Option<Resp>, Status> {
484 self.engine_mut().recv().await
485 }
486}
487
488impl<Req, Resp> Stream for BidiConn<Req, Resp>
489where
490 Req: Send + 'static,
491 Resp: Send + 'static,
492{
493 type Item = Result<Resp, Status>;
494
495 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
496 let this = self.get_mut();
497 if this.in_flight.is_none() {
498 let mut engine = this.engine.take().expect("engine present between polls");
499 this.in_flight = Some(Box::pin(async move {
500 let result = engine.recv().await;
501 (engine, result)
502 }));
503 }
504 let fut = this.in_flight.as_mut().unwrap();
505 match fut.as_mut().poll(cx) {
506 Poll::Pending => Poll::Pending,
507 Poll::Ready((engine, result)) => {
508 this.engine = Some(engine);
509 this.in_flight = None;
510 match result {
511 Ok(None) => Poll::Ready(None),
512 Ok(Some(msg)) => Poll::Ready(Some(Ok(msg))),
513 Err(status) => Poll::Ready(Some(Err(status))),
514 }
515 }
516 }
517 }
518}