Skip to content

Commit

Permalink
Merge #347
Browse files Browse the repository at this point in the history
347: MQTT/Miniconf refactor r=ryan-summers a=ryan-summers

This PR refactors the MQTT/Miniconf API to make use of the refactored miniconf API. Routing has been moved to the `net` module.

**TODO**:
- [x] Test on hardware
- [x] Document modules
- [x] Merge dependency changes, update dependencies to use `rev`
- [x] Update `miniconf.py` to be `stabilizer.py`, update API

Co-authored-by: Ryan Summers <[email protected]>
  • Loading branch information
bors[bot] and ryan-summers authored May 5, 2021
2 parents 339332d + 05dc807 commit ed34b69
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 168 deletions.
23 changes: 15 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,15 @@ version = "0.9.0"

[patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git"
rev = "314fa5587d"
rev = "c6f2b28"

[dependencies.smoltcp-nal]
git = "https://github.com/quartiq/smoltcp-nal.git"
rev = "8468f11"

[patch.crates-io.minimq]
[dependencies.minimq]
git = "https://github.com/quartiq/minimq.git"
rev = "933687c2e4b"

[patch.crates-io.serde-json-core]
git = "https://github.com/rust-embedded-community/serde-json-core.git"
rev = "ee06ac91bc"
rev = "b3f364d"

[features]
semihosting = ["panic-semihosting", "cortex-m-log/semihosting"]
Expand Down
76 changes: 47 additions & 29 deletions miniconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import asyncio
import json
import logging
import sys
import uuid

from gmqtt import Client as MqttClient

logger = logging.getLogger(__name__)

LOGGER = logging.getLogger(__name__)

class Miniconf:
"""An asynchronous API for controlling Miniconf devices using MQTT."""
Expand All @@ -32,27 +33,33 @@ def __init__(self, client, prefix):
client: A connected MQTT5 client.
prefix: The MQTT toptic prefix of the device to control.
"""
self.uuid = uuid.uuid1()
self.request_id = 0
self.client = client
self.prefix = prefix
self.inflight = {}
self.client.on_message = self._handle_response
self.client.subscribe(f'{prefix}/response/#')
self.client.subscribe(f'{prefix}/response/{self.uuid.hex}')

def _handle_response(self, _client, topic, payload, *_args, **_kwargs):
def _handle_response(self, _client, _topic, payload, _qos, properties):
"""Callback function for when messages are received over MQTT.
Args:
_client: The MQTT client.
topic: The topic that the message was received on.
_topic: The topic that the message was received on.
payload: The payload of the message.
_qos: The quality-of-service level of the received packet
properties: A dictionary of properties associated with the message.
"""
if topic not in self.inflight:
# TODO use correlation_data to distinguish clients and requests
logger.warning('Unexpected response on topic: %s', topic)
return
# Extract corrleation data from the properties
correlation_data = json.loads(properties['correlation_data'][0].decode('ascii'))

# Get the request ID from the correlation data
request_id = correlation_data['request_id']

self.inflight[request_id].set_result(json.loads(payload))
del self.inflight[request_id]

self.inflight[topic].set_result(payload.decode('ascii'))
del self.inflight[topic]

async def command(self, path, value):
"""Write the provided data to the specified path.
Expand All @@ -62,29 +69,37 @@ async def command(self, path, value):
value: The value to write to the path.
Returns:
The received response to the command.
The response to the command as a dictionary.
"""
setting_topic = f'{self.prefix}/settings/{path}'
response_topic = f'{self.prefix}/response/{path}'
if response_topic in self.inflight:
# TODO use correlation_data to distinguish clients and requests
raise NotImplementedError(
'Only one in-flight message per topic is supported')
response_topic = f'{self.prefix}/response/{self.uuid.hex}'

# Assign a unique identifier to this update request.
request_id = self.request_id
self.request_id += 1
assert request_id not in self.inflight, 'Invalid ID encountered'

correlation_data = json.dumps({
'request_id': request_id,
}).encode('ascii')

value = json.dumps(value)
logger.info('Sending %s to "%s"', value, setting_topic)
LOGGER.info('Sending %s to "%s"', value, setting_topic)
fut = asyncio.get_running_loop().create_future()
self.inflight[response_topic] = fut

self.inflight[request_id] = fut
self.client.publish(setting_topic, payload=value, qos=0, retain=True,
response_topic=response_topic)
response_topic=response_topic,
correlation_data=correlation_data)
return await fut


def main():
""" Main program entry point. """
parser = argparse.ArgumentParser(
description='Miniconf command line interface.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''Examples:
description='Miniconf command line interface.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''Examples:
%(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\
'{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}'
''')
Expand All @@ -100,19 +115,22 @@ def main():
args = parser.parse_args()

logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
level=logging.WARN - 10*args.verbose)
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
level=logging.WARN - 10*args.verbose)

loop = asyncio.get_event_loop()

async def configure_settings():
interface = await Miniconf.create(args.prefix, args.broker)
for kv in args.settings:
path, value = kv.split("=", 1)
for key_value in args.settings:
path, value = key_value.split("=", 1)
response = await interface.command(path, json.loads(value))
print(response)
print(f'{path}: {response}')
if response['code'] != 0:
return response['code']
return 0

loop.run_until_complete(configure_settings())
sys.exit(loop.run_until_complete(configure_settings()))


if __name__ == '__main__':
Expand Down
20 changes: 8 additions & 12 deletions src/bin/dual-iir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use hardware::{
InputPin, AFE0, AFE1,
};

use net::{Action, MiniconfInterface};
use net::{Action, MqttInterface};

const SCALE: f32 = i16::MAX as _;

Expand Down Expand Up @@ -54,7 +54,7 @@ const APP: () = {
digital_input1: DigitalInput1,
adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output),
mqtt_config: MiniconfInterface<Settings>,
mqtt: MqttInterface<Settings>,

#[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])]
iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2],
Expand All @@ -66,7 +66,7 @@ const APP: () = {
// Configure the microcontroller
let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device);

let mqtt_config = MiniconfInterface::new(
let mqtt = MqttInterface::new(
stabilizer.net.stack,
"",
&net::get_device_prefix(
Expand All @@ -93,7 +93,7 @@ const APP: () = {
afes: stabilizer.afes,
adcs: stabilizer.adcs,
dacs: stabilizer.dacs,
mqtt_config,
mqtt,
digital_input1: stabilizer.digital_inputs.1,
settings: Settings::default(),
}
Expand Down Expand Up @@ -150,14 +150,10 @@ const APP: () = {
}
}

#[idle(resources=[mqtt_config], spawn=[settings_update])]
#[idle(resources=[mqtt], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! {
loop {
match c
.resources
.mqtt_config
.lock(|config_interface| config_interface.update())
{
match c.resources.mqtt.lock(|mqtt| mqtt.update()) {
Some(Action::Sleep) => cortex_m::asm::wfi(),
Some(Action::UpdateSettings) => {
c.spawn.settings_update().unwrap()
Expand All @@ -167,9 +163,9 @@ const APP: () = {
}
}

#[task(priority = 1, resources=[mqtt_config, afes, settings])]
#[task(priority = 1, resources=[mqtt, afes, settings])]
fn settings_update(mut c: settings_update::Context) {
let settings = &c.resources.mqtt_config.mqtt.settings;
let settings = c.resources.mqtt.settings();

// Update the IIR channels.
c.resources.settings.lock(|current| *current = *settings);
Expand Down
20 changes: 8 additions & 12 deletions src/bin/lockin-external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use stabilizer::hardware::{
};

use miniconf::Miniconf;
use stabilizer::net::{Action, MiniconfInterface};
use stabilizer::net::{Action, MqttInterface};

#[derive(Copy, Clone, Debug, Deserialize, Miniconf)]
enum Conf {
Expand Down Expand Up @@ -60,7 +60,7 @@ const APP: () = {
afes: (AFE0, AFE1),
adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output),
mqtt_config: MiniconfInterface<Settings>,
mqtt: MqttInterface<Settings>,
settings: Settings,

timestamper: InputStamper,
Expand All @@ -73,7 +73,7 @@ const APP: () = {
// Configure the microcontroller
let (mut stabilizer, _pounder) = setup(c.core, c.device);

let mqtt_config = MiniconfInterface::new(
let mqtt = MqttInterface::new(
stabilizer.net.stack,
"",
&net::get_device_prefix(
Expand Down Expand Up @@ -113,7 +113,7 @@ const APP: () = {
afes: stabilizer.afes,
adcs: stabilizer.adcs,
dacs: stabilizer.dacs,
mqtt_config,
mqtt,
timestamper: stabilizer.timestamper,

settings,
Expand Down Expand Up @@ -195,14 +195,10 @@ const APP: () = {
}
}

#[idle(resources=[mqtt_config], spawn=[settings_update])]
#[idle(resources=[mqtt], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! {
loop {
match c
.resources
.mqtt_config
.lock(|config_interface| config_interface.update())
{
match c.resources.mqtt.lock(|mqtt| mqtt.update()) {
Some(Action::Sleep) => cortex_m::asm::wfi(),
Some(Action::UpdateSettings) => {
c.spawn.settings_update().unwrap()
Expand All @@ -212,9 +208,9 @@ const APP: () = {
}
}

#[task(priority = 1, resources=[mqtt_config, settings, afes])]
#[task(priority = 1, resources=[mqtt, settings, afes])]
fn settings_update(mut c: settings_update::Context) {
let settings = &c.resources.mqtt_config.mqtt.settings;
let settings = c.resources.mqtt.settings();

c.resources.afes.0.set_gain(settings.afe[0]);
c.resources.afes.1.set_gain(settings.afe[1]);
Expand Down
Loading

0 comments on commit ed34b69

Please sign in to comment.