Skip to content

Commit

Permalink
feat: on-device bus with rumqttd and data joiner on uplink (#348)
Browse files Browse the repository at this point in the history
* doc: example config

* feat: config types to deserialize example

* feat: service bus traits

* feat: on device bus and data joiner

* feat: simulator as a service on bus

* style: fmt toml

* remove dbg

* fix: spawn blocking thread task

* fix: routing setup

* refactor: pass whole json

* feat: `select_ fields = "all"`

* feat: instant data push

* fix: deserialization error message

* fix: actually push data instantly

* fix: handle timestamp and sequence from incoming stream

* remove dbg

* ci: clippy suggestion

* Bus at same level as TcpJson

* test: data and status

* Make structs PartialEq

* fix: default subscribe to action_status

* refactor: separate out async part

* test: merge streams, but not data

* test: merge streams with data

* test: select from a stream

* test: select from two streams

* test: null after timeout

* test: run them together

* test: previous value after flush

* doc: describe topic structure

* wait 2s for output

* test: use different port

* test: await data with no push and change to push with qos 0

* test: similar inputs

* test: fix port

* doc: describe tests

* test: renaming fields

* test: publish back on bus

* doc: testing publish back on bus

* test: move to tests/bus.rs

* refactor: feature gate bus

* refactor: use `LinkRx::next()`

* chore: use rumqtt main

* ci: test all features

* refactor: split joins out

* refactor: redo without trait

* fix: use the correct topics

* feat: expose rumqttd console

* feat: no route for unsubscribed actions

* test: fail unregistered action

* refactor: test/bus.rs

* fix: `/subscriptions` might return empty list

* fix: directly push `on_new_data` sequence numbers

* test: fix port occupied

* fix: `NoRoute` if no subscription only

* test: addr occupied

* style: note the unit in field name

* test: fix joins

* Revert "ci: test all features"

This reverts commit 650c871.

* Revert "feat: simulator as a service on bus"

This reverts commit f8885ab.

* style: name makes sense

* style: name types

* fix: include bus config
  • Loading branch information
Devdutt Shenoi authored Sep 16, 2024
1 parent 0e9e6a9 commit ca07900
Show file tree
Hide file tree
Showing 14 changed files with 2,121 additions and 33 deletions.
611 changes: 588 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

49 changes: 48 additions & 1 deletion configs/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,52 @@
"port": 3333
},

"device_shadow": { "interval": 30 }
"device_shadow": { "interval": 30 },

"bus": {
"port": 1883,
"console_port": 3030,
"joins": {
"output_streams": [
{
"name": "location",
"construct_from": [
{
"input_stream": "gps",
"select_fields": ["latitude", "longitude"]
},
{ "input_stream": "altimeter", "select_fields": ["altitude"] }
],
"push_interval_s": 60,
"no_data_action": "null",
"publish_on_service_bus": true
},
{
"name": "device_shadow",
"construct_from": [
{ "input_stream": "device_shadow", "select_fields": "all" }
],
"push_interval_s": "on_new_data",
"no_data_action": "previous_value",
"publish_on_service_bus": true
},
{
"name": "example",
"construct_from": [
{
"input_stream": "stream_one",
"select_fields": ["field_x", "field_y"]
},
{
"input_stream": "stream_two",
"select_fields": [{ "field_z": "field_x" }]
}
],
"push_interval_s": 120,
"no_data_action": "previous_value",
"publish_on_service_bus": false
}
]
}
}
}
32 changes: 32 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,35 @@ port = 3333
# - interval: time in seconds after which device-shadow is pushed onto platform, default value is 60
[device_shadow]
interval = 30

