Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust MQTT telemetry #182

Merged
merged 7 commits into from
Oct 31, 2024
Merged

Conversation

nbuffon
Copy link
Member

@nbuffon nbuffon commented Oct 24, 2024

What's new

  • Trace MQTT message reception
  • Trace MQTT message publishing
  • Set the W3C context as MQTTv5 user property in published message
  • Extract W3C context from received message if any

Closes #124

How to test

  1. Start a Jaeger/OTLP collector image

    docker run --rm --name jaeger \
     -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
     -p 6831:6831/udp \
     -p 6832:6832/udp \
     -p 5778:5778 \
     -p 16686:16686 \
     -p 4317:4317 \
     -p 4318:4318 \
     -p 14250:14250 \
     -p 14268:14268 \
     -p 14269:14269 \
     -p 9411:9411 -d\
     jaegertracing/all-in-one:1.58
    
  2. Create the test script

    wget https://github.com/user-attachments/files/17507082/test_copycat.txt &&
      mv test_copycat.txt /tmp/test_copycat.sh &&
      chmod +x /tmp/test_copycat.sh
    

    (or download it as txt test_copycat.txt)

  3. Listen to MQTT messages with MQTTv5 properties

    mosquitto_sub -h test.mosquitto.org -p 1884 -u rw -P readwrite -v -t "default/+/v2x/cam/#" -V mqttv5 -F %j | jq -C
    
  4. Edit the examples/config.ini file to fill the telemetry section with the following content

    [telemetry]
    host=localhost
    port=4318
  5. Start the copycat example with the telemetry feature

    cargo run --example copycat --features geo_routing --features telemetry
    
  6. Launch the test script

    /tmp/test_copycat.sh
    

    => Several reception must be logged

    DEBUG [copycat] item received: Packet { topic: GeoTopic { prefix: "default", queue: Out, suffix: "v2x", message_type: CAM, uuid: "ora_car_testcon2", geo_extension: Quadkey { tiles: [Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two, Two] } }, payload: Exchange { type_field: "cam", origin: "self", version: "1.2.0", source_uuid: "ora_car_2222", timestamp: 1618591358923, path: [], message: CAM(CooperativeAwarenessMessage { protocol_version: 2, station_id: 1077952580, generation_delta_time: 31571, basic_container: BasicContainer { station_type: Some(5), reference_position: ReferencePosition { latitude: 486230934, longitude: 22419064, altitude: 10000 }, confidence: Some(PositionConfidence { position_confidence_ellipse: Some(PositionConfidenceEllipse { semi_major_confidence: Some(0), semi_minor_confidence: Some(0), semi_major_orientation: Some(0) }), altitude: Some(1) }) }, high_frequency_container: HighFrequencyContainer { heading: Some(2375), speed: Some(137), drive_direction: Some(0), vehicle_length: Some(47), vehicle_width: Some(24), curvature: Some(0), curvature_calculation_mode: None, longitudinal_acceleration: Some(1), yaw_rate: Some(-27), acceleration_control: None, lane_position: Some(0), lateral_acceleration: Some(0), vertical_acceleration: Some(0), confidence: Some(HighFrequencyConfidence { heading: Some(2), speed: Some(3), vehicle_length: Some(1), yaw_rate: Some(6), longitudinal_acceleration: Some(5), curvature: Some(7), lateral_acceleration: None, vertical_acceleration: None }) }, low_frequency_container: Some(LowFrequencyContainer { vehicle_role: Some(0), exterior_lights: "00000000", path_history: [PathHistory { path_position: PathPosition { delta_latitude: None, delta_longitude: None, delta_altitude: None }, path_delta_time: Some(1) }] }) }) }, properties: PublishProperties { payload_format_indicator: None, message_expiry_interval: None, topic_alias: None, response_topic: None, correlation_data: None, user_properties: [], subscription_identifiers: [], content_type: None } }

    => One message publication must be logged

    DEBUG [libits::client::application::pipeline] Packet to publish...
    INFO [libits::client::application::pipeline] Cannot trace exchange, missing gateway component name in node configuration
    DEBUG [libits::client::application::pipeline] Packet published!
    DEBUG [rumqttc::v5::state] Publish. Topic = default/inQueue/v2x/cam/com_orange_its-client_10000/2/2/2/2/2/2/2/2/2/2/2/2/2/ 2/2/2/2/2/2, Pkid = 2, Payload Size = 962

    The published message should be carrying the W3C context (logged by mosquitto_sub command)

    {
      "tst": "2024-10-24T14:44:41.462008Z+0200",
      "topic": "default/inQueue/v2x/cam/com_orange_its-client_10000/2/2/2/2/2/2/2/2/2/2/2/2/2/2/2/2/2/2/2",
      "qos": 0,
      "retain": 0,
      "payloadlen": 962,
      "properties": {
        "user-properties": {
          "traceparent": "00-083c33ecb76bdd0f2d4b4b60e167f878-d5bb5974d8e3116b-01",
          "tracestate": ""
        }
      },
      "payload": "{\"type\":\"cam\",\"origin\":\"mec_application\",\"version\":\"1.2.0\",\"source_uuid\":\"com_orange_its-client_10000\",\"timestamp\":1729773881425,\"message\": {\"protocol_version\":2,\"station_id\":1077952580,\"generation_delta_time\":31571,\"basic_container\":{\"station_type\":5,\"reference_position\":{\"latitude\":486230934, \"longitude\":22419064,\"altitude\":10000},\"confidence\":{\"position_confidence_ellipse\":{\"semi_major_confidence\":0,\"semi_minor_confidence\":0, \"semi_major_orientation\":0},\"altitude\":1}},\"high_frequency_container\":{\"heading\":2375,\"speed\":137,\"drive_direction\":0,\"vehicle_length\":47,\"vehicle_width\":24, \"curvature\":0,\"longitudinal_acceleration\":1,\"yaw_rate\":-27,\"lane_position\":0,\"lateral_acceleration\":0,\"vertical_acceleration\":0,\"confidence\":{\"heading\":2, \"speed\":3,\"vehicle_length\":1,\"yaw_rate\":6,\"longitudinal_acceleration\":5,\"curvature\":7}},\"low_frequency_container\":{\"vehicle_role\":0, \"exterior_lights\":\"00000000\",\"path_history\":[{\"path_position\":{},\"path_delta_time\":1}]}}}"
    }
    
  7. The test scripts did prompt asking for a trace parent, answer it using the published message's one

  8. Check the spans content on the local jaeger interface
    => You must see 8 traces
    => The 7th (the second most recent) must have a link to another span

