From fe1ddde4dbc40e1a534dc8daf2b55e03069087ae Mon Sep 17 00:00:00 2001 From: Stefano Pagnottelli Date: Sat, 25 May 2024 08:24:33 +0200 Subject: [PATCH 1/4] add ability ro receive async commands from mqtt --- mppsolar/__init__.py | 91 ++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/mppsolar/__init__.py b/mppsolar/__init__.py index 841fd082..fc8657e0 100755 --- a/mppsolar/__init__.py +++ b/mppsolar/__init__.py @@ -253,6 +253,9 @@ def main(): push_url = args.pushurl _commands = [] + _setup_in_commands = [] + _curr_in_commands = [] + # Initialize Daemon if args.daemon: import time @@ -316,6 +319,7 @@ def main(): mongo_db = config[section].get("mongo_db", fallback=None) push_url = config[section].get("push_url", fallback=push_url) mqtt_topic = config[section].get("mqtt_topic", fallback=mqtt_topic) + direction = config[section].get("direction", fallback="out") # device_class = get_device_class(_type) log.debug(f"device_class {device_class}") @@ -338,7 +342,10 @@ def main(): commands = _command.split("#") for command in commands: - _commands.append((device, command, tag, outputs, filter, excl_filter)) + _commands.append((device, command, tag, outputs, filter, excl_filter, direction)) + if direction == 'in': + for command in commands: + _setup_in_commands.append((device, command, tag, outputs, filter, excl_filter, direction)) log.debug(f"Commands from config file {_commands}") if args.daemon: @@ -404,52 +411,62 @@ def main(): tag = args.tag else: tag = command - _commands.append((device, command, tag, outputs, filter, excl_filter)) + _commands.append((device, command, tag, outputs, filter, excl_filter, 'out')) log.debug(f"Commands {_commands}") + def mqtt_on_message(client: mqtt_client, userdata, msg: mqtt_client.MQTTMessage): + _payload = msg.payload.decode() + for _device, _command, _tag, _outputs, filter, excl_filter, direction in _setup_in_commands: + if _command == _payload: + _curr_in_commands.append((_device, _command, _tag, _outputs, filter, excl_filter, direction)) + + def execute_command(_device, _command, _tag, _outputs, filter, excl_filter): + log.info(f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}") + results = _device.run_command(command=_command) + log.debug(f"results: {results}") + # send to output processor(s) + outputs = get_outputs(_outputs) + for op in outputs: + # maybe include the command and what the command is im the output + # eg QDI run, Display Inverter Default Settings + log.debug(f"Using output filter: {filter}") + op.output( + data=results, + tag=_tag, + name=_device._name, + mqtt_broker=mqtt_broker, + udp_port=udp_port, + postgres_url=postgres_url, + mongo_url=mongo_url, + mongo_db=mongo_db, + push_url=push_url, + # mqtt_port=mqtt_port, + # mqtt_user=mqtt_user, + # mqtt_pass=mqtt_pass, + mqtt_topic=mqtt_topic, + filter=filter, + excl_filter=excl_filter, + keep_case=keep_case, + ) + + mqtt_broker.subscribe("Inverter/command", mqtt_on_message) + while True: # Loop through the configured commands if not args.daemon: log.info(f"Looping {len(_commands)} commands") - for _device, _command, _tag, _outputs, filter, excl_filter in _commands: + if len(_curr_in_commands): + for _device, _command, _tag, _outputs, filter, excl_filter, direction in _curr_in_commands: + execute_command(_device, _command, _tag, _outputs, filter, excl_filter) + _curr_in_commands = [] + for _device, _command, _tag, _outputs, filter, excl_filter, direction in _commands: # for item in mppUtilArray: # Tell systemd watchdog we are still alive if args.daemon: systemd.daemon.notify("WATCHDOG=1") - print( - f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}" - ) - else: - log.info( - f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}" - ) - results = _device.run_command(command=_command) - log.debug(f"results: {results}") - # send to output processor(s) - outputs = get_outputs(_outputs) - for op in outputs: - # maybe include the command and what the command is im the output - # eg QDI run, Display Inverter Default Settings - log.debug(f"Using output filter: {filter}") - op.output( - data=results, - tag=_tag, - name=_device._name, - mqtt_broker=mqtt_broker, - udp_port=udp_port, - postgres_url=postgres_url, - mongo_url=mongo_url, - mongo_db=mongo_db, - push_url=push_url, - # mqtt_port=mqtt_port, - # mqtt_user=mqtt_user, - # mqtt_pass=mqtt_pass, - mqtt_topic=mqtt_topic, - filter=filter, - excl_filter=excl_filter, - keep_case=keep_case, - ) - # Tell systemd watchdog we are still alive + if direction == 'out': + execute_command(_device, _command, _tag, _outputs, filter, excl_filter) + # Tell systemd watchdog we are still alive if args.daemon: systemd.daemon.notify("WATCHDOG=1") print(f"Sleeping for {pause} sec") From 996fe3e27883ba086a5070cad6f780927081b287 Mon Sep 17 00:00:00 2001 From: Stefano Pagnottelli Date: Sat, 25 May 2024 08:43:55 +0200 Subject: [PATCH 2/4] get input command topic from tag attribute --- mppsolar/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mppsolar/__init__.py b/mppsolar/__init__.py index fc8657e0..89858c2d 100755 --- a/mppsolar/__init__.py +++ b/mppsolar/__init__.py @@ -449,7 +449,12 @@ def execute_command(_device, _command, _tag, _outputs, filter, excl_filter): keep_case=keep_case, ) - mqtt_broker.subscribe("Inverter/command", mqtt_on_message) + _in_tags = [] + for _device, _command, _tag, _outputs, filter, excl_filter, direction in _setup_in_commands: + if not tag in _in_tags: + _in_tags.append(tag) + for _tag in _in_tags: + mqtt_broker.subscribe(f"{_tag}/command", mqtt_on_message) while True: # Loop through the configured commands From 5931a59f2a7064b790713ad707acec296e40d3fa Mon Sep 17 00:00:00 2001 From: Stefano Pagnottelli Date: Sat, 25 May 2024 08:52:12 +0200 Subject: [PATCH 3/4] missed import of mqtt_client --- mppsolar/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mppsolar/__init__.py b/mppsolar/__init__.py index 89858c2d..d96becf1 100755 --- a/mppsolar/__init__.py +++ b/mppsolar/__init__.py @@ -10,6 +10,8 @@ from .outputs import get_outputs, list_outputs from .protocols import list_protocols +import paho.mqtt.client as mqtt_client + # Set-up logger log = logging.getLogger("") FORMAT = "%(asctime)-15s:%(levelname)s:%(module)s:%(funcName)s@%(lineno)d: %(message)s" From d638504d706c2c78c5b40538f3210b172b440709 Mon Sep 17 00:00:00 2001 From: Stefano Pagnottelli Date: Sat, 25 May 2024 10:04:08 +0200 Subject: [PATCH 4/4] use threading to avoid to sleep when a command is received --- mppsolar/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mppsolar/__init__.py b/mppsolar/__init__.py index d96becf1..01493a02 100755 --- a/mppsolar/__init__.py +++ b/mppsolar/__init__.py @@ -1,5 +1,6 @@ # !/usr/bin/python3 import logging +import threading from argparse import ArgumentParser from platform import python_version @@ -254,6 +255,7 @@ def main(): mqtt_topic = args.mqtttopic push_url = args.pushurl + _event = threading.Event() _commands = [] _setup_in_commands = [] _curr_in_commands = [] @@ -421,6 +423,8 @@ def mqtt_on_message(client: mqtt_client, userdata, msg: mqtt_client.MQTTMessage) for _device, _command, _tag, _outputs, filter, excl_filter, direction in _setup_in_commands: if _command == _payload: _curr_in_commands.append((_device, _command, _tag, _outputs, filter, excl_filter, direction)) + if len(_curr_in_commands) > 0: + _event.set() def execute_command(_device, _command, _tag, _outputs, filter, excl_filter): log.info(f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}") @@ -477,7 +481,8 @@ def execute_command(_device, _command, _tag, _outputs, filter, excl_filter): if args.daemon: systemd.daemon.notify("WATCHDOG=1") print(f"Sleeping for {pause} sec") - time.sleep(pause) + _event.wait(timeout=pause) + _event.clear() else: # Dont loop unless running as daemon log.debug("Not daemon, so not looping")