# The on-device service bus exposes an MQTT interface for inter-process communication
# Required Parameters
# - port: the port on which the broker/server is listening for incoming service connections
# - joins: description of how uplink will join incoming data and push outgoing data streams
# - console_port: port on which the rumqttd console API is exposed
[bus]
port = 1883
console_port = 3030
joins = { output_streams = [
{ name = "location", construct_from = [
{ input_stream = "gps", select_fields = [
"latitude",
"longitude",
] },
{ input_stream = "altimeter", select_fields = [
"altitude",
] },
], push_interval_s = 60, no_data_action = "null", publish_on_service_bus = true },
{ name = "device_shadow", construct_from = [
{ input_stream = "device_shadow", select_fields = "all" },
], push_interval_s = "on_new_data", no_data_action = "previous_value", publish_on_service_bus = true },
{ name = "example", construct_from = [
{ input_stream = "stream_one", select_fields = [
"field_x",
"field_y",
] },
{ input_stream = "stream_two", select_fields = [
{ "field_z" = "field_x" },
] },
], push_interval_s = 120, no_data_action = "previous_value", publish_on_service_bus = false },
] }
10 changes: 10 additions & 0 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
bytes = { workspace = true }
flume = { workspace = true }
rumqttc = { workspace = true }
rumqttd = { git = "https://github.com/bytebeamio/rumqtt", default-features = false, optional = true, branch = "console-response" }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = "3.3.0"
Expand Down Expand Up @@ -76,3 +77,12 @@ vergen = { version = "7", features = ["git", "build", "time"] }

[dev-dependencies]
tempdir = { workspace = true }

[features]
default = ["bus"]
bus = ["rumqttd"]

