From ca079009181f529769dddb1310811ba8fc11fdb6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 16 Sep 2024 15:14:56 +0530 Subject: [PATCH] feat: on-device bus with `rumqttd` and data joiner on uplink (#348) * doc: example config * feat: config types to deserialize example * feat: service bus traits * feat: on device bus and data joiner * feat: simulator as a service on bus * style: fmt toml * remove dbg * fix: spawn blocking thread task * fix: routing setup * refactor: pass whole json * feat: `select_ fields = "all"` * feat: instant data push * fix: deserialization error message * fix: actually push data instantly * fix: handle timestamp and sequence from incoming stream * remove dbg * ci: clippy suggestion * Bus at same level as TcpJson * test: data and status * Make structs PartialEq * fix: default subscribe to action_status * refactor: separate out async part * test: merge streams, but not data * test: merge streams with data * test: select from a stream * test: select from two streams * test: null after timeout * test: run them together * test: previous value after flush * doc: describe topic structure * wait 2s for output * test: use different port * test: await data with no push and change to push with qos 0 * test: similar inputs * test: fix port * doc: describe tests * test: renaming fields * test: publish back on bus * doc: testing publish back on bus * test: move to tests/bus.rs * refactor: feature gate bus * refactor: use `LinkRx::next()` * chore: use rumqtt main * ci: test all features * refactor: split joins out * refactor: redo without trait * fix: use the correct topics * feat: expose rumqttd console * feat: no route for unsubscribed actions * test: fail unregistered action * refactor: test/bus.rs * fix: `/subscriptions` might return empty list * fix: directly push `on_new_data` sequence numbers * test: fix port occupied * fix: `NoRoute` if no subscription only * test: addr occupied * style: note the unit in field name * test: fix joins * Revert "ci: test all features" This reverts commit 650c8718228293de1189d74c3ff854c72338efe3. * Revert "feat: simulator as a service on bus" This reverts commit f8885abb32db9cdc54e890879efdb35810b7cb0e. * style: name makes sense * style: name types * fix: include bus config --- Cargo.lock | 611 ++++++++++++++++++++++- configs/config.json | 49 +- configs/config.toml | 32 ++ uplink/Cargo.toml | 10 + uplink/src/base/actions.rs | 4 +- uplink/src/base/bridge/actions_lane.rs | 17 +- uplink/src/base/bridge/mod.rs | 6 +- uplink/src/collector/bus/joins.rs | 201 ++++++++ uplink/src/collector/bus/mod.rs | 222 +++++++++ uplink/src/collector/mod.rs | 2 + uplink/src/config.rs | 228 ++++++++- uplink/src/main.rs | 26 +- uplink/tests/bus.rs | 91 ++++ uplink/tests/joins.rs | 655 +++++++++++++++++++++++++ 14 files changed, 2121 insertions(+), 33 deletions(-) create mode 100644 uplink/src/collector/bus/joins.rs create mode 100644 uplink/src/collector/bus/mod.rs create mode 100644 uplink/tests/bus.rs create mode 100644 uplink/tests/joins.rs diff --git a/Cargo.lock b/Cargo.lock index b23a4baa..cf0697b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,12 +24,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2797f92fb146a37560af914b5d5328f8330d6a39b6eaf00f5b184ac73c0c81e7" dependencies = [ "cc", - "clap", + "clap 2.34.0", "libc", "rustc_version 0.2.3", "xdg", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -63,6 +75,55 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" + +[[package]] +name = "anstyle-parse" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.89" @@ -118,13 +179,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes 1.7.1", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.30", "itoa", "matchit", "memchr", @@ -136,13 +197,47 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "tokio 1.40.0", "tower", "tower-layer", "tower-service", ] +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "bytes 1.7.1", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite 0.2.14", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio 1.40.0", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -152,14 +247,35 @@ dependencies = [ "async-trait", "bytes 1.7.1", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes 1.7.1", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite 0.2.14", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -235,6 +351,9 @@ name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +dependencies = [ + "serde", +] [[package]] name = "block-buffer" @@ -277,6 +396,9 @@ name = "bytes" version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -329,6 +451,52 @@ dependencies = [ "vec_map", ] +[[package]] +name = "clap" +version = "4.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + +[[package]] +name = "colorchoice" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" + [[package]] name = "config" version = "0.13.4" @@ -344,6 +512,26 @@ dependencies = [ "toml 0.5.11", ] +[[package]] +name = "config" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be" +dependencies = [ + "async-trait", + "convert_case", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml 0.8.19", + "yaml-rust", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -370,6 +558,35 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -445,6 +662,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -537,6 +760,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "dummy" version = "0.7.0" @@ -963,7 +1195,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.5.0", "slab", "tokio 1.40.0", @@ -977,11 +1209,20 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] [[package]] name = "heck" @@ -992,6 +1233,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1024,6 +1271,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes 1.7.1", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1031,7 +1289,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes 1.7.1", - "http", + "http 0.2.12", + "pin-project-lite 0.2.14", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes 1.7.1", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes 1.7.1", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite 0.2.14", ] @@ -1073,8 +1354,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1086,6 +1367,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes 1.7.1", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite 0.2.14", + "smallvec", + "tokio 1.40.0", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1093,13 +1393,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.30", "rustls 0.21.12", "tokio 1.40.0", "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" +dependencies = [ + "bytes 1.7.1", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.4.1", + "pin-project-lite 0.2.14", + "tokio 1.40.0", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1194,6 +1509,12 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itoa" version = "1.0.11" @@ -1218,6 +1539,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -1284,6 +1616,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -1342,6 +1680,48 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8452105ba047068f40ff7093dd1d9da90898e63dd61736462e9cdda6a90ad3c3" +[[package]] +name = "metrics" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bf4e7146e30ad172c42c39b3246864bd2d3c6396780711a1baf749cfe423e21" +dependencies = [ + "base64 0.21.7", + "hyper 0.14.30", + "indexmap 2.5.0", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio 1.40.0", +] + +[[package]] +name = "metrics-util" +version = "0.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.14.5", + "metrics", + "num_cpus", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1669,6 +2049,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-multimap" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" +dependencies = [ + "dlv-list", + "hashbrown 0.13.2", +] + [[package]] name = "overload" version = "0.1.1" @@ -1719,6 +2109,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c73c26c01b8c87956cea613c907c9d6ecffd8d18a2a5908e5de0adfaa185cea" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "664d22978e2815783adbdd2c588b455b1bd625299ce36b2a99881ac9627e6d8d" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2d5487022d5d33f4c30d91c22afa240ce2a644e87fe08caad974d4eab6badbe" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "pest_meta" +version = "2.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0091754bbd0ea592c4deb3a122ce8ecbb0753b738aa82bc055fcc2eccc8d8174" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "0.4.30" @@ -1846,6 +2281,12 @@ dependencies = [ "pnet_macros_support", ] +[[package]] +name = "portable-atomic" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" + [[package]] name = "portable-pty" version = "0.3.1" @@ -1955,6 +2396,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2069,6 +2525,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2195,9 +2660,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.30", "hyper-rustls", "ipnet", "js-sys", @@ -2211,7 +2676,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio 1.40.0", "tokio-rustls 0.24.1", @@ -2256,6 +2721,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ron" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" +dependencies = [ + "base64 0.21.7", + "bitflags 2.6.0", + "serde", + "serde_derive", +] + [[package]] name = "rsa" version = "0.9.6" @@ -2297,6 +2774,41 @@ dependencies = [ "tokio-util 0.7.12", ] +[[package]] +name = "rumqttd" +version = "0.19.0" +source = "git+https://github.com/bytebeamio/rumqtt?branch=console-response#c2b925c1b94eaf6f4026bc5ba38dd3ff8a3b53f4" +dependencies = [ + "axum 0.7.5", + "bytes 1.7.1", + "clap 4.5.17", + "config 0.14.0", + "flume 0.11.0", + "metrics", + "metrics-exporter-prometheus", + "parking_lot", + "rand 0.8.5", + "serde", + "serde_json", + "slab", + "subtle", + "thiserror", + "tokio 1.40.0", + "tracing", + "tracing-subscriber", + "uuid", +] + +[[package]] +name = "rust-ini" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" +dependencies = [ + "cfg-if 1.0.0", + "ordered-multimap", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2778,6 +3290,12 @@ dependencies = [ "time", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.9" @@ -2866,7 +3384,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap", + "clap 2.34.0", "lazy_static", "structopt-derive", ] @@ -2877,7 +3395,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro-error", "proc-macro2", "quote", @@ -2934,6 +3452,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "sysinfo" version = "0.26.9" @@ -3125,6 +3649,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -3447,7 +3980,7 @@ dependencies = [ "base64 0.12.3", "byteorder", "bytes 0.5.6", - "http", + "http 0.2.12", "httparse", "input_buffer", "log", @@ -3531,6 +4064,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "uds_windows" version = "0.1.6" @@ -3592,9 +4131,9 @@ version = "2.14.1" dependencies = [ "anyhow", "async-trait", - "axum", + "axum 0.6.20", "bytes 1.7.1", - "config", + "config 0.13.4", "fake", "flume 0.10.14", "fs2", @@ -3610,6 +4149,7 @@ dependencies = [ "reqwest", "rsa", "rumqttc", + "rumqttd", "serde", "serde_json", "serde_with", @@ -3651,6 +4191,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "utils" version = "0.1.0" @@ -3665,6 +4211,16 @@ dependencies = [ "uplink", ] +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom 0.2.15", + "rand 0.8.5", +] + [[package]] name = "valuable" version = "0.1.0" @@ -4107,6 +4663,15 @@ version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213b7324336b53d2414b2db8537e56544d981803139155afa84f76eeebb7a546" +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/configs/config.json b/configs/config.json index 3984a4a9..8e4a6440 100644 --- a/configs/config.json +++ b/configs/config.json @@ -101,5 +101,52 @@ "port": 3333 }, - "device_shadow": { "interval": 30 } + "device_shadow": { "interval": 30 }, + + "bus": { + "port": 1883, + "console_port": 3030, + "joins": { + "output_streams": [ + { + "name": "location", + "construct_from": [ + { + "input_stream": "gps", + "select_fields": ["latitude", "longitude"] + }, + { "input_stream": "altimeter", "select_fields": ["altitude"] } + ], + "push_interval_s": 60, + "no_data_action": "null", + "publish_on_service_bus": true + }, + { + "name": "device_shadow", + "construct_from": [ + { "input_stream": "device_shadow", "select_fields": "all" } + ], + "push_interval_s": "on_new_data", + "no_data_action": "previous_value", + "publish_on_service_bus": true + }, + { + "name": "example", + "construct_from": [ + { + "input_stream": "stream_one", + "select_fields": ["field_x", "field_y"] + }, + { + "input_stream": "stream_two", + "select_fields": [{ "field_z": "field_x" }] + } + ], + "push_interval_s": 120, + "no_data_action": "previous_value", + "publish_on_service_bus": false + } + ] + } + } } diff --git a/configs/config.toml b/configs/config.toml index 8991a1f9..1e328f75 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -202,3 +202,35 @@ port = 3333 # - interval: time in seconds after which device-shadow is pushed onto platform, default value is 60 [device_shadow] interval = 30 + +# The on-device service bus exposes an MQTT interface for inter-process communication +# Required Parameters +# - port: the port on which the broker/server is listening for incoming service connections +# - joins: description of how uplink will join incoming data and push outgoing data streams +# - console_port: port on which the rumqttd console API is exposed +[bus] +port = 1883 +console_port = 3030 +joins = { output_streams = [ + { name = "location", construct_from = [ + { input_stream = "gps", select_fields = [ + "latitude", + "longitude", + ] }, + { input_stream = "altimeter", select_fields = [ + "altitude", + ] }, + ], push_interval_s = 60, no_data_action = "null", publish_on_service_bus = true }, + { name = "device_shadow", construct_from = [ + { input_stream = "device_shadow", select_fields = "all" }, + ], push_interval_s = "on_new_data", no_data_action = "previous_value", publish_on_service_bus = true }, + { name = "example", construct_from = [ + { input_stream = "stream_one", select_fields = [ + "field_x", + "field_y", + ] }, + { input_stream = "stream_two", select_fields = [ + { "field_z" = "field_x" }, + ] }, + ], push_interval_s = 120, no_data_action = "previous_value", publish_on_service_bus = false }, +] } diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index d7ee3244..d817c659 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" bytes = { workspace = true } flume = { workspace = true } rumqttc = { workspace = true } +rumqttd = { git = "https://github.com/bytebeamio/rumqtt", default-features = false, optional = true, branch = "console-response" } serde = { workspace = true } serde_json = { workspace = true } serde_with = "3.3.0" @@ -76,3 +77,12 @@ vergen = { version = "7", features = ["git", "build", "time"] } [dev-dependencies] tempdir = { workspace = true } + +[features] +default = ["bus"] +bus = ["rumqttd"] + +[[test]] +name = "joins" +path = "tests/joins.rs" +required-features = ["bus"] diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index d9c07e4a..aaf12856 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -7,7 +7,7 @@ use super::clock; /// On the Bytebeam platform, an Action is how beamd and through it, /// the end-user, can communicate the tasks they want to perform on /// said device, in this case, uplink. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct Action { // action id #[serde(alias = "id")] @@ -18,7 +18,7 @@ pub struct Action { pub payload: String, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct ActionResponse { #[serde(alias = "id")] pub action_id: String, diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index a614b305..cad5a081 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -365,7 +365,18 @@ impl ActionsBridge { /// Handle received actions fn try_route_action(&mut self, action: Action) -> Result<(), Error> { let Some(route) = self.action_routes.get(&action.name) else { - return Err(Error::NoRoute(action.name)); + // actions that can't be routed should go onto the broker if enabled + let deadline = self + .action_routes + .get("*") + .ok_or_else(|| Error::NoRoute(action.name.clone()))? + .try_send(action.clone()) + .map_err(|_| Error::UnresponsiveReceiver)?; + debug!("Action routed to broker"); + + self.current_action = Some(CurrentAction::new(action, deadline)); + + return Ok(()); }; let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?; @@ -567,6 +578,10 @@ impl StatusTx { pub async fn send_action_response(&self, response: ActionResponse) { self.inner.send_async(response).await.unwrap() } + + pub fn send_action_response_sync(&self, response: ActionResponse) { + self.inner.send(response).unwrap() + } } /// Handle to send control messages to action lane diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index b8ec952f..196cf852 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -42,7 +42,7 @@ pub trait Package: Send + Debug { // TODO Don't do any deserialization on payload. Read it a Vec which is in turn a json // TODO which cloud will double deserialize (Batch 1st and messages next) -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Payload { #[serde(skip_serializing)] pub stream: String, @@ -148,4 +148,8 @@ impl BridgeTx { pub async fn send_action_response(&self, response: ActionResponse) { self.status_tx.send_action_response(response).await } + + pub fn send_action_response_sync(&self, response: ActionResponse) { + self.status_tx.send_action_response_sync(response) + } } diff --git a/uplink/src/collector/bus/joins.rs b/uplink/src/collector/bus/joins.rs new file mode 100644 index 00000000..ede2331d --- /dev/null +++ b/uplink/src/collector/bus/joins.rs @@ -0,0 +1,201 @@ +use std::collections::HashMap; + +use flume::{bounded, Receiver, Sender}; +use log::{error, warn}; +use serde_json::{json, Map, Value}; +use tokio::{select, task::JoinSet, time::interval}; + +use crate::{ + base::{ + bridge::{BridgeTx, Payload}, + clock, + }, + config::{Field, JoinConfig, NoDataAction, PushInterval, SelectConfig}, +}; + +type Json = Map; +type InputStream = String; +type FieldName = String; + +pub struct Router { + map: HashMap>>, + pub tasks: JoinSet<()>, +} + +impl Router { + pub async fn new( + configs: Vec, + bridge_tx: BridgeTx, + back_tx: Sender, + ) -> Self { + let mut map: HashMap>> = HashMap::new(); + let mut tasks = JoinSet::new(); + for config in configs { + let (tx, rx) = bounded(1); + let mut fields = HashMap::new(); + for stream in &config.construct_from { + if let SelectConfig::Fields(selected_fields) = &stream.select_fields { + let renames: &mut HashMap = + fields.entry(stream.input_stream.to_owned()).or_default(); + for field in selected_fields { + renames.insert(field.original.to_owned(), field.to_owned()); + } + } + if let Some(senders) = map.get_mut(&stream.input_stream) { + senders.push(tx.clone()); + continue; + } + map.insert(stream.input_stream.to_owned(), vec![tx.clone()]); + } + let joiner = Joiner { + rx, + joined: Json::new(), + config, + tx: bridge_tx.clone(), + fields, + back_tx: back_tx.clone(), + sequence: 0, + }; + tasks.spawn(joiner.start()); + } + + Router { map, tasks } + } + + pub async fn map(&mut self, input_stream: InputStream, json: Json) { + let Some(iter) = self.map.get(&input_stream) else { return }; + for tx in iter { + _ = tx.send_async((input_stream.clone(), json.clone())).await; + } + } +} + +struct Joiner { + rx: Receiver<(InputStream, Json)>, + joined: Json, + config: JoinConfig, + fields: HashMap>, + tx: BridgeTx, + back_tx: Sender, + sequence: u32, +} + +impl Joiner { + async fn start(mut self) { + let PushInterval::OnTimeout(period) = self.config.push_interval_s else { + loop { + match self.rx.recv_async().await { + Ok((input_stream, json)) => self.update(input_stream, json), + Err(e) => { + error!("{e}"); + return; + } + } + self.send_data().await; + } + }; + let mut ticker = interval(period); + loop { + select! { + r = self.rx.recv_async() => { + match r { + Ok((input_stream, json)) => self.update(input_stream, json), + Err(e) => { + error!("{e}"); + return; + } + } + } + + _ = ticker.tick() => { + self.send_data().await + } + } + } + } + + // Use data sequence and timestamp if data is to be pushed instantly + fn is_insertable(&self, key: &str) -> bool { + match key { + "timestamp" | "sequence" => self.config.push_interval_s == PushInterval::OnNewData, + _ => true, + } + } + + fn update(&mut self, input_stream: InputStream, json: Json) { + if let Some(map) = self.fields.get(&input_stream) { + for (mut key, value) in json { + // drop unenumerated keys from json + let Some(field) = map.get(&key) else { continue }; + if let Some(name) = &field.renamed { + name.clone_into(&mut key); + } + + if self.is_insertable(&key) { + self.joined.insert(key, value); + } + } + } else { + // Select All if no mapping exists + for (key, value) in json { + if self.is_insertable(&key) { + self.joined.insert(key, value); + } + } + } + } + + async fn send_data(&mut self) { + if self.joined.is_empty() { + return; + } + + // timestamp and sequence values should be passed as is for instant push, else use generated values + let timestamp = self + .joined + .remove("timestamp") + .and_then(|value| { + value.as_i64().map_or_else( + || { + warn!( + "timestamp: {value:?} has unexpected type; defaulting to system time" + ); + None + }, + |v| Some(v as u64), + ) + }) + .unwrap_or_else(|| clock() as u64); + let sequence = self + .joined + .remove("sequence") + .and_then(|value| { + value.as_i64().map_or_else( + || { + warn!( + "sequence: {value:?} has unexpected type; defaulting to internal sequence" + ); + None + }, + |v| Some(v as u32), + ) + }) + .unwrap_or_else(|| { + self.sequence += 1; + self.sequence + }); + let payload = Payload { + stream: self.config.name.clone(), + sequence, + timestamp, + payload: json!(self.joined), + }; + if self.config.publish_on_service_bus { + _ = self.back_tx.send_async(payload.clone()).await; + } + self.tx.send_payload(payload).await; + if self.config.no_data_action == NoDataAction::Null { + self.joined.clear(); + } + } +} diff --git a/uplink/src/collector/bus/mod.rs b/uplink/src/collector/bus/mod.rs new file mode 100644 index 00000000..30dc4a78 --- /dev/null +++ b/uplink/src/collector/bus/mod.rs @@ -0,0 +1,222 @@ +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, +}; + +use flume::{bounded, Receiver}; +use joins::Router; +use log::error; +use reqwest::get; +use rumqttd::{ + local::{LinkRx, LinkTx}, + protocol::Publish, + Broker, Config, ConnectionSettings, ConsoleSettings, Forward, Notification, RouterConfig, + ServerSettings, +}; +use serde_json::{Map, Value}; +use tokio::select; + +use crate::{ + base::bridge::{BridgeTx, Payload}, + config::BusConfig, + spawn_named_thread, Action, ActionResponse, +}; + +mod joins; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("Link error: {0}")] + Link(#[from] rumqttd::local::LinkError), + #[error("Parse error: {0}")] + Parse(#[from] std::net::AddrParseError), + #[error("Recv error: {0}")] + Recv(#[from] flume::RecvError), + #[error("Req error: {0}")] + Req(#[from] reqwest::Error), + #[error("Action was not expected")] + NoRoute, +} + +pub struct BusRx { + rx: LinkRx, +} + +impl BusRx { + async fn recv_async(&mut self) -> Option { + loop { + return match self.rx.next().await { + Ok(Some(Notification::Forward(Forward { publish, .. }))) => Some(publish), + Err(_) => None, + _ => continue, + }; + } + } +} + +pub struct BusTx { + tx: LinkTx, + console_url: String, +} + +impl BusTx { + fn publish_data(&mut self, data: Payload) -> Result<(), Error> { + let topic = format!("/streams/{}", data.stream); + let payload = serde_json::to_vec(&data)?; + self.tx.publish(topic, payload)?; + + Ok(()) + } + + fn subscribe_to_streams( + &mut self, + streams: impl IntoIterator>, + ) -> Result<(), Error> { + for stream in streams { + let filter = format!("/streams/{}", stream.into()); + self.tx.subscribe(filter)?; + } + + Ok(()) + } + + async fn run_action(&mut self, action: &Action) -> Result<(), Error> { + let topic = format!("/actions/{}", action.name); + + let url = format!("http://{}/subscriptions", self.console_url); + let body = get(url).await?.bytes().await?; + let subscriptions: HashMap> = serde_json::from_slice(&body)?; + + if !subscriptions.get(&topic).is_some_and(|s| !s.is_empty()) { + return Err(Error::NoRoute); + } + + let response_topic = format!("/actions/{}/status", action.action_id); + let payload = serde_json::to_vec(action)?; + + self.tx.subscribe(response_topic)?; + self.tx.publish(topic, payload)?; + + Ok(()) + } +} + +pub struct Bus { + rx: BusRx, + tx: BusTx, + bridge_tx: BridgeTx, + actions_rx: Receiver, + config: BusConfig, +} + +impl Bus { + pub fn new(config: BusConfig, bridge_tx: BridgeTx, actions_rx: Receiver) -> Self { + let router = RouterConfig { + max_segment_size: 1024, + max_connections: 10, + max_segment_count: 10, + max_outgoing_packet_count: 1024, + ..Default::default() + }; + let connections = ConnectionSettings { + connection_timeout_ms: 10000, + max_payload_size: 1073741824, + max_inflight_count: 10, + auth: None, + external_auth: None, + dynamic_filters: false, + }; + let server = ServerSettings { + name: "service_bus".to_owned(), + listen: format!("127.0.0.1:{}", config.port).parse::().unwrap(), + tls: None, + next_connection_delay_ms: 0, + connections, + }; + let mut console = ConsoleSettings::default(); + let console_url = format!("127.0.0.1:{}", config.console_port); + console_url.clone_into(&mut console.listen); + let servers = [("service_bus".to_owned(), server)].into_iter().collect(); + let mut broker = Broker::new(Config { + id: 0, + router, + v4: Some(servers), + console: Some(console), + ..Default::default() + }); + let (tx, rx) = broker.link("uplink").unwrap(); + spawn_named_thread("Broker", move || { + if let Err(e) = broker.start() { + error!("{e}") + } + }); + + Self { tx: BusTx { tx, console_url }, rx: BusRx { rx }, bridge_tx, actions_rx, config } + } + + #[tokio::main(flavor = "current_thread")] + pub async fn start(mut self) { + let (publish_back_tx, publish_back_rx) = bounded(0); + let mut router = Router::new( + self.config.joins.output_streams.clone(), + self.bridge_tx.clone(), + publish_back_tx, + ) + .await; + let mut input_streams = HashSet::new(); + for join in &self.config.joins.output_streams { + for input in &join.construct_from { + input_streams.insert(input.input_stream.to_owned()); + } + } + if let Err(e) = self.tx.subscribe_to_streams(input_streams) { + error!("{e}"); + return; + } + + loop { + select! { + Ok(action) = self.actions_rx.recv_async() => { + if let Err(e) = self.tx.run_action(&action).await { + error!("{e}"); + let status = ActionResponse::failure(&action.action_id, e.to_string()); + self.bridge_tx.send_action_response(status).await; + } + } + + Some(publish) = self.rx.recv_async() => { + if publish.topic.starts_with(b"/actions/") && publish.topic.ends_with(b"/status") { + let Ok(status) = serde_json::from_slice(&publish.payload) else { + error!("Couldn't parse payload as action status"); + continue; + }; + self.bridge_tx.send_action_response(status).await; + continue; + } + + let Ok(data) = serde_json::from_slice::>(&publish.payload) else { + error!("Couldn't parse payload as data payload"); + continue; + }; + let topic = String::from_utf8(publish.topic.to_vec()).unwrap(); + // Expected topic structure: `streams/{stream_name}` + let Some (stream_name) = topic.split('/').last() else { + error!("unexpected topic structure: {topic}"); + continue + }; + router.map(stream_name.to_owned(), data).await; + } + + Ok(data) = publish_back_rx.recv_async() => { + if let Err(e) = self.tx.publish_data(data) { + error!("{e}"); + } + } + + _ = router.tasks.join_next() => {} + } + } + } +} diff --git a/uplink/src/collector/mod.rs b/uplink/src/collector/mod.rs index 903663c3..ba26b70e 100644 --- a/uplink/src/collector/mod.rs +++ b/uplink/src/collector/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "bus")] +pub mod bus; pub mod device_shadow; pub mod downloader; pub mod installer; diff --git a/uplink/src/config.rs b/uplink/src/config.rs index 6249c26f..b7d1aff6 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -4,9 +4,11 @@ use std::fs::File; use std::io::Write; use std::path::PathBuf; use std::time::Duration; -use std::{collections::HashMap, fmt::Debug}; +use std::{collections::HashMap, fmt}; -use serde::{Deserialize, Serialize}; +use serde::de::{self, Visitor}; +use serde::ser::{SerializeMap, SerializeSeq}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::{serde_as, DurationSeconds}; pub use crate::base::bridge::stream::MAX_BATCH_SIZE; @@ -262,6 +264,227 @@ pub struct PreconditionCheckerConfig { pub actions: Vec, } +#[derive(Debug, Clone)] +pub struct Field { + pub original: String, + pub renamed: Option, +} + +struct FieldVisitor; + +impl<'de> Visitor<'de> for FieldVisitor { + type Value = Field; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str(r#"a string or a map with a single key-value pair"#) + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + Ok(Field { original: value.to_string(), renamed: None }) + } + + fn visit_map(self, mut map: M) -> Result + where + M: serde::de::MapAccess<'de>, + { + let entry = map.next_entry::()?; + if let Some((renamed, original)) = entry { + Ok(Field { original, renamed: Some(renamed) }) + } else { + Err(de::Error::custom("Expected a single key-value pair in the map")) + } + } +} + +impl<'de> Deserialize<'de> for Field { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(FieldVisitor) + } +} + +impl Serialize for Field { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match &self.renamed { + Some(renamed) => { + let mut map = serializer.serialize_map(Some(1))?; + map.serialize_entry(renamed, &self.original)?; + map.end() + } + None => serializer.serialize_str(&self.original), + } + } +} + +#[derive(Debug, Clone)] +pub enum SelectConfig { + All, + Fields(Vec), +} + +struct SelectVisitor; + +impl<'de> Visitor<'de> for SelectVisitor { + type Value = SelectConfig; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str(r#"the string "all" or a list of `Field`s"#) + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + "all" => Ok(SelectConfig::All), + _ => Err(de::Error::custom(r#"Expected the string "all""#)), + } + } + + fn visit_seq(self, mut seq: S) -> Result + where + S: serde::de::SeqAccess<'de>, + { + let mut fields = vec![]; + while let Some(field) = seq.next_element()? { + fields.push(field); + } + + Ok(SelectConfig::Fields(fields)) + } +} + +impl<'de> Deserialize<'de> for SelectConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(SelectVisitor) + } +} + +impl Serialize for SelectConfig { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + SelectConfig::All => serializer.serialize_str("all"), + SelectConfig::Fields(fields) => { + let mut seq = serializer.serialize_seq(Some(fields.len()))?; + for field in fields { + seq.serialize_element(field)?; + } + seq.end() + } + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct InputConfig { + pub input_stream: String, + pub select_fields: SelectConfig, +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum NoDataAction { + #[default] + Null, + PreviousValue, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum PushInterval { + OnNewData, + OnTimeout(Duration), +} + +struct PushVisitor; + +impl<'de> Visitor<'de> for PushVisitor { + type Value = PushInterval; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str(r#"the string "on_new_data" or a unsigned integer"#) + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + match value { + "on_new_data" => Ok(PushInterval::OnNewData), + _ => Err(de::Error::custom(r#"Expected the string "on_new_data""#)), + } + } + + fn visit_u64(self, secs: u64) -> Result + where + E: de::Error, + { + Ok(PushInterval::OnTimeout(Duration::from_secs(secs))) + } + + fn visit_i64(self, secs: i64) -> Result + where + E: de::Error, + { + self.visit_u64(secs as u64) + } +} + +impl<'de> Deserialize<'de> for PushInterval { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(PushVisitor) + } +} + +impl Serialize for PushInterval { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + PushInterval::OnNewData => serializer.serialize_str("on_new_data"), + PushInterval::OnTimeout(duration) => serializer.serialize_u64(duration.as_secs()), + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct JoinConfig { + pub name: String, + pub construct_from: Vec, + pub no_data_action: NoDataAction, + pub push_interval_s: PushInterval, + pub publish_on_service_bus: bool, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct JoinerConfig { + pub output_streams: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct BusConfig { + pub port: u16, + pub console_port: u16, + pub joins: JoinerConfig, +} + #[derive(Debug, Clone, Deserialize, Default)] pub struct DeviceConfig { pub project_id: String, @@ -312,6 +535,7 @@ pub struct Config { #[cfg(target_os = "android")] pub logging: Option, pub precondition_checks: Option, + pub bus: Option, } impl Config { diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 6e65598f..1ca904e1 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -19,9 +19,11 @@ use tracing_subscriber::{EnvFilter, Registry}; pub type ReloadHandle = Handle>, Registry>>; -use uplink::config::{ - ActionRoute, AppConfig, Config, DeviceConfig, StreamConfig, DEFAULT_TIMEOUT, MAX_BATCH_SIZE, -}; +#[cfg(feature = "bus")] +use uplink::collector::bus::Bus; +#[cfg(feature = "bus")] +use uplink::config::{ActionRoute, DEFAULT_TIMEOUT}; +use uplink::config::{AppConfig, Config, DeviceConfig, StreamConfig, MAX_BATCH_SIZE}; use uplink::{simulator, spawn_named_thread, TcpJson, Uplink}; const DEFAULT_CONFIG: &str = r#" @@ -346,6 +348,19 @@ fn main() -> Result<(), Error> { _ => None, }; + #[cfg(feature = "bus")] + let bus = config.bus.as_ref().map(|cfg| { + let actions_rx = bridge + .register_action_routes([ActionRoute { + name: "*".to_string(), + timeout: DEFAULT_TIMEOUT, + cancellable: false, + }]) + .unwrap(); + + Bus::new(cfg.clone(), bridge_tx.clone(), actions_rx) + }); + let update_config_actions = bridge.register_action_route(ActionRoute { name: "update_uplink_config".to_owned(), timeout: DEFAULT_TIMEOUT, @@ -357,6 +372,11 @@ fn main() -> Result<(), Error> { let ctrl_tx = uplink.spawn(&device_config, bridge, downloader_disable.clone(), network_up.clone())?; + #[cfg(feature = "bus")] + if let Some(bus) = bus { + spawn_named_thread("Bus Interface", move || bus.start()) + }; + if let Some(config) = config.simulator.clone() { spawn_named_thread("Simulator", || { simulator::start(config, bridge_tx, simulator_actions).unwrap(); diff --git a/uplink/tests/bus.rs b/uplink/tests/bus.rs new file mode 100644 index 00000000..f45ee2ff --- /dev/null +++ b/uplink/tests/bus.rs @@ -0,0 +1,91 @@ +use std::{ + thread::{sleep, spawn}, + time::Duration, +}; + +use flume::{bounded, Receiver, Sender}; +use rumqttc::{Client, Connection, Event, MqttOptions, Packet, QoS}; +use serde_json::json; + +use uplink::{ + base::bridge::{BridgeTx, DataTx, StatusTx}, + collector::bus::Bus, + config::{BusConfig, JoinerConfig}, + Action, ActionResponse, +}; + +fn setup(offset: u16) -> (Sender, Receiver, Client, Connection) { + let (port, console_port) = (1873 + offset, 3020 + offset); + let config = BusConfig { port, console_port, joins: JoinerConfig { output_streams: vec![] } }; + + let (data_tx, _data_rx) = bounded(1); + let (status_tx, status_rx) = bounded(1); + let bridge_tx = + BridgeTx { data_tx: DataTx { inner: data_tx }, status_tx: StatusTx { inner: status_tx } }; + let (actions_tx, actions_rx) = bounded(1); + spawn(|| Bus::new(config, bridge_tx, actions_rx).start()); + + let opt = MqttOptions::new("test", "localhost", port); + let (client, conn) = Client::new(opt, 1); + + (actions_tx, status_rx, client, conn) +} + +/// This test verifies that action status messages published to the bus are correctly received. +#[test] +fn recv_action_and_respond() { + let (actions_tx, status_rx, client, mut conn) = setup(0); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + client.subscribe("/actions/abc", QoS::AtMostOnce).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + let Event::Incoming(_) = conn.recv().unwrap().unwrap() else { panic!() }; + sleep(Duration::from_millis(100)); + + let action = + Action { action_id: "123".to_owned(), name: "abc".to_owned(), payload: "".to_owned() }; + let expected_action = action.clone(); + actions_tx.send(action).unwrap(); + + let Event::Incoming(Packet::Publish(publish)) = + conn.recv_timeout(Duration::from_millis(500)).unwrap().unwrap() + else { + panic!() + }; + let action = serde_json::from_slice(&publish.payload).unwrap(); + assert_eq!(expected_action, action); + + let action_status = ActionResponse { + action_id: "123".to_owned(), + sequence: 1, + timestamp: 0, + state: "abc".to_owned(), + progress: 234, + errors: vec!["Testing".to_owned()], + done_response: None, + }; + client + .publish("/actions/123/status", QoS::AtMostOnce, false, json!(action_status).to_string()) + .unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + assert_eq!(action_status, status_rx.recv_timeout(Duration::from_millis(200)).unwrap()); +} + +/// This test verifies that action status is set to failed for actions which are not subscribed to on the bus +#[test] +fn mark_unregistered_action_as_failed() { + let (actions_tx, status_rx, _, _) = setup(1); + + let action = + Action { action_id: "123".to_owned(), name: "abc".to_owned(), payload: "".to_owned() }; + actions_tx.send(action).unwrap(); + + let ActionResponse { action_id, state, errors, .. } = + status_rx.recv_timeout(Duration::from_millis(200)).unwrap(); + assert_eq!(action_id, "123"); + assert_eq!(state, "Failed"); + assert_eq!(errors, ["Action was not expected"]); +} diff --git a/uplink/tests/joins.rs b/uplink/tests/joins.rs new file mode 100644 index 00000000..886f0027 --- /dev/null +++ b/uplink/tests/joins.rs @@ -0,0 +1,655 @@ +//! Each test follows a similar structure: +//! - Setup Configuration: Define the bus configuration, including join configurations, push intervals, and other parameters. +//! - Initialize Channels: Create bounded channels for data and status transmission. +//! - Start the Bus: Spawn a new thread to start the bus with the given configuration and channels. +//! - Setup MQTT Client: Configure and connect the MQTT client to the bus. +//! - Publish Messages: Publish JSON messages to the defined input streams. +//! - Receive and Verify: Receive the output messages and verify that they match the expected results. + +use std::{ + thread::{sleep, spawn}, + time::Duration, +}; + +use flume::{bounded, Receiver, Sender}; +use rumqttc::{Client, Connection, Event, MqttOptions, Packet, Publish, QoS}; +use serde::Deserialize; + +use serde_json::{json, Value}; +use uplink::{ + base::bridge::{BridgeTx, DataTx, Payload, StatusTx}, + collector::bus::Bus, + config::{ + BusConfig, Field, InputConfig, JoinConfig, JoinerConfig, NoDataAction, PushInterval, + SelectConfig, + }, + Action, ActionResponse, +}; + +fn setup( + joins: JoinerConfig, + offset: u16, +) -> (Receiver, Receiver, Sender, Client, Connection) { + let (port, console_port) = (1883 + offset, 3030 + offset); + + let config = BusConfig { console_port, port, joins }; + + let (data_tx, data_rx) = bounded(1); + let (status_tx, status_rx) = bounded(1); + let bridge_tx = + BridgeTx { data_tx: DataTx { inner: data_tx }, status_tx: StatusTx { inner: status_tx } }; + let (actions_tx, actions_rx) = bounded(1); + spawn(|| Bus::new(config, bridge_tx, actions_rx).start()); + + let opt = MqttOptions::new("test", "localhost", port); + let (client, conn) = Client::new(opt, 1); + + (data_rx, status_rx, actions_tx, client, conn) +} + +/// This test checks if data published to the input stream is received as-is on the output stream. +#[test] +fn as_is_data_from_bus() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "as_is".to_owned(), + construct_from: vec![InputConfig { + input_stream: "input".to_owned(), + select_fields: SelectConfig::All, + }], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnNewData, + publish_on_service_bus: false, + }], + }, + 0, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input", QoS::AtMostOnce, false, input.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(100)).unwrap() + else { + panic!() + }; + assert_eq!(stream, "as_is"); + assert_eq!(payload, input); +} + +/// This test ensures that data from two different input streams is joined correctly and published to the output stream when new data is received. +#[test] +fn join_two_streams_on_new_data_from_bus() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::All, + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::All, + }, + ], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnNewData, + publish_on_service_bus: false, + }], + }, + 1, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_two = json!({"field_x": 456, "field_y": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(200)).unwrap() + else { + panic!() + }; + assert_eq!(stream, "output"); + assert_eq!(payload, input_one); + + let Payload { stream, sequence: 2, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(100)).unwrap() + else { + panic!() + }; + assert_eq!(stream, "output"); + assert_eq!(payload, input_two); +} + +/// This test checks the joining of data from two streams based on a timeout interval, ensuring the correct output even if data is received after the timeout. +#[test] +fn join_two_streams_on_timeout_from_bus() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::All, + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::All, + }, + ], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: false, + }], + }, + 2, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let input_two = json!({"field_x": 456, "field_y": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(1000)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123, "field_2": "abc", "field_x": 456, "field_y": "xyz"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test validates that only selected fields from an input stream are published to the output stream. +#[test] +fn select_from_stream_on_bus() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![InputConfig { + input_stream: "input".to_owned(), + select_fields: SelectConfig::Fields(vec![Field { + original: "field_1".to_owned(), + renamed: None, + }]), + }], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnNewData, + publish_on_service_bus: false, + }], + }, + 3, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input", QoS::AtMostOnce, false, input.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(500)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test checks that selected fields from two different streams are combined and published correctly to the output stream. +#[test] +fn select_from_two_streams_on_bus() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::Fields(vec![Field { + original: "field_1".to_owned(), + renamed: None, + }]), + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::Fields(vec![Field { + original: "field_x".to_owned(), + renamed: None, + }]), + }, + ], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: false, + }], + }, + 4, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let input_two = json!({"field_x": 456, "field_y": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(1000)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123, "field_x": 456}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test verifies that the system correctly handles flushing of streams, ensuring that when no new data arrives, keys are droppped/set to null. +#[test] +fn null_after_flush() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::All, + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::All, + }, + ], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: false, + }], + }, + 5, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123, "field_2": "abc"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let input_two = json!({"field_3": 456, "field_4": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 2, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_3": 456, "field_4": "xyz"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let input_one = json!({"field_1": 789, "field_2": "efg"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let input_two = json!({"field_3": 098, "field_4": "zyx"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 3, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 789, "field_2": "efg","field_3": 098, "field_4": "zyx"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test checks that the system correctly handles data when configured with PreviousValue, ensuring that the last known values are used when no new data arrives. +#[test] +fn previous_value_after_flush() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::All, + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::All, + }, + ], + no_data_action: NoDataAction::PreviousValue, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: false, + }], + }, + 6, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123, "field_2": "abc"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let input_two = json!({"field_3": 456, "field_4": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 2, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123, "field_2": "abc", "field_3": 456, "field_4": "xyz"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let input_one = json!({"field_1": 789, "field_2": "efg"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let input_two = json!({"field_3": 098, "field_4": "zyx"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 3, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 789, "field_2": "efg", "field_3": 098, "field_4": "zyx"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test checks how the system handles two input streams that have similar fields without renaming them. +/// The expected behavior is to merge the fields, with the latest value for duplicated fields being used. +#[test] +fn two_streams_with_similar_fields_no_rename() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::Fields(vec![ + Field { original: "field_a".to_owned(), renamed: None }, + Field { original: "field_b".to_owned(), renamed: None }, + ]), + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::Fields(vec![ + Field { original: "field_a".to_owned(), renamed: None }, + Field { original: "field_c".to_owned(), renamed: None }, + ]), + }, + ], + no_data_action: NoDataAction::PreviousValue, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: false, + }], + }, + 7, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_a": 123, "field_b": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_a": 123, "field_b": "abc"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let input_two = json!({"field_a": 456, "field_c": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 2, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_a": 456, "field_b": "abc", "field_c": "xyz"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test checks how the system handles two input streams that have similar fields but with renaming to avoid field conflicts. +/// The expected behavior is to have the fields renamed as specified and merged into the output. +#[test] +fn two_streams_with_similar_fields_renamed() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::Fields(vec![ + Field { + original: "field_a".to_owned(), + renamed: Some("field_a1".to_owned()), + }, + Field { original: "field_b".to_owned(), renamed: None }, + ]), + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::Fields(vec![ + Field { + original: "field_a".to_owned(), + renamed: Some("field_a2".to_owned()), + }, + Field { original: "field_c".to_owned(), renamed: None }, + ]), + }, + ], + no_data_action: NoDataAction::PreviousValue, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: false, + }], + }, + 8, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + + let input_one = json!({"field_a": 123, "field_b": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_a1": 123, "field_b": "abc"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let input_two = json!({"field_a": 456, "field_c": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let Payload { stream, sequence: 2, payload, .. } = + data_rx.recv_timeout(Duration::from_secs(2)).unwrap() + else { + panic!() + }; + let output = json!({"field_a1": 123, "field_a2": 456, "field_b": "abc", "field_c": "xyz"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); +} + +/// This test is to validate the behavior of the bus when it is configured to push a joined stream back onto the bus. +/// In this test the client subscribes to the output stream on the bus, publishes data onto input streams and then expects the joined data back from the bus. +#[test] +fn publish_joined_stream_back_on_bus() { + let (data_rx, _, _, client, mut conn) = setup( + JoinerConfig { + output_streams: vec![JoinConfig { + name: "output".to_owned(), + construct_from: vec![ + InputConfig { + input_stream: "input_one".to_owned(), + select_fields: SelectConfig::All, + }, + InputConfig { + input_stream: "input_two".to_owned(), + select_fields: SelectConfig::All, + }, + ], + no_data_action: NoDataAction::Null, + push_interval_s: PushInterval::OnTimeout(Duration::from_secs(1)), + publish_on_service_bus: true, + }], + }, + 9, + ); + + sleep(Duration::from_millis(100)); + let Event::Incoming(Packet::ConnAck(_)) = conn.recv().unwrap().unwrap() else { panic!() }; + client.subscribe("/streams/output", QoS::AtMostOnce).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + let Event::Incoming(Packet::SubAck(_)) = + conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() + else { + panic!() + }; + + let input_one = json!({"field_1": 123, "field_2": "abc"}); + client.publish("/streams/input_one", QoS::AtMostOnce, false, input_one.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() else { + panic!() + }; + + let input_two = json!({"field_x": 456, "field_y": "xyz"}); + client.publish("/streams/input_two", QoS::AtMostOnce, false, input_two.to_string()).unwrap(); + let Event::Outgoing(_) = conn.recv().unwrap().unwrap() else { panic!() }; + + let Payload { stream, sequence: 1, payload, .. } = + data_rx.recv_timeout(Duration::from_millis(1000)).unwrap() + else { + panic!() + }; + let output = json!({"field_1": 123, "field_2": "abc", "field_x": 456, "field_y": "xyz"}); + assert_eq!(stream, "output"); + assert_eq!(payload, output); + + let Event::Incoming(Packet::Publish(Publish { topic, payload, .. })) = + conn.recv_timeout(Duration::from_millis(200)).unwrap().unwrap() + else { + panic!() + }; + { + #[derive(Deserialize)] + struct Payload { + sequence: u32, + #[allow(dead_code)] + timestamp: u64, + #[serde(flatten)] + payload: Value, + } + let Payload { sequence, payload, .. } = serde_json::from_slice(&payload).unwrap(); + assert_eq!(topic, "/streams/output"); + assert_eq!(sequence, 1); + assert_eq!(payload, output); + } +}