1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use futures_lite::Stream;
use std::{
    fmt::{Debug, Formatter, Result},
    pin::Pin,
    task::{Context, Poll},
};
pin_project_lite::pin_project! {


pub(crate) struct BidirectionalStream<I, O> {
    pub(crate) inbound: Option<I>,
    #[pin]
    pub(crate) outbound: O,
}
}
impl<I, O> Debug for BidirectionalStream<I, O> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
        f.debug_struct("BidirectionalStream")
            .field(
                "inbound",
                &match self.inbound {
                    Some(_) => "Some(_)",
                    None => "None",
                },
            )
            .field("outbound", &"..")
            .finish()
    }
}

#[derive(Debug)]
pub(crate) enum Direction<I, O> {
    Inbound(I),
    Outbound(O),
}

impl<I, O> Stream for BidirectionalStream<I, O>
where
    I: Stream + Unpin + Send + Sync + 'static,
    O: Stream + Send + Sync + 'static,
{
    type Item = Direction<I::Item, O::Item>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        macro_rules! poll_inbound {
            () => {
                if let Some(inbound) = &mut *this.inbound {
                    match Pin::new(inbound).poll_next(cx) {
                        Poll::Ready(Some(t)) => return Poll::Ready(Some(Direction::Inbound(t))),
                        Poll::Ready(None) => return Poll::Ready(None),
                        _ => (),
                    }
                }
            };
        }
        macro_rules! poll_outbound {
            () => {
                match this.outbound.poll_next(cx) {
                    Poll::Ready(Some(t)) => return Poll::Ready(Some(Direction::Outbound(t))),
                    Poll::Ready(None) => return Poll::Ready(None),
                    _ => (),
                }
            };
        }

        if fastrand::bool() {
            poll_inbound!();
            poll_outbound!();
        } else {
            poll_outbound!();
            poll_inbound!();
        }

        Poll::Pending
    }
}