[[test]]
name = "joins"
path = "tests/joins.rs"
required-features = ["bus"]
4 changes: 2 additions & 2 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::clock;
/// On the Bytebeam platform, an Action is how beamd and through it,
/// the end-user, can communicate the tasks they want to perform on
/// said device, in this case, uplink.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Action {
// action id
#[serde(alias = "id")]
Expand All @@ -18,7 +18,7 @@ pub struct Action {
pub payload: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ActionResponse {
#[serde(alias = "id")]
pub action_id: String,
Expand Down
17 changes: 16 additions & 1 deletion uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,18 @@ impl ActionsBridge {
/// Handle received actions
fn try_route_action(&mut self, action: Action) -> Result<(), Error> {
let Some(route) = self.action_routes.get(&action.name) else {
return Err(Error::NoRoute(action.name));
// actions that can't be routed should go onto the broker if enabled
let deadline = self
.action_routes
.get("*")
.ok_or_else(|| Error::NoRoute(action.name.clone()))?
.try_send(action.clone())
.map_err(|_| Error::UnresponsiveReceiver)?;
debug!("Action routed to broker");

self.current_action = Some(CurrentAction::new(action, deadline));

return Ok(());
};

let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
Expand Down Expand Up @@ -567,6 +578,10 @@ impl StatusTx {
pub async fn send_action_response(&self, response: ActionResponse) {
self.inner.send_async(response).await.unwrap()
}

pub fn send_action_response_sync(&self, response: ActionResponse) {
self.inner.send(response).unwrap()
}
}

/// Handle to send control messages to action lane
Expand Down
6 changes: 5 additions & 1 deletion uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait Package: Send + Debug {

// TODO Don't do any deserialization on payload. Read it a Vec<u8> which is in turn a json
// TODO which cloud will double deserialize (Batch 1st and messages next)
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Payload {
#[serde(skip_serializing)]
pub stream: String,
Expand Down Expand Up @@ -148,4 +148,8 @@ impl BridgeTx {
pub async fn send_action_response(&self, response: ActionResponse) {
self.status_tx.send_action_response(response).await
}

pub fn send_action_response_sync(&self, response: ActionResponse) {
self.status_tx.send_action_response_sync(response)
}
}
201 changes: 201 additions & 0 deletions uplink/src/collector/bus/joins.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use std::collections::HashMap;

use flume::{bounded, Receiver, Sender};
use log::{error, warn};
use serde_json::{json, Map, Value};
use tokio::{select, task::JoinSet, time::interval};

use crate::{
base::{
bridge::{BridgeTx, Payload},
clock,
},
config::{Field, JoinConfig, NoDataAction, PushInterval, SelectConfig},
};

type Json = Map<String, Value>;
type InputStream = String;
type FieldName = String;

pub struct Router {
map: HashMap<InputStream, Vec<Sender<(InputStream, Json)>>>,
pub tasks: JoinSet<()>,
}

impl Router {
pub async fn new(
configs: Vec<JoinConfig>,
bridge_tx: BridgeTx,
back_tx: Sender<Payload>,
) -> Self {
let mut map: HashMap<InputStream, Vec<Sender<(InputStream, Json)>>> = HashMap::new();
let mut tasks = JoinSet::new();
for config in configs {
let (tx, rx) = bounded(1);
let mut fields = HashMap::new();
for stream in &config.construct_from {
if let SelectConfig::Fields(selected_fields) = &stream.select_fields {
let renames: &mut HashMap<FieldName, Field> =
fields.entry(stream.input_stream.to_owned()).or_default();
for field in selected_fields {
renames.insert(field.original.to_owned(), field.to_owned());
}
}
if let Some(senders) = map.get_mut(&stream.input_stream) {
senders.push(tx.clone());
continue;
}
map.insert(stream.input_stream.to_owned(), vec![tx.clone()]);
}
let joiner = Joiner {
rx,
joined: Json::new(),
config,
tx: bridge_tx.clone(),
fields,
back_tx: back_tx.clone(),
sequence: 0,
};
tasks.spawn(joiner.start());
}

Router { map, tasks }
}

pub async fn map(&mut self, input_stream: InputStream, json: Json) {
let Some(iter) = self.map.get(&input_stream) else { return };
for tx in iter {
_ = tx.send_async((input_stream.clone(), json.clone())).await;
}
}
}

struct Joiner {
rx: Receiver<(InputStream, Json)>,
joined: Json,
config: JoinConfig,
fields: HashMap<InputStream, HashMap<FieldName, Field>>,
tx: BridgeTx,
back_tx: Sender<Payload>,
sequence: u32,
}

impl Joiner {
async fn start(mut self) {
let PushInterval::OnTimeout(period) = self.config.push_interval_s else {
loop {
match self.rx.recv_async().await {
Ok((input_stream, json)) => self.update(input_stream, json),
Err(e) => {
error!("{e}");
return;
}
}
self.send_data().await;
}
};
let mut ticker = interval(period);
loop {
select! {
r = self.rx.recv_async() => {
match r {
Ok((input_stream, json)) => self.update(input_stream, json),
Err(e) => {
error!("{e}");
return;
}
}
}

_ = ticker.tick() => {
self.send_data().await
}
}
}
}

// Use data sequence and timestamp if data is to be pushed instantly
fn is_insertable(&self, key: &str) -> bool {
match key {
"timestamp" | "sequence" => self.config.push_interval_s == PushInterval::OnNewData,
_ => true,
}
}

fn update(&mut self, input_stream: InputStream, json: Json) {
if let Some(map) = self.fields.get(&input_stream) {
for (mut key, value) in json {
// drop unenumerated keys from json
let Some(field) = map.get(&key) else { continue };
if let Some(name) = &field.renamed {
name.clone_into(&mut key);
}

if self.is_insertable(&key) {
self.joined.insert(key, value);
}
}
} else {
// Select All if no mapping exists
for (key, value) in json {
if self.is_insertable(&key) {
self.joined.insert(key, value);
}
}
}
}

async fn send_data(&mut self) {
if self.joined.is_empty() {
return;
}

// timestamp and sequence values should be passed as is for instant push, else use generated values
let timestamp = self
.joined
.remove("timestamp")
.and_then(|value| {
value.as_i64().map_or_else(
|| {
warn!(
"timestamp: {value:?} has unexpected type; defaulting to system time"
);
None
},
|v| Some(v as u64),
)
})
.unwrap_or_else(|| clock() as u64);
let sequence = self
.joined
.remove("sequence")
.and_then(|value| {
value.as_i64().map_or_else(
|| {
warn!(
"sequence: {value:?} has unexpected type; defaulting to internal sequence"
);
None
},
|v| Some(v as u32),
)
})
.unwrap_or_else(|| {
self.sequence += 1;
self.sequence
});
let payload = Payload {
stream: self.config.name.clone(),
sequence,
timestamp,
payload: json!(self.joined),
};
if self.config.publish_on_service_bus {
_ = self.back_tx.send_async(payload.clone()).await;
}
self.tx.send_payload(payload).await;
if self.config.no_data_action == NoDataAction::Null {
self.joined.clear();
}
}
}
Loading

0 comments on commit ca07900

Please sign in to comment.