From 36cc423f70081886ae306c5b9fd9486b9d96bbeb Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 22 Apr 2021 15:16:33 +0200 Subject: [PATCH 01/17] Refactoring MQTT to support telemetry --- Cargo.lock | 6 +- Cargo.toml | 4 +- src/bin/dual-iir.rs | 22 +++--- src/bin/lockin-external.rs | 22 +++--- src/net/mod.rs | 150 ++++++++++++++++++++++++++++++------- 5 files changed, 147 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 970506d47..ff026ff82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,7 +203,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?branch=feature/mqtt-removal#6b6b9d7973ac5b4771c211c775c51f82d4a7727f" dependencies = [ "proc-macro2", "quote", @@ -416,11 +416,10 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?branch=feature/mqtt-removal#6b6b9d7973ac5b4771c211c775c51f82d4a7727f" dependencies = [ "derive_miniconf", "heapless 0.6.1", - "minimq", "serde", "serde-json-core", ] @@ -742,6 +741,7 @@ dependencies = [ "log", "mcp23017", "miniconf", + "minimq", "nb 1.0.0", "panic-semihosting", "paste", diff --git a/Cargo.toml b/Cargo.toml index f4db4d8ca..db89ef5dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,13 +56,13 @@ version = "0.9.0" [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" -rev = "314fa5587d" +branch = "feature/mqtt-removal" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" rev = "8468f11" -[patch.crates-io.minimq] +[dependencies.minimq] git = "https://github.com/quartiq/minimq.git" rev = "933687c2e4b" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index eaa82aed9..b275de0cd 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -13,7 +13,7 @@ use hardware::{ InputPin, AFE0, AFE1, }; -use net::{Action, MiniconfInterface}; +use net::{Action, MqttInterface}; const SCALE: f32 = i16::MAX as _; @@ -46,7 +46,7 @@ const APP: () = { digital_input1: DigitalInput1, adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, // Format: iir_state[ch][cascade-no][coeff] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] @@ -59,7 +59,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -86,7 +86,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - mqtt_config, + mqtt, digital_input1: stabilizer.digital_inputs.1, settings: Settings::default(), } @@ -143,14 +143,10 @@ const APP: () = { } } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -160,12 +156,12 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, afes, settings])] + #[task(priority = 1, resources=[mqtt, afes, settings])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); // Update the IIR channels. - c.resources.settings.lock(|current| *current = *settings); + c.resources.settings.lock(|current| *current = settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); diff --git a/src/bin/lockin-external.rs b/src/bin/lockin-external.rs index 082b0d23f..f9e9ae452 100644 --- a/src/bin/lockin-external.rs +++ b/src/bin/lockin-external.rs @@ -16,7 +16,7 @@ use stabilizer::hardware::{ }; use miniconf::Miniconf; -use stabilizer::net::{Action, MiniconfInterface}; +use stabilizer::net::{Action, MqttInterface}; #[derive(Copy, Clone, Debug, Deserialize, Miniconf)] enum Conf { @@ -60,7 +60,7 @@ const APP: () = { afes: (AFE0, AFE1), adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, settings: Settings, timestamper: InputStamper, @@ -73,7 +73,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -113,7 +113,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - mqtt_config, + mqtt, timestamper: stabilizer.timestamper, settings, @@ -195,14 +195,10 @@ const APP: () = { } } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -212,14 +208,14 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, settings, afes])] + #[task(priority = 1, resources=[mqtt, settings, afes])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); - c.resources.settings.lock(|current| *current = *settings); + c.resources.settings.lock(|current| *current = settings); } #[task(binds = ETH, priority = 1)] diff --git a/src/net/mod.rs b/src/net/mod.rs index 13b514cbd..e2398b8a9 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2,10 +2,10 @@ use crate::hardware::{ design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, }; -use core::fmt::Write; +use core::{cell::RefCell, fmt::Write}; -use heapless::{consts, String}; -use miniconf::minimq; +use heapless::{consts, String, Vec}; +use serde::Serialize; /// Potential actions for firmware to take. pub enum Action { @@ -17,19 +17,22 @@ pub enum Action { } /// MQTT settings interface. -pub struct MiniconfInterface +pub struct MqttInterface where - S: miniconf::Miniconf + Default, + S: miniconf::Miniconf + Default + Clone, { - pub mqtt: miniconf::MqttInterface, + telemetry_topic: String, + mqtt: RefCell>, + miniconf: RefCell>, clock: CycleCounter, phy: EthernetPhy, network_was_reset: bool, + subscribed: bool, } -impl MiniconfInterface +impl MqttInterface where - S: miniconf::Miniconf + Default, + S: miniconf::Miniconf + Default + Clone, { /// Construct a new MQTT settings interface. /// @@ -46,21 +49,23 @@ where phy: EthernetPhy, clock: CycleCounter, ) -> Self { - let mqtt = { - let mqtt_client = { - minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) - .unwrap() - }; - - miniconf::MqttInterface::new(mqtt_client, prefix, S::default()) - .unwrap() - }; + let mqtt_client = + minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) + .unwrap(); + let config = + miniconf::MiniconfInterface::new(prefix, S::default()).unwrap(); + + let mut telemetry_topic: String = String::new(); + write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); Self { - mqtt, + mqtt: RefCell::new(mqtt_client), + miniconf: RefCell::new(config), clock, phy, + telemetry_topic, network_was_reset: false, + subscribed: false, } } @@ -72,7 +77,7 @@ where let now = self.clock.current_ms(); // First, service the network stack to process and inbound and outbound traffic. - let sleep = match self.mqtt.network_stack().poll(now) { + let sleep = match self.mqtt.borrow_mut().network_stack.poll(now) { Ok(updated) => !updated, Err(err) => { log::info!("Network error: {:?}", err); @@ -87,19 +92,93 @@ where // sending an excessive number of DHCP requests. if !self.network_was_reset { self.network_was_reset = true; - self.mqtt.network_stack().handle_link_reset(); + self.mqtt.borrow_mut().network_stack.handle_link_reset(); } } else { self.network_was_reset = false; } - // Finally, service the MQTT interface and handle any necessary messages. - match self.mqtt.update() { - Ok(true) => Some(Action::UpdateSettings), - Ok(false) if sleep => Some(Action::Sleep), - Ok(_) => None, + // If we're no longer subscribed to the settings topic, but we are connected to the broker, + // resubscribe. + if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { + self.mqtt + .borrow_mut() + .subscribe( + self.miniconf.borrow_mut().get_listening_topic(), + &[], + ) + .unwrap(); + self.subscribed = true; + } - Err(miniconf::MqttError::Network( + let mut update = false; + + // Handle any MQTT traffic. + match self.mqtt.borrow_mut().poll( + |client, topic, message, properties| { + // Find correlation-data and response topics. + let correlation_data = properties.iter().find_map(|prop| { + if let minimq::Property::CorrelationData(data) = prop { + Some(*data) + } else { + None + } + }); + let response_topic = properties.iter().find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(*topic) + } else { + None + } + }); + + let incoming = miniconf::Message { + data: message, + correlation_data, + response_topic, + }; + + if let Some(response) = + self.miniconf.borrow_mut().process(topic, incoming) + { + let mut response_properties: Vec< + minimq::Property, + consts::U1, + > = Vec::new(); + if let Some(data) = response.correlation_data { + response_properties + .push(minimq::Property::CorrelationData(data)) + .unwrap(); + } + + // Make a best-effort attempt to send the response. + client + .publish( + response.topic, + &response.data.into_bytes(), + minimq::QoS::AtMostOnce, + &response_properties, + ) + .ok(); + update = true; + } + }, + ) { + // If settings updated, + Ok(_) => { + if update { + Some(Action::UpdateSettings) + } else if sleep { + Some(Action::Sleep) + } else { + None + } + } + Err(minimq::Error::Disconnected) => { + self.subscribed = false; + None + } + Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, )) => None, @@ -109,6 +188,25 @@ where } } } + + pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { + let telemetry = + miniconf::serde_json_core::to_string::(telemetry) + .unwrap(); + self.mqtt + .borrow_mut() + .publish( + &self.telemetry_topic, + telemetry.as_bytes(), + minimq::QoS::AtMostOnce, + &[], + ) + .ok(); + } + + pub fn settings(&self) -> S { + self.miniconf.borrow().settings.clone() + } } /// Get the MQTT prefix of a device. From f38e3b96084c1fadf3459fdd212cf33e5e7c899b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 22 Apr 2021 15:57:24 +0200 Subject: [PATCH 02/17] Simplifying MQTT handling --- Cargo.lock | 3 +-- Cargo.toml | 11 ++++++++--- src/net/mod.rs | 29 ++++------------------------- 3 files changed, 13 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff026ff82..13c7828f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,7 +203,6 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?branch=feature/mqtt-removal#6b6b9d7973ac5b4771c211c775c51f82d4a7727f" dependencies = [ "proc-macro2", "quote", @@ -416,10 +415,10 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?branch=feature/mqtt-removal#6b6b9d7973ac5b4771c211c775c51f82d4a7727f" dependencies = [ "derive_miniconf", "heapless 0.6.1", + "minimq", "serde", "serde-json-core", ] diff --git a/Cargo.toml b/Cargo.toml index db89ef5dc..00ba5c6ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ paste = "1" dsp = { path = "dsp" } ad9959 = { path = "ad9959" } generic-array = "0.14" -miniconf = "0.1.0" +miniconf = { version = "0.1.0", features = ["minimq-support"] } [dependencies.mcp23017] git = "https://github.com/mrd0ll4r/mcp23017.git" @@ -55,8 +55,13 @@ features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"] version = "0.9.0" [patch.crates-io.miniconf] -git = "https://github.com/quartiq/miniconf.git" -branch = "feature/mqtt-removal" +path = "../miniconf" +# git = "https://github.com/quartiq/miniconf.git" +# branch = "feature/mqtt-removal" + +[patch.crates-io.minimq] +git = "https://github.com/quartiq/minimq.git" +rev = "933687c2e4b" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" diff --git a/src/net/mod.rs b/src/net/mod.rs index e2398b8a9..de55d1a8d 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -116,31 +116,10 @@ where // Handle any MQTT traffic. match self.mqtt.borrow_mut().poll( |client, topic, message, properties| { - // Find correlation-data and response topics. - let correlation_data = properties.iter().find_map(|prop| { - if let minimq::Property::CorrelationData(data) = prop { - Some(*data) - } else { - None - } - }); - let response_topic = properties.iter().find_map(|prop| { - if let minimq::Property::ResponseTopic(topic) = prop { - Some(*topic) - } else { - None - } - }); - - let incoming = miniconf::Message { - data: message, - correlation_data, - response_topic, - }; - - if let Some(response) = - self.miniconf.borrow_mut().process(topic, incoming) - { + if let Some(response) = self.miniconf.borrow_mut().process( + topic, + miniconf::Message::from(message, properties), + ) { let mut response_properties: Vec< minimq::Property, consts::U1, From 0922cc42af194d79a0c8fc88f2fa2836fe10b3d4 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 28 Apr 2021 21:03:38 +0200 Subject: [PATCH 03/17] Adding new miniconf implementation --- Cargo.lock | 16 ++- Cargo.toml | 17 ++- src/net/mod.rs | 184 ++------------------------------- src/net/mqtt_interface.rs | 212 ++++++++++++++++++++++++++++++++++++++ src/net/router.rs | 91 ++++++++++++++++ 5 files changed, 327 insertions(+), 193 deletions(-) create mode 100644 src/net/mqtt_interface.rs create mode 100644 src/net/router.rs diff --git a/Cargo.lock b/Cargo.lock index 13c7828f7..9007083b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,6 +203,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" +source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" dependencies = [ "proc-macro2", "quote", @@ -415,10 +416,10 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" +source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" dependencies = [ "derive_miniconf", "heapless 0.6.1", - "minimq", "serde", "serde-json-core", ] @@ -426,7 +427,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38" +source = "git+https://github.com/quartiq/minimq.git?branch=rs/issue-40/copyable-properties#c95c758b620ee98752852bb643df8557a7200f3f" dependencies = [ "bit_field", "embedded-nal", @@ -656,6 +657,12 @@ dependencies = [ "semver", ] +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + [[package]] name = "semver" version = "0.9.0" @@ -683,9 +690,10 @@ dependencies = [ [[package]] name = "serde-json-core" version = "0.2.0" -source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2" +source = "git+https://github.com/quartiq/serde-json-core.git?branch=feature/dependency-update#a304506a1efb4a90a6ef3faf71ec3ef5f8433fb4" dependencies = [ - "heapless 0.5.6", + "heapless 0.6.1", + "ryu", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 00ba5c6ba..29480d573 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ paste = "1" dsp = { path = "dsp" } ad9959 = { path = "ad9959" } generic-array = "0.14" -miniconf = { version = "0.1.0", features = ["minimq-support"] } +miniconf = "0.1.0" [dependencies.mcp23017] git = "https://github.com/mrd0ll4r/mcp23017.git" @@ -55,13 +55,8 @@ features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"] version = "0.9.0" [patch.crates-io.miniconf] -path = "../miniconf" -# git = "https://github.com/quartiq/miniconf.git" -# branch = "feature/mqtt-removal" - -[patch.crates-io.minimq] -git = "https://github.com/quartiq/minimq.git" -rev = "933687c2e4b" +git = "https://github.com/quartiq/miniconf.git" +rev = "c8d819c" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" @@ -69,11 +64,11 @@ rev = "8468f11" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" -rev = "933687c2e4b" +branch = "rs/issue-40/copyable-properties" [patch.crates-io.serde-json-core] -git = "https://github.com/rust-embedded-community/serde-json-core.git" -rev = "ee06ac91bc" +git = "https://github.com/quartiq/serde-json-core.git" +branch = "feature/dependency-update" [features] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] diff --git a/src/net/mod.rs b/src/net/mod.rs index de55d1a8d..a1e139949 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,11 +1,11 @@ -use crate::hardware::{ - design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, -}; +use heapless::{consts, String}; -use core::{cell::RefCell, fmt::Write}; +use core::fmt::Write; -use heapless::{consts, String, Vec}; -use serde::Serialize; +mod mqtt_interface; +mod router; +pub use mqtt_interface::MqttInterface; +use router::{RouteResult, SettingsResponse}; /// Potential actions for firmware to take. pub enum Action { @@ -16,178 +16,6 @@ pub enum Action { UpdateSettings, } -/// MQTT settings interface. -pub struct MqttInterface -where - S: miniconf::Miniconf + Default + Clone, -{ - telemetry_topic: String, - mqtt: RefCell>, - miniconf: RefCell>, - clock: CycleCounter, - phy: EthernetPhy, - network_was_reset: bool, - subscribed: bool, -} - -impl MqttInterface -where - S: miniconf::Miniconf + Default + Clone, -{ - /// Construct a new MQTT settings interface. - /// - /// # Args - /// * `stack` - The network stack to use for communication. - /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. - /// * `prefix` - The MQTT device prefix to use for this device. - /// * `phy` - The PHY driver for querying the link state. - /// * `clock` - The clock to utilize for querying the current system time. - pub fn new( - stack: NetworkStack, - client_id: &str, - prefix: &str, - phy: EthernetPhy, - clock: CycleCounter, - ) -> Self { - let mqtt_client = - minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) - .unwrap(); - let config = - miniconf::MiniconfInterface::new(prefix, S::default()).unwrap(); - - let mut telemetry_topic: String = String::new(); - write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); - - Self { - mqtt: RefCell::new(mqtt_client), - miniconf: RefCell::new(config), - clock, - phy, - telemetry_topic, - network_was_reset: false, - subscribed: false, - } - } - - /// Update the MQTT interface and service the network - /// - /// # Returns - /// An option containing an action that should be completed as a result of network servicing. - pub fn update(&mut self) -> Option { - let now = self.clock.current_ms(); - - // First, service the network stack to process and inbound and outbound traffic. - let sleep = match self.mqtt.borrow_mut().network_stack.poll(now) { - Ok(updated) => !updated, - Err(err) => { - log::info!("Network error: {:?}", err); - false - } - }; - - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - if self.phy.poll_link() == false { - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - if !self.network_was_reset { - self.network_was_reset = true; - self.mqtt.borrow_mut().network_stack.handle_link_reset(); - } - } else { - self.network_was_reset = false; - } - - // If we're no longer subscribed to the settings topic, but we are connected to the broker, - // resubscribe. - if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { - self.mqtt - .borrow_mut() - .subscribe( - self.miniconf.borrow_mut().get_listening_topic(), - &[], - ) - .unwrap(); - self.subscribed = true; - } - - let mut update = false; - - // Handle any MQTT traffic. - match self.mqtt.borrow_mut().poll( - |client, topic, message, properties| { - if let Some(response) = self.miniconf.borrow_mut().process( - topic, - miniconf::Message::from(message, properties), - ) { - let mut response_properties: Vec< - minimq::Property, - consts::U1, - > = Vec::new(); - if let Some(data) = response.correlation_data { - response_properties - .push(minimq::Property::CorrelationData(data)) - .unwrap(); - } - - // Make a best-effort attempt to send the response. - client - .publish( - response.topic, - &response.data.into_bytes(), - minimq::QoS::AtMostOnce, - &response_properties, - ) - .ok(); - update = true; - } - }, - ) { - // If settings updated, - Ok(_) => { - if update { - Some(Action::UpdateSettings) - } else if sleep { - Some(Action::Sleep) - } else { - None - } - } - Err(minimq::Error::Disconnected) => { - self.subscribed = false; - None - } - Err(minimq::Error::Network( - smoltcp_nal::NetworkError::NoIpAddress, - )) => None, - - Err(error) => { - log::info!("Unexpected error: {:?}", error); - None - } - } - } - - pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { - let telemetry = - miniconf::serde_json_core::to_string::(telemetry) - .unwrap(); - self.mqtt - .borrow_mut() - .publish( - &self.telemetry_topic, - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok(); - } - - pub fn settings(&self) -> S { - self.miniconf.borrow().settings.clone() - } -} - /// Get the MQTT prefix of a device. /// /// # Args diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs new file mode 100644 index 000000000..bcc6ae71c --- /dev/null +++ b/src/net/mqtt_interface.rs @@ -0,0 +1,212 @@ +use crate::hardware::{ + design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, +}; + +use core::{cell::RefCell, fmt::Write}; + +use heapless::{consts, String}; +use serde::Serialize; + +use super::{Action, RouteResult, SettingsResponse}; + +/// MQTT settings interface. +pub struct MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + telemetry_topic: String, + default_response_topic: String, + mqtt: RefCell>, + settings: RefCell, + clock: CycleCounter, + phy: EthernetPhy, + network_was_reset: bool, + subscribed: bool, + id: String, +} + +impl MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + /// Construct a new MQTT settings interface. + /// + /// # Args + /// * `stack` - The network stack to use for communication. + /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. + /// * `prefix` - The MQTT device prefix to use for this device. + /// * `phy` - The PHY driver for querying the link state. + /// * `clock` - The clock to utilize for querying the current system time. + pub fn new( + stack: NetworkStack, + client_id: &str, + prefix: &str, + phy: EthernetPhy, + clock: CycleCounter, + ) -> Self { + let mqtt_client = + minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) + .unwrap(); + + let mut telemetry_topic: String = String::new(); + write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); + + let mut response_topic: String = String::new(); + write!(&mut response_topic, "{}/log", prefix).unwrap(); + + Self { + mqtt: RefCell::new(mqtt_client), + settings: RefCell::new(S::default()), + id: String::from(prefix), + clock, + phy, + telemetry_topic, + default_response_topic: response_topic, + network_was_reset: false, + subscribed: false, + } + } + + /// Update the MQTT interface and service the network + /// + /// # Returns + /// An option containing an action that should be completed as a result of network servicing. + pub fn update(&mut self) -> Option { + // First, service the network stack to process any inbound and outbound traffic. + let sleep = match self + .mqtt + .borrow_mut() + .network_stack + .poll(self.clock.current_ms()) + { + Ok(updated) => !updated, + Err(err) => { + log::info!("Network error: {:?}", err); + false + } + }; + + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + if self.phy.poll_link() == false { + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + if !self.network_was_reset { + self.network_was_reset = true; + self.mqtt.borrow_mut().network_stack.handle_link_reset(); + } + } else { + self.network_was_reset = false; + } + + // If we're no longer subscribed to the settings topic, but we are connected to the broker, + // resubscribe. + if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { + let mut settings_topic: String = String::new(); + write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) + .unwrap(); + + self.mqtt + .borrow_mut() + .subscribe(&settings_topic, &[]) + .unwrap(); + self.subscribed = true; + } + + // Handle any MQTT traffic. + let mut update = false; + match self.mqtt.borrow_mut().poll( + |client, topic, message, properties| { + let (response, settings_update) = + self.route_message(topic, message, properties); + client + .publish( + response.response_topic, + &response.message, + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + update = settings_update; + }, + ) { + // If settings updated, + Ok(_) => { + if update { + Some(Action::UpdateSettings) + } else if sleep { + Some(Action::Sleep) + } else { + None + } + } + Err(minimq::Error::Disconnected) => { + self.subscribed = false; + None + } + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => None, + + Err(error) => { + log::info!("Unexpected error: {:?}", error); + None + } + } + } + + fn route_message<'a, 'me: 'a>( + &'me self, + topic: &str, + message: &[u8], + properties: &[minimq::Property<'a>], + ) -> (RouteResult<'a>, bool) { + let mut response = + RouteResult::new(properties, &self.default_response_topic); + let mut update = false; + + if let Some(path) = topic.strip_prefix(self.id.as_str()) { + let mut parts = path[1..].split('/'); + match parts.next() { + Some("settings") => { + let result = self + .settings + .borrow_mut() + .string_set(parts.peekable(), message); + update = result.is_ok(); + response.set_message(SettingsResponse::new(result, topic)); + } + Some(_) => response.set_message(SettingsResponse::custom( + "Unknown topic", + 255, + )), + _ => response + .set_message(SettingsResponse::custom("No topic", 254)), + } + } else { + response + .set_message(SettingsResponse::custom("Invalid prefix", 253)); + } + + (response, update) + } + + pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { + let telemetry = + miniconf::serde_json_core::to_string::(telemetry) + .unwrap(); + self.mqtt + .borrow_mut() + .publish( + &self.telemetry_topic, + telemetry.as_bytes(), + minimq::QoS::AtMostOnce, + &[], + ) + .ok(); + } + + pub fn settings(&self) -> S { + self.settings.borrow().clone() + } +} diff --git a/src/net/router.rs b/src/net/router.rs new file mode 100644 index 000000000..c8b777644 --- /dev/null +++ b/src/net/router.rs @@ -0,0 +1,91 @@ +use heapless::{consts, String, Vec}; +use serde::Serialize; + +use core::fmt::Write; + +pub struct RouteResult<'a> { + pub response_topic: &'a str, + pub message: Vec, + pub properties: Vec, consts::U1>, +} + +#[derive(Serialize)] +pub struct SettingsResponse { + code: u8, + msg: String, +} + +impl<'a> RouteResult<'a> { + pub fn new<'b: 'a>( + properties: &[minimq::Property<'a>], + default_response: &'b str, + ) -> Self { + // Extract the MQTT response topic. + let response_topic = properties + .iter() + .find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(topic) + } else { + None + } + }) + .unwrap_or(&default_response); + + // Associate any provided correlation data with the response. + let mut correlation_data: Vec, consts::U1> = + Vec::new(); + if let Some(data) = properties + .iter() + .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) + { + // Note(unwrap): Unwrap can not fail, as we only ever push one value. + correlation_data.push(*data).unwrap(); + } + + RouteResult { + response_topic, + message: Vec::new(), + properties: correlation_data, + } + } + + pub fn set_message(&mut self, response: impl Serialize) { + self.message = miniconf::serde_json_core::to_vec(&response).unwrap(); + } +} + +impl SettingsResponse { + pub fn new(result: Result<(), miniconf::Error>, path: &str) -> Self { + match result { + Ok(_) => { + let mut msg: String = String::new(); + if write!(&mut msg, "{} updated", path).is_err() { + msg = String::from("Latest update succeeded"); + } + + Self { msg, code: 0 } + } + Err(error) => { + let mut msg: String = String::new(); + if write!(&mut msg, "{} update failed: {:?}", path, error) + .is_err() + { + if write!(&mut msg, "Latest update failed: {:?}", error) + .is_err() + { + msg = String::from("Latest update failed"); + } + } + Self { msg, code: 5 } + } + } + } + + pub fn custom(msg: &str, code: u8) -> Self { + Self { + code, + msg: String::from(msg), + } + } +} From aad026161f97aedb103f718a37fe6ceb84e5d060 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 29 Apr 2021 11:28:35 +0200 Subject: [PATCH 04/17] Refactoring to support DHCP --- Cargo.lock | 4 ++-- Cargo.toml | 6 +++--- src/net/mqtt_interface.rs | 19 +++++++++++++++++-- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a002739db..aef5a92e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,7 +427,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?branch=rs/issue-40/copyable-properties#c95c758b620ee98752852bb643df8557a7200f3f" +source = "git+https://github.com/quartiq/minimq.git?rev=b3f364d#b3f364d55dea35da6572f78ddb91c87bfbb453bf" dependencies = [ "bit_field", "embedded-nal", @@ -690,7 +690,7 @@ dependencies = [ [[package]] name = "serde-json-core" version = "0.2.0" -source = "git+https://github.com/quartiq/serde-json-core.git?branch=feature/dependency-update#a304506a1efb4a90a6ef3faf71ec3ef5f8433fb4" +source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=da460d1#da460d123e217f0e822a3977eb2170ed5d279d5e" dependencies = [ "heapless 0.6.1", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 095fb7428..4ab885e30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,11 +64,11 @@ rev = "8468f11" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" -branch = "rs/issue-40/copyable-properties" +rev = "b3f364d" [patch.crates-io.serde-json-core] -git = "https://github.com/quartiq/serde-json-core.git" -branch = "feature/dependency-update" +git = "https://github.com/rust-embedded-community/serde-json-core.git" +rev = "da460d1" [features] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index bcc6ae71c..374714db0 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -22,7 +22,7 @@ where phy: EthernetPhy, network_was_reset: bool, subscribed: bool, - id: String, + id: String, } impl MqttInterface @@ -99,9 +99,24 @@ where self.network_was_reset = false; } + let mqtt_connected = match self.mqtt.borrow_mut().is_connected() { + Ok(connected) => connected, + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => false, + Err(minimq::Error::Network(error)) => { + log::info!("Unexpected network error: {:?}", error); + false + } + Err(error) => { + log::warn!("Unexpected MQTT error: {:?}", error); + false + } + }; + // If we're no longer subscribed to the settings topic, but we are connected to the broker, // resubscribe. - if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { + if !self.subscribed && mqtt_connected { let mut settings_topic: String = String::new(); write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) .unwrap(); From 7ddf2271f3779dddbf892c42a560c0fa9f975105 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 29 Apr 2021 11:54:16 +0200 Subject: [PATCH 05/17] Refactoring net module --- src/net/messages.rs | 124 ++++++++++++++++++++++++++++++++++++++ src/net/mod.rs | 11 +++- src/net/mqtt_interface.rs | 62 ++++++++++--------- src/net/router.rs | 91 ---------------------------- 4 files changed, 168 insertions(+), 120 deletions(-) create mode 100644 src/net/messages.rs delete mode 100644 src/net/router.rs diff --git a/src/net/messages.rs b/src/net/messages.rs new file mode 100644 index 000000000..2f54f5223 --- /dev/null +++ b/src/net/messages.rs @@ -0,0 +1,124 @@ +use heapless::{consts, String, Vec}; +use serde::Serialize; + +use core::fmt::Write; + +#[derive(Debug, Copy, Clone)] +pub enum SettingsResponseCode { + NoError = 0, + NoTopic = 1, + InvalidPrefix = 2, + UnknownTopic = 3, + UpdateFailure = 4, +} + +/// Represents a generic MQTT message. +pub struct MqttMessage<'a> { + pub topic: &'a str, + pub message: Vec, + pub properties: Vec, consts::U1>, +} + +/// The payload of the MQTT response message to a settings update request. +#[derive(Serialize)] +pub struct SettingsResponse { + code: u8, + msg: String, +} + +impl<'a> MqttMessage<'a> { + /// Construct a new MQTT message from an incoming message. + /// + /// # Args + /// * `properties` - A list of properties associated with the inbound message. + /// * `default_response` - The default response topic for the message + /// * `msg` - The response associated with the message. Must fit within 128 bytes. + pub fn new<'b: 'a>( + properties: &[minimq::Property<'a>], + default_response: &'b str, + msg: &impl Serialize, + ) -> Self { + // Extract the MQTT response topic. + let topic = properties + .iter() + .find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(topic) + } else { + None + } + }) + .unwrap_or(&default_response); + + // Associate any provided correlation data with the response. + let mut correlation_data: Vec, consts::U1> = + Vec::new(); + if let Some(data) = properties + .iter() + .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) + { + // Note(unwrap): Unwrap can not fail, as we only ever push one value. + correlation_data.push(*data).unwrap(); + } + + Self { + topic, + // Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector. + message: miniconf::serde_json_core::to_vec(msg).unwrap(), + properties: correlation_data, + } + } +} + +impl SettingsResponse { + /// Construct a settings response upon successful settings update. + /// + /// # Args + /// * `path` - The path of the setting that was updated. + pub fn update_success(path: &str) -> Self { + let mut msg: String = String::new(); + if write!(&mut msg, "{} updated", path).is_err() { + msg = String::from("Latest update succeeded"); + } + + Self { + msg, + code: SettingsResponseCode::NoError as u8, + } + } + + /// Construct a response when a settings update failed. + /// + /// # Args + /// * `path` - The settings path that configuration failed for. + /// * `err` - The settings update error that occurred. + pub fn update_failure(path: &str, err: miniconf::Error) -> Self { + let mut msg: String = String::new(); + if write!(&mut msg, "{} update failed: {:?}", path, err).is_err() { + if write!(&mut msg, "Latest update failed: {:?}", err).is_err() { + msg = String::from("Latest update failed"); + } + } + + Self { + msg, + code: SettingsResponseCode::UpdateFailure as u8, + } + } + + /// Construct a response from a custom response code. + /// + /// # Args + /// * `code` - The response code to provide. + pub fn code(code: SettingsResponseCode) -> Self { + let mut msg: String = String::new(); + + // Note(unwrap): All code debug names shall fit in the 64 byte string. + write!(&mut msg, "{:?}", code).unwrap(); + + Self { + code: code as u8, + msg, + } + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index a1e139949..9a86b8547 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,11 +1,18 @@ +///! Stabilizer network management module +///! +///! # Design +///! The stabilizer network architecture supports numerous layers to permit transmission of +///! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and live data +///! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines +///! related to Stabilizer networking operations. use heapless::{consts, String}; use core::fmt::Write; +mod messages; mod mqtt_interface; -mod router; +use messages::{MqttMessage, SettingsResponse, SettingsResponseCode}; pub use mqtt_interface::MqttInterface; -use router::{RouteResult, SettingsResponse}; /// Potential actions for firmware to take. pub enum Action { diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index 374714db0..209a67d36 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -7,7 +7,7 @@ use core::{cell::RefCell, fmt::Write}; use heapless::{consts, String}; use serde::Serialize; -use super::{Action, RouteResult, SettingsResponse}; +use super::{Action, MqttMessage, SettingsResponse, SettingsResponseCode}; /// MQTT settings interface. pub struct MqttInterface @@ -136,7 +136,7 @@ where self.route_message(topic, message, properties); client .publish( - response.response_topic, + response.topic, &response.message, minimq::QoS::AtMostOnce, &response.properties, @@ -175,33 +175,41 @@ where topic: &str, message: &[u8], properties: &[minimq::Property<'a>], - ) -> (RouteResult<'a>, bool) { - let mut response = - RouteResult::new(properties, &self.default_response_topic); + ) -> (MqttMessage<'a>, bool) { let mut update = false; - - if let Some(path) = topic.strip_prefix(self.id.as_str()) { - let mut parts = path[1..].split('/'); - match parts.next() { - Some("settings") => { - let result = self - .settings - .borrow_mut() - .string_set(parts.peekable(), message); - update = result.is_ok(); - response.set_message(SettingsResponse::new(result, topic)); + let response_msg = + if let Some(path) = topic.strip_prefix(self.id.as_str()) { + let mut parts = path[1..].split('/'); + match parts.next() { + Some("settings") => { + match self + .settings + .borrow_mut() + .string_set(parts.peekable(), message) + { + Ok(_) => { + update = true; + SettingsResponse::update_success(path) + } + Err(error) => { + SettingsResponse::update_failure(path, error) + } + } + } + Some(_) => SettingsResponse::code( + SettingsResponseCode::UnknownTopic, + ), + _ => SettingsResponse::code(SettingsResponseCode::NoTopic), } - Some(_) => response.set_message(SettingsResponse::custom( - "Unknown topic", - 255, - )), - _ => response - .set_message(SettingsResponse::custom("No topic", 254)), - } - } else { - response - .set_message(SettingsResponse::custom("Invalid prefix", 253)); - } + } else { + SettingsResponse::code(SettingsResponseCode::InvalidPrefix) + }; + + let response = MqttMessage::new( + properties, + &self.default_response_topic, + &response_msg, + ); (response, update) } diff --git a/src/net/router.rs b/src/net/router.rs deleted file mode 100644 index c8b777644..000000000 --- a/src/net/router.rs +++ /dev/null @@ -1,91 +0,0 @@ -use heapless::{consts, String, Vec}; -use serde::Serialize; - -use core::fmt::Write; - -pub struct RouteResult<'a> { - pub response_topic: &'a str, - pub message: Vec, - pub properties: Vec, consts::U1>, -} - -#[derive(Serialize)] -pub struct SettingsResponse { - code: u8, - msg: String, -} - -impl<'a> RouteResult<'a> { - pub fn new<'b: 'a>( - properties: &[minimq::Property<'a>], - default_response: &'b str, - ) -> Self { - // Extract the MQTT response topic. - let response_topic = properties - .iter() - .find_map(|prop| { - if let minimq::Property::ResponseTopic(topic) = prop { - Some(topic) - } else { - None - } - }) - .unwrap_or(&default_response); - - // Associate any provided correlation data with the response. - let mut correlation_data: Vec, consts::U1> = - Vec::new(); - if let Some(data) = properties - .iter() - .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) - { - // Note(unwrap): Unwrap can not fail, as we only ever push one value. - correlation_data.push(*data).unwrap(); - } - - RouteResult { - response_topic, - message: Vec::new(), - properties: correlation_data, - } - } - - pub fn set_message(&mut self, response: impl Serialize) { - self.message = miniconf::serde_json_core::to_vec(&response).unwrap(); - } -} - -impl SettingsResponse { - pub fn new(result: Result<(), miniconf::Error>, path: &str) -> Self { - match result { - Ok(_) => { - let mut msg: String = String::new(); - if write!(&mut msg, "{} updated", path).is_err() { - msg = String::from("Latest update succeeded"); - } - - Self { msg, code: 0 } - } - Err(error) => { - let mut msg: String = String::new(); - if write!(&mut msg, "{} update failed: {:?}", path, error) - .is_err() - { - if write!(&mut msg, "Latest update failed: {:?}", error) - .is_err() - { - msg = String::from("Latest update failed"); - } - } - Self { msg, code: 5 } - } - } - } - - pub fn custom(msg: &str, code: u8) -> Self { - Self { - code, - msg: String::from(msg), - } - } -} From 5499fc6b478d06a9379d75b62b2c19cc45fe0ed6 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 29 Apr 2021 12:11:35 +0200 Subject: [PATCH 06/17] Updating miniconf to use updated API --- miniconf.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/miniconf.py b/miniconf.py index b059f78f0..837b2d57f 100644 --- a/miniconf.py +++ b/miniconf.py @@ -9,6 +9,7 @@ import asyncio import json import logging +import sys from gmqtt import Client as MqttClient @@ -51,7 +52,8 @@ def _handle_response(self, _client, topic, payload, *_args, **_kwargs): logger.warning('Unexpected response on topic: %s', topic) return - self.inflight[topic].set_result(payload.decode('ascii')) + response = json.loads(payload) + self.inflight[topic].set_result((response['code'], response['msg'])) del self.inflight[topic] async def command(self, path, value): @@ -62,7 +64,8 @@ async def command(self, path, value): value: The value to write to the path. Returns: - The received response to the command. + (code, msg) tuple as a response to the command. `code` is zero for success and `msg` is + a use-readable message indicating further information. """ setting_topic = f'{self.prefix}/settings/{path}' response_topic = f'{self.prefix}/response/{path}' @@ -107,12 +110,17 @@ def main(): async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) + failures = 0 for kv in args.settings: path, value = kv.split("=", 1) - response = await interface.command(path, json.loads(value)) - print(response) + code, response = await interface.command(path, json.loads(value)) + if code != 0: + failures += 1 + logging.warning('%s update failed!', path) - loop.run_until_complete(configure_settings()) + return failures + + sys.exit(loop.run_until_complete(configure_settings())) if __name__ == '__main__': From d742350bfdbdb0e46803eca204f89e0805abec16 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 29 Apr 2021 12:12:46 +0200 Subject: [PATCH 07/17] Updating failure printing --- miniconf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miniconf.py b/miniconf.py index 837b2d57f..77e491cc9 100644 --- a/miniconf.py +++ b/miniconf.py @@ -114,9 +114,9 @@ async def configure_settings(): for kv in args.settings: path, value = kv.split("=", 1) code, response = await interface.command(path, json.loads(value)) + print(response) if code != 0: failures += 1 - logging.warning('%s update failed!', path) return failures From fa1f501f0b715a1629211354c72db317d176ac60 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 29 Apr 2021 12:58:04 +0200 Subject: [PATCH 08/17] Updating dependencies --- Cargo.toml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4ab885e30..226e4d4e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ version = "0.9.0" [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" -rev = "c8d819c" +rev = "c6f2b28" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" @@ -66,10 +66,6 @@ rev = "8468f11" git = "https://github.com/quartiq/minimq.git" rev = "b3f364d" -[patch.crates-io.serde-json-core] -git = "https://github.com/rust-embedded-community/serde-json-core.git" -rev = "da460d1" - [features] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] bkpt = [ ] From eeb15efdff95657e2917ccb6585bcb72c8be0e86 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 29 Apr 2021 13:20:26 +0200 Subject: [PATCH 09/17] Adding lockfile --- Cargo.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aef5a92e6..ae9ef69e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,7 +203,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" +source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd" dependencies = [ "proc-macro2", "quote", @@ -416,10 +416,9 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" +source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd" dependencies = [ "derive_miniconf", - "heapless 0.6.1", "serde", "serde-json-core", ] @@ -689,8 +688,9 @@ dependencies = [ [[package]] name = "serde-json-core" -version = "0.2.0" -source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=da460d1#da460d123e217f0e822a3977eb2170ed5d279d5e" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39af17f40c2a28d2c9a7918663ddc8a10f54cc6f109ead5c3f010869761df186" dependencies = [ "heapless 0.6.1", "ryu", From 5c4ba78dd112834d380bbb0e397a03cca0c648b7 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 4 May 2021 13:13:44 +0200 Subject: [PATCH 10/17] Refactoring MQTT architecture --- src/bin/dual-iir.rs | 2 +- src/bin/lockin-external.rs | 2 +- src/net/messages.rs | 67 ++++----------- src/net/mod.rs | 2 +- src/net/mqtt_interface.rs | 171 ++++++++++++++----------------------- 5 files changed, 86 insertions(+), 158 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 65d3f2e6e..3b451f979 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -168,7 +168,7 @@ const APP: () = { let settings = c.resources.mqtt.settings(); // Update the IIR channels. - c.resources.settings.lock(|current| *current = settings); + c.resources.settings.lock(|current| *current = *settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); diff --git a/src/bin/lockin-external.rs b/src/bin/lockin-external.rs index f9e9ae452..26634c047 100644 --- a/src/bin/lockin-external.rs +++ b/src/bin/lockin-external.rs @@ -215,7 +215,7 @@ const APP: () = { c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); - c.resources.settings.lock(|current| *current = settings); + c.resources.settings.lock(|current| *current = *settings); } #[task(binds = ETH, priority = 1)] diff --git a/src/net/messages.rs b/src/net/messages.rs index 2f54f5223..167e44079 100644 --- a/src/net/messages.rs +++ b/src/net/messages.rs @@ -6,10 +6,7 @@ use core::fmt::Write; #[derive(Debug, Copy, Clone)] pub enum SettingsResponseCode { NoError = 0, - NoTopic = 1, - InvalidPrefix = 2, - UnknownTopic = 3, - UpdateFailure = 4, + MiniconfError = 1, } /// Represents a generic MQTT message. @@ -70,55 +67,25 @@ impl<'a> MqttMessage<'a> { } } -impl SettingsResponse { - /// Construct a settings response upon successful settings update. - /// - /// # Args - /// * `path` - The path of the setting that was updated. - pub fn update_success(path: &str) -> Self { - let mut msg: String = String::new(); - if write!(&mut msg, "{} updated", path).is_err() { - msg = String::from("Latest update succeeded"); - } +impl From> for SettingsResponse { + fn from(result: Result<(), miniconf::Error>) -> Self { + match result { + Ok(_) => Self { + msg: String::from("OK"), + code: SettingsResponseCode::NoError as u8, + }, - Self { - msg, - code: SettingsResponseCode::NoError as u8, - } - } + Err(error) => { + let mut msg = String::new(); + if write!(&mut msg, "{:?}", error).is_err() { + msg = String::from("Miniconf Error"); + } - /// Construct a response when a settings update failed. - /// - /// # Args - /// * `path` - The settings path that configuration failed for. - /// * `err` - The settings update error that occurred. - pub fn update_failure(path: &str, err: miniconf::Error) -> Self { - let mut msg: String = String::new(); - if write!(&mut msg, "{} update failed: {:?}", path, err).is_err() { - if write!(&mut msg, "Latest update failed: {:?}", err).is_err() { - msg = String::from("Latest update failed"); + Self { + code: SettingsResponseCode::MiniconfError as u8, + msg, + } } } - - Self { - msg, - code: SettingsResponseCode::UpdateFailure as u8, - } - } - - /// Construct a response from a custom response code. - /// - /// # Args - /// * `code` - The response code to provide. - pub fn code(code: SettingsResponseCode) -> Self { - let mut msg: String = String::new(); - - // Note(unwrap): All code debug names shall fit in the 64 byte string. - write!(&mut msg, "{:?}", code).unwrap(); - - Self { - code: code as u8, - msg, - } } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 9a86b8547..75f92d61f 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -11,7 +11,7 @@ use core::fmt::Write; mod messages; mod mqtt_interface; -use messages::{MqttMessage, SettingsResponse, SettingsResponseCode}; +use messages::{MqttMessage, SettingsResponse}; pub use mqtt_interface::MqttInterface; /// Potential actions for firmware to take. diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index 209a67d36..dabcbe8f5 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -2,27 +2,25 @@ use crate::hardware::{ design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, }; -use core::{cell::RefCell, fmt::Write}; +use core::fmt::Write; use heapless::{consts, String}; -use serde::Serialize; -use super::{Action, MqttMessage, SettingsResponse, SettingsResponseCode}; +use super::{Action, MqttMessage, SettingsResponse}; /// MQTT settings interface. pub struct MqttInterface where S: miniconf::Miniconf + Default + Clone, { - telemetry_topic: String, default_response_topic: String, - mqtt: RefCell>, - settings: RefCell, + mqtt: minimq::MqttClient, + settings: S, clock: CycleCounter, phy: EthernetPhy, network_was_reset: bool, subscribed: bool, - id: String, + settings_prefix: String, } impl MqttInterface @@ -44,23 +42,24 @@ where phy: EthernetPhy, clock: CycleCounter, ) -> Self { - let mqtt_client = + let mqtt = minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) .unwrap(); - let mut telemetry_topic: String = String::new(); - write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); - let mut response_topic: String = String::new(); write!(&mut response_topic, "{}/log", prefix).unwrap(); + let mut settings_prefix: String = String::new(); + write!(&mut settings_prefix, "{}/settings", prefix).unwrap(); + + // Ensure we have two remaining spaces + Self { - mqtt: RefCell::new(mqtt_client), - settings: RefCell::new(S::default()), - id: String::from(prefix), + mqtt, + settings: S::default(), + settings_prefix, clock, phy, - telemetry_topic, default_response_topic: response_topic, network_was_reset: false, subscribed: false, @@ -73,11 +72,7 @@ where /// An option containing an action that should be completed as a result of network servicing. pub fn update(&mut self) -> Option { // First, service the network stack to process any inbound and outbound traffic. - let sleep = match self - .mqtt - .borrow_mut() - .network_stack - .poll(self.clock.current_ms()) + let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms()) { Ok(updated) => !updated, Err(err) => { @@ -93,13 +88,13 @@ where // sending an excessive number of DHCP requests. if !self.network_was_reset { self.network_was_reset = true; - self.mqtt.borrow_mut().network_stack.handle_link_reset(); + self.mqtt.network_stack.handle_link_reset(); } } else { self.network_was_reset = false; } - let mqtt_connected = match self.mqtt.borrow_mut().is_connected() { + let mqtt_connected = match self.mqtt.is_connected() { Ok(connected) => connected, Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, @@ -117,34 +112,59 @@ where // If we're no longer subscribed to the settings topic, but we are connected to the broker, // resubscribe. if !self.subscribed && mqtt_connected { - let mut settings_topic: String = String::new(); - write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) - .unwrap(); + // Note(unwrap): We construct a string with two more characters than the prefix + // strucutre, so we are guaranteed to have space for storage. + let mut settings_topic: String = + String::from(self.settings_prefix.as_str()); + settings_topic.push_str("/#").unwrap(); - self.mqtt - .borrow_mut() - .subscribe(&settings_topic, &[]) - .unwrap(); + self.mqtt.subscribe(&settings_topic, &[]).unwrap(); self.subscribed = true; } // Handle any MQTT traffic. + let settings = &mut self.settings; + let mqtt = &mut self.mqtt; + let prefix = self.settings_prefix.as_str(); + let default_response_topic = self.default_response_topic.as_str(); + let mut update = false; - match self.mqtt.borrow_mut().poll( - |client, topic, message, properties| { - let (response, settings_update) = - self.route_message(topic, message, properties); - client - .publish( - response.topic, - &response.message, - minimq::QoS::AtMostOnce, - &response.properties, - ) - .ok(); - update = settings_update; - }, - ) { + match mqtt.poll(|client, topic, message, properties| { + let path = match topic.strip_prefix(prefix) { + // For paths, we do not want to include the leading slash. + Some(path) => { + if path.len() > 0 { + &path[1..] + } else { + path + } + } + None => { + info!("Unexpected MQTT topic: {}", topic); + return; + } + }; + + let message: SettingsResponse = settings + .string_set(path.split('/').peekable(), message) + .and_then(|_| { + update = true; + Ok(()) + }) + .into(); + + let response = + MqttMessage::new(properties, default_response_topic, &message); + + client + .publish( + response.topic, + &response.message, + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + }) { // If settings updated, Ok(_) => { if update { @@ -170,66 +190,7 @@ where } } - fn route_message<'a, 'me: 'a>( - &'me self, - topic: &str, - message: &[u8], - properties: &[minimq::Property<'a>], - ) -> (MqttMessage<'a>, bool) { - let mut update = false; - let response_msg = - if let Some(path) = topic.strip_prefix(self.id.as_str()) { - let mut parts = path[1..].split('/'); - match parts.next() { - Some("settings") => { - match self - .settings - .borrow_mut() - .string_set(parts.peekable(), message) - { - Ok(_) => { - update = true; - SettingsResponse::update_success(path) - } - Err(error) => { - SettingsResponse::update_failure(path, error) - } - } - } - Some(_) => SettingsResponse::code( - SettingsResponseCode::UnknownTopic, - ), - _ => SettingsResponse::code(SettingsResponseCode::NoTopic), - } - } else { - SettingsResponse::code(SettingsResponseCode::InvalidPrefix) - }; - - let response = MqttMessage::new( - properties, - &self.default_response_topic, - &response_msg, - ); - - (response, update) - } - - pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { - let telemetry = - miniconf::serde_json_core::to_string::(telemetry) - .unwrap(); - self.mqtt - .borrow_mut() - .publish( - &self.telemetry_topic, - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok(); - } - - pub fn settings(&self) -> S { - self.settings.borrow().clone() + pub fn settings(&self) -> &S { + &self.settings } } From 1f8f63018ae8b156294626aff7be100f3cf5f2e6 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 4 May 2021 13:50:17 +0200 Subject: [PATCH 11/17] Cleaning up API --- src/net/mqtt_interface.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index dabcbe8f5..cef26dd0b 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -2,8 +2,6 @@ use crate::hardware::{ design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, }; -use core::fmt::Write; - use heapless::{consts, String}; use super::{Action, MqttMessage, SettingsResponse}; @@ -46,13 +44,11 @@ where minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) .unwrap(); - let mut response_topic: String = String::new(); - write!(&mut response_topic, "{}/log", prefix).unwrap(); - - let mut settings_prefix: String = String::new(); - write!(&mut settings_prefix, "{}/settings", prefix).unwrap(); + let mut response_topic: String = String::from(prefix); + response_topic.push_str("/log").unwrap(); - // Ensure we have two remaining spaces + let mut settings_prefix: String = String::from(prefix); + settings_prefix.push_str("/settings").unwrap(); Self { mqtt, @@ -83,16 +79,17 @@ where // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network // stack. - if self.phy.poll_link() == false { + match self.phy.poll_link() { + true => self.network_was_reset = false, + // Only reset the network stack once per link reconnection. This prevents us from // sending an excessive number of DHCP requests. - if !self.network_was_reset { + false if !self.network_was_reset => { self.network_was_reset = true; self.mqtt.network_stack.handle_link_reset(); } - } else { - self.network_was_reset = false; - } + _ => {}, + }; let mqtt_connected = match self.mqtt.is_connected() { Ok(connected) => connected, @@ -160,6 +157,8 @@ where .publish( response.topic, &response.message, + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. minimq::QoS::AtMostOnce, &response.properties, ) From a3ff8aae752f5140d723162cee8b0c90963a7db0 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 4 May 2021 14:21:00 +0200 Subject: [PATCH 12/17] Updating miniconf utility to use correlation data --- miniconf.py | 84 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/miniconf.py b/miniconf.py index 77e491cc9..40f163ef2 100644 --- a/miniconf.py +++ b/miniconf.py @@ -10,6 +10,7 @@ import json import logging import sys +import uuid from gmqtt import Client as MqttClient @@ -33,28 +34,52 @@ def __init__(self, client, prefix): client: A connected MQTT5 client. prefix: The MQTT toptic prefix of the device to control. """ + self.uuid = uuid.uuid1(prefix) + self.request_id = 0 self.client = client self.prefix = prefix self.inflight = {} self.client.on_message = self._handle_response self.client.subscribe(f'{prefix}/response/#') - def _handle_response(self, _client, topic, payload, *_args, **_kwargs): + def _handle_response(self, _client, _topic, payload, _qos, properties): """Callback function for when messages are received over MQTT. Args: _client: The MQTT client. - topic: The topic that the message was received on. + _topic: The topic that the message was received on. payload: The payload of the message. + _qos: The quality-of-service level of the received packet + properties: A dictionary of properties associated with the message. """ - if topic not in self.inflight: - # TODO use correlation_data to distinguish clients and requests - logger.warning('Unexpected response on topic: %s', topic) + # Extract corrleation data from the properties + try: + correlation_data = json.loads(properties['correlation_data']) + except (json.decoder.JSONDecodeError, KeyError): + logger.warning('Ignoring message with invalid correlation data') return - response = json.loads(payload) - self.inflight[topic].set_result((response['code'], response['msg'])) - del self.inflight[topic] + # Validate the correlation data. + try: + if correlation_data['id'] != self.uuid.hex: + logger.info('Ignoring correlation data for different ID') + return + pid = correlation_data['pid'] + except KeyError: + logger.warning('Ignoring unknown correlation data: %s', correlation_data) + return + + if pid not in self.inflight: + logger.warning('Unexpected pid: %s', pid) + return + + try: + response = json.loads(payload) + self.inflight[pid].set_result((response['code'], response['msg'])) + del self.inflight[pid] + except json.decoder.JSONDecodeError: + logger.warning('Invalid response format: %s', payload) + async def command(self, path, value): """Write the provided data to the specified path. @@ -69,25 +94,34 @@ async def command(self, path, value): """ setting_topic = f'{self.prefix}/settings/{path}' response_topic = f'{self.prefix}/response/{path}' - if response_topic in self.inflight: - # TODO use correlation_data to distinguish clients and requests - raise NotImplementedError( - 'Only one in-flight message per topic is supported') + + # Assign a unique identifier to this update request. + pid = self.request_id + self.request_id += 1 + assert pid not in self.inflight, 'Invalid PID encountered' + + correlation_data = json.dumps({ + 'id': self.uuid.hex, + 'pid': pid, + }) value = json.dumps(value) logger.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() - self.inflight[response_topic] = fut + + self.inflight[pid] = fut self.client.publish(setting_topic, payload=value, qos=0, retain=True, - response_topic=response_topic) + response_topic=response_topic, + correlation_data=correlation_data) return await fut def main(): + """ Main program entry point. """ parser = argparse.ArgumentParser( - description='Miniconf command line interface.', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog='''Examples: + description='Miniconf command line interface.', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog='''Examples: %(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\ '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' ''') @@ -103,22 +137,20 @@ def main(): args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - level=logging.WARN - 10*args.verbose) + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + level=logging.WARN - 10*args.verbose) loop = asyncio.get_event_loop() async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) - failures = 0 - for kv in args.settings: - path, value = kv.split("=", 1) + for key_value in args.settings: + path, value = key_value.split("=", 1) code, response = await interface.command(path, json.loads(value)) - print(response) + print(f'{path}: {response}') if code != 0: - failures += 1 - - return failures + return code + return 0 sys.exit(loop.run_until_complete(configure_settings())) From 919f68c93445e59fb5bf9d371bc832f1a2c877b2 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 5 May 2021 14:30:32 +0200 Subject: [PATCH 13/17] Updating miniconf utility after testing --- miniconf.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/miniconf.py b/miniconf.py index 40f163ef2..36ea0dba5 100644 --- a/miniconf.py +++ b/miniconf.py @@ -14,8 +14,6 @@ from gmqtt import Client as MqttClient -logger = logging.getLogger(__name__) - class Miniconf: """An asynchronous API for controlling Miniconf devices using MQTT.""" @@ -34,13 +32,14 @@ def __init__(self, client, prefix): client: A connected MQTT5 client. prefix: The MQTT toptic prefix of the device to control. """ - self.uuid = uuid.uuid1(prefix) + self.uuid = uuid.uuid1() self.request_id = 0 self.client = client self.prefix = prefix self.inflight = {} self.client.on_message = self._handle_response self.client.subscribe(f'{prefix}/response/#') + self.logger = logging.getLogger(__name__) def _handle_response(self, _client, _topic, payload, _qos, properties): """Callback function for when messages are received over MQTT. @@ -54,23 +53,23 @@ def _handle_response(self, _client, _topic, payload, _qos, properties): """ # Extract corrleation data from the properties try: - correlation_data = json.loads(properties['correlation_data']) + correlation_data = json.loads(properties['correlation_data'][0].decode('ascii')) except (json.decoder.JSONDecodeError, KeyError): - logger.warning('Ignoring message with invalid correlation data') + self.logger.warning('Ignoring message with invalid correlation data') return # Validate the correlation data. try: if correlation_data['id'] != self.uuid.hex: - logger.info('Ignoring correlation data for different ID') + self.logger.info('Ignoring correlation data for different ID') return pid = correlation_data['pid'] except KeyError: - logger.warning('Ignoring unknown correlation data: %s', correlation_data) + self.logger.warning('Ignoring unknown correlation data: %s', correlation_data) return if pid not in self.inflight: - logger.warning('Unexpected pid: %s', pid) + self.logger.warning('Unexpected pid: %s', pid) return try: @@ -78,7 +77,7 @@ def _handle_response(self, _client, _topic, payload, _qos, properties): self.inflight[pid].set_result((response['code'], response['msg'])) del self.inflight[pid] except json.decoder.JSONDecodeError: - logger.warning('Invalid response format: %s', payload) + self.logger.warning('Invalid response format: %s', payload) async def command(self, path, value): @@ -103,10 +102,10 @@ async def command(self, path, value): correlation_data = json.dumps({ 'id': self.uuid.hex, 'pid': pid, - }) + }).encode('ascii') value = json.dumps(value) - logger.info('Sending %s to "%s"', value, setting_topic) + self.logger.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() self.inflight[pid] = fut From f9b1b8df13cbda6e979729c4b63f1809c7dd820c Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 5 May 2021 14:33:34 +0200 Subject: [PATCH 14/17] Adding comment about subscription failures --- src/net/mqtt_interface.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index cef26dd0b..42ab41e1a 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -115,6 +115,8 @@ where String::from(self.settings_prefix.as_str()); settings_topic.push_str("/#").unwrap(); + // We do not currently handle or process potential subscription failures. Instead, this + // failure will be logged through the stabilizer logging interface. self.mqtt.subscribe(&settings_topic, &[]).unwrap(); self.subscribed = true; } From e6180de147d70aaf3943baa359b05291ff7b9bb2 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 5 May 2021 14:38:43 +0200 Subject: [PATCH 15/17] Fixing style --- src/net/mqtt_interface.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index 42ab41e1a..2a55b900a 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -88,7 +88,7 @@ where self.network_was_reset = true; self.mqtt.network_stack.handle_link_reset(); } - _ => {}, + _ => {} }; let mqtt_connected = match self.mqtt.is_connected() { From 1ad3f1d1a80720fa839a09220e4c9ccec14fe3f3 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 5 May 2021 17:09:51 +0200 Subject: [PATCH 16/17] Cleaning up python script --- miniconf.py | 52 ++++++++++++++++------------------------------------ 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/miniconf.py b/miniconf.py index 36ea0dba5..4878419b9 100644 --- a/miniconf.py +++ b/miniconf.py @@ -38,7 +38,7 @@ def __init__(self, client, prefix): self.prefix = prefix self.inflight = {} self.client.on_message = self._handle_response - self.client.subscribe(f'{prefix}/response/#') + self.client.subscribe(f'{prefix}/response/{self.uuid.hex}') self.logger = logging.getLogger(__name__) def _handle_response(self, _client, _topic, payload, _qos, properties): @@ -52,32 +52,13 @@ def _handle_response(self, _client, _topic, payload, _qos, properties): properties: A dictionary of properties associated with the message. """ # Extract corrleation data from the properties - try: - correlation_data = json.loads(properties['correlation_data'][0].decode('ascii')) - except (json.decoder.JSONDecodeError, KeyError): - self.logger.warning('Ignoring message with invalid correlation data') - return - - # Validate the correlation data. - try: - if correlation_data['id'] != self.uuid.hex: - self.logger.info('Ignoring correlation data for different ID') - return - pid = correlation_data['pid'] - except KeyError: - self.logger.warning('Ignoring unknown correlation data: %s', correlation_data) - return - - if pid not in self.inflight: - self.logger.warning('Unexpected pid: %s', pid) - return - - try: - response = json.loads(payload) - self.inflight[pid].set_result((response['code'], response['msg'])) - del self.inflight[pid] - except json.decoder.JSONDecodeError: - self.logger.warning('Invalid response format: %s', payload) + correlation_data = json.loads(properties['correlation_data'][0].decode('ascii')) + + # Get the request ID from the correlation data + request_id = correlation_data['request_id'] + + self.inflight[request_id].set_result(json.loads(payload)) + del self.inflight[request_id] async def command(self, path, value): @@ -92,23 +73,22 @@ async def command(self, path, value): a use-readable message indicating further information. """ setting_topic = f'{self.prefix}/settings/{path}' - response_topic = f'{self.prefix}/response/{path}' + response_topic = f'{self.prefix}/response/{self.uuid.hex}' # Assign a unique identifier to this update request. - pid = self.request_id + request_id = self.request_id self.request_id += 1 - assert pid not in self.inflight, 'Invalid PID encountered' + assert request_id not in self.inflight, 'Invalid ID encountered' correlation_data = json.dumps({ - 'id': self.uuid.hex, - 'pid': pid, + 'request_id': request_id, }).encode('ascii') value = json.dumps(value) self.logger.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() - self.inflight[pid] = fut + self.inflight[request_id] = fut self.client.publish(setting_topic, payload=value, qos=0, retain=True, response_topic=response_topic, correlation_data=correlation_data) @@ -145,10 +125,10 @@ async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) for key_value in args.settings: path, value = key_value.split("=", 1) - code, response = await interface.command(path, json.loads(value)) + response = await interface.command(path, json.loads(value)) print(f'{path}: {response}') - if code != 0: - return code + if response['code'] != 0: + return response['code'] return 0 sys.exit(loop.run_until_complete(configure_settings())) From 05dc80709e93669e016c6e2f9b32f0f886759e10 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 5 May 2021 17:32:07 +0200 Subject: [PATCH 17/17] Updating docs and logger --- miniconf.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/miniconf.py b/miniconf.py index 4878419b9..7f88f6553 100644 --- a/miniconf.py +++ b/miniconf.py @@ -14,6 +14,7 @@ from gmqtt import Client as MqttClient +LOGGER = logging.getLogger(__name__) class Miniconf: """An asynchronous API for controlling Miniconf devices using MQTT.""" @@ -39,7 +40,6 @@ def __init__(self, client, prefix): self.inflight = {} self.client.on_message = self._handle_response self.client.subscribe(f'{prefix}/response/{self.uuid.hex}') - self.logger = logging.getLogger(__name__) def _handle_response(self, _client, _topic, payload, _qos, properties): """Callback function for when messages are received over MQTT. @@ -69,8 +69,7 @@ async def command(self, path, value): value: The value to write to the path. Returns: - (code, msg) tuple as a response to the command. `code` is zero for success and `msg` is - a use-readable message indicating further information. + The response to the command as a dictionary. """ setting_topic = f'{self.prefix}/settings/{path}' response_topic = f'{self.prefix}/response/{self.uuid.hex}' @@ -85,7 +84,7 @@ async def command(self, path, value): }).encode('ascii') value = json.dumps(value) - self.logger.info('Sending %s to "%s"', value, setting_topic) + LOGGER.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() self.inflight[request_id] = fut