From a883def0fb63668ad6bf4c8b7ce574f5a6e48a0b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 23 Aug 2023 10:00:47 +0530 Subject: [PATCH 1/4] refactor: built-in single device sim from `tools/` --- Cargo.lock | 79 +++- uplink/Cargo.toml | 1 + uplink/src/base/mod.rs | 4 - uplink/src/base/mqtt/mod.rs | 5 +- uplink/src/collector/simulator.rs | 621 ------------------------- uplink/src/collector/simulator/data.rs | 415 +++++++++++++++++ uplink/src/collector/simulator/mod.rs | 150 ++++++ uplink/src/lib.rs | 9 - uplink/src/main.rs | 2 +- 9 files changed, 646 insertions(+), 640 deletions(-) delete mode 100644 uplink/src/collector/simulator.rs create mode 100644 uplink/src/collector/simulator/data.rs create mode 100644 uplink/src/collector/simulator/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 60945101..004eb5a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,7 +236,7 @@ dependencies = [ "ansi_term", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", @@ -370,6 +370,41 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "darling" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 2.0.18", +] + +[[package]] +name = "darling_macro" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.18", +] + [[package]] name = "digest" version = "0.9.0" @@ -379,6 +414,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "dummy" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ba338a15d93c01c2f117a2b0bd1bfa3c780fe771e7db7c69fc70bda265e2115" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.18", +] + [[package]] name = "either" version = "1.8.1" @@ -459,6 +506,17 @@ dependencies = [ "libc", ] +[[package]] +name = "fake" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9af7b0c58ac9d03169e27f080616ce9f64004edca3d2ef4147a811c21b23b319" +dependencies = [ + "dummy", + "rand 0.8.5", + "unidecode", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -889,6 +947,12 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.4.0" @@ -2284,6 +2348,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "structopt" version = "0.3.26" @@ -2908,6 +2978,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unidecode" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "402bb19d8e03f1d1a7450e2bd613980869438e0666331be3e073089124aa1adc" + [[package]] name = "untrusted" version = "0.7.1" @@ -2923,6 +2999,7 @@ dependencies = [ "axum", "bytes 1.4.0", "config", + "fake", "flume", "futures-util", "lazy_static", diff --git a/uplink/Cargo.toml b/uplink/Cargo.toml index 8a2675e1..63174afd 100644 --- a/uplink/Cargo.toml +++ b/uplink/Cargo.toml @@ -38,6 +38,7 @@ tracing-subscriber = { version="=0.3.14", features=["env-filter"] } tokio-compat-02 = "0.2.0" tunshell-client = { git = "https://github.com/bytebeamio/tunshell.git", branch = "android_patch" } # simulator +fake = { version = "2.5.0", features = ["derive"] } rand = "0.8" # downloader futures-util = "0.3" diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index 3a6b8cb0..7a796b05 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -117,14 +117,10 @@ pub struct Stats { #[derive(Debug, Clone, Deserialize, Default)] pub struct SimulatorConfig { - /// number of devices to be simulated - pub num_devices: u32, /// path to directory containing files with gps paths to be used in simulation pub gps_paths: String, /// actions that are to be routed to simulator pub actions: Vec, - #[serde(skip)] - pub actions_subscriptions: Vec, } #[derive(Debug, Clone, Deserialize, Default)] diff --git a/uplink/src/base/mqtt/mod.rs b/uplink/src/base/mqtt/mod.rs index 7b431363..c7dbeb05 100644 --- a/uplink/src/base/mqtt/mod.rs +++ b/uplink/src/base/mqtt/mod.rs @@ -61,10 +61,7 @@ impl Mqtt { let options = mqttoptions(&config); let (client, mut eventloop) = AsyncClient::new(options, 10); eventloop.network_options.set_connection_timeout(config.mqtt.network_timeout); - let mut actions_subscriptions = vec![config.actions_subscription.clone()]; - if let Some(sim_cfg) = &config.simulator { - actions_subscriptions.extend_from_slice(&sim_cfg.actions_subscriptions); - } + let actions_subscriptions = vec![config.actions_subscription.clone()]; Mqtt { config, diff --git a/uplink/src/collector/simulator.rs b/uplink/src/collector/simulator.rs deleted file mode 100644 index 37b066f9..00000000 --- a/uplink/src/collector/simulator.rs +++ /dev/null @@ -1,621 +0,0 @@ -use log::{error, info, trace}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use thiserror::Error; -use tokio::select; -use tokio_util::codec::LinesCodecError; - -use std::collections::BinaryHeap; -use std::time::{Duration, Instant}; -use std::{cmp::Ordering, fs, io, sync::Arc}; - -use crate::base::bridge::BridgeTx; -use crate::base::{clock, SimulatorConfig}; -use crate::{Action, ActionResponse, Payload}; - -use rand::Rng; - -#[derive(Error, Debug)] -pub enum Error { - #[error("Io error {0}")] - Io(#[from] io::Error), - #[error("Stream done")] - StreamDone, - #[error("Lines codec error {0}")] - Codec(#[from] LinesCodecError), - #[error("Serde error {0}")] - Json(#[from] serde_json::error::Error), - #[error("flume error {0}")] - Recv(#[from] flume::RecvError), -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Location { - latitude: f64, - longitude: f64, -} - -#[derive(Clone)] -pub struct DeviceData { - device_id: String, - path: Arc>, - path_offset: u32, - time_offset: Duration, -} - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum DataEventType { - GenerateGPS, - GenerateIMU, - GenerateVehicleData, - GeneratePeripheralData, - GenerateMotor, - GenerateBMS, -} - -#[derive(Clone)] -pub struct DataEvent { - timestamp: Instant, - event_type: DataEventType, - device: DeviceData, - sequence: u32, -} - -#[derive(Clone, PartialEq, Eq)] -pub struct ActionResponseEvent { - timestamp: Instant, - action_id: String, - device_id: String, - status: String, - progress: u8, -} - -#[derive(Clone)] -pub enum Event { - DataEvent(DataEvent), - ActionResponseEvent(ActionResponseEvent), -} - -impl PartialEq for Event { - fn eq(&self, other: &Self) -> bool { - match self { - Event::DataEvent(e1) => match other { - Event::DataEvent(e2) => { - e1.timestamp == e2.timestamp - && e1.event_type == e2.event_type - && e1.device.device_id == e2.device.device_id - } - Event::ActionResponseEvent(_) => false, - }, - Event::ActionResponseEvent(e1) => match other { - Event::ActionResponseEvent(e2) => e1 == e2, - Event::DataEvent(_) => false, - }, - } - } -} - -impl Eq for Event {} - -fn event_timestamp(event: &Event) -> Instant { - match event { - Event::DataEvent(e) => e.timestamp, - Event::ActionResponseEvent(e) => e.timestamp, - } -} - -impl Ord for Event { - fn cmp(&self, other: &Event) -> Ordering { - let t1 = event_timestamp(self); - let t2 = event_timestamp(other); - - t1.cmp(&t2) - } -} - -impl PartialOrd for Event { - fn partial_cmp(&self, other: &Event) -> Option { - Some(other.cmp(self)) - } -} - -pub fn generate_gps_data(device: &DeviceData, sequence: u32) -> Payload { - let path_len = device.path.len() as u32; - let path_index = ((device.path_offset + sequence) % path_len) as usize; - let position = device.path.get(path_index).unwrap(); - - Payload { - timestamp: clock() as u64, - device_id: Some(device.device_id.to_owned()), - sequence, - stream: "gps".to_string(), - payload: json!(position), - } -} - -pub fn generate_float(start: f64, end: f64) -> f64 { - let mut rng = rand::thread_rng(); - - rng.gen_range(start..end) -} - -pub fn generate_int(start: i32, end: i32) -> i64 { - rand::thread_rng().gen_range(start..end) as i64 -} - -pub fn generate_bool_string(p: f64) -> String { - if rand::thread_rng().gen_bool(p) { - "on".to_owned() - } else { - "off".to_owned() - } -} - -#[derive(Debug, Serialize)] -struct Bms { - periodicity_ms: i32, - mosfet_temperature: f64, - ambient_temperature: f64, - mosfet_status: i32, - cell_voltage_count: i32, - cell_voltage_1: f64, - cell_voltage_2: f64, - cell_voltage_3: f64, - cell_voltage_4: f64, - cell_voltage_5: f64, - cell_voltage_6: f64, - cell_voltage_7: f64, - cell_voltage_8: f64, - cell_voltage_9: f64, - cell_voltage_10: f64, - cell_voltage_11: f64, - cell_voltage_12: f64, - cell_voltage_13: f64, - cell_voltage_14: f64, - cell_voltage_15: f64, - cell_voltage_16: f64, - cell_thermistor_count: i32, - cell_temp_1: f64, - cell_temp_2: f64, - cell_temp_3: f64, - cell_temp_4: f64, - cell_temp_5: f64, - cell_temp_6: f64, - cell_temp_7: f64, - cell_temp_8: f64, - cell_balancing_status: i32, - pack_voltage: f64, - pack_current: f64, - pack_soc: f64, - pack_soh: f64, - pack_sop: f64, - pack_cycle_count: i64, - pack_available_energy: i64, - pack_consumed_energy: i64, - pack_fault: i32, - pack_status: i32, -} - -pub fn generate_bms_data(device: &DeviceData, sequence: u32) -> Payload { - let payload = Bms { - periodicity_ms: 250, - mosfet_temperature: generate_float(40f64, 45f64), - ambient_temperature: generate_float(35f64, 40f64), - mosfet_status: 1, - cell_voltage_count: 16, - cell_voltage_1: generate_float(3.0f64, 3.2f64), - cell_voltage_2: generate_float(3.0f64, 3.2f64), - cell_voltage_3: generate_float(3.0f64, 3.2f64), - cell_voltage_4: generate_float(3.0f64, 3.2f64), - cell_voltage_5: generate_float(3.0f64, 3.2f64), - cell_voltage_6: generate_float(3.0f64, 3.2f64), - cell_voltage_7: generate_float(3.0f64, 3.2f64), - cell_voltage_8: generate_float(3.0f64, 3.2f64), - cell_voltage_9: generate_float(3.0f64, 3.2f64), - cell_voltage_10: generate_float(3.0f64, 3.2f64), - cell_voltage_11: generate_float(3.0f64, 3.2f64), - cell_voltage_12: generate_float(3.0f64, 3.2f64), - cell_voltage_13: generate_float(3.0f64, 3.2f64), - cell_voltage_14: generate_float(3.0f64, 3.2f64), - cell_voltage_15: generate_float(3.0f64, 3.2f64), - cell_voltage_16: generate_float(3.0f64, 3.2f64), - cell_thermistor_count: 8, - cell_temp_1: generate_float(40.0f64, 43.0f64), - cell_temp_2: generate_float(40.0f64, 43.0f64), - cell_temp_3: generate_float(40.0f64, 43.0f64), - cell_temp_4: generate_float(40.0f64, 43.0f64), - cell_temp_5: generate_float(40.0f64, 43.0f64), - cell_temp_6: generate_float(40.0f64, 43.0f64), - cell_temp_7: generate_float(40.0f64, 43.0f64), - cell_temp_8: generate_float(40.0f64, 43.0f64), - cell_balancing_status: 1, - pack_voltage: generate_float(95f64, 96f64), - pack_current: generate_float(15f64, 20f64), - pack_soc: generate_float(80f64, 90f64), - pack_soh: generate_float(9.5f64, 9.9f64), - pack_sop: generate_float(9.5f64, 9.9f64), - pack_cycle_count: generate_int(100, 150), - pack_available_energy: generate_int(2000, 3000), - pack_consumed_energy: generate_int(2000, 3000), - pack_fault: 0, - pack_status: 1, - }; - - Payload { - timestamp: clock() as u64, - device_id: Some(device.device_id.to_owned()), - sequence, - stream: "bms".to_string(), - payload: json!(payload), - } -} - -#[derive(Debug, Serialize)] -struct Imu { - ax: f64, - ay: f64, - az: f64, - pitch: f64, - roll: f64, - yaw: f64, - magx: f64, - magy: f64, - magz: f64, -} - -pub fn generate_imu_data(device: &DeviceData, sequence: u32) -> Payload { - let payload = Imu { - ax: generate_float(1f64, 2.8f64), - ay: generate_float(1f64, 2.8f64), - az: generate_float(9.79f64, 9.82f64), - pitch: generate_float(0.8f64, 1f64), - roll: generate_float(0.8f64, 1f64), - yaw: generate_float(0.8f64, 1f64), - magx: generate_float(-45f64, -15f64), - magy: generate_float(-45f64, -15f64), - magz: generate_float(-45f64, -15f64), - }; - - Payload { - timestamp: clock() as u64, - device_id: Some(device.device_id.to_owned()), - sequence, - stream: "imu".to_string(), - payload: json!(payload), - } -} - -#[derive(Debug, Serialize)] -struct Motor { - motor_temperature1: f64, - motor_temperature2: f64, - motor_temperature3: f64, - motor_voltage: f64, - motor_current: f64, - motor_rpm: i64, -} - -pub fn generate_motor_data(device: &DeviceData, sequence: u32) -> Payload { - let payload = Motor { - motor_temperature1: generate_float(40f64, 45f64), - motor_temperature2: generate_float(40f64, 45f64), - motor_temperature3: generate_float(40f64, 45f64), - - motor_voltage: generate_float(95f64, 96f64), - motor_current: generate_float(20f64, 25f64), - motor_rpm: generate_int(1000, 9000), - }; - - Payload { - timestamp: clock() as u64, - device_id: Some(device.device_id.to_owned()), - sequence, - stream: "motor".to_string(), - payload: json!(payload), - } -} - -#[derive(Debug, Serialize)] -struct PeripheralState { - gps_status: String, - gsm_status: String, - imu_status: String, - left_indicator: String, - right_indicator: String, - headlamp: String, - horn: String, - left_brake: String, - right_brake: String, -} - -pub fn generate_peripheral_state_data(device: &DeviceData, sequence: u32) -> Payload { - let payload = PeripheralState { - gps_status: generate_bool_string(0.99), - gsm_status: generate_bool_string(0.99), - imu_status: generate_bool_string(0.99), - left_indicator: generate_bool_string(0.1), - right_indicator: generate_bool_string(0.1), - headlamp: generate_bool_string(0.9), - horn: generate_bool_string(0.05), - left_brake: generate_bool_string(0.1), - right_brake: generate_bool_string(0.1), - }; - - Payload { - timestamp: clock() as u64, - device_id: Some(device.device_id.to_owned()), - sequence, - stream: "peripheral_state".to_string(), - payload: json!(payload), - } -} - -#[derive(Debug, Serialize)] -struct DeviceShadow { - mode: String, - status: String, - firmware_version: String, - config_version: String, - distance_travelled: i64, - range: i64, - #[serde(rename(serialize = "SOC"))] - soc: f64, -} - -pub fn generate_device_shadow_data(device: &DeviceData, sequence: u32) -> Payload { - let payload = DeviceShadow { - mode: "economy".to_owned(), - status: "Locked".to_owned(), - firmware_version: "1.33-Aug-2020b1".to_owned(), - config_version: "1.23".to_owned(), - distance_travelled: generate_int(20000, 30000), - range: generate_int(50000, 60000), - soc: generate_float(50f64, 90f64), - }; - - Payload { - timestamp: clock() as u64, - device_id: Some(device.device_id.to_owned()), - sequence, - stream: "device_shadow".to_string(), - payload: json!(payload), - } -} - -pub fn read_gps_paths(paths_dir: &str) -> Vec>> { - (0..10) - .map(|i| { - let file_name = format!("{paths_dir}/path{i}.json"); - - let contents = fs::read_to_string(file_name).expect("Oops, failed ot read path"); - - let parsed: Vec = serde_json::from_str(&contents).unwrap(); - - Arc::new(parsed) - }) - .collect::>() -} - -pub fn new_device_data(device_id: String, paths: &[Arc>]) -> DeviceData { - let mut rng = rand::thread_rng(); - - let n = rng.gen_range(0..10); - let path = paths.get(n).unwrap().clone(); - let path_index = rng.gen_range(0..path.len()) as u32; - - DeviceData { - device_id, - path, - path_offset: path_index, - time_offset: Duration::from_millis(rng.gen_range(0..10000)), - } -} - -pub fn generate_initial_events( - events: &mut BinaryHeap, - timestamp: Instant, - devices: &[DeviceData], -) { - for device in devices.iter() { - let timestamp = timestamp + device.time_offset; - - events.push(Event::DataEvent(DataEvent { - event_type: DataEventType::GenerateGPS, - device: device.clone(), - timestamp, - sequence: 1, - })); - - events.push(Event::DataEvent(DataEvent { - event_type: DataEventType::GenerateVehicleData, - device: device.clone(), - timestamp, - sequence: 1, - })); - - events.push(Event::DataEvent(DataEvent { - event_type: DataEventType::GeneratePeripheralData, - device: device.clone(), - timestamp, - sequence: 1, - })); - - events.push(Event::DataEvent(DataEvent { - event_type: DataEventType::GenerateMotor, - device: device.clone(), - timestamp, - sequence: 1, - })); - - events.push(Event::DataEvent(DataEvent { - event_type: DataEventType::GenerateBMS, - device: device.clone(), - timestamp, - sequence: 1, - })); - - events.push(Event::DataEvent(DataEvent { - event_type: DataEventType::GenerateIMU, - device: device.clone(), - timestamp, - sequence: 1, - })); - } -} - -pub fn next_event_duration(event_type: DataEventType) -> Duration { - match event_type { - DataEventType::GenerateGPS => Duration::from_millis(1000), - DataEventType::GenerateIMU => Duration::from_millis(100), - DataEventType::GenerateVehicleData => Duration::from_millis(1000), - DataEventType::GeneratePeripheralData => Duration::from_millis(1000), - DataEventType::GenerateMotor => Duration::from_millis(250), - DataEventType::GenerateBMS => Duration::from_millis(250), - } -} - -pub async fn process_data_event( - event: &DataEvent, - events: &mut BinaryHeap, - bridge_tx: &BridgeTx, -) { - let data = match event.event_type { - DataEventType::GenerateGPS => generate_gps_data(&event.device, event.sequence), - DataEventType::GenerateIMU => generate_imu_data(&event.device, event.sequence), - DataEventType::GenerateVehicleData => { - generate_device_shadow_data(&event.device, event.sequence) - } - DataEventType::GeneratePeripheralData => { - generate_peripheral_state_data(&event.device, event.sequence) - } - DataEventType::GenerateMotor => generate_motor_data(&event.device, event.sequence), - DataEventType::GenerateBMS => generate_bms_data(&event.device, event.sequence), - }; - - bridge_tx.send_payload(data).await; - - let duration = next_event_duration(event.event_type); - - events.push(Event::DataEvent(DataEvent { - sequence: event.sequence + 1, - timestamp: event.timestamp + duration, - device: event.device.clone(), - event_type: event.event_type, - })); -} - -async fn process_action_response_event(event: &ActionResponseEvent, bridge_tx: &BridgeTx) { - //info!("Sending action response {:?} {} {} {}", event.device_id, event.action_id, event.progress, event.status); - let mut response = ActionResponse::progress(&event.action_id, &event.status, event.progress); - response.device_id = Some(event.device_id.to_owned()); - - info!( - "Sending action response {:?} {} {} {}", - event.device_id, event.action_id, event.progress, event.status - ); - - bridge_tx.send_action_response(response).await; - info!("Successfully sent action response"); -} - -pub async fn process_events(events: &mut BinaryHeap, bridge_tx: &BridgeTx) { - if let Some(e) = events.pop() { - let current_time = Instant::now(); - let timestamp = event_timestamp(&e); - - if timestamp > current_time { - let time_left = timestamp.duration_since(current_time); - - if time_left > Duration::from_millis(50) { - tokio::time::sleep(time_left).await; - } - } - - match e { - Event::DataEvent(event) => { - process_data_event(&event, events, bridge_tx).await; - } - - Event::ActionResponseEvent(event) => { - process_action_response_event(&event, bridge_tx).await; - } - } - } else { - tokio::time::sleep(Duration::from_millis(100)).await; - } -} - -pub fn generate_action_events(action: Action, events: &mut BinaryHeap) { - let action_id = action.action_id; - let device_id = action.device_id.unwrap(); - - info!("Generating action events for action: {action_id} on device: {device_id}"); - let now = Instant::now() + Duration::from_millis(rand::thread_rng().gen_range(0..5000)); - - for i in 1..100 { - events.push(Event::ActionResponseEvent(ActionResponseEvent { - action_id: action_id.clone(), - device_id: device_id.clone(), - progress: i, - status: String::from("in_progress"), - timestamp: now + Duration::from_secs(i as u64), - })); - } - - events.push(Event::ActionResponseEvent(ActionResponseEvent { - action_id, - device_id, - progress: 100, - status: String::from("Completed"), - timestamp: now + Duration::from_secs(100), - })); -} - -#[tokio::main(flavor = "current_thread")] -pub async fn start(bridge_tx: BridgeTx, simulator_config: &SimulatorConfig) -> Result<(), Error> { - let paths = read_gps_paths(&simulator_config.gps_paths); - let num_devices = simulator_config.num_devices; - let actions_rx = bridge_tx.register_action_routes(&simulator_config.actions).await; - - let devices = - (1..(num_devices + 1)).map(|i| new_device_data(i.to_string(), &paths)).collect::>(); - - let mut events = BinaryHeap::new(); - - generate_initial_events(&mut events, Instant::now(), &devices); - let mut time = Instant::now(); - let mut i = 0; - - loop { - let current_time = Instant::now(); - if time.elapsed() > Duration::from_secs(1) { - if let Some(event) = events.peek() { - let timestamp = event_timestamp(event); - - if current_time > timestamp { - trace!("Time delta {:?} {:?} {:?}", num_devices, current_time - timestamp, i); - } else { - trace!("Time delta {:?} -{:?} {:?}", num_devices, timestamp - current_time, i); - } - - i += 1; - } - time = Instant::now(); - } - - if let Some(actions_rx) = &actions_rx { - select! { - action = actions_rx.recv_async() => { - let action = action?; - generate_action_events(action, &mut events); - } - _ = process_events(&mut events, &bridge_tx) => { - } - } - } else { - loop { - process_events(&mut events, &bridge_tx).await - } - } - } -} diff --git a/uplink/src/collector/simulator/data.rs b/uplink/src/collector/simulator/data.rs new file mode 100644 index 00000000..13a1e73e --- /dev/null +++ b/uplink/src/collector/simulator/data.rs @@ -0,0 +1,415 @@ +use fake::{Dummy, Fake, Faker}; +use flume::Sender; +use log::{error, trace}; +use rand::seq::SliceRandom; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tokio::time::interval; + +use std::sync::Arc; +use std::time::Duration; + +use crate::base::clock; +use crate::Payload; + +const RESET_LIMIT: u32 = 1500; + +#[inline] +fn next_sequence(sequence: &mut u32) { + if *sequence > RESET_LIMIT { + *sequence = 1 + } else { + *sequence += 1 + }; +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum DataType { + Gps, + Imu, + DeviceShadow, + PeripheralData, + Motor, + Bms, +} + +impl DataType { + fn duration(&self) -> Duration { + match self { + DataType::Gps => Duration::from_millis(1000), + DataType::Imu => Duration::from_millis(100), + DataType::DeviceShadow => Duration::from_millis(1000), + DataType::PeripheralData => Duration::from_millis(1000), + DataType::Motor => Duration::from_millis(250), + DataType::Bms => Duration::from_millis(250), + } + } +} + +#[derive(Clone, PartialEq)] +pub struct DeviceData { + pub path: Arc>, + pub path_offset: u32, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct Gps { + latitude: f64, + longitude: f64, +} + +impl Gps { + pub async fn simulate(tx: Sender, device: DeviceData) { + let mut sequence = 0; + let mut interval = interval(DataType::Gps.duration()); + let path_len = device.path.len() as u32; + + loop { + interval.tick().await; + next_sequence(&mut sequence); + let path_index = ((device.path_offset + sequence) % path_len) as usize; + let payload = device.path.get(path_index).unwrap(); + + trace!("Data Event: {:?}", payload); + + if let Err(e) = tx + .send_async(Payload { + device_id: None, + timestamp: clock() as u64, + stream: "gps".to_string(), + sequence, + payload: json!(payload), + }) + .await + { + error!("{e}"); + break; + } + } + } +} + +struct BoolString; + +impl Dummy for String { + fn dummy_with_rng(_: &BoolString, rng: &mut R) -> String { + const NAMES: &[&str] = &["on", "off"]; + NAMES.choose(rng).unwrap().to_string() + } +} + +struct VerString; + +impl Dummy for String { + fn dummy_with_rng(_: &VerString, rng: &mut R) -> String { + const NAMES: &[&str] = &["v0.0.1", "v0.5.0", "v1.0.1"]; + NAMES.choose(rng).unwrap().to_string() + } +} + +#[derive(Debug, Serialize, Dummy)] +pub struct Bms { + #[dummy(faker = "250")] + periodicity_ms: i32, + #[dummy(faker = "40.0 .. 45.0")] + mosfet_temperature: f64, + #[dummy(faker = "35.0 .. 40.0")] + ambient_temperature: f64, + #[dummy(faker = "1")] + mosfet_status: i32, + #[dummy(faker = "16")] + cell_voltage_count: i32, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_1: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_2: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_3: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_4: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_5: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_6: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_7: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_8: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_9: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_10: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_11: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_12: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_13: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_14: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_15: f64, + #[dummy(faker = "3.0 .. 3.2")] + cell_voltage_16: f64, + #[dummy(faker = "8")] + cell_thermistor_count: i32, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_1: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_2: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_3: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_4: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_5: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_6: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_7: f64, + #[dummy(faker = "40.0 .. 43.0")] + cell_temp_8: f64, + #[dummy(faker = "1")] + cell_balancing_status: i32, + #[dummy(faker = "95.0 .. 96.0")] + pack_voltage: f64, + #[dummy(faker = "15.0 .. 20.0")] + pack_current: f64, + #[dummy(faker = "80.0 .. 90.0")] + pack_soc: f64, + #[dummy(faker = "9.5 .. 9.9")] + pack_soh: f64, + #[dummy(faker = "9.5 .. 9.9")] + pack_sop: f64, + #[dummy(faker = "100 .. 150")] + pack_cycle_count: i64, + #[dummy(faker = "2000 .. 3000")] + pack_available_energy: i64, + #[dummy(faker = "2000 .. 3000")] + pack_consumed_energy: i64, + #[dummy(faker = "0")] + pack_fault: i32, + #[dummy(faker = "1")] + pack_status: i32, +} + +impl Bms { + pub async fn simulate(tx: Sender) { + let mut sequence = 0; + let mut interval = interval(DataType::Bms.duration()); + loop { + interval.tick().await; + let payload: Bms = Faker.fake(); + next_sequence(&mut sequence); + + trace!("Data Event: {:?}", payload); + + if let Err(e) = tx + .send_async(Payload { + device_id: None, + timestamp: clock() as u64, + stream: "bms".to_string(), + sequence, + payload: json!(payload), + }) + .await + { + error!("{e}"); + break; + } + } + } +} + +#[derive(Debug, Serialize, Dummy)] +pub struct Imu { + #[dummy(faker = "1.0 .. 2.8")] + ax: f64, + #[dummy(faker = "1.0 .. 2.8")] + ay: f64, + #[dummy(faker = "9.79 .. 9.82")] + az: f64, + #[dummy(faker = "0.8 .. 1.0")] + pitch: f64, + #[dummy(faker = "0.8 .. 1.0")] + roll: f64, + #[dummy(faker = "0.8 .. 1.0")] + yaw: f64, + #[dummy(faker = "-45.0 .. -15.0")] + magx: f64, + #[dummy(faker = "-45.0 .. -15.0")] + magy: f64, + #[dummy(faker = "-45.0 .. -15.0")] + magz: f64, +} + +impl Imu { + pub async fn simulate(tx: Sender) { + let mut sequence = 0; + let mut interval = interval(DataType::Imu.duration()); + loop { + interval.tick().await; + let payload: Imu = Faker.fake(); + next_sequence(&mut sequence); + + trace!("Data Event: {:?}", payload); + + if let Err(e) = tx + .send_async(Payload { + device_id: None, + timestamp: clock() as u64, + stream: "imu".to_string(), + sequence, + payload: json!(payload), + }) + .await + { + error!("{e}"); + break; + } + } + } +} + +#[derive(Debug, Serialize, Dummy)] +pub struct Motor { + #[dummy(faker = "40.0 .. 45.0")] + motor_temperature1: f64, + #[dummy(faker = "40.0 .. 45.0")] + motor_temperature2: f64, + #[dummy(faker = "40.0 .. 45.0")] + motor_temperature3: f64, + #[dummy(faker = "95.0 .. 96.0")] + motor_voltage: f64, + #[dummy(faker = "20.0 .. 25.0")] + motor_current: f64, + #[dummy(faker = "1000 .. 9000")] + motor_rpm: i64, +} + +impl Motor { + pub async fn simulate(tx: Sender) { + let mut sequence = 0; + let mut interval = interval(DataType::Motor.duration()); + loop { + interval.tick().await; + let payload: Motor = Faker.fake(); + next_sequence(&mut sequence); + + trace!("Data Event: {:?}", payload); + + if let Err(e) = tx + .send_async(Payload { + device_id: None, + timestamp: clock() as u64, + stream: "motor".to_string(), + sequence, + payload: json!(payload), + }) + .await + { + error!("{e}"); + break; + } + } + } +} + +#[derive(Debug, Serialize, Dummy)] +pub struct PeripheralState { + #[dummy(faker = "BoolString")] + gps_status: String, + #[dummy(faker = "BoolString")] + gsm_status: String, + #[dummy(faker = "BoolString")] + imu_status: String, + #[dummy(faker = "BoolString")] + left_indicator: String, + #[dummy(faker = "BoolString")] + right_indicator: String, + #[dummy(faker = "BoolString")] + headlamp: String, + #[dummy(faker = "BoolString")] + horn: String, + #[dummy(faker = "BoolString")] + left_brake: String, + #[dummy(faker = "BoolString")] + right_brake: String, +} + +impl PeripheralState { + pub async fn simulate(tx: Sender) { + let mut sequence = 0; + let mut interval = interval(DataType::PeripheralData.duration()); + loop { + interval.tick().await; + let payload: PeripheralState = Faker.fake(); + next_sequence(&mut sequence); + + trace!("Data Event: {:?}", payload); + + if let Err(e) = tx + .send_async(Payload { + device_id: None, + timestamp: clock() as u64, + stream: "peripheral_state".to_string(), + sequence, + payload: json!(payload), + }) + .await + { + error!("{e}"); + break; + } + } + } +} + +#[derive(Debug, Serialize, Dummy)] +pub struct DeviceShadow { + #[dummy(faker = "BoolString")] + mode: String, + #[dummy(faker = "BoolString")] + status: String, + #[dummy(faker = "VerString")] + firmware_version: String, + #[dummy(faker = "VerString")] + config_version: String, + #[dummy(faker = "20000..30000")] + distance_travelled: i64, + #[dummy(faker = "50000..60000")] + range: i64, + #[serde(rename(serialize = "SOC"))] + #[dummy(faker = "50.0..90.0")] + soc: f64, +} + +impl DeviceShadow { + pub async fn simulate(tx: Sender) { + let mut sequence = 0; + let mut interval = interval(DataType::DeviceShadow.duration()); + loop { + interval.tick().await; + let payload: DeviceShadow = Faker.fake(); + next_sequence(&mut sequence); + + trace!("Data Event: {:?}", payload); + + if let Err(e) = tx + .send_async(Payload { + device_id: None, + timestamp: clock() as u64, + stream: "device_shadow".to_string(), + sequence, + payload: json!(payload), + }) + .await + { + error!("{e}"); + + break; + } + } + } +} diff --git a/uplink/src/collector/simulator/mod.rs b/uplink/src/collector/simulator/mod.rs new file mode 100644 index 00000000..7866ece2 --- /dev/null +++ b/uplink/src/collector/simulator/mod.rs @@ -0,0 +1,150 @@ +use crate::base::bridge::{BridgeTx, Payload}; +use crate::base::{clock, SimulatorConfig}; +use crate::Action; +use data::{Bms, DeviceData, DeviceShadow, Gps, Imu, Motor, PeripheralState}; +use flume::{bounded, Sender}; +use log::{error, info}; +use rand::Rng; +use serde::Serialize; +use serde_json::json; +use thiserror::Error; +use tokio::time::interval; +use tokio::{select, spawn}; + +use std::time::Duration; +use std::{fs, io, sync::Arc}; + +mod data; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Io error {0}")] + Io(#[from] io::Error), + #[error("Recv error {0}")] + Recv(#[from] flume::RecvError), + #[error("Serde error {0}")] + Json(#[from] serde_json::error::Error), +} + +#[derive(Serialize)] +pub struct ActionResponse { + action_id: String, + state: String, + progress: u8, + errors: Vec, +} + +impl ActionResponse { + pub async fn simulate(action: Action, tx: Sender) { + let action_id = action.action_id; + info!("Generating action events for action: {action_id}"); + let mut sequence = 0; + let mut interval = interval(Duration::from_secs(1)); + + // Action response, 10% completion per second + for i in 1..10 { + let response = ActionResponse { + action_id: action_id.clone(), + progress: i * 10 + rand::thread_rng().gen_range(0..10), + state: String::from("in_progress"), + errors: vec![], + }; + sequence += 1; + if let Err(e) = tx + .send_async(Payload { + stream: "action_status".to_string(), + sequence, + payload: json!(response), + device_id: None, + timestamp: clock() as u64, + }) + .await + { + error!("{e}"); + break; + } + + interval.tick().await; + } + + let response = ActionResponse { + action_id, + progress: 100, + state: String::from("Completed"), + errors: vec![], + }; + sequence += 1; + if let Err(e) = tx + .send_async(Payload { + stream: "action_status".to_string(), + sequence, + payload: json!(response), + device_id: None, + timestamp: clock() as u64, + }) + .await + { + error!("{e}"); + } + info!("Successfully sent all action responses"); + } +} + +pub fn read_gps_path(paths_dir: &str) -> Arc> { + let i = rand::thread_rng().gen_range(0..10); + let file_name: String = format!("{}/path{}.json", paths_dir, i); + + let contents = fs::read_to_string(file_name).expect("Oops, failed ot read path"); + + let parsed: Vec = serde_json::from_str(&contents).unwrap(); + + Arc::new(parsed) +} + +pub fn new_device_data(path: Arc>) -> DeviceData { + let mut rng = rand::thread_rng(); + + let path_index = rng.gen_range(0..path.len()) as u32; + + DeviceData { path, path_offset: path_index } +} + +pub fn spawn_data_simulators(device: DeviceData, tx: Sender) { + spawn(Gps::simulate(tx.clone(), device)); + spawn(Bms::simulate(tx.clone())); + spawn(Imu::simulate(tx.clone())); + spawn(Motor::simulate(tx.clone())); + spawn(PeripheralState::simulate(tx.clone())); + spawn(DeviceShadow::simulate(tx.clone())); +} + +#[tokio::main(flavor = "current_thread")] +pub async fn start(config: SimulatorConfig, bridge_tx: BridgeTx) -> Result<(), Error> { + let path = read_gps_path(&config.gps_paths); + let device = new_device_data(path); + + let actions_rx = bridge_tx.register_action_routes(&config.actions).await; + + let (tx, rx) = bounded(10); + spawn_data_simulators(device, tx.clone()); + + loop { + select! { + action = actions_rx.as_ref().unwrap().recv_async(), if actions_rx.is_some() => { + let action = action?; + spawn(ActionResponse::simulate(action, tx.clone())); + } + p = rx.recv_async() => { + let payload = match p { + Ok(p) => p, + Err(_) => { + error!("All generators have stopped!"); + return Ok(()) + } + }; + + bridge_tx.send_payload(payload).await; + } + } + } +} diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index f4775baa..a0cfde08 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -222,15 +222,6 @@ pub mod config { replace_topic_placeholders(&mut device_action_topic, tenant_id, device_id); config.actions_subscription = device_action_topic; - // Add topics to be subscribed to for simulation purposes, if in simulator mode - if let Some(sim_cfg) = &mut config.simulator { - for n in 1..=sim_cfg.num_devices { - let mut topic = action_topic_template.to_string(); - replace_topic_placeholders(&mut topic, tenant_id, &n.to_string()); - sim_cfg.actions_subscriptions.push(topic); - } - } - Ok(config) } diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 56576cd2..a9386efb 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -123,7 +123,7 @@ fn main() -> Result<(), Error> { if let Some(config) = config.simulator.clone() { let bridge = bridge.clone(); thread::spawn(move || { - simulator::start(bridge, &config).unwrap(); + simulator::start(config, bridge).unwrap(); }); } From 1ba2320c84f19a204455b9590e9d216022377358 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 23 Aug 2023 10:05:04 +0530 Subject: [PATCH 2/4] refactor: simulator.sh works with built-in-sim --- simulator.sh | 31 ++++++++----------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/simulator.sh b/simulator.sh index d7131682..8b2fad37 100755 --- a/simulator.sh +++ b/simulator.sh @@ -6,27 +6,21 @@ start_devices() { kill_devices; mkdir -p devices - echo "Starting uplink and simulator" + echo "Starting uplink with simulator" devices=($(seq $start $stop)) first=${devices[0]} rest=${devices[@]:1} - printf -v port "5%04d" $first download_auth_config $first - create_uplink_config $first $port + create_uplink_config $first start_uplink 1 $first "-vv" "devices/uplink_$first.log" - sleep 1 - start_simulator 1 $first $port "-vv" "devices/simulator_$first.log" - for id in $rest do - printf -v port "5%04d" $id + sleep 1 + download_auth_config $id - create_uplink_config $id $port + create_uplink_config $id start_uplink 0 $id - - sleep 1 - start_simulator 0 $id $port done echo DONE @@ -39,18 +33,17 @@ start_devices() { create_uplink_config() { id=${1:?"Missing id"} - port=${2:?"Missing port number"} printf "$(cat << EOF processes = [] action_redirections = { send_file = \"load_file\", update_firmware = \"install_firmware\" } +persistence_path = \"/var/tmp/persistence/$id\" [persistence] -path = \"/var/tmp/persistence/$id\" max_file_size = 104857600 max_file_count = 3 -[tcpapps.1] -port = $port +[simulator] +gps_paths = "./paths" actions= [{ name = \"load_file\" }, { name = \"install_firmware\" }, { name = \"update_config\" }, { name = \"unlock\" }, { name = \"lock\" }] [downloader] @@ -88,14 +81,6 @@ start_uplink() { echo $! >> "devices/$2.pid" } -start_simulator() { - id=${2:?"Missing id"} - port=${3:?"Missing port number"} - cmd="simulator -p $port -g ./paths" - run $1 "$cmd" "$4" "$5" - # simulator runs only as long as associated uplink instance runs and need not be tracked -} - kill_devices() { echo "Killing all devices in pids file" for file in $(find ./devices -type f -name "*.pid") From 7ba5eb2b8efd3f08098d2ebcf2e223a5380b39ab Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 23 Aug 2023 10:20:39 +0530 Subject: [PATCH 3/4] refactor: remove code supporting multi-device sim --- uplink/src/base/actions.rs | 6 ----- uplink/src/base/bridge/mod.rs | 12 ---------- uplink/src/base/bridge/streams.rs | 22 ++++++++---------- uplink/src/base/mqtt/mod.rs | 31 +++++--------------------- uplink/src/base/serializer/mod.rs | 1 - uplink/src/collector/device_shadow.rs | 1 - uplink/src/collector/downloader.rs | 2 -- uplink/src/collector/journalctl.rs | 2 +- uplink/src/collector/script_runner.rs | 24 +++++++++++++++----- uplink/src/collector/simulator/data.rs | 6 ----- uplink/src/collector/simulator/mod.rs | 2 -- uplink/src/collector/systemstats.rs | 6 ----- 12 files changed, 34 insertions(+), 81 deletions(-) diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 32e14f31..46bf6c79 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -10,8 +10,6 @@ use super::clock; /// said device, in this case, uplink. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Action { - #[serde(skip)] - pub device_id: Option, // action id #[serde(alias = "id")] pub action_id: String, @@ -27,8 +25,6 @@ pub struct Action { pub struct ActionResponse { #[serde(alias = "id")] pub action_id: String, - #[serde(skip)] - pub device_id: Option, // sequence number pub sequence: u32, // timestamp @@ -49,7 +45,6 @@ impl ActionResponse { ActionResponse { action_id: id.to_owned(), - device_id: None, sequence: 0, timestamp, state: state.to_owned(), @@ -113,7 +108,6 @@ impl From<&ActionResponse> for Payload { fn from(resp: &ActionResponse) -> Self { Self { stream: "action_status".to_owned(), - device_id: resp.device_id.to_owned(), sequence: resp.sequence, timestamp: resp.timestamp, payload: json!({ diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index 73096b25..8be65cad 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -68,8 +68,6 @@ pub trait Package: Send + Debug { pub struct Payload { #[serde(skip_serializing)] pub stream: String, - #[serde(skip)] - pub device_id: Option, pub sequence: u32, pub timestamp: u64, #[serde(flatten)] @@ -608,7 +606,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action_1 = Action { - device_id: None, action_id: "1".to_string(), kind: "test".to_string(), name: "route_1".to_string(), @@ -630,7 +627,6 @@ mod tests { assert_eq!(elapsed / 1000, 10); let action_2 = Action { - device_id: None, action_id: "2".to_string(), kind: "test".to_string(), name: "route_2".to_string(), @@ -668,7 +664,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action_1 = Action { - device_id: None, action_id: "1".to_string(), kind: "test".to_string(), name: "test".to_string(), @@ -681,7 +676,6 @@ mod tests { assert_eq!(status.state, "Received".to_owned()); let action_2 = Action { - device_id: None, action_id: "2".to_string(), kind: "test".to_string(), name: "test".to_string(), @@ -715,7 +709,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action = Action { - device_id: None, action_id: "1".to_string(), kind: "test".to_string(), name: "test".to_string(), @@ -768,7 +761,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action = Action { - device_id: None, action_id: "1".to_string(), kind: "test".to_string(), name: "test".to_string(), @@ -826,7 +818,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action = Action { - device_id: None, action_id: "1".to_string(), kind: "tunshell".to_string(), name: "launch_shell".to_string(), @@ -837,7 +828,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action = Action { - device_id: None, action_id: "2".to_string(), kind: "test".to_string(), name: "test".to_string(), @@ -905,7 +895,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action = Action { - device_id: None, action_id: "1".to_string(), kind: "test".to_string(), name: "test".to_string(), @@ -916,7 +905,6 @@ mod tests { std::thread::sleep(Duration::from_secs(1)); let action = Action { - device_id: None, action_id: "2".to_string(), kind: "tunshell".to_string(), name: "launch_shell".to_string(), diff --git a/uplink/src/base/bridge/streams.rs b/uplink/src/base/bridge/streams.rs index bfdeea81..309fd11d 100644 --- a/uplink/src/base/bridge/streams.rs +++ b/uplink/src/base/bridge/streams.rs @@ -47,29 +47,25 @@ impl Streams { } pub async fn forward(&mut self, data: Payload) { - let stream_name = &data.stream; - let (stream_id, device_id) = match &data.device_id { - Some(device_id) => (stream_name.to_owned() + "/" + device_id, device_id.to_owned()), - _ => (stream_name.to_owned(), self.config.device_id.to_owned()), - }; + let stream_name = data.stream.to_owned(); - let stream = match self.map.get_mut(&stream_id) { + let stream = match self.map.get_mut(&stream_name) { Some(partition) => partition, None => { if self.config.simulator.is_none() && self.map.keys().len() > 20 { - error!("Failed to create {:?} stream. More than max 20 streams", stream_id); + error!("Failed to create {:?} stream. More than max 20 streams", stream_name); return; } let stream = Stream::dynamic( - stream_name, + &stream_name, &self.config.project_id, - &device_id, + &self.config.device_id, MAX_BUFFER_SIZE, self.data_tx.clone(), ); - self.map.entry(stream_id.to_owned()).or_insert(stream) + self.map.entry(stream_name.to_owned()).or_insert(stream) } }; @@ -87,10 +83,10 @@ impl Streams { // Warn in case stream flushed stream was not in the queue. if max_stream_size > 1 { match state { - StreamStatus::Flushed => self.stream_timeouts.remove(&stream_id), + StreamStatus::Flushed => self.stream_timeouts.remove(&stream_name), StreamStatus::Init(flush_period) => { - trace!("Initialized stream buffer for {stream_id}"); - self.stream_timeouts.insert(&stream_id, flush_period); + trace!("Initialized stream buffer for {stream_name}"); + self.stream_timeouts.insert(&stream_name, flush_period); } StreamStatus::Partial(_l) => {} } diff --git a/uplink/src/base/mqtt/mod.rs b/uplink/src/base/mqtt/mod.rs index c7dbeb05..07026aad 100644 --- a/uplink/src/base/mqtt/mod.rs +++ b/uplink/src/base/mqtt/mod.rs @@ -43,8 +43,6 @@ pub struct Mqtt { eventloop: EventLoop, /// Handles to channels between threads native_actions_tx: Sender, - /// List of currently subscribed topics - actions_subscriptions: Vec, /// Metrics metrics: MqttMetrics, /// Metrics tx @@ -61,14 +59,12 @@ impl Mqtt { let options = mqttoptions(&config); let (client, mut eventloop) = AsyncClient::new(options, 10); eventloop.network_options.set_connection_timeout(config.mqtt.network_timeout); - let actions_subscriptions = vec![config.actions_subscription.clone()]; Mqtt { config, client, eventloop, native_actions_tx: actions_tx, - actions_subscriptions, metrics: MqttMetrics::new(), metrics_tx, } @@ -85,7 +81,7 @@ impl Mqtt { match self.eventloop.poll().await { Ok(Event::Incoming(Incoming::ConnAck(connack))) => { info!("Connected to broker. Session present = {}", connack.session_present); - let subscriptions = self.actions_subscriptions.clone(); + let subscription = self.config.actions_subscription.clone(); let client = self.client(); self.metrics.add_connection(); @@ -93,11 +89,9 @@ impl Mqtt { // This can potentially block when client from other threads // have already filled the channel due to bad network. So we spawn task::spawn(async move { - for subscription in subscriptions { - match client.subscribe(&subscription, QoS::AtLeastOnce).await { - Ok(..) => info!("Subscribe -> {:?}", subscription), - Err(e) => error!("Failed to send subscription. Error = {:?}", e), - } + match client.subscribe(&subscription, QoS::AtLeastOnce).await { + Ok(..) => info!("Subscribe -> {:?}", subscription), + Err(e) => error!("Failed to send subscription. Error = {:?}", e), } }); } @@ -143,25 +137,12 @@ impl Mqtt { } fn handle_incoming_publish(&mut self, publish: Publish) -> Result<(), Error> { - if !self.actions_subscriptions.contains(&publish.topic) { + if self.config.actions_subscription != publish.topic { error!("Unsolicited publish on {}", publish.topic); return Ok(()); } - let mut action: Action = serde_json::from_slice(&publish.payload)?; - - // Collect device_id information from publish topic for simulation purpose - if self.config.simulator.is_some() { - let tokens: Vec<&str> = publish.topic.split('/').collect(); - let mut tokens = tokens.iter(); - while let Some(token) = tokens.next() { - if token == &"devices" { - let device_id = tokens.next().unwrap().to_string(); - action.device_id = Some(device_id); - } - } - } - + let action: Action = serde_json::from_slice(&publish.payload)?; info!("Action = {:?}", action); self.native_actions_tx.try_send(action)?; diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 944ccd7c..1db1c32d 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -781,7 +781,6 @@ mod test { fn send(&mut self, i: u32) -> Result<(), Error> { let payload = Payload { stream: "hello".to_owned(), - device_id: None, sequence: i, timestamp: 0, payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}")?, diff --git a/uplink/src/collector/device_shadow.rs b/uplink/src/collector/device_shadow.rs index 6623fcb6..96d95818 100644 --- a/uplink/src/collector/device_shadow.rs +++ b/uplink/src/collector/device_shadow.rs @@ -38,7 +38,6 @@ impl DeviceShadow { Ok(Payload { stream: "device_shadow".to_owned(), - device_id: None, sequence: self.sequence, timestamp: clock() as u64, payload, diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 91717d04..7ce0877d 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -399,7 +399,6 @@ mod test { let mut expected_forward = download_update.clone(); expected_forward.download_path = Some(downloader_cfg.path + "/firmware_update/test.txt"); let download_action = Action { - device_id: None, action_id: "1".to_string(), kind: "firmware_update".to_string(), name: "firmware_update".to_string(), @@ -475,7 +474,6 @@ mod test { let mut expected_forward = download_update.clone(); expected_forward.download_path = Some(downloader_cfg.path + "/firmware_update/test.txt"); let download_action = Action { - device_id: None, action_id: "1".to_string(), kind: "firmware_update".to_string(), name: "firmware_update".to_string(), diff --git a/uplink/src/collector/journalctl.rs b/uplink/src/collector/journalctl.rs index b5ea6a26..c934576e 100644 --- a/uplink/src/collector/journalctl.rs +++ b/uplink/src/collector/journalctl.rs @@ -95,7 +95,7 @@ impl LogEntry { // Convert from microseconds to milliseconds let timestamp = self.log_timestamp.parse::()? / 1000; - Ok(Payload { stream: "logs".to_string(), device_id: None, sequence, timestamp, payload }) + Ok(Payload { stream: "logs".to_string(), sequence, timestamp, payload }) } } diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index f49e3975..5412eb79 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -177,10 +177,13 @@ mod tests { }) }); - let Event::RegisterActionRoute(_, ActionRouter{actions_tx,..}) = events_rx.recv().unwrap() else { unreachable!()}; + let Event::RegisterActionRoute(_, ActionRouter { actions_tx, .. }) = + events_rx.recv().unwrap() + else { + unreachable!() + }; actions_tx .send(Action { - device_id: None, action_id: "1".to_string(), kind: "1".to_string(), name: "test".to_string(), @@ -188,7 +191,10 @@ mod tests { }) .unwrap(); - let Event::ActionResponse(ActionResponse{state, errors,..}) = events_rx.recv().unwrap() else { unreachable!()}; + let Event::ActionResponse(ActionResponse { state, errors, .. }) = events_rx.recv().unwrap() + else { + unreachable!() + }; assert_eq!(state, "Failed"); assert_eq!(errors, ["Failed to deserialize action payload: \"EOF while parsing a value at line 1 column 0\"; payload: \"\""]); } @@ -206,10 +212,13 @@ mod tests { }) }); - let Event::RegisterActionRoute(_, ActionRouter{actions_tx,..}) = events_rx.recv().unwrap() else { unreachable!()}; + let Event::RegisterActionRoute(_, ActionRouter { actions_tx, .. }) = + events_rx.recv().unwrap() + else { + unreachable!() + }; actions_tx .send(Action { - device_id: None, action_id: "1".to_string(), kind: "1".to_string(), name: "test".to_string(), @@ -218,7 +227,10 @@ mod tests { }) .unwrap(); - let Event::ActionResponse(ActionResponse{state, errors,..}) = events_rx.recv().unwrap() else { unreachable!()}; + let Event::ActionResponse(ActionResponse { state, errors, .. }) = events_rx.recv().unwrap() + else { + unreachable!() + }; assert_eq!(state, "Failed"); assert_eq!(errors, ["Action payload doesn't contain path for script execution; payload: \"{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}\""]); } diff --git a/uplink/src/collector/simulator/data.rs b/uplink/src/collector/simulator/data.rs index 13a1e73e..12527b54 100644 --- a/uplink/src/collector/simulator/data.rs +++ b/uplink/src/collector/simulator/data.rs @@ -75,7 +75,6 @@ impl Gps { if let Err(e) = tx .send_async(Payload { - device_id: None, timestamp: clock() as u64, stream: "gps".to_string(), sequence, @@ -207,7 +206,6 @@ impl Bms { if let Err(e) = tx .send_async(Payload { - device_id: None, timestamp: clock() as u64, stream: "bms".to_string(), sequence, @@ -257,7 +255,6 @@ impl Imu { if let Err(e) = tx .send_async(Payload { - device_id: None, timestamp: clock() as u64, stream: "imu".to_string(), sequence, @@ -301,7 +298,6 @@ impl Motor { if let Err(e) = tx .send_async(Payload { - device_id: None, timestamp: clock() as u64, stream: "motor".to_string(), sequence, @@ -351,7 +347,6 @@ impl PeripheralState { if let Err(e) = tx .send_async(Payload { - device_id: None, timestamp: clock() as u64, stream: "peripheral_state".to_string(), sequence, @@ -398,7 +393,6 @@ impl DeviceShadow { if let Err(e) = tx .send_async(Payload { - device_id: None, timestamp: clock() as u64, stream: "device_shadow".to_string(), sequence, diff --git a/uplink/src/collector/simulator/mod.rs b/uplink/src/collector/simulator/mod.rs index 7866ece2..eca338a2 100644 --- a/uplink/src/collector/simulator/mod.rs +++ b/uplink/src/collector/simulator/mod.rs @@ -55,7 +55,6 @@ impl ActionResponse { stream: "action_status".to_string(), sequence, payload: json!(response), - device_id: None, timestamp: clock() as u64, }) .await @@ -79,7 +78,6 @@ impl ActionResponse { stream: "action_status".to_string(), sequence, payload: json!(response), - device_id: None, timestamp: clock() as u64, }) .await diff --git a/uplink/src/collector/systemstats.rs b/uplink/src/collector/systemstats.rs index 9080f4bc..97b8838f 100644 --- a/uplink/src/collector/systemstats.rs +++ b/uplink/src/collector/systemstats.rs @@ -86,7 +86,6 @@ impl From<&System> for Payload { Payload { stream: "uplink_system_stats".to_owned(), - device_id: None, sequence: *sequence, timestamp: *timestamp, payload: json!({ @@ -156,7 +155,6 @@ impl From<&mut Network> for Payload { Payload { stream: "uplink_network_stats".to_owned(), - device_id: None, sequence: *sequence, timestamp: *timestamp, payload: json!({ @@ -218,7 +216,6 @@ impl From<&mut Disk> for Payload { Payload { stream: "uplink_disk_stats".to_owned(), - device_id: None, sequence: *sequence, timestamp: *timestamp, payload: json!({ @@ -276,7 +273,6 @@ impl From<&mut Processor> for Payload { Payload { stream: "uplink_processor_stats".to_owned(), - device_id: None, sequence: *sequence, timestamp: *timestamp, payload: json!({ @@ -330,7 +326,6 @@ impl From<&mut Component> for Payload { Payload { stream: "uplink_component_stats".to_owned(), - device_id: None, sequence: *sequence, timestamp: *timestamp, payload: json!({ @@ -409,7 +404,6 @@ impl From<&mut Process> for Payload { Payload { stream: "uplink_process_stats".to_owned(), - device_id: None, sequence: *sequence, timestamp: *timestamp, payload: json!({ From 0dae98966f9171dc3627d78aa8e688461f191d29 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 23 Aug 2023 10:39:37 +0530 Subject: [PATCH 4/4] fix: `logcat.rs` --- uplink/src/collector/logcat.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/uplink/src/collector/logcat.rs b/uplink/src/collector/logcat.rs index 8a52c192..5e3bd934 100644 --- a/uplink/src/collector/logcat.rs +++ b/uplink/src/collector/logcat.rs @@ -136,7 +136,6 @@ impl LogEntry { pub fn to_payload(&self, sequence: u32) -> anyhow::Result { Ok(Payload { stream: "logs".to_string(), - device_id: None, sequence, timestamp: self.log_timestamp, payload: serde_json::json!({