From 30958a1cddb86c2abd705b2d4fd36b51037f6879 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Mon, 16 Oct 2023 15:03:08 +0200 Subject: [PATCH] chore(deps): Start work upgrading to opentelemetry 0.20 (#292) --- Cargo.lock | 42 ++++-- server/Cargo.toml | 7 +- server/src/metrics/actix_web_metrics.rs | 157 ++++++++++++++++------- server/src/middleware/request_tracing.rs | 7 +- server/src/prom_metrics.rs | 30 ++--- 5 files changed, 153 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef1f6c2b..0de1a1ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1757,9 +1757,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" dependencies = [ "opentelemetry_api", "opentelemetry_sdk", @@ -1767,34 +1767,36 @@ dependencies = [ [[package]] name = "opentelemetry-prometheus" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9f186f6293ebb693caddd0595e66b74a6068fa51048e26e0bf9c95478c639c" +checksum = "c7d81bc254e2d572120363a2b16cdb0d715d301b5789be0cfc26ad87e4e10e53" dependencies = [ - "opentelemetry", + "once_cell", + "opentelemetry_api", + "opentelemetry_sdk", "prometheus", "protobuf", ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5" +checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" dependencies = [ "opentelemetry", ] [[package]] name = "opentelemetry_api" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" dependencies = [ - "fnv", "futures-channel", "futures-util", "indexmap 1.9.3", + "js-sys", "once_cell", "pin-project-lite", "thiserror", @@ -1803,26 +1805,37 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" dependencies = [ "async-trait", "crossbeam-channel", - "dashmap", - "fnv", "futures-channel", "futures-executor", "futures-util", "once_cell", "opentelemetry_api", + "ordered-float", "percent-encoding", "rand", + "regex", + "serde", + "serde_json", "thiserror", "tokio", "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -3129,6 +3142,7 @@ dependencies = [ "opentelemetry", "opentelemetry-prometheus", "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "prometheus", "prometheus-static-metric", "rand", diff --git a/server/Cargo.toml b/server/Cargo.toml index 293e7408..2a5901ca 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -35,9 +35,10 @@ iter_tools = "0.1.4" itertools = "0.11.0" lazy_static = "1.4.0" num_cpus = "1.16.0" -opentelemetry = {version = "0.19.0", features = ["trace", "rt-tokio", "metrics"]} -opentelemetry-prometheus = "0.12.0" -opentelemetry-semantic-conventions = "0.11.0" +opentelemetry = {version = "0.20.0", features = ["trace", "rt-tokio", "metrics"]} +opentelemetry-prometheus = "0.13.0" +opentelemetry-semantic-conventions = "0.12.0" +opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "serde", "serde_json", "rt-tokio-current-thread", "logs"] } prometheus = {version = "0.13.3", features = ["process"]} prometheus-static-metric = "0.5.1" redis = {version = "0.23.3", features = ["tokio-comp", "tokio-rustls-comp"]} diff --git a/server/src/metrics/actix_web_metrics.rs b/server/src/metrics/actix_web_metrics.rs index 6b551ce1..384e6362 100644 --- a/server/src/metrics/actix_web_metrics.rs +++ b/server/src/metrics/actix_web_metrics.rs @@ -1,15 +1,16 @@ +use actix_http::header::CONTENT_LENGTH; use actix_web::dev; use actix_web::dev::ServiceRequest; use actix_web::http::{header, Method, StatusCode, Version}; use futures::{future, FutureExt}; use futures_core::future::LocalBoxFuture; -use opentelemetry::metrics::{Histogram, Meter, MetricsError, Unit, UpDownCounter}; +use opentelemetry::metrics::{Histogram, Meter, MeterProvider, MetricsError, Unit, UpDownCounter}; use opentelemetry::trace::OrderMap; -use opentelemetry::{global, Context, Key, KeyValue, Value}; -use opentelemetry_prometheus::PrometheusExporter; +use opentelemetry::{global, Key, KeyValue, Value}; use opentelemetry_semantic_conventions::trace::{ - HTTP_CLIENT_IP, HTTP_FLAVOR, HTTP_METHOD, HTTP_ROUTE, HTTP_SCHEME, HTTP_STATUS_CODE, - HTTP_TARGET, HTTP_USER_AGENT, NET_HOST_PORT, NET_PEER_NAME, NET_SOCK_PEER_ADDR, + CLIENT_ADDRESS, CLIENT_SOCKET_ADDRESS, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, + HTTP_ROUTE, NETWORK_PROTOCOL_NAME, NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, + URL_PATH, URL_SCHEME, USER_AGENT_ORIGINAL, }; use prometheus::{Encoder, TextEncoder}; use std::sync::Arc; @@ -18,6 +19,8 @@ use std::time::SystemTime; use crate::metrics::route_formatter::RouteFormatter; const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests"; const HTTP_SERVER_DURATION: &str = "http.server.duration"; +const HTTP_SERVER_REQUEST_SIZE: &str = "http.server.request.size"; +const HTTP_SERVER_RESPONSE_SIZE: &str = "http.server.response.size"; #[inline] pub(super) fn http_method_str(method: &Method) -> Value { @@ -36,13 +39,13 @@ pub(super) fn http_method_str(method: &Method) -> Value { } #[inline] -pub(super) fn http_flavor(version: Version) -> Value { +pub(super) fn http_version(version: Version) -> Value { match version { - Version::HTTP_09 => "HTTP/0.9".into(), - Version::HTTP_10 => "HTTP/1.0".into(), - Version::HTTP_11 => "HTTP/1.1".into(), - Version::HTTP_2 => "HTTP/2".into(), - Version::HTTP_3 => "HTTP/3".into(), + Version::HTTP_09 => "0.9".into(), + Version::HTTP_10 => "1.0".into(), + Version::HTTP_11 => "1.1".into(), + Version::HTTP_2 => "2.0".into(), + Version::HTTP_3 => "3.0".into(), other => format!("{:?}", other).into(), } } @@ -63,15 +66,16 @@ pub(crate) fn trace_attributes_from_request( let conn_info = req.connection_info(); let mut attributes = OrderMap::with_capacity(11); - attributes.insert(HTTP_METHOD, http_method_str(req.method())); - attributes.insert(HTTP_FLAVOR, http_flavor(req.version())); - attributes.insert(NET_PEER_NAME, conn_info.host().to_string().into()); + attributes.insert(HTTP_REQUEST_METHOD, http_method_str(req.method())); + attributes.insert(NETWORK_PROTOCOL_NAME, "http".into()); + attributes.insert(NETWORK_PROTOCOL_VERSION, http_version(req.version())); + attributes.insert(CLIENT_ADDRESS, conn_info.host().to_string().into()); attributes.insert(HTTP_ROUTE, http_route.to_owned().into()); - attributes.insert(HTTP_SCHEME, http_scheme(conn_info.scheme())); + attributes.insert(URL_SCHEME, http_scheme(conn_info.scheme())); let server_name = req.app_config().host(); if server_name != conn_info.host() { - attributes.insert(NET_PEER_NAME, server_name.to_string().into()); + attributes.insert(SERVER_ADDRESS, server_name.to_string().into()); } if let Some(port) = conn_info .host() @@ -80,27 +84,27 @@ pub(crate) fn trace_attributes_from_request( .and_then(|port| port.parse::().ok()) { if port != 80 && port != 443 { - attributes.insert(NET_HOST_PORT, port.into()); + attributes.insert(SERVER_PORT, port.into()); } } if let Some(path) = req.uri().path_and_query() { - attributes.insert(HTTP_TARGET, path.as_str().to_string().into()); + attributes.insert(URL_PATH, path.as_str().to_string().into()); } if let Some(user_agent) = req .headers() .get(header::USER_AGENT) .and_then(|s| s.to_str().ok()) { - attributes.insert(HTTP_USER_AGENT, user_agent.to_string().into()); + attributes.insert(USER_AGENT_ORIGINAL, user_agent.to_string().into()); } let remote_addr = conn_info.realip_remote_addr(); if let Some(remote) = remote_addr { - attributes.insert(HTTP_CLIENT_IP, remote.to_string().into()); + attributes.insert(CLIENT_ADDRESS, remote.to_string().into()); } if let Some(peer_addr) = req.peer_addr().map(|socket| socket.ip().to_string()) { if Some(peer_addr.as_str()) != remote_addr { // Client is going through a proxy - attributes.insert(NET_SOCK_PEER_ADDR, peer_addr.into()); + attributes.insert(CLIENT_SOCKET_ADDRESS, peer_addr.into()); } } @@ -111,20 +115,27 @@ pub(super) fn metrics_attributes_from_request( req: &ServiceRequest, http_target: &str, ) -> Vec { - use opentelemetry_semantic_conventions::trace::NET_SOCK_HOST_ADDR; + use opentelemetry_semantic_conventions::trace::SERVER_SOCKET_ADDRESS; let conn_info = req.connection_info(); let mut attributes = Vec::with_capacity(11); - attributes.push(KeyValue::new(HTTP_METHOD, http_method_str(req.method()))); - attributes.push(KeyValue::new(HTTP_FLAVOR, http_flavor(req.version()))); - attributes.push(NET_SOCK_HOST_ADDR.string(conn_info.host().to_string())); - attributes.push(HTTP_TARGET.string(http_target.to_owned())); - attributes.push(KeyValue::new(HTTP_SCHEME, http_scheme(conn_info.scheme()))); + attributes.push(KeyValue::new( + HTTP_REQUEST_METHOD, + http_method_str(req.method()), + )); + attributes.push(KeyValue::new(NETWORK_PROTOCOL_NAME, "http")); + attributes.push(KeyValue::new( + NETWORK_PROTOCOL_VERSION, + http_version(req.version()), + )); + attributes.push(SERVER_SOCKET_ADDRESS.string(conn_info.host().to_string())); + attributes.push(URL_PATH.string(http_target.to_owned())); + attributes.push(KeyValue::new(URL_SCHEME, http_scheme(conn_info.scheme()))); let server_name = req.app_config().host(); if server_name != conn_info.host() { - attributes.push(NET_PEER_NAME.string(server_name.to_string())); + attributes.push(SERVER_ADDRESS.string(server_name.to_string())); } if let Some(port) = conn_info .host() @@ -132,14 +143,14 @@ pub(super) fn metrics_attributes_from_request( .nth(1) .and_then(|port| port.parse().ok()) { - attributes.push(NET_HOST_PORT.i64(port)) + attributes.push(SERVER_PORT.i64(port)) } let remote_addr = conn_info.realip_remote_addr(); if let Some(peer_addr) = req.peer_addr().map(|socket| socket.ip().to_string()) { if Some(peer_addr.as_str()) != remote_addr { // Client is going through a proxy - attributes.push(NET_SOCK_PEER_ADDR.string(peer_addr)) + attributes.push(CLIENT_SOCKET_ADDRESS.string(peer_addr)) } } @@ -150,6 +161,8 @@ pub(super) fn metrics_attributes_from_request( struct Metrics { http_server_active_requests: UpDownCounter, http_server_duration: Histogram, + http_server_request_size: Histogram, + http_server_response_size: Histogram, } impl Metrics { @@ -166,9 +179,23 @@ impl Metrics { .with_unit(Unit::new("ms")) .init(); + let http_server_request_size = meter + .u64_histogram(HTTP_SERVER_REQUEST_SIZE) + .with_description("Measures the size of HTTP request messages (compressed).") + .with_unit(Unit::new("By")) + .init(); + + let http_server_response_size = meter + .u64_histogram(HTTP_SERVER_RESPONSE_SIZE) + .with_description("Measures the size of HTTP request messages (compressed).") + .with_unit(Unit::new("By")) + .init(); + Metrics { http_server_active_requests, http_server_duration, + http_server_request_size, + http_server_response_size, } } } @@ -177,6 +204,7 @@ impl Metrics { #[derive(Clone, Debug, Default)] pub struct RequestMetricsBuilder { route_formatter: Option>, + meter: Option, } impl RequestMetricsBuilder { @@ -194,8 +222,18 @@ impl RequestMetricsBuilder { self } + /// Set the meter provider this middleware should use to construct meters + pub fn with_meter_provider(mut self, meter_provider: impl MeterProvider) -> Self { + self.meter = Some(get_versioned_meter(meter_provider)); + self + } + /// Build the `RequestMetrics` middleware - pub fn build(self, meter: Meter) -> RequestMetrics { + pub fn build(self) -> RequestMetrics { + let meter = self + .meter + .unwrap_or_else(|| get_versioned_meter(global::meter_provider())); + RequestMetrics { route_formatter: self.route_formatter, metrics: Arc::new(Metrics::new(meter)), @@ -203,6 +241,15 @@ impl RequestMetricsBuilder { } } +/// construct meters for this crate +fn get_versioned_meter(meter_provider: impl MeterProvider) -> Meter { + meter_provider.versioned_meter( + "actix_web_opentelemetry", + Some(env!("CARGO_PKG_VERSION")), + Some(opentelemetry_semantic_conventions::SCHEMA_URL), + None, + ) +} /// Request metrics tracking /// /// # Examples @@ -310,34 +357,48 @@ where fn call(&self, req: dev::ServiceRequest) -> Self::Future { let timer = SystemTime::now(); - let mut http_target = req.match_pattern().unwrap_or_else(|| "default".to_string()); + let mut http_target = req + .match_pattern() + .map(std::borrow::Cow::Owned) + .unwrap_or(std::borrow::Cow::Borrowed("default")); + if let Some(formatter) = &self.route_formatter { - http_target = formatter.format(&http_target); + http_target = std::borrow::Cow::Owned(formatter.format(&http_target)); } let mut attributes = metrics_attributes_from_request(&req, &http_target); - let cx = Context::current(); + self.metrics.http_server_active_requests.add(1, &attributes); + let content_length = req + .headers() + .get(CONTENT_LENGTH) + .and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok())) + .unwrap_or(0); self.metrics - .http_server_active_requests - .add(&cx, 1, &attributes); + .http_server_request_size + .record(content_length, &attributes); let request_metrics = self.metrics.clone(); Box::pin(self.service.call(req).map(move |res| { request_metrics .http_server_active_requests - .add(&cx, -1, &attributes); + .add(-1, &attributes); // Ignore actix errors for metrics if let Ok(res) = res { - attributes.push(HTTP_STATUS_CODE.string(res.status().as_str().to_owned())); + attributes.push(HTTP_RESPONSE_STATUS_CODE.i64(res.status().as_u16() as i64)); + let response_size = res + .response() + .headers() + .get(CONTENT_LENGTH) + .and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok())) + .unwrap_or(0); + request_metrics + .http_server_response_size + .record(response_size, &attributes); request_metrics.http_server_duration.record( - &cx, - timer - .elapsed() - .map(|t| t.as_secs_f64() * 1000.0) - .unwrap_or_default(), + timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(), &attributes, ); @@ -351,22 +412,20 @@ where #[derive(Clone, Debug)] pub struct PrometheusMetricsHandler { - prometheus_exporter: PrometheusExporter, + registry: prometheus::Registry, } impl PrometheusMetricsHandler { /// Build a route to serve Prometheus metrics - pub fn new(exporter: PrometheusExporter) -> Self { - Self { - prometheus_exporter: exporter, - } + pub fn new(registry: prometheus::Registry) -> Self { + Self { registry } } } impl PrometheusMetricsHandler { fn metrics(&self) -> String { let encoder = TextEncoder::new(); - let metric_families = self.prometheus_exporter.registry().gather(); + let metric_families = self.registry.gather(); let mut buf = Vec::new(); if let Err(err) = encoder.encode(&metric_families[..], &mut buf) { global::handle_error(MetricsError::Other(err.to_string())); diff --git a/server/src/middleware/request_tracing.rs b/server/src/middleware/request_tracing.rs index 3c2b202e..a7e8d643 100644 --- a/server/src/middleware/request_tracing.rs +++ b/server/src/middleware/request_tracing.rs @@ -9,7 +9,7 @@ use futures_core::future::LocalBoxFuture; use opentelemetry::global; use opentelemetry::propagation::Extractor; use opentelemetry::trace::{FutureExt, SpanKind, Status, TraceContextExt, Tracer, TracerProvider}; -use opentelemetry_semantic_conventions::trace::HTTP_STATUS_CODE; +use opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE; use std::borrow::Cow; use std::rc::Rc; use std::task::Poll; @@ -112,6 +112,7 @@ where global::tracer_provider().versioned_tracer( "unleash-edge", Some(env!("CARGO_PKG_VERSION")), + Some("https://opentelemetry.io/schema/1.0.0"), None, ), service, @@ -192,7 +193,9 @@ where .map(move |res| match res { Ok(ok_res) => { let span = cx.span(); - span.set_attribute(HTTP_STATUS_CODE.i64(ok_res.status().as_u16() as i64)); + span.set_attribute( + HTTP_RESPONSE_STATUS_CODE.i64(ok_res.status().as_u16() as i64), + ); if ok_res.status().is_server_error() { span.set_status(Status::error( ok_res diff --git a/server/src/prom_metrics.rs b/server/src/prom_metrics.rs index 144f8742..bc88da3b 100644 --- a/server/src/prom_metrics.rs +++ b/server/src/prom_metrics.rs @@ -1,10 +1,4 @@ -use opentelemetry::{ - global, - sdk::{ - export::metrics::aggregation, - metrics::{controllers, processors, selectors}, - }, -}; +use opentelemetry_sdk::metrics::MeterProvider; #[cfg(target_os = "linux")] use prometheus::process_collector::ProcessCollector; use tracing_subscriber::layer::SubscriberExt; @@ -37,25 +31,17 @@ pub fn instantiate( fn instantiate_prometheus_metrics_handler( registry: prometheus::Registry, ) -> (PrometheusMetricsHandler, RequestMetrics) { - let controller = controllers::basic(processors::factory( - selectors::simple::histogram([0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), // Will give histogram for with resolution in n ms - aggregation::cumulative_temporality_selector(), - )) - .with_resource(opentelemetry::sdk::Resource::new(vec![ + let resource = opentelemetry::sdk::Resource::new(vec![ opentelemetry::KeyValue::new("service.name", "unleash-edge"), opentelemetry::KeyValue::new("edge.version", crate::types::build::PKG_VERSION), opentelemetry::KeyValue::new("edge.githash", crate::types::build::SHORT_COMMIT), - ])) - .build(); - - let exporter = opentelemetry_prometheus::exporter(controller) - .with_registry(registry) - .init(); - let meter = global::meter("edge_web"); - + ]); + let provider = MeterProvider::builder().with_resource(resource).build(); ( - PrometheusMetricsHandler::new(exporter), - RequestMetricsBuilder::new().build(meter), + PrometheusMetricsHandler::new(registry), + RequestMetricsBuilder::new() + .with_meter_provider(provider) + .build(), ) }