Skip to content

Commit

Permalink
Merge pull request #182 from nbuffon/rust_mqtt_telemetry
Browse files Browse the repository at this point in the history
Rust  MQTT telemetry

Signed-off by: Nicolas Buffon <[email protected]>
  • Loading branch information
nbuffon authored Oct 31, 2024
2 parents 1e20195 + 20f183c commit a962a88
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 62 deletions.
6 changes: 6 additions & 0 deletions rust/examples/copycat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ use libits::transport::packet::Packet;
use log::{debug, info, warn};
use timer::MessageTimer;

#[cfg(feature = "telemetry")]
use libits::transport::telemetry::init_tracer;

pub struct CopyCat {
configuration: Arc<Configuration>,
item_receiver: Receiver<Packet<GeoTopic, Exchange>>,
Expand Down Expand Up @@ -233,6 +236,9 @@ async fn main() {
.set_credentials(username, password.unwrap_or(&String::new()));
}

#[cfg(feature = "telemetry")]
init_tracer(&configuration.telemetry, "copycat").expect("Failed to init telemetry");

pipeline::run::<CopyCat, NoContext, GeoTopic>(
Arc::new(configuration),
Arc::new(RwLock::new(context)),
Expand Down
25 changes: 15 additions & 10 deletions rust/examples/json_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libits::transport::mqtt::mqtt_client::MqttClient;
use libits::transport::mqtt::mqtt_router::MqttRouter;
use libits::transport::mqtt::topic::Topic;
use log::{error, info};
use rumqttc::v5::mqttbytes::v5::Publish;
use rumqttc::v5::mqttbytes::v5::{Publish, PublishProperties};

#[derive(Clone, Default, Debug, Hash, PartialEq, Eq)]
struct StrTopic {
Expand Down Expand Up @@ -116,19 +116,24 @@ async fn main() {

router.add_route(
StrTopic::from_str("#").unwrap(),
|publish: Publish| -> Option<Box<dyn Any + Send + 'static>> {
|publish: Publish| -> Option<(Box<dyn Any + 'static + Send>, PublishProperties)> {
if let Ok(payload) = std::str::from_utf8(publish.payload.as_ref()) {
if serde_json::from_str::<String>(payload).is_ok() {
Some(Box::new(Ok::<(), &'static str>(())))
Some((
Box::new(Ok::<(), &'static str>),
publish.properties.unwrap_or_default(),
))
} else {
Some(Box::new(Err::<(), &'static str>(
"Failed to parse payload as JSON",
)))
Some((
Box::new(Err::<(), &'static str>("Failed to parse payload as JSON")),
publish.properties.unwrap_or_default(),
))
}
} else {
Some(Box::new(Err::<(), &'static str>(
"Failed to parse payload as UTF-8",
)))
Some((
Box::new(Err::<(), &'static str>("Failed to parse payload as UTF-8")),
publish.properties.unwrap_or_default(),
))
}
},
);
Expand All @@ -142,7 +147,7 @@ async fn main() {
match event_loop.poll().await {
Ok(event) => {
if let Some((_, result)) = router.handle_event::<StrTopic>(event) {
let result = result.downcast::<Result<(), &'static str>>();
let result = result.0.downcast::<Result<(), &'static str>>();
if result.is_ok() {
json += 1;
}
Expand Down
72 changes: 48 additions & 24 deletions rust/examples/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use flexi_logger::{with_thread, Cleanup, Criterion, FileSpec, Logger, Naming, Wr
use ini::Ini;
use log::{info, warn};
use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
use opentelemetry::trace::{mark_span_as_active, TraceContextExt};
use opentelemetry::trace::{mark_span_as_active, SpanKind, TraceContextExt};
use opentelemetry::{global, Context};
use opentelemetry_sdk::propagation::TraceContextPropagator;

Expand Down Expand Up @@ -127,37 +127,61 @@ async fn main() {
init_tracer(&configuration.telemetry, "iot3").expect("Failed to configure telemetry");

info!("Send a trace with a single span 'ping' root span");
let ping_data = execute_in_span(TRACER_NAME, "example/ping", None::<&Data>, || {
let context = Context::current();
trace_span_context_info!("└─ Ping", context);
let ping_data = execute_in_span(
TRACER_NAME,
"example/ping",
Some(SpanKind::Producer),
None::<&Data>,
|| {
let context = Context::current();
trace_span_context_info!("└─ Ping", context);

let mut data = Data::default();
let mut data = Data::default();

let propagator = TraceContextPropagator::new();
propagator.inject(&mut data);
let propagator = TraceContextPropagator::new();
propagator.inject(&mut data);

data
});
data
},
);

info!("Send a trace with a single span 'pong' root span linked with the previous one 'ping'");
execute_in_span(TRACER_NAME, "example/pong", Some(&ping_data), || {
let context = Context::current();
trace_span_context_info!("└─ Pong", context);
});
execute_in_span(
TRACER_NAME,
"example/pong",
Some(SpanKind::Consumer),
Some(&ping_data),
|| {
let context = Context::current();
trace_span_context_info!("└─ Pong", context);
},
);

info!("Send a single trace with two spans");
execute_in_span(TRACER_NAME, "example/nested_root", None::<&Data>, || {
let context = Context::current();
trace_span_context_info!("└─ Root", context);

execute_in_span(TRACER_NAME, "example/nested_child", None::<&Data>, || {
execute_in_span(
TRACER_NAME,
"example/nested_root",
None,
None::<&Data>,
|| {
let context = Context::current();
trace_span_context_info!(" └─ Child", context);
})
});
trace_span_context_info!("└─ Root", context);

execute_in_span(
TRACER_NAME,
"example/nested_child",
None,
None::<&Data>,
|| {
let context = Context::current();
trace_span_context_info!(" └─ Child", context);
},
)
},
);

info!("Send a trace with 3 spans from 3 threads");
let root_span = get_span(TRACER_NAME, "main_thread", None::<&Data>);
let root_span = get_span(TRACER_NAME, "main_thread", None);
let guard = mark_span_as_active(root_span);
let cxt = Context::current();
trace_span_context_info!("└─ Main thread", &cxt);
Expand All @@ -171,7 +195,7 @@ async fn main() {
let cxt: Context = rx.recv().unwrap();
let _guard = cxt.attach();

execute_in_span(TRACER_NAME, "listener_thread", None::<&Data>, || {
execute_in_span(TRACER_NAME, "listener_thread", None, None::<&Data>, || {
let cxt = Context::current();
trace_span_context_info!(" └─ Listener thread", cxt);
});
Expand All @@ -189,7 +213,7 @@ async fn main() {
let cxt: Context = rx.recv().unwrap();
let _guard = cxt.clone().attach();

execute_in_span(TRACER_NAME, "sender_thread", None::<&Data>, || {
execute_in_span(TRACER_NAME, "sender_thread", None, None::<&Data>, || {
let inner_cxt = Context::current();
trace_span_context_info!(" ├─ Sender thread", inner_cxt);
listener_tx
Expand Down
12 changes: 5 additions & 7 deletions rust/src/client/application/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::exchange::Exchange;
use crate::monitor::trace_exchange;
use crate::transport::mqtt::mqtt_client::{listen, MqttClient};
use crate::transport::mqtt::mqtt_router;
use crate::transport::mqtt::mqtt_router::BoxedReception;
use crate::transport::mqtt::topic::Topic;
use crate::transport::packet::Packet;
use crate::transport::payload::Payload;
Expand All @@ -26,7 +27,6 @@ use log::{debug, error, info, trace, warn};
use rumqttc::v5::mqttbytes::v5::PublishProperties;
use rumqttc::v5::{Event, EventLoop};
use serde::de::DeserializeOwned;
use std::any::Any;
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::JoinHandle;
Expand Down Expand Up @@ -370,14 +370,14 @@ where

for event in event_receiver {
match router.handle_event(event) {
Some((topic, reception)) => {
Some((topic, (reception, properties))) => {
// TODO use the From Trait
if reception.is::<Exchange>() {
if let Ok(exchange) = reception.downcast::<Exchange>() {
let item = Packet {
topic,
payload: *exchange,
properties: PublishProperties::default(),
properties,
};
//assumed clone, we send to 2 channels
match monitoring_sender.send((item.clone(), None)) {
Expand Down Expand Up @@ -424,9 +424,7 @@ where
)
}

fn deserialize<T>(
publish: rumqttc::v5::mqttbytes::v5::Publish,
) -> Option<Box<dyn Any + 'static + Send>>
fn deserialize<T>(publish: rumqttc::v5::mqttbytes::v5::Publish) -> Option<BoxedReception>
where
T: DeserializeOwned + Payload + 'static + Send,
{
Expand All @@ -437,7 +435,7 @@ where
match serde_json::from_str::<T>(message_str) {
Ok(message) => {
trace!("message parsed");
return Some(Box::new(message));
return Some((Box::new(message), publish.properties.unwrap_or_default()));
}
Err(e) => warn!("parse error({}) on: {}", e, message_str),
}
Expand Down
35 changes: 35 additions & 0 deletions rust/src/transport/mqtt/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ use rumqttc::v5::mqttbytes::v5::Filter;
use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::{AsyncClient, Event, EventLoop, MqttOptions};

#[cfg(feature = "telemetry")]
use {
crate::transport::telemetry::get_mqtt_span,
opentelemetry::propagation::TextMapPropagator,
opentelemetry::trace::{SpanKind, TraceContextExt},
opentelemetry::Context,
opentelemetry_sdk::propagation::TraceContextPropagator,
};

pub struct MqttClient {
client: AsyncClient,
}
Expand Down Expand Up @@ -48,7 +57,33 @@ impl<'client> MqttClient {
};
}

#[cfg(feature = "telemetry")]
pub async fn publish<T: Topic, P: Payload>(&self, mut packet: Packet<T, P>) {
debug!("Publish with context");
let payload = serde_json::to_string(&packet.payload).unwrap();

let span = get_mqtt_span(
SpanKind::Producer,
&packet.topic.to_string(),
payload.as_bytes().len() as i64,
);

let cx = Context::current().with_span(span);
let _guard = cx.attach();

let propagator = TraceContextPropagator::new();
propagator.inject(&mut packet);

self.do_publish(packet).await
}

#[cfg(not(feature = "telemetry"))]
pub async fn publish<T: Topic, P: Payload>(&self, packet: Packet<T, P>) {
debug!("Publish without context");
self.do_publish(packet).await
}

async fn do_publish<T: Topic, P: Payload>(&self, packet: Packet<T, P>) {
let payload = serde_json::to_string(&packet.payload).unwrap();

match self
Expand Down
10 changes: 8 additions & 2 deletions rust/src/transport/mqtt/mqtt_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
use std::collections::HashMap;

use log::{error, info, trace, warn};
use rumqttc::v5::mqttbytes::v5::Publish;
use rumqttc::v5::mqttbytes::v5::{Publish, PublishProperties};
use rumqttc::v5::{Event, Incoming};

use crate::transport::mqtt::topic::Topic;
use std::any::{type_name, Any};
use std::str::from_utf8;

type BoxedReception = Box<dyn Any + 'static + Send>;
pub type BoxedReception = (Box<dyn Any + 'static + Send>, PublishProperties);

type BoxedCallback = Box<dyn Fn(Publish) -> Option<BoxedReception>>;

#[cfg(feature = "telemetry")]
use crate::transport::telemetry::get_reception_mqtt_span;

#[derive(Default)]
pub struct MqttRouter {
route_map: HashMap<String, BoxedCallback>,
Expand All @@ -44,6 +47,9 @@ impl MqttRouter {
Incoming::Publish(publish) => {
match from_utf8(&publish.topic) {
Ok(str_topic) => {
#[cfg(feature = "telemetry")]
let _span = get_reception_mqtt_span(&publish);

trace!(
"Publish received for the packet {:?} on the topic {}",
publish.pkid,
Expand Down
Loading

0 comments on commit a962a88

Please sign in to comment.