1use crate::conventions::normalize_method;
2use opentelemetry::{
3 KeyValue, global,
4 metrics::{Histogram, Meter},
5};
6use opentelemetry_semantic_conventions as semconv;
7use std::{
8 borrow::Cow,
9 fmt::{self, Debug, Formatter},
10 sync::Arc,
11 time::Instant,
12};
13use trillium::{Conn, Handler, Info, KnownHeaderName, Status, Transport, log};
14
15const DEFAULT_DURATION_BOUNDARIES: &[f64] = &[
20 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
21];
22
23type StringExtractionFn = dyn Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static;
24type StringAndPortExtractionFn =
25 dyn Fn(&Conn) -> Option<(Cow<'static, str>, u16)> + Send + Sync + 'static;
26
27pub struct Metrics {
32 pub(crate) route: Option<Arc<StringExtractionFn>>,
33 pub(crate) error_type: Option<Arc<StringExtractionFn>>,
34 pub(crate) server_address_and_port: Option<Arc<StringAndPortExtractionFn>>,
35 pub(crate) histograms: Histograms,
36}
37
38#[derive(Clone, Debug)]
39pub(crate) enum Histograms {
40 Uninitialized {
41 meter: Meter,
42 duration_histogram_boundaries: Option<Vec<f64>>,
43 request_size_histogram_boundaries: Option<Vec<f64>>,
44 response_size_histogram_boundaries: Option<Vec<f64>>,
45 },
46 Initialized {
47 duration_histogram: Histogram<f64>,
48 request_size_histogram: Histogram<u64>,
49 response_size_histogram: Histogram<u64>,
50 },
51}
52
53impl Histograms {
54 fn init(&mut self) {
55 match self {
56 Self::Uninitialized {
57 meter,
58 duration_histogram_boundaries,
59 request_size_histogram_boundaries,
60 response_size_histogram_boundaries,
61 } => {
62 let mut duration_histogram_builder = meter
63 .f64_histogram(semconv::metric::HTTP_SERVER_REQUEST_DURATION)
64 .with_description("Measures the duration of inbound HTTP requests.")
65 .with_unit("s");
66 duration_histogram_builder.boundaries = Some(
67 duration_histogram_boundaries
68 .take()
69 .unwrap_or_else(|| DEFAULT_DURATION_BOUNDARIES.to_vec()),
70 );
71
72 let mut request_size_histogram_builder = meter
73 .u64_histogram(semconv::metric::HTTP_SERVER_REQUEST_BODY_SIZE)
74 .with_description("Measures the size of HTTP request messages (compressed).")
75 .with_unit("By");
76 request_size_histogram_builder.boundaries =
77 request_size_histogram_boundaries.take();
78
79 let mut response_size_histogram_builder = meter
80 .u64_histogram(semconv::metric::HTTP_SERVER_RESPONSE_BODY_SIZE)
81 .with_description("Measures the size of HTTP response messages (compressed).")
82 .with_unit("By");
83 response_size_histogram_builder.boundaries =
84 response_size_histogram_boundaries.take();
85
86 *self = Self::Initialized {
87 duration_histogram: duration_histogram_builder.build(),
88 request_size_histogram: request_size_histogram_builder.build(),
89 response_size_histogram: response_size_histogram_builder.build(),
90 }
91 }
92
93 Self::Initialized { .. } => {
94 log::warn!("Attempted to initialize the Metrics handler twice");
95 }
96 }
97 }
98
99 fn set_request_size_boundaries(&mut self, boundaries: Vec<f64>) {
100 match self {
101 Self::Uninitialized {
102 request_size_histogram_boundaries,
103 ..
104 } => {
105 *request_size_histogram_boundaries = Some(boundaries);
106 }
107
108 Self::Initialized { .. } => {
109 log::warn!(
110 "Attempted to set histogram boundaries on a Metrics handler that was already initialized"
111 );
112 }
113 }
114 }
115
116 fn set_response_size_boundaries(&mut self, boundaries: Vec<f64>) {
117 match self {
118 Self::Uninitialized {
119 response_size_histogram_boundaries,
120 ..
121 } => {
122 *response_size_histogram_boundaries = Some(boundaries);
123 }
124
125 Self::Initialized { .. } => {
126 log::warn!(
127 "Attempted to set histogram boundaries on a Metrics handler that was already initialized"
128 );
129 }
130 }
131 }
132
133 fn set_duration_boundaries(&mut self, boundaries: Vec<f64>) {
134 match self {
135 Self::Uninitialized {
136 duration_histogram_boundaries,
137 ..
138 } => {
139 *duration_histogram_boundaries = Some(boundaries);
140 }
141 Self::Initialized { .. } => {
142 log::warn!(
143 "Attempted to set histogram boundaries on a Metrics handler that was already initialized"
144 );
145 }
146 }
147 }
148
149 fn record_duration(&self, duration_s: f64, attributes: &[KeyValue]) {
150 match self {
151 Self::Initialized {
152 duration_histogram, ..
153 } => {
154 duration_histogram.record(duration_s, attributes);
155 }
156 Self::Uninitialized { .. } => {
157 log::error!("Attempted to record a duration on an uninitialized Metrics handler");
158 }
159 }
160 }
161 fn record_response_len(&self, response_len: u64, attributes: &[KeyValue]) {
162 match self {
163 Self::Initialized {
164 response_size_histogram,
165 ..
166 } => {
167 response_size_histogram.record(response_len, attributes);
168 }
169
170 Self::Uninitialized { .. } => {
171 log::error!(
172 "Attempted to record a response length on an uninitialized Metrics handler"
173 );
174 }
175 }
176 }
177
178 fn record_request_len(&self, request_len: u64, attributes: &[KeyValue]) {
179 match self {
180 Self::Initialized {
181 request_size_histogram,
182 ..
183 } => {
184 request_size_histogram.record(request_len, attributes);
185 }
186
187 Self::Uninitialized { .. } => {
188 log::error!(
189 "Attempted to record a request length on an uninitialized Metrics handler"
190 );
191 }
192 }
193 }
194}
195
196impl From<Histograms> for Metrics {
197 fn from(value: Histograms) -> Self {
198 Metrics {
199 route: None,
200 error_type: None,
201 server_address_and_port: None,
202 histograms: value,
203 }
204 }
205}
206
207impl From<Meter> for Histograms {
208 fn from(meter: Meter) -> Self {
209 Histograms::Uninitialized {
210 meter,
211 duration_histogram_boundaries: None,
212 request_size_histogram_boundaries: None,
213 response_size_histogram_boundaries: None,
214 }
215 }
216}
217
218impl Debug for Metrics {
219 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
220 f.debug_struct("Metrics")
221 .field(
222 "route",
223 &match self.route {
224 Some(_) => "Some(..)",
225 _ => "None",
226 },
227 )
228 .field(
229 "error_type",
230 &match self.error_type {
231 Some(_) => "Some(..)",
232 _ => "None",
233 },
234 )
235 .field("histograms", &self.histograms)
236 .finish()
237 }
238}
239
240pub fn metrics(meter: impl Into<Metrics>) -> Metrics {
244 meter.into()
245}
246
247impl From<&'static str> for Metrics {
248 fn from(value: &'static str) -> Self {
249 global::meter(value).into()
250 }
251}
252
253impl From<Meter> for Metrics {
254 fn from(value: Meter) -> Self {
255 Histograms::from(value).into()
256 }
257}
258
259impl From<&Meter> for Metrics {
260 fn from(meter: &Meter) -> Self {
261 meter.clone().into()
262 }
263}
264
265impl Metrics {
266 pub fn new(meter: impl Into<Metrics>) -> Self {
268 meter.into()
269 }
270
271 pub fn with_route<F>(mut self, route: F) -> Self
283 where
284 F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
285 {
286 self.route = Some(Arc::new(route));
287 self
288 }
289
290 pub fn with_error_type<F>(mut self, error_type: F) -> Self
295 where
296 F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
297 {
298 self.error_type = Some(Arc::new(error_type));
299 self
300 }
301
302 pub fn with_server_address_and_port<F>(mut self, server_address_and_port: F) -> Self
314 where
315 F: Fn(&Conn) -> Option<(Cow<'static, str>, u16)> + Send + Sync + 'static,
316 {
317 self.server_address_and_port = Some(Arc::new(server_address_and_port));
318 self
319 }
320
321 pub fn with_duration_histogram_boundaries(mut self, boundaries: Vec<f64>) -> Self {
328 self.histograms.set_duration_boundaries(boundaries);
329 self
330 }
331
332 pub fn with_request_size_histogram_boundaries(mut self, boundaries: Vec<f64>) -> Self {
339 self.histograms.set_request_size_boundaries(boundaries);
340 self
341 }
342
343 pub fn with_response_size_histogram_boundaries(mut self, boundaries: Vec<f64>) -> Self {
350 self.histograms.set_response_size_boundaries(boundaries);
351 self
352 }
353}
354
355struct MetricsWasRun;
356
357impl Handler for Metrics {
358 async fn run(&self, conn: Conn) -> Conn {
359 conn.with_state(MetricsWasRun)
360 }
361
362 async fn init(&mut self, _: &mut Info) {
363 self.histograms.init();
364 }
365
366 async fn before_send(&self, mut conn: Conn) -> Conn {
367 if conn.state::<MetricsWasRun>().is_none() {
368 return conn;
369 }
370
371 let Metrics {
372 route,
373 error_type,
374 server_address_and_port,
375 histograms,
376 } = self;
377 let error_type = error_type.as_ref().and_then(|et| et(&conn)).or_else(|| {
378 let status = conn.status().unwrap_or(Status::NotFound);
379 if status.is_server_error() {
380 Some((status as u16).to_string().into())
381 } else {
382 None
383 }
384 });
385 let status: i64 = (conn.status().unwrap_or(Status::NotFound) as u16).into();
386 let route = route.as_ref().and_then(|r| r(&conn));
387 let start_time = conn.start_time();
388 let (method, method_original) = normalize_method(conn.method().as_str());
389 let request_len = conn
390 .request_headers()
391 .get_str(KnownHeaderName::ContentLength)
392 .and_then(|src| src.parse::<u64>().ok());
393 let response_len = conn.response_len();
394 let scheme = if conn.is_secure() { "https" } else { "http" };
395 let version = conn.http_version().as_str().strip_prefix("HTTP/").unwrap();
396 let server_address_and_port = server_address_and_port.as_ref().and_then(|f| f(&conn));
397
398 let mut attributes = vec![
399 KeyValue::new(semconv::attribute::HTTP_REQUEST_METHOD, method),
400 KeyValue::new(semconv::attribute::HTTP_RESPONSE_STATUS_CODE, status),
401 KeyValue::new(semconv::attribute::URL_SCHEME, scheme),
402 KeyValue::new(semconv::attribute::NETWORK_PROTOCOL_VERSION, version),
403 ];
404
405 if let Some(method_original) = method_original {
406 attributes.push(KeyValue::new(
407 semconv::attribute::HTTP_REQUEST_METHOD_ORIGINAL,
408 method_original,
409 ));
410 }
411
412 if let Some(error_type) = error_type {
413 attributes.push(KeyValue::new(semconv::attribute::ERROR_TYPE, error_type));
414 }
415
416 if let Some(route) = route {
417 attributes.push(KeyValue::new(semconv::attribute::HTTP_ROUTE, route))
418 };
419
420 if let Some((address, port)) = server_address_and_port {
421 attributes.push(KeyValue::new(semconv::attribute::SERVER_ADDRESS, address));
422 attributes.push(KeyValue::new(
423 semconv::attribute::SERVER_PORT,
424 i64::from(port),
425 ));
426 }
427
428 let histograms = histograms.clone();
429 let inner: &mut trillium_http::Conn<Box<dyn Transport>> = conn.as_mut();
430 inner.after_send(move |_| {
431 let duration_s = (Instant::now() - start_time).as_secs_f64();
432
433 histograms.record_duration(duration_s, &attributes);
434
435 if let Some(response_len) = response_len {
436 histograms.record_response_len(response_len, &attributes);
437 }
438
439 if let Some(request_len) = request_len {
440 histograms.record_request_len(request_len, &attributes);
441 }
442 });
443
444 conn
445 }
446}