Skip to main content

trillium_opentelemetry/
trace.rs

1use crate::conventions::{normalize_method, redact_query};
2use opentelemetry::{
3    Array, Context, KeyValue, Value,
4    trace::{SpanBuilder, SpanKind, TraceContextExt, Tracer},
5};
6use opentelemetry_semantic_conventions as semconv;
7use std::{
8    borrow::Cow,
9    fmt::{self, Debug, Formatter},
10    net::SocketAddr,
11    sync::Arc,
12    time::{Instant, SystemTime},
13};
14use trillium::{Conn, Handler, HeaderName, KnownHeaderName, Status, Transport};
15
16type StringExtractionFn = dyn Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static;
17
18/// Trillium handler that instruments per-request spans as per [semantic conventions for http][http-spans].
19///
20/// [http-spans]: https://opentelemetry.io/docs/specs/semconv/http/http-spans
21#[derive(Clone)]
22pub struct Trace<T> {
23    pub(crate) route: Option<Arc<StringExtractionFn>>,
24    pub(crate) error_type: Option<Arc<StringExtractionFn>>,
25    pub(crate) headers: Vec<HeaderName<'static>>,
26    pub(crate) enable_local_address_and_port: bool,
27    tracer: T,
28    socket_addr: Option<SocketAddr>,
29}
30
31impl<Span> Debug for Trace<Span> {
32    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
33        f.debug_struct("Trace")
34            .field(
35                "route",
36                &match self.route {
37                    Some(_) => "Some(..)",
38                    _ => "None",
39                },
40            )
41            .field(
42                "error_type",
43                &match self.error_type {
44                    Some(_) => "Some(..)",
45                    _ => "None",
46                },
47            )
48            .field("tracer", &"..")
49            .finish()
50    }
51}
52
53/// Alias for [`Trace::new`]
54pub fn trace<T: Tracer>(tracer: T) -> Trace<T> {
55    Trace::new(tracer)
56}
57
58impl<T: Tracer> Trace<T> {
59    /// Constructs a new [`Trace`] handler from a Tracer
60    pub fn new(tracer: T) -> Self {
61        Trace {
62            route: None,
63            error_type: None,
64            enable_local_address_and_port: false,
65            tracer,
66            headers: vec![],
67            socket_addr: None,
68        }
69    }
70
71    /// provides a route specification to include in the trace spans.
72    ///
73    /// in order to avoid forcing anyone to use a particular router, this is provided as a
74    /// configuration hook.
75    ///
76    /// for use with [`trillium-router`](https://docs.trillium.rs/trillium_router/index.html),
77    /// ```
78    /// use trillium_router::RouterConnExt;
79    /// trillium_opentelemetry::Metrics::new(&opentelemetry::global::meter("example"))
80    ///     .with_route(|conn| conn.route().map(|r| r.to_string().into()));
81    /// ```
82    pub fn with_route<F>(mut self, route: F) -> Self
83    where
84        F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
85    {
86        self.route = Some(Arc::new(route));
87        self
88    }
89
90    /// Provides an optional low-cardinality error type specification to include in the trace spans.
91    ///
92    /// The implementation of this is application specific, but will often look like checking the
93    /// [`Conn::state`] for an error enum and mapping that to a low-cardinality `&'static str`.
94    pub fn with_error_type<F>(mut self, error_type: F) -> Self
95    where
96        F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
97    {
98        self.error_type = Some(Arc::new(error_type));
99        self
100    }
101
102    /// Specify a list of request headers to include in the trace spans
103    pub fn with_headers(
104        mut self,
105        headers: impl IntoIterator<Item = impl Into<HeaderName<'static>>>,
106    ) -> Self {
107        self.headers = headers.into_iter().map(Into::into).collect();
108        self
109    }
110
111    /// Enable population of the local socket address and port in the trace spans.
112    ///
113    /// This populates the `network.local.address` and `network.local.port` attributes.
114    pub fn with_local_address_and_port(mut self) -> Self {
115        self.enable_local_address_and_port = true;
116        self
117    }
118}
119
120#[derive(Clone, Debug)]
121pub(crate) struct TraceContext {
122    pub(crate) context: Context,
123}
124
125struct RouteWasAvailable;
126
127impl<T> Handler for Trace<T>
128where
129    T: Tracer + Send + Sync + 'static,
130    T::Span: Send + Sync + 'static,
131{
132    async fn init(&mut self, info: &mut trillium::Info) {
133        if self.enable_local_address_and_port {
134            self.socket_addr = info.tcp_socket_addr().cloned();
135        }
136    }
137    async fn run(&self, mut conn: Conn) -> Conn {
138        let start_time = Some(SystemTime::now() - conn.start_time().duration_since(Instant::now()));
139
140        let scheme = if conn.is_secure() { "https" } else { "http" };
141        let (method, method_original) = normalize_method(conn.method().as_str());
142
143        let version = conn.http_version().as_str().strip_prefix("HTTP/").unwrap();
144
145        let path_and_query = conn.path_and_query();
146        let (path, query) = match path_and_query.find('?') {
147            Some(x) => (&path_and_query[0..x], &path_and_query[x + 1..]),
148            None => (path_and_query, ""),
149        };
150
151        let mut attributes = vec![
152            KeyValue::new(semconv::attribute::HTTP_REQUEST_METHOD, method),
153            KeyValue::new(semconv::attribute::URL_PATH, path.to_string()),
154            KeyValue::new(semconv::attribute::URL_SCHEME, scheme),
155            KeyValue::new(semconv::attribute::NETWORK_PROTOCOL_VERSION, version),
156        ];
157
158        if !query.is_empty() {
159            attributes.push(KeyValue::new(
160                semconv::attribute::URL_QUERY,
161                redact_query(query).into_owned(),
162            ));
163        }
164
165        if let Some(method_original) = method_original {
166            attributes.push(KeyValue::new(
167                semconv::attribute::HTTP_REQUEST_METHOD_ORIGINAL,
168                method_original,
169            ));
170        }
171
172        if let Some(socket_addr) = &self.socket_addr {
173            attributes.push(KeyValue::new(
174                semconv::attribute::NETWORK_LOCAL_ADDRESS,
175                socket_addr.ip().to_string(),
176            ));
177
178            attributes.push(KeyValue::new(
179                semconv::attribute::NETWORK_LOCAL_PORT,
180                i64::from(socket_addr.port()),
181            ));
182        }
183
184        if let Some(peer_ip) = conn.peer_ip() {
185            attributes.push(KeyValue::new(
186                semconv::attribute::CLIENT_ADDRESS,
187                peer_ip.to_string(),
188            ));
189        }
190
191        for (header_name, header_values) in self.headers.iter().filter_map(|hn| {
192            conn.request_headers()
193                .get_values(hn.clone())
194                .map(|v| (hn, v))
195        }) {
196            attributes.push(KeyValue::new(
197                format!(
198                    "http.request.header.{}",
199                    header_name.as_ref().to_lowercase()
200                ),
201                Value::Array(Array::String(
202                    header_values.iter().map(|x| x.to_string().into()).collect(),
203                )),
204            ));
205        }
206
207        let address_and_port = conn.host().map(|host| {
208            host.split_once(':')
209                .and_then(|(host, port)| Some((String::from(host), port.parse().ok()?)))
210                .unwrap_or_else(|| (String::from(host), if conn.is_secure() { 443 } else { 80 }))
211        });
212
213        if let Some((address, port)) = address_and_port {
214            attributes.push(KeyValue::new(semconv::attribute::SERVER_ADDRESS, address));
215            attributes.push(KeyValue::new(semconv::attribute::SERVER_PORT, port));
216        }
217
218        if let Some(user_agent) = conn.request_headers().get_str(KnownHeaderName::UserAgent) {
219            attributes.push(KeyValue::new(
220                semconv::attribute::USER_AGENT_ORIGINAL,
221                user_agent.to_string(),
222            ));
223        }
224
225        let name = if let Some(route) = self.route.as_ref().and_then(|route| route(&conn)) {
226            conn.insert_state(RouteWasAvailable);
227            attributes.push(KeyValue::new(semconv::attribute::HTTP_ROUTE, route.clone()));
228            format!("{method} {route}").into()
229        } else {
230            method.into()
231        };
232
233        let span = self.tracer.build(SpanBuilder {
234            name,
235            start_time,
236            span_kind: Some(SpanKind::Server),
237            attributes: Some(attributes),
238            ..SpanBuilder::default()
239        });
240        let context = Context::current_with_span(span);
241
242        conn.with_state(TraceContext { context })
243    }
244
245    async fn before_send(&self, mut conn: Conn) -> Conn {
246        let Some(TraceContext { context }) = conn.state().cloned() else {
247            return conn;
248        };
249
250        let span = context.span();
251
252        let error_type = self
253            .error_type
254            .as_ref()
255            .and_then(|et| et(&conn))
256            .or_else(|| {
257                let status = conn.status().unwrap_or(Status::NotFound);
258                if status.is_server_error() {
259                    Some((status as u16).to_string().into())
260                } else {
261                    None
262                }
263            });
264
265        if conn.status().is_some_and(|s| s.is_server_error()) {
266            span.set_status(opentelemetry::trace::Status::Error {
267                description: "".into(), // see error.type
268            });
269        }
270
271        let status: i64 = (conn.status().unwrap_or(Status::NotFound) as u16).into();
272
273        let mut attributes = vec![KeyValue::new(
274            semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
275            status,
276        )];
277
278        if conn.take_state::<RouteWasAvailable>().is_none() {
279            let route = self.route.as_ref().and_then(|route| route(&conn));
280            if let Some(route) = &route {
281                let (method, _) = normalize_method(conn.method().as_str());
282                attributes.push(KeyValue::new(semconv::attribute::HTTP_ROUTE, route.clone()));
283                span.update_name(format!("{method} {route}"));
284            }
285        }
286
287        if let Some(error_type) = error_type {
288            attributes.push(KeyValue::new(semconv::attribute::ERROR_TYPE, error_type));
289        }
290
291        span.set_attributes(attributes);
292
293        {
294            let context = context.clone();
295            let inner: &mut trillium_http::Conn<Box<dyn Transport>> = conn.as_mut();
296            inner.after_send(move |send_status| {
297                let span = context.span();
298                if !send_status.is_success() {
299                    span.set_status(opentelemetry::trace::Status::Error {
300                        description: "http send error".into(),
301                    });
302                    span.set_attribute(KeyValue::new(
303                        semconv::attribute::ERROR_TYPE,
304                        "http send error",
305                    ));
306                }
307                span.end();
308            });
309        }
310
311        conn
312    }
313}