1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
use crate::{Acceptor, Config, ConfigExt, Stopper, Transport};
use std::{
future::{ready, Future},
io::Result,
pin::Pin,
sync::Arc,
};
use trillium::{Handler, Info};
/**
The server trait, for standard network-based server implementations.
*/
pub trait Server: Sized + Send + Sync + 'static {
/// the individual byte stream that http
/// will be communicated over. This is often an async "stream"
/// like TcpStream or UnixStream. See [`Transport`]
type Transport: Transport;
/// The description of this server, to be appended to the Info and potentially logged.
const DESCRIPTION: &'static str;
/// Asynchronously return a single `Self::Transport` from a
/// `Self::Listener`. Must be implemented.
fn accept(&mut self) -> Pin<Box<dyn Future<Output = Result<Self::Transport>> + Send + '_>>;
/// Build an [`Info`] from the Self::Listener type. See [`Info`]
/// for more details.
fn info(&self) -> Info;
/// After the server has shut down, perform any housekeeping, eg
/// unlinking a unix socket.
fn clean_up(self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(ready(()))
}
/// Build a listener from the config. The default logic for this
/// is described elsewhere. To override the default logic, server
/// implementations could potentially implement this directly. To
/// use this default logic, implement
/// [`Server::listener_from_tcp`] and
/// [`Server::listener_from_unix`].
#[cfg(unix)]
fn build_listener<A>(config: &Config<Self, A>) -> Self
where
A: Acceptor<Self::Transport>,
{
if let Some(listener) = config.binding.write().unwrap().take() {
log::debug!("taking prebound listener");
return listener;
}
use std::os::unix::prelude::FromRawFd;
let host = config.host();
if host.starts_with(|c| c == '/' || c == '.' || c == '~') {
Self::listener_from_unix(std::os::unix::net::UnixListener::bind(host).unwrap())
} else {
let tcp_listener = if let Some(fd) = std::env::var("LISTEN_FD")
.ok()
.and_then(|fd| fd.parse().ok())
{
log::debug!("using fd {} from LISTEN_FD", fd);
unsafe { std::net::TcpListener::from_raw_fd(fd) }
} else {
std::net::TcpListener::bind((host, config.port())).unwrap()
};
tcp_listener.set_nonblocking(true).unwrap();
Self::listener_from_tcp(tcp_listener)
}
}
/// Build a listener from the config. The default logic for this
/// is described elsewhere. To override the default logic, server
/// implementations could potentially implement this directly. To
/// use this default logic, implement [`Server::listener_from_tcp`]
#[cfg(not(unix))]
fn build_listener<A>(config: &Config<Self, A>) -> Self
where
A: Acceptor<Self::Transport>,
{
if let Some(listener) = config.binding.write().unwrap().take() {
log::debug!("taking prebound listener");
return listener;
}
let tcp_listener = std::net::TcpListener::bind((config.host(), config.port())).unwrap();
tcp_listener.set_nonblocking(true).unwrap();
Self::listener_from_tcp(tcp_listener)
}
/// Build a Self::Listener from a tcp listener. This is called by
/// the [`Server::build_listener`] default implementation, and
/// is mandatory if the default implementation is used.
fn listener_from_tcp(_tcp: std::net::TcpListener) -> Self {
unimplemented!()
}
/// Build a Self::Listener from a tcp listener. This is called by
/// the [`Server::build_listener`] default implementation. You
/// will want to tag an implementation of this with #[cfg(unix)].
#[cfg(unix)]
fn listener_from_unix(_tcp: std::os::unix::net::UnixListener) -> Self {
unimplemented!()
}
/// Implementation hook for listening for any os signals and
/// stopping the provided [`Stopper`]. The returned future will be
/// spawned using [`Server::spawn`]
fn handle_signals(_stopper: Stopper) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
Box::pin(ready(()))
}
/// Runtime implementation hook for spawning a task.
fn spawn(fut: impl Future<Output = ()> + Send + 'static);
/// Runtime implementation hook for blocking on a top level future.
fn block_on(fut: impl Future<Output = ()> + 'static);
/// Run a trillium application from a sync context
fn run<A, H>(config: Config<Self, A>, handler: H)
where
A: Acceptor<Self::Transport>,
H: Handler,
{
Self::block_on(Self::run_async(config, handler))
}
/// Run a trillium application from an async context. The default
/// implementation of this method contains the core logic of this
/// Trait.
fn run_async<A, H>(
config: Config<Self, A>,
mut handler: H,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
where
A: Acceptor<Self::Transport>,
H: Handler,
{
Box::pin(async move {
if config.should_register_signals() {
#[cfg(unix)]
Self::spawn(Self::handle_signals(config.stopper()));
#[cfg(not(unix))]
log::error!("signals handling not supported on windows yet");
}
let mut listener = Self::build_listener(&config);
let mut info = Self::info(&listener);
info.server_description_mut().push_str(Self::DESCRIPTION);
handler.init(&mut info).await;
config.info.set(info);
let config = Arc::new(config);
let handler = Arc::new(handler);
while let Some(stream) = config
.stopper
.stop_future(Self::accept(&mut listener))
.await
{
match stream {
Ok(stream) => {
let config = Arc::clone(&config);
let handler = Arc::clone(&handler);
Self::spawn(async move { config.handle_stream(stream, handler).await })
}
Err(e) => log::error!("tcp error: {}", e),
}
}
config.graceful_shutdown().await;
Self::clean_up(listener).await;
})
}
}