diff --git a/rust/examples/copycat.rs b/rust/examples/copycat.rs index 67303e48..28bbd17e 100644 --- a/rust/examples/copycat.rs +++ b/rust/examples/copycat.rs @@ -30,6 +30,9 @@ use libits::transport::packet::Packet; use log::{debug, info, warn}; use timer::MessageTimer; +#[cfg(feature = "telemetry")] +use libits::transport::telemetry::init_tracer; + pub struct CopyCat { configuration: Arc, item_receiver: Receiver>, @@ -233,6 +236,9 @@ async fn main() { .set_credentials(username, password.unwrap_or(&String::new())); } + #[cfg(feature = "telemetry")] + init_tracer(&configuration.telemetry, "copycat").expect("Failed to init telemetry"); + pipeline::run::( Arc::new(configuration), Arc::new(RwLock::new(context)), diff --git a/rust/examples/json_counter.rs b/rust/examples/json_counter.rs index 9169e66f..f5ae064f 100644 --- a/rust/examples/json_counter.rs +++ b/rust/examples/json_counter.rs @@ -23,7 +23,7 @@ use libits::transport::mqtt::mqtt_client::MqttClient; use libits::transport::mqtt::mqtt_router::MqttRouter; use libits::transport::mqtt::topic::Topic; use log::{error, info}; -use rumqttc::v5::mqttbytes::v5::Publish; +use rumqttc::v5::mqttbytes::v5::{Publish, PublishProperties}; #[derive(Clone, Default, Debug, Hash, PartialEq, Eq)] struct StrTopic { @@ -116,19 +116,24 @@ async fn main() { router.add_route( StrTopic::from_str("#").unwrap(), - |publish: Publish| -> Option> { + |publish: Publish| -> Option<(Box, PublishProperties)> { if let Ok(payload) = std::str::from_utf8(publish.payload.as_ref()) { if serde_json::from_str::(payload).is_ok() { - Some(Box::new(Ok::<(), &'static str>(()))) + Some(( + Box::new(Ok::<(), &'static str>), + publish.properties.unwrap_or_default(), + )) } else { - Some(Box::new(Err::<(), &'static str>( - "Failed to parse payload as JSON", - ))) + Some(( + Box::new(Err::<(), &'static str>("Failed to parse payload as JSON")), + publish.properties.unwrap_or_default(), + )) } } else { - Some(Box::new(Err::<(), &'static str>( - "Failed to parse payload as UTF-8", - ))) + Some(( + Box::new(Err::<(), &'static str>("Failed to parse payload as UTF-8")), + publish.properties.unwrap_or_default(), + )) } }, ); @@ -142,7 +147,7 @@ async fn main() { match event_loop.poll().await { Ok(event) => { if let Some((_, result)) = router.handle_event::(event) { - let result = result.downcast::>(); + let result = result.0.downcast::>(); if result.is_ok() { json += 1; } diff --git a/rust/examples/telemetry.rs b/rust/examples/telemetry.rs index 634c16ee..ead89570 100644 --- a/rust/examples/telemetry.rs +++ b/rust/examples/telemetry.rs @@ -19,7 +19,7 @@ use flexi_logger::{with_thread, Cleanup, Criterion, FileSpec, Logger, Naming, Wr use ini::Ini; use log::{info, warn}; use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator}; -use opentelemetry::trace::{mark_span_as_active, TraceContextExt}; +use opentelemetry::trace::{mark_span_as_active, SpanKind, TraceContextExt}; use opentelemetry::{global, Context}; use opentelemetry_sdk::propagation::TraceContextPropagator; @@ -127,37 +127,61 @@ async fn main() { init_tracer(&configuration.telemetry, "iot3").expect("Failed to configure telemetry"); info!("Send a trace with a single span 'ping' root span"); - let ping_data = execute_in_span(TRACER_NAME, "example/ping", None::<&Data>, || { - let context = Context::current(); - trace_span_context_info!("└─ Ping", context); + let ping_data = execute_in_span( + TRACER_NAME, + "example/ping", + Some(SpanKind::Producer), + None::<&Data>, + || { + let context = Context::current(); + trace_span_context_info!("└─ Ping", context); - let mut data = Data::default(); + let mut data = Data::default(); - let propagator = TraceContextPropagator::new(); - propagator.inject(&mut data); + let propagator = TraceContextPropagator::new(); + propagator.inject(&mut data); - data - }); + data + }, + ); info!("Send a trace with a single span 'pong' root span linked with the previous one 'ping'"); - execute_in_span(TRACER_NAME, "example/pong", Some(&ping_data), || { - let context = Context::current(); - trace_span_context_info!("└─ Pong", context); - }); + execute_in_span( + TRACER_NAME, + "example/pong", + Some(SpanKind::Consumer), + Some(&ping_data), + || { + let context = Context::current(); + trace_span_context_info!("└─ Pong", context); + }, + ); info!("Send a single trace with two spans"); - execute_in_span(TRACER_NAME, "example/nested_root", None::<&Data>, || { - let context = Context::current(); - trace_span_context_info!("└─ Root", context); - - execute_in_span(TRACER_NAME, "example/nested_child", None::<&Data>, || { + execute_in_span( + TRACER_NAME, + "example/nested_root", + None, + None::<&Data>, + || { let context = Context::current(); - trace_span_context_info!(" └─ Child", context); - }) - }); + trace_span_context_info!("└─ Root", context); + + execute_in_span( + TRACER_NAME, + "example/nested_child", + None, + None::<&Data>, + || { + let context = Context::current(); + trace_span_context_info!(" └─ Child", context); + }, + ) + }, + ); info!("Send a trace with 3 spans from 3 threads"); - let root_span = get_span(TRACER_NAME, "main_thread", None::<&Data>); + let root_span = get_span(TRACER_NAME, "main_thread", None); let guard = mark_span_as_active(root_span); let cxt = Context::current(); trace_span_context_info!("└─ Main thread", &cxt); @@ -171,7 +195,7 @@ async fn main() { let cxt: Context = rx.recv().unwrap(); let _guard = cxt.attach(); - execute_in_span(TRACER_NAME, "listener_thread", None::<&Data>, || { + execute_in_span(TRACER_NAME, "listener_thread", None, None::<&Data>, || { let cxt = Context::current(); trace_span_context_info!(" └─ Listener thread", cxt); }); @@ -189,7 +213,7 @@ async fn main() { let cxt: Context = rx.recv().unwrap(); let _guard = cxt.clone().attach(); - execute_in_span(TRACER_NAME, "sender_thread", None::<&Data>, || { + execute_in_span(TRACER_NAME, "sender_thread", None, None::<&Data>, || { let inner_cxt = Context::current(); trace_span_context_info!(" ├─ Sender thread", inner_cxt); listener_tx diff --git a/rust/src/client/application/pipeline.rs b/rust/src/client/application/pipeline.rs index 03930102..2aecd12c 100644 --- a/rust/src/client/application/pipeline.rs +++ b/rust/src/client/application/pipeline.rs @@ -18,6 +18,7 @@ use crate::exchange::Exchange; use crate::monitor::trace_exchange; use crate::transport::mqtt::mqtt_client::{listen, MqttClient}; use crate::transport::mqtt::mqtt_router; +use crate::transport::mqtt::mqtt_router::BoxedReception; use crate::transport::mqtt::topic::Topic; use crate::transport::packet::Packet; use crate::transport::payload::Payload; @@ -26,7 +27,6 @@ use log::{debug, error, info, trace, warn}; use rumqttc::v5::mqttbytes::v5::PublishProperties; use rumqttc::v5::{Event, EventLoop}; use serde::de::DeserializeOwned; -use std::any::Any; use std::sync::{Arc, RwLock}; use std::thread; use std::thread::JoinHandle; @@ -370,14 +370,14 @@ where for event in event_receiver { match router.handle_event(event) { - Some((topic, reception)) => { + Some((topic, (reception, properties))) => { // TODO use the From Trait if reception.is::() { if let Ok(exchange) = reception.downcast::() { let item = Packet { topic, payload: *exchange, - properties: PublishProperties::default(), + properties, }; //assumed clone, we send to 2 channels match monitoring_sender.send((item.clone(), None)) { @@ -424,9 +424,7 @@ where ) } -fn deserialize( - publish: rumqttc::v5::mqttbytes::v5::Publish, -) -> Option> +fn deserialize(publish: rumqttc::v5::mqttbytes::v5::Publish) -> Option where T: DeserializeOwned + Payload + 'static + Send, { @@ -437,7 +435,7 @@ where match serde_json::from_str::(message_str) { Ok(message) => { trace!("message parsed"); - return Some(Box::new(message)); + return Some((Box::new(message), publish.properties.unwrap_or_default())); } Err(e) => warn!("parse error({}) on: {}", e, message_str), } diff --git a/rust/src/transport/mqtt/mqtt_client.rs b/rust/src/transport/mqtt/mqtt_client.rs index c87daa00..37f6134d 100644 --- a/rust/src/transport/mqtt/mqtt_client.rs +++ b/rust/src/transport/mqtt/mqtt_client.rs @@ -19,6 +19,15 @@ use rumqttc::v5::mqttbytes::v5::Filter; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::{AsyncClient, Event, EventLoop, MqttOptions}; +#[cfg(feature = "telemetry")] +use { + crate::transport::telemetry::get_mqtt_span, + opentelemetry::propagation::TextMapPropagator, + opentelemetry::trace::{SpanKind, TraceContextExt}, + opentelemetry::Context, + opentelemetry_sdk::propagation::TraceContextPropagator, +}; + pub struct MqttClient { client: AsyncClient, } @@ -48,7 +57,33 @@ impl<'client> MqttClient { }; } + #[cfg(feature = "telemetry")] + pub async fn publish(&self, mut packet: Packet) { + debug!("Publish with context"); + let payload = serde_json::to_string(&packet.payload).unwrap(); + + let span = get_mqtt_span( + SpanKind::Producer, + &packet.topic.to_string(), + payload.as_bytes().len() as i64, + ); + + let cx = Context::current().with_span(span); + let _guard = cx.attach(); + + let propagator = TraceContextPropagator::new(); + propagator.inject(&mut packet); + + self.do_publish(packet).await + } + + #[cfg(not(feature = "telemetry"))] pub async fn publish(&self, packet: Packet) { + debug!("Publish without context"); + self.do_publish(packet).await + } + + async fn do_publish(&self, packet: Packet) { let payload = serde_json::to_string(&packet.payload).unwrap(); match self diff --git a/rust/src/transport/mqtt/mqtt_router.rs b/rust/src/transport/mqtt/mqtt_router.rs index e75f2626..c98ee6c5 100644 --- a/rust/src/transport/mqtt/mqtt_router.rs +++ b/rust/src/transport/mqtt/mqtt_router.rs @@ -12,17 +12,20 @@ use std::collections::HashMap; use log::{error, info, trace, warn}; -use rumqttc::v5::mqttbytes::v5::Publish; +use rumqttc::v5::mqttbytes::v5::{Publish, PublishProperties}; use rumqttc::v5::{Event, Incoming}; use crate::transport::mqtt::topic::Topic; use std::any::{type_name, Any}; use std::str::from_utf8; -type BoxedReception = Box; +pub type BoxedReception = (Box, PublishProperties); type BoxedCallback = Box Option>; +#[cfg(feature = "telemetry")] +use crate::transport::telemetry::get_reception_mqtt_span; + #[derive(Default)] pub struct MqttRouter { route_map: HashMap, @@ -44,6 +47,9 @@ impl MqttRouter { Incoming::Publish(publish) => { match from_utf8(&publish.topic) { Ok(str_topic) => { + #[cfg(feature = "telemetry")] + let _span = get_reception_mqtt_span(&publish); + trace!( "Publish received for the packet {:?} on the topic {}", publish.pkid, diff --git a/rust/src/transport/telemetry.rs b/rust/src/transport/telemetry.rs index f2293a74..58f6fcf8 100644 --- a/rust/src/transport/telemetry.rs +++ b/rust/src/transport/telemetry.rs @@ -9,11 +9,13 @@ * Authors: see CONTRIBUTORS.md */ +use log::debug; +use std::str::from_utf8; use std::time::Duration; use opentelemetry::global::BoxedSpan; use opentelemetry::propagation::{Extractor, TextMapPropagator}; -use opentelemetry::trace::{Link, TraceContextExt, Tracer}; +use opentelemetry::trace::{Link, Span, SpanKind, TraceContextExt, Tracer}; use opentelemetry::{global, Context, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::propagation::TraceContextPropagator; @@ -23,6 +25,7 @@ use opentelemetry_sdk::trace::{ }; use opentelemetry_sdk::Resource; use reqwest::header; +use rumqttc::v5::mqttbytes::v5::Publish; use crate::client::configuration::telemetry_configuration::TelemetryConfiguration; @@ -94,9 +97,51 @@ pub fn init_tracer( Ok(()) } +pub fn get_span( + tracer_name: &'static str, + span_name: &'static str, + span_kind: Option, +) -> BoxedSpan { + let tracer = global::tracer(tracer_name); + let mut span_builder = tracer.span_builder(span_name); + + if let Some(kind) = span_kind { + span_builder = span_builder.with_kind(kind) + } + + span_builder.start(&tracer) +} + +pub fn get_linked_span( + tracer_name: &'static str, + span_name: &'static str, + span_kind: Option, + from: &E, +) -> BoxedSpan +where + E: Extractor, +{ + let tracer = global::tracer(tracer_name); + + let propagator = TraceContextPropagator::new(); + let trace_cx = propagator.extract(from); + let span_cx = trace_cx.span().span_context().clone(); + + let mut span_builder = tracer + .span_builder(span_name) + .with_links(vec![Link::with_context(span_cx)]); + + if let Some(kind) = span_kind { + span_builder = span_builder.with_kind(kind) + } + + span_builder.start(&tracer) +} + pub fn execute_in_span( tracer_name: &'static str, span_name: &'static str, + span_kind: Option, from: Option<&E>, block: F, ) -> R @@ -104,34 +149,89 @@ where F: FnOnce() -> R, E: Extractor, { - let span = get_span(tracer_name, span_name, from); + let span = if let Some(from) = from { + get_linked_span(tracer_name, span_name, span_kind, from) + } else { + get_span(tracer_name, span_name, span_kind) + }; + let cx = Context::current_with_span(span); let _guard = cx.attach(); block() } -pub fn get_span( - tracer_name: &'static str, - span_name: &'static str, - from: Option<&E>, -) -> BoxedSpan +pub fn add_link(linked_entity: &E, span: &mut BoxedSpan) where E: Extractor, { - let tracer = global::tracer(tracer_name); + let propagator = TraceContextPropagator::new(); + let trace_cx = propagator.extract(linked_entity); + let span_cx = trace_cx.span().span_context().clone(); - let span_builder = if let Some(packet) = from { - let propagator = TraceContextPropagator::new(); - let trace_cx = propagator.extract(packet); - let span_cx = trace_cx.span().span_context().clone(); + span.add_link(span_cx, Vec::new()); +} - tracer - .span_builder(span_name) - .with_links(vec![Link::with_context(span_cx)]) - } else { - tracer.span_builder(span_name) - }; +pub(crate) fn get_mqtt_span(span_kind: SpanKind, topic: &str, payload_size: i64) -> BoxedSpan { + debug!("Starting MQTT span..."); + let tracer = global::tracer("iot3.core"); + + tracer + .span_builder("IoT3 Core MQTT Message") + .with_kind(span_kind) + .with_attributes(vec![ + KeyValue::new("iot3.core.mqtt.topic", topic.to_string()), + KeyValue::new("iot3.core.mqtt.payload_size", payload_size), + KeyValue::new("iot3.core.sdk_language", "rust"), + ]) + .start(&tracer) +} - span_builder.start(&tracer) +pub(crate) fn get_reception_mqtt_span(publish: &Publish) -> BoxedSpan { + let tracer = global::tracer("iot3.core"); + + let topic = from_utf8(&publish.topic).unwrap_or_default().to_string(); + let size = publish.payload.len(); + + let propagator = TraceContextPropagator::new(); + let trace_cx = propagator.extract(&ExtractWrapper(publish)); + let span_cx = trace_cx.span().span_context().clone(); + + tracer + .span_builder("IoT3 Core MQTT Message") + .with_kind(SpanKind::Consumer) + .with_attributes(vec![ + KeyValue::new("iot3.core.mqtt.topic", topic), + KeyValue::new("iot3.core.mqtt.payload_size", size as i64), + KeyValue::new("iot3.core.sdk_language", "rust"), + ]) + .with_links(vec![Link::with_context(span_cx)]) + .start(&tracer) +} + +struct ExtractWrapper<'p>(&'p Publish); +impl Extractor for ExtractWrapper<'_> { + fn get(&self, key: &str) -> Option<&str> { + if let Some(properties) = &self.0.properties { + properties + .user_properties + .iter() + .find(|(k, _)| key == k) + .map(|(_, value)| value.as_str()) + } else { + None + } + } + + fn keys(&self) -> Vec<&str> { + if let Some(properties) = &self.0.properties { + properties + .user_properties + .iter() + .map(|(key, _)| key.as_str()) + .collect::>() + } else { + Vec::new() + } + } }