Skip to main content

trillium_server_common/
runtime.rs

1use futures_lite::Stream;
2use std::{
3    fmt::{self, Debug, Formatter},
4    future::Future,
5    pin::Pin,
6    sync::Arc,
7    time::Duration,
8};
9
10mod droppable_future;
11pub use droppable_future::DroppableFuture;
12
13mod runtime_trait;
14pub use runtime_trait::RuntimeTrait;
15
16mod object_safe_runtime;
17use object_safe_runtime::ObjectSafeRuntime;
18
19/// A type-erased [`RuntimeTrait`] implementation. Think of this as an `Arc<dyn RuntimeTrait>`
20#[derive(Clone)]
21pub struct Runtime(Arc<dyn ObjectSafeRuntime>);
22
23impl Debug for Runtime {
24    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
25        f.debug_tuple("Runtime").field(&format_args!("..")).finish()
26    }
27}
28
29impl<R: RuntimeTrait> From<Arc<R>> for Runtime {
30    fn from(value: Arc<R>) -> Self {
31        Self(value)
32    }
33}
34
35impl Runtime {
36    /// Construct a new type-erased runtime object from any [`RuntimeTrait`] implementation.
37    pub fn new(runtime: impl RuntimeTrait) -> Self {
38        runtime.into() // we avoid re-arcing a Runtime by using Into::into
39    }
40
41    /// Spawn a future on the runtime, returning a future that has detach-on-drop semantics
42    ///
43    /// Spawned tasks conform to the following behavior:
44    ///
45    /// * detach on drop: If the returned [`DroppableFuture`] is dropped immediately, the task will
46    ///   continue to execute until completion.
47    ///
48    /// * unwinding: If the spawned future panics, this must not propagate to the join handle.
49    ///   Instead, the awaiting the join handle returns None in case of panic.
50    pub fn spawn<Output: Send + 'static>(
51        &self,
52        fut: impl Future<Output = Output> + Send + 'static,
53    ) -> DroppableFuture<Pin<Box<dyn Future<Output = Option<Output>> + Send + 'static>>> {
54        let fut = RuntimeTrait::spawn(self, fut).into_inner();
55        DroppableFuture::new(Box::pin(fut))
56    }
57
58    /// Wake in this amount of wall time
59    pub async fn delay(&self, duration: Duration) {
60        RuntimeTrait::delay(self, duration).await
61    }
62
63    /// Returns a [`Stream`] that yields a `()` on the provided period
64    pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + '_ {
65        RuntimeTrait::interval(self, period)
66    }
67
68    /// Runtime implementation hook for blocking on a top level future.
69    pub fn block_on<Fut>(&self, fut: Fut) -> Fut::Output
70    where
71        Fut: Future,
72    {
73        RuntimeTrait::block_on(self, fut)
74    }
75
76    /// Race a future against the provided duration, returning None in case of timeout.
77    pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
78    where
79        Fut: Future + Send,
80        Fut::Output: Send + 'static,
81    {
82        RuntimeTrait::timeout(self, duration, fut).await
83    }
84}
85
86impl RuntimeTrait for Runtime {
87    async fn delay(&self, duration: Duration) {
88        self.0.delay(duration).await
89    }
90
91    fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
92        self.0.interval(period)
93    }
94
95    fn spawn<Fut>(
96        &self,
97        fut: Fut,
98    ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
99    where
100        Fut: Future + Send + 'static,
101        Fut::Output: Send + 'static,
102    {
103        let (send, receive) = async_channel::bounded(1);
104        let spawn_fut = self.0.spawn(Box::pin(async move {
105            let _ = send.try_send(fut.await);
106        }));
107        DroppableFuture::new(Box::pin(async move {
108            spawn_fut.await;
109            receive.try_recv().ok()
110        }))
111    }
112
113    fn block_on<Fut>(&self, fut: Fut) -> Fut::Output
114    where
115        Fut: Future,
116    {
117        let (send, receive) = std::sync::mpsc::channel();
118        self.0.block_on(Box::pin(async move {
119            let _ = send.send(fut.await);
120        }));
121        receive.recv().unwrap()
122    }
123
124    fn hook_signals(
125        &self,
126        signals: impl IntoIterator<Item = i32>,
127    ) -> impl Stream<Item = i32> + Send + 'static {
128        self.0.hook_signals(signals.into_iter().collect())
129    }
130}