trillium_opentelemetry/
instrument_handler.rs1use crate::{instrumentation_scope, trace::TraceContext};
2use opentelemetry::{
3 Context,
4 global::BoxedTracer,
5 trace::{FutureExt, TraceContextExt, Tracer},
6};
7use trillium::{Conn, Handler, Info, Upgrade};
8
9#[derive(Debug, Clone)]
14pub struct InstrumentHandler<H, T> {
15 handler: H,
16 tracer: T,
17}
18
19impl<H, T> Handler for InstrumentHandler<H, T>
20where
21 H: Handler,
22 T: Tracer + Send + Sync + 'static,
23 T::Span: Send + Sync + 'static,
24{
25 async fn init(&mut self, info: &mut Info) {
26 let name = self.handler.name();
27 self.handler
28 .init(info)
29 .with_context(Context::current_with_span(
30 self.tracer.start(format!("{name}::init")),
31 ))
32 .await
33 }
34
35 async fn run(&self, mut conn: Conn) -> Conn {
36 let name = self.handler.name();
37 match conn.take_state() {
38 Some(TraceContext { context }) => {
39 let child = self
40 .tracer
41 .start_with_context(format!("{name}::run"), &context);
42 let child_context = Context::current_with_span(child);
43 self.handler
44 .run(conn.with_state(TraceContext {
45 context: child_context.clone(),
46 }))
47 .with_context(child_context)
48 .await
49 .with_state(TraceContext { context })
50 }
51
52 None => self.handler.run(conn).await,
53 }
54 }
55
56 async fn before_send(&self, mut conn: Conn) -> Conn {
57 let name = self.handler.name();
58 match conn.take_state() {
59 Some(TraceContext { context }) => {
60 let child = self
61 .tracer
62 .start_with_context(format!("{name}::before_send"), &context);
63
64 let child_context = Context::current_with_span(child);
65 self.handler
66 .before_send(conn.with_state(TraceContext {
67 context: child_context.clone(),
68 }))
69 .with_context(child_context)
70 .await
71 .with_state(TraceContext { context })
72 }
73
74 None => self.handler.before_send(conn).await,
75 }
76 }
77
78 fn has_upgrade(&self, upgrade: &Upgrade) -> bool {
79 self.handler.has_upgrade(upgrade)
80 }
81
82 async fn upgrade(&self, upgrade: Upgrade) {
83 let name = self.handler.name();
84 match upgrade.state().get() {
85 Some(TraceContext { context }) => {
86 let child = self
87 .tracer
88 .start_with_context(format!("{name}::upgrade"), context);
89
90 self.handler
91 .upgrade(upgrade)
92 .with_context(Context::current_with_span(child))
93 .await
94 }
95
96 None => self.handler.upgrade(upgrade).await,
97 }
98 }
99}
100
101pub fn instrument_handler<H, T>(handler: H, tracer: T) -> InstrumentHandler<H, T>
106where
107 H: Handler,
108 T: Tracer + Send + Sync + 'static,
109 T::Span: Send + Sync + 'static,
110{
111 InstrumentHandler::new(handler, tracer)
112}
113
114impl<H, T> InstrumentHandler<H, T>
115where
116 H: Handler,
117 T: Tracer + Send + Sync + 'static,
118 T::Span: Send + Sync + 'static,
119{
120 pub fn new(handler: H, tracer: T) -> Self {
125 Self { handler, tracer }
126 }
127}
128
129pub fn instrument_handler_global<H>(handler: H) -> InstrumentHandler<H, BoxedTracer>
136where
137 H: Handler,
138{
139 InstrumentHandler::new(
140 handler,
141 opentelemetry::global::tracer_with_scope(instrumentation_scope()),
142 )
143}