Skip to main content

trillium_opentelemetry/
metrics.rs

1use crate::conventions::normalize_method;
2use opentelemetry::{
3    KeyValue, global,
4    metrics::{Histogram, Meter},
5};
6use opentelemetry_semantic_conventions as semconv;
7use std::{
8    borrow::Cow,
9    fmt::{self, Debug, Formatter},
10    sync::Arc,
11    time::Instant,
12};
13use trillium::{Conn, Handler, Info, KnownHeaderName, Status, Transport, log};
14
15/// Recommended `http.server.request.duration` bucket boundaries (seconds), per the
16/// [OpenTelemetry HTTP metrics semantic conventions][duration].
17///
18/// [duration]: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestduration
19const DEFAULT_DURATION_BOUNDARIES: &[f64] = &[
20    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
21];
22
23type StringExtractionFn = dyn Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static;
24type StringAndPortExtractionFn =
25    dyn Fn(&Conn) -> Option<(Cow<'static, str>, u16)> + Send + Sync + 'static;
26
27/// Trillium handler that instruments http.server.request.duration, http.server.request.body.size,
28/// and http.server.response.body.size as per [semantic conventions for http][http-metrics].
29///
30/// [http-metrics]: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
31pub struct Metrics {
32    pub(crate) route: Option<Arc<StringExtractionFn>>,
33    pub(crate) error_type: Option<Arc<StringExtractionFn>>,
34    pub(crate) server_address_and_port: Option<Arc<StringAndPortExtractionFn>>,
35    pub(crate) histograms: Histograms,
36}
37
38#[derive(Clone, Debug)]
39pub(crate) enum Histograms {
40    Uninitialized {
41        meter: Meter,
42        duration_histogram_boundaries: Option<Vec<f64>>,
43        request_size_histogram_boundaries: Option<Vec<f64>>,
44        response_size_histogram_boundaries: Option<Vec<f64>>,
45    },
46    Initialized {
47        duration_histogram: Histogram<f64>,
48        request_size_histogram: Histogram<u64>,
49        response_size_histogram: Histogram<u64>,
50    },
51}
52
53impl Histograms {
54    fn init(&mut self) {
55        match self {
56            Self::Uninitialized {
57                meter,
58                duration_histogram_boundaries,
59                request_size_histogram_boundaries,
60                response_size_histogram_boundaries,
61            } => {
62                let mut duration_histogram_builder = meter
63                    .f64_histogram(semconv::metric::HTTP_SERVER_REQUEST_DURATION)
64                    .with_description("Measures the duration of inbound HTTP requests.")
65                    .with_unit("s");
66                duration_histogram_builder.boundaries = Some(
67                    duration_histogram_boundaries
68                        .take()
69                        .unwrap_or_else(|| DEFAULT_DURATION_BOUNDARIES.to_vec()),
70                );
71
72                let mut request_size_histogram_builder = meter
73                    .u64_histogram(semconv::metric::HTTP_SERVER_REQUEST_BODY_SIZE)
74                    .with_description("Measures the size of HTTP request messages (compressed).")
75                    .with_unit("By");
76                request_size_histogram_builder.boundaries =
77                    request_size_histogram_boundaries.take();
78
79                let mut response_size_histogram_builder = meter
80                    .u64_histogram(semconv::metric::HTTP_SERVER_RESPONSE_BODY_SIZE)
81                    .with_description("Measures the size of HTTP response messages (compressed).")
82                    .with_unit("By");
83                response_size_histogram_builder.boundaries =
84                    response_size_histogram_boundaries.take();
85
86                *self = Self::Initialized {
87                    duration_histogram: duration_histogram_builder.build(),
88                    request_size_histogram: request_size_histogram_builder.build(),
89                    response_size_histogram: response_size_histogram_builder.build(),
90                }
91            }
92
93            Self::Initialized { .. } => {
94                log::warn!("Attempted to initialize the Metrics handler twice");
95            }
96        }
97    }
98
99    fn set_request_size_boundaries(&mut self, boundaries: Vec<f64>) {
100        match self {
101            Self::Uninitialized {
102                request_size_histogram_boundaries,
103                ..
104            } => {
105                *request_size_histogram_boundaries = Some(boundaries);
106            }
107
108            Self::Initialized { .. } => {
109                log::warn!(
110                    "Attempted to set histogram boundaries on a Metrics handler that was already initialized"
111                );
112            }
113        }
114    }
115
116    fn set_response_size_boundaries(&mut self, boundaries: Vec<f64>) {
117        match self {
118            Self::Uninitialized {
119                response_size_histogram_boundaries,
120                ..
121            } => {
122                *response_size_histogram_boundaries = Some(boundaries);
123            }
124
125            Self::Initialized { .. } => {
126                log::warn!(
127                    "Attempted to set histogram boundaries on a Metrics handler that was already initialized"
128                );
129            }
130        }
131    }
132
133    fn set_duration_boundaries(&mut self, boundaries: Vec<f64>) {
134        match self {
135            Self::Uninitialized {
136                duration_histogram_boundaries,
137                ..
138            } => {
139                *duration_histogram_boundaries = Some(boundaries);
140            }
141            Self::Initialized { .. } => {
142                log::warn!(
143                    "Attempted to set histogram boundaries on a Metrics handler that was already initialized"
144                );
145            }
146        }
147    }
148
149    fn record_duration(&self, duration_s: f64, attributes: &[KeyValue]) {
150        match self {
151            Self::Initialized {
152                duration_histogram, ..
153            } => {
154                duration_histogram.record(duration_s, attributes);
155            }
156            Self::Uninitialized { .. } => {
157                log::error!("Attempted to record a duration on an uninitialized Metrics handler");
158            }
159        }
160    }
161    fn record_response_len(&self, response_len: u64, attributes: &[KeyValue]) {
162        match self {
163            Self::Initialized {
164                response_size_histogram,
165                ..
166            } => {
167                response_size_histogram.record(response_len, attributes);
168            }
169
170            Self::Uninitialized { .. } => {
171                log::error!(
172                    "Attempted to record a response length on an uninitialized Metrics handler"
173                );
174            }
175        }
176    }
177
178    fn record_request_len(&self, request_len: u64, attributes: &[KeyValue]) {
179        match self {
180            Self::Initialized {
181                request_size_histogram,
182                ..
183            } => {
184                request_size_histogram.record(request_len, attributes);
185            }
186
187            Self::Uninitialized { .. } => {
188                log::error!(
189                    "Attempted to record a request length on an uninitialized Metrics handler"
190                );
191            }
192        }
193    }
194}
195
196impl From<Histograms> for Metrics {
197    fn from(value: Histograms) -> Self {
198        Metrics {
199            route: None,
200            error_type: None,
201            server_address_and_port: None,
202            histograms: value,
203        }
204    }
205}
206
207impl From<Meter> for Histograms {
208    fn from(meter: Meter) -> Self {
209        Histograms::Uninitialized {
210            meter,
211            duration_histogram_boundaries: None,
212            request_size_histogram_boundaries: None,
213            response_size_histogram_boundaries: None,
214        }
215    }
216}
217
218impl Debug for Metrics {
219    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
220        f.debug_struct("Metrics")
221            .field(
222                "route",
223                &match self.route {
224                    Some(_) => "Some(..)",
225                    _ => "None",
226                },
227            )
228            .field(
229                "error_type",
230                &match self.error_type {
231                    Some(_) => "Some(..)",
232                    _ => "None",
233                },
234            )
235            .field("histograms", &self.histograms)
236            .finish()
237    }
238}
239
240/// Constructs a [`Metrics`] handler from a `&'static str`, [`Meter`], or [`&Meter`][Meter].
241///
242/// Alias for [`Metrics::new`] and [`Metrics::from`]
243pub fn metrics(meter: impl Into<Metrics>) -> Metrics {
244    meter.into()
245}
246
247impl From<&'static str> for Metrics {
248    fn from(value: &'static str) -> Self {
249        global::meter(value).into()
250    }
251}
252
253impl From<Meter> for Metrics {
254    fn from(value: Meter) -> Self {
255        Histograms::from(value).into()
256    }
257}
258
259impl From<&Meter> for Metrics {
260    fn from(meter: &Meter) -> Self {
261        meter.clone().into()
262    }
263}
264
265impl Metrics {
266    /// Constructs a new [`Metrics`] handler from a `&'static str`, [`&Meter`][Meter] or [`Meter`]
267    pub fn new(meter: impl Into<Metrics>) -> Self {
268        meter.into()
269    }
270
271    /// provides a route specification to the metrics collector.
272    ///
273    /// in order to avoid forcing anyone to use a particular router, this is provided as a
274    /// configuration hook.
275    ///
276    /// for use with [`trillium-router`](https://docs.trillium.rs/trillium_router/index.html),
277    /// ```
278    /// use trillium_router::RouterConnExt;
279    /// trillium_opentelemetry::Metrics::new(&opentelemetry::global::meter("example"))
280    ///     .with_route(|conn| conn.route().map(|r| r.to_string().into()));
281    /// ```
282    pub fn with_route<F>(mut self, route: F) -> Self
283    where
284        F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
285    {
286        self.route = Some(Arc::new(route));
287        self
288    }
289
290    /// Provides an optional low-cardinality error type specification to the metrics collector.
291    ///
292    /// The implementation of this is application specific, but will often look like checking the
293    /// [`Conn::state`] for an error enum and mapping that to a low-cardinality `&'static str`.
294    pub fn with_error_type<F>(mut self, error_type: F) -> Self
295    where
296        F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
297    {
298        self.error_type = Some(Arc::new(error_type));
299        self
300    }
301
302    /// Provides a callback for `server.address` and `server.port` attributes to the metrics
303    /// collector.
304    ///
305    /// These should be set based on request headers according to the [OpenTelemetry HTTP semantic
306    /// conventions][semconv-server-address-port].
307    ///
308    /// It is not recommended to enable this when the server is exposed to clients outside of your
309    /// control, as request headers could arbitrarily increase the cardinality of these attributes.
310    ///
311    /// [semconv-server-address-port]:
312    ///     https://opentelemetry.io/docs/specs/semconv/http/http-spans/#setting-serveraddress-and-serverport-attributes
313    pub fn with_server_address_and_port<F>(mut self, server_address_and_port: F) -> Self
314    where
315        F: Fn(&Conn) -> Option<(Cow<'static, str>, u16)> + Send + Sync + 'static,
316    {
317        self.server_address_and_port = Some(Arc::new(server_address_and_port));
318        self
319    }
320
321    /// Sets histogram boundaries for request durations (in seconds).
322    ///
323    /// This sets the histogram bucket boundaries for the [`http.server.request.duration`][semconv]
324    /// metric.
325    ///
326    /// [semconv]: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestduration
327    pub fn with_duration_histogram_boundaries(mut self, boundaries: Vec<f64>) -> Self {
328        self.histograms.set_duration_boundaries(boundaries);
329        self
330    }
331
332    /// Sets histogram boundaries for request sizes (in bytes).
333    ///
334    /// This sets the histogram bucket boundaries for the [`http.server.request.body.size`][semconv]
335    /// metric.
336    ///
337    /// [semconv]: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestbodysize
338    pub fn with_request_size_histogram_boundaries(mut self, boundaries: Vec<f64>) -> Self {
339        self.histograms.set_request_size_boundaries(boundaries);
340        self
341    }
342
343    /// Sets histogram boundaries for response sizes (in bytes).
344    ///
345    /// This sets the histogram bucket boundaries for the [`http.server.response.body.size`][semconv]
346    /// metric.
347    ///
348    /// [semconv]: https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverresponsebodysize
349    pub fn with_response_size_histogram_boundaries(mut self, boundaries: Vec<f64>) -> Self {
350        self.histograms.set_response_size_boundaries(boundaries);
351        self
352    }
353}
354
355struct MetricsWasRun;
356
357impl Handler for Metrics {
358    async fn run(&self, conn: Conn) -> Conn {
359        conn.with_state(MetricsWasRun)
360    }
361
362    async fn init(&mut self, _: &mut Info) {
363        self.histograms.init();
364    }
365
366    async fn before_send(&self, mut conn: Conn) -> Conn {
367        if conn.state::<MetricsWasRun>().is_none() {
368            return conn;
369        }
370
371        let Metrics {
372            route,
373            error_type,
374            server_address_and_port,
375            histograms,
376        } = self;
377        let error_type = error_type.as_ref().and_then(|et| et(&conn)).or_else(|| {
378            let status = conn.status().unwrap_or(Status::NotFound);
379            if status.is_server_error() {
380                Some((status as u16).to_string().into())
381            } else {
382                None
383            }
384        });
385        let status: i64 = (conn.status().unwrap_or(Status::NotFound) as u16).into();
386        let route = route.as_ref().and_then(|r| r(&conn));
387        let start_time = conn.start_time();
388        let (method, method_original) = normalize_method(conn.method().as_str());
389        let request_len = conn
390            .request_headers()
391            .get_str(KnownHeaderName::ContentLength)
392            .and_then(|src| src.parse::<u64>().ok());
393        let response_len = conn.response_len();
394        let scheme = if conn.is_secure() { "https" } else { "http" };
395        let version = conn.http_version().as_str().strip_prefix("HTTP/").unwrap();
396        let server_address_and_port = server_address_and_port.as_ref().and_then(|f| f(&conn));
397
398        let mut attributes = vec![
399            KeyValue::new(semconv::attribute::HTTP_REQUEST_METHOD, method),
400            KeyValue::new(semconv::attribute::HTTP_RESPONSE_STATUS_CODE, status),
401            KeyValue::new(semconv::attribute::URL_SCHEME, scheme),
402            KeyValue::new(semconv::attribute::NETWORK_PROTOCOL_VERSION, version),
403        ];
404
405        if let Some(method_original) = method_original {
406            attributes.push(KeyValue::new(
407                semconv::attribute::HTTP_REQUEST_METHOD_ORIGINAL,
408                method_original,
409            ));
410        }
411
412        if let Some(error_type) = error_type {
413            attributes.push(KeyValue::new(semconv::attribute::ERROR_TYPE, error_type));
414        }
415
416        if let Some(route) = route {
417            attributes.push(KeyValue::new(semconv::attribute::HTTP_ROUTE, route))
418        };
419
420        if let Some((address, port)) = server_address_and_port {
421            attributes.push(KeyValue::new(semconv::attribute::SERVER_ADDRESS, address));
422            attributes.push(KeyValue::new(
423                semconv::attribute::SERVER_PORT,
424                i64::from(port),
425            ));
426        }
427
428        let histograms = histograms.clone();
429        let inner: &mut trillium_http::Conn<Box<dyn Transport>> = conn.as_mut();
430        inner.after_send(move |_| {
431            let duration_s = (Instant::now() - start_time).as_secs_f64();
432
433            histograms.record_duration(duration_s, &attributes);
434
435            if let Some(response_len) = response_len {
436                histograms.record_response_len(response_len, &attributes);
437            }
438
439            if let Some(request_len) = request_len {
440                histograms.record_request_len(request_len, &attributes);
441            }
442        });
443
444        conn
445    }
446}