Generics management where a pain in some cases when you just don't have
another context to link to the span you want, as the call would require
to specify the optional type you pass (e.g. `None::<MyType<P,A,I,N>>')

Remove the optional parameter from the `get_span` function and add
another one `get_linked_span` which as a non optional argument where the
context can be extracted from

Signed-off-by: Nicolas Buffon <[email protected]>
Signed-off-by: Nicolas Buffon <[email protected]>
@nbuffon nbuffon added the Rust Rust code label Oct 24, 2024
@nbuffon nbuffon self-assigned this Oct 24, 2024
@nbuffon nbuffon force-pushed the rust_mqtt_telemetry branch from 262ac32 to 5751495 Compare October 24, 2024 08:56
Add the OTLP W3C context as user property in MQTT message on publish

Signed-off-by: Nicolas Buffon <[email protected]>
@nbuffon nbuffon force-pushed the rust_mqtt_telemetry branch from 5751495 to bb116dd Compare October 24, 2024 15:00
@nbuffon nbuffon changed the title Rust mqtt telemetry Rust MQTT telemetry Oct 25, 2024
@nbuffon nbuffon requested a review from tigroo October 25, 2024 07:45
@ymorin-orange ymorin-orange self-requested a review October 29, 2024 10:16
Copy link
Collaborator

@tigroo tigroo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor comment.

No test.

pub(crate) fn get_reception_mqtt_span(publish: &Publish) -> BoxedSpan {
let tracer = global::tracer("iot3.core");

let topic = from_utf8(&publish.topic.to_vec())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let topic = from_utf8(&publish.topic)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member

@ymorin-orange ymorin-orange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Tests all working as described.

One comment, though: copycat reports several informational messages that look like errors:

INFO [libits::client::application::pipeline] Cannot trace exchange, missing gateway component name in node configuration

Message reception and emission are automatically traced by the SDK if
the feature is enabled
This can now be tested using the copycat example by initializing the
tracer in the example

Signed-off-by: Nicolas Buffon <[email protected]>
@nbuffon
Copy link
Member Author

nbuffon commented Oct 31, 2024

  • Tests all working as described.

One comment, though: copycat reports several informational messages that look like errors:

INFO [libits::client::application::pipeline] Cannot trace exchange, missing gateway component name in node configuration

This is because of the old KPI logging system which requires the Information message to be received once first, and test.mosquitto.org obviously does not send one
As KPI computation is planned to now be made using telemetry traces these function calls can be removed, I opened a new issue to do so (#199)

@nbuffon nbuffon merged commit a962a88 into Orange-OpenSource:master Oct 31, 2024
36 checks passed
@nbuffon nbuffon deleted the rust_mqtt_telemetry branch October 31, 2024 07:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Rust Rust code
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Send messages to telemetry service service from application for MQTT exchanges
3 participants