1use crate::conventions::{normalize_method, redact_query};
2use opentelemetry::{
3 Array, Context, KeyValue, Value,
4 trace::{SpanBuilder, SpanKind, TraceContextExt, Tracer},
5};
6use opentelemetry_semantic_conventions as semconv;
7use std::{
8 borrow::Cow,
9 fmt::{self, Debug, Formatter},
10 net::SocketAddr,
11 sync::Arc,
12 time::{Instant, SystemTime},
13};
14use trillium::{Conn, Handler, HeaderName, KnownHeaderName, Status, Transport};
15
16type StringExtractionFn = dyn Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static;
17
18#[derive(Clone)]
22pub struct Trace<T> {
23 pub(crate) route: Option<Arc<StringExtractionFn>>,
24 pub(crate) error_type: Option<Arc<StringExtractionFn>>,
25 pub(crate) headers: Vec<HeaderName<'static>>,
26 pub(crate) enable_local_address_and_port: bool,
27 tracer: T,
28 socket_addr: Option<SocketAddr>,
29}
30
31impl<Span> Debug for Trace<Span> {
32 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
33 f.debug_struct("Trace")
34 .field(
35 "route",
36 &match self.route {
37 Some(_) => "Some(..)",
38 _ => "None",
39 },
40 )
41 .field(
42 "error_type",
43 &match self.error_type {
44 Some(_) => "Some(..)",
45 _ => "None",
46 },
47 )
48 .field("tracer", &"..")
49 .finish()
50 }
51}
52
53pub fn trace<T: Tracer>(tracer: T) -> Trace<T> {
55 Trace::new(tracer)
56}
57
58impl<T: Tracer> Trace<T> {
59 pub fn new(tracer: T) -> Self {
61 Trace {
62 route: None,
63 error_type: None,
64 enable_local_address_and_port: false,
65 tracer,
66 headers: vec![],
67 socket_addr: None,
68 }
69 }
70
71 pub fn with_route<F>(mut self, route: F) -> Self
83 where
84 F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
85 {
86 self.route = Some(Arc::new(route));
87 self
88 }
89
90 pub fn with_error_type<F>(mut self, error_type: F) -> Self
95 where
96 F: Fn(&Conn) -> Option<Cow<'static, str>> + Send + Sync + 'static,
97 {
98 self.error_type = Some(Arc::new(error_type));
99 self
100 }
101
102 pub fn with_headers(
104 mut self,
105 headers: impl IntoIterator<Item = impl Into<HeaderName<'static>>>,
106 ) -> Self {
107 self.headers = headers.into_iter().map(Into::into).collect();
108 self
109 }
110
111 pub fn with_local_address_and_port(mut self) -> Self {
115 self.enable_local_address_and_port = true;
116 self
117 }
118}
119
120#[derive(Clone, Debug)]
121pub(crate) struct TraceContext {
122 pub(crate) context: Context,
123}
124
125struct RouteWasAvailable;
126
127impl<T> Handler for Trace<T>
128where
129 T: Tracer + Send + Sync + 'static,
130 T::Span: Send + Sync + 'static,
131{
132 async fn init(&mut self, info: &mut trillium::Info) {
133 if self.enable_local_address_and_port {
134 self.socket_addr = info.tcp_socket_addr().cloned();
135 }
136 }
137 async fn run(&self, mut conn: Conn) -> Conn {
138 let start_time = Some(SystemTime::now() - conn.start_time().duration_since(Instant::now()));
139
140 let scheme = if conn.is_secure() { "https" } else { "http" };
141 let (method, method_original) = normalize_method(conn.method().as_str());
142
143 let version = conn.http_version().as_str().strip_prefix("HTTP/").unwrap();
144
145 let path_and_query = conn.path_and_query();
146 let (path, query) = match path_and_query.find('?') {
147 Some(x) => (&path_and_query[0..x], &path_and_query[x + 1..]),
148 None => (path_and_query, ""),
149 };
150
151 let mut attributes = vec![
152 KeyValue::new(semconv::attribute::HTTP_REQUEST_METHOD, method),
153 KeyValue::new(semconv::attribute::URL_PATH, path.to_string()),
154 KeyValue::new(semconv::attribute::URL_SCHEME, scheme),
155 KeyValue::new(semconv::attribute::NETWORK_PROTOCOL_VERSION, version),
156 ];
157
158 if !query.is_empty() {
159 attributes.push(KeyValue::new(
160 semconv::attribute::URL_QUERY,
161 redact_query(query).into_owned(),
162 ));
163 }
164
165 if let Some(method_original) = method_original {
166 attributes.push(KeyValue::new(
167 semconv::attribute::HTTP_REQUEST_METHOD_ORIGINAL,
168 method_original,
169 ));
170 }
171
172 if let Some(socket_addr) = &self.socket_addr {
173 attributes.push(KeyValue::new(
174 semconv::attribute::NETWORK_LOCAL_ADDRESS,
175 socket_addr.ip().to_string(),
176 ));
177
178 attributes.push(KeyValue::new(
179 semconv::attribute::NETWORK_LOCAL_PORT,
180 i64::from(socket_addr.port()),
181 ));
182 }
183
184 if let Some(peer_ip) = conn.peer_ip() {
185 attributes.push(KeyValue::new(
186 semconv::attribute::CLIENT_ADDRESS,
187 peer_ip.to_string(),
188 ));
189 }
190
191 for (header_name, header_values) in self.headers.iter().filter_map(|hn| {
192 conn.request_headers()
193 .get_values(hn.clone())
194 .map(|v| (hn, v))
195 }) {
196 attributes.push(KeyValue::new(
197 format!(
198 "http.request.header.{}",
199 header_name.as_ref().to_lowercase()
200 ),
201 Value::Array(Array::String(
202 header_values.iter().map(|x| x.to_string().into()).collect(),
203 )),
204 ));
205 }
206
207 let address_and_port = conn.host().map(|host| {
208 host.split_once(':')
209 .and_then(|(host, port)| Some((String::from(host), port.parse().ok()?)))
210 .unwrap_or_else(|| (String::from(host), if conn.is_secure() { 443 } else { 80 }))
211 });
212
213 if let Some((address, port)) = address_and_port {
214 attributes.push(KeyValue::new(semconv::attribute::SERVER_ADDRESS, address));
215 attributes.push(KeyValue::new(semconv::attribute::SERVER_PORT, port));
216 }
217
218 if let Some(user_agent) = conn.request_headers().get_str(KnownHeaderName::UserAgent) {
219 attributes.push(KeyValue::new(
220 semconv::attribute::USER_AGENT_ORIGINAL,
221 user_agent.to_string(),
222 ));
223 }
224
225 let name = if let Some(route) = self.route.as_ref().and_then(|route| route(&conn)) {
226 conn.insert_state(RouteWasAvailable);
227 attributes.push(KeyValue::new(semconv::attribute::HTTP_ROUTE, route.clone()));
228 format!("{method} {route}").into()
229 } else {
230 method.into()
231 };
232
233 let span = self.tracer.build(SpanBuilder {
234 name,
235 start_time,
236 span_kind: Some(SpanKind::Server),
237 attributes: Some(attributes),
238 ..SpanBuilder::default()
239 });
240 let context = Context::current_with_span(span);
241
242 conn.with_state(TraceContext { context })
243 }
244
245 async fn before_send(&self, mut conn: Conn) -> Conn {
246 let Some(TraceContext { context }) = conn.state().cloned() else {
247 return conn;
248 };
249
250 let span = context.span();
251
252 let error_type = self
253 .error_type
254 .as_ref()
255 .and_then(|et| et(&conn))
256 .or_else(|| {
257 let status = conn.status().unwrap_or(Status::NotFound);
258 if status.is_server_error() {
259 Some((status as u16).to_string().into())
260 } else {
261 None
262 }
263 });
264
265 if conn.status().is_some_and(|s| s.is_server_error()) {
266 span.set_status(opentelemetry::trace::Status::Error {
267 description: "".into(), });
269 }
270
271 let status: i64 = (conn.status().unwrap_or(Status::NotFound) as u16).into();
272
273 let mut attributes = vec![KeyValue::new(
274 semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
275 status,
276 )];
277
278 if conn.take_state::<RouteWasAvailable>().is_none() {
279 let route = self.route.as_ref().and_then(|route| route(&conn));
280 if let Some(route) = &route {
281 let (method, _) = normalize_method(conn.method().as_str());
282 attributes.push(KeyValue::new(semconv::attribute::HTTP_ROUTE, route.clone()));
283 span.update_name(format!("{method} {route}"));
284 }
285 }
286
287 if let Some(error_type) = error_type {
288 attributes.push(KeyValue::new(semconv::attribute::ERROR_TYPE, error_type));
289 }
290
291 span.set_attributes(attributes);
292
293 {
294 let context = context.clone();
295 let inner: &mut trillium_http::Conn<Box<dyn Transport>> = conn.as_mut();
296 inner.after_send(move |send_status| {
297 let span = context.span();
298 if !send_status.is_success() {
299 span.set_status(opentelemetry::trace::Status::Error {
300 description: "http send error".into(),
301 });
302 span.set_attribute(KeyValue::new(
303 semconv::attribute::ERROR_TYPE,
304 "http send error",
305 ));
306 }
307 span.end();
308 });
309 }
310
311 conn
312 }
313}