Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input asynchronous commands from mqtt to the inverter #495

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 67 additions & 38 deletions mppsolar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# !/usr/bin/python3
import logging
import threading
from argparse import ArgumentParser
from platform import python_version

Expand All @@ -10,6 +11,8 @@
from .outputs import get_outputs, list_outputs
from .protocols import list_protocols

import paho.mqtt.client as mqtt_client

# Set-up logger
log = logging.getLogger("")
FORMAT = "%(asctime)-15s:%(levelname)s:%(module)s:%(funcName)s@%(lineno)d: %(message)s"
Expand Down Expand Up @@ -252,7 +255,11 @@ def main():
mqtt_topic = args.mqtttopic
push_url = args.pushurl

_event = threading.Event()
_commands = []
_setup_in_commands = []
_curr_in_commands = []

# Initialize Daemon
if args.daemon:
import time
Expand Down Expand Up @@ -316,6 +323,7 @@ def main():
mongo_db = config[section].get("mongo_db", fallback=None)
push_url = config[section].get("push_url", fallback=push_url)
mqtt_topic = config[section].get("mqtt_topic", fallback=mqtt_topic)
direction = config[section].get("direction", fallback="out")
#
device_class = get_device_class(_type)
log.debug(f"device_class {device_class}")
Expand All @@ -338,7 +346,10 @@ def main():
commands = _command.split("#")

for command in commands:
_commands.append((device, command, tag, outputs, filter, excl_filter))
_commands.append((device, command, tag, outputs, filter, excl_filter, direction))
if direction == 'in':
for command in commands:
_setup_in_commands.append((device, command, tag, outputs, filter, excl_filter, direction))
log.debug(f"Commands from config file {_commands}")

if args.daemon:
Expand Down Expand Up @@ -404,56 +415,74 @@ def main():
tag = args.tag
else:
tag = command
_commands.append((device, command, tag, outputs, filter, excl_filter))
_commands.append((device, command, tag, outputs, filter, excl_filter, 'out'))
log.debug(f"Commands {_commands}")

def mqtt_on_message(client: mqtt_client, userdata, msg: mqtt_client.MQTTMessage):
_payload = msg.payload.decode()
for _device, _command, _tag, _outputs, filter, excl_filter, direction in _setup_in_commands:
if _command == _payload:
_curr_in_commands.append((_device, _command, _tag, _outputs, filter, excl_filter, direction))
if len(_curr_in_commands) > 0:
_event.set()

def execute_command(_device, _command, _tag, _outputs, filter, excl_filter):
log.info(f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}")
results = _device.run_command(command=_command)
log.debug(f"results: {results}")
# send to output processor(s)
outputs = get_outputs(_outputs)
for op in outputs:
# maybe include the command and what the command is im the output
# eg QDI run, Display Inverter Default Settings
log.debug(f"Using output filter: {filter}")
op.output(
data=results,
tag=_tag,
name=_device._name,
mqtt_broker=mqtt_broker,
udp_port=udp_port,
postgres_url=postgres_url,
mongo_url=mongo_url,
mongo_db=mongo_db,
push_url=push_url,
# mqtt_port=mqtt_port,
# mqtt_user=mqtt_user,
# mqtt_pass=mqtt_pass,
mqtt_topic=mqtt_topic,
filter=filter,
excl_filter=excl_filter,
keep_case=keep_case,
)

_in_tags = []
for _device, _command, _tag, _outputs, filter, excl_filter, direction in _setup_in_commands:
if not tag in _in_tags:
_in_tags.append(tag)
for _tag in _in_tags:
mqtt_broker.subscribe(f"{_tag}/command", mqtt_on_message)

while True:
# Loop through the configured commands
if not args.daemon:
log.info(f"Looping {len(_commands)} commands")
for _device, _command, _tag, _outputs, filter, excl_filter in _commands:
if len(_curr_in_commands):
for _device, _command, _tag, _outputs, filter, excl_filter, direction in _curr_in_commands:
execute_command(_device, _command, _tag, _outputs, filter, excl_filter)
_curr_in_commands = []
for _device, _command, _tag, _outputs, filter, excl_filter, direction in _commands:
# for item in mppUtilArray:
# Tell systemd watchdog we are still alive
if args.daemon:
systemd.daemon.notify("WATCHDOG=1")
print(
f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}"
)
else:
log.info(
f"Getting results from device: {_device} for command: {_command}, tag: {_tag}, outputs: {_outputs}"
)
results = _device.run_command(command=_command)
log.debug(f"results: {results}")
# send to output processor(s)
outputs = get_outputs(_outputs)
for op in outputs:
# maybe include the command and what the command is im the output
# eg QDI run, Display Inverter Default Settings
log.debug(f"Using output filter: {filter}")
op.output(
data=results,
tag=_tag,
name=_device._name,
mqtt_broker=mqtt_broker,
udp_port=udp_port,
postgres_url=postgres_url,
mongo_url=mongo_url,
mongo_db=mongo_db,
push_url=push_url,
# mqtt_port=mqtt_port,
# mqtt_user=mqtt_user,
# mqtt_pass=mqtt_pass,
mqtt_topic=mqtt_topic,
filter=filter,
excl_filter=excl_filter,
keep_case=keep_case,
)
# Tell systemd watchdog we are still alive
if direction == 'out':
execute_command(_device, _command, _tag, _outputs, filter, excl_filter)
# Tell systemd watchdog we are still alive
if args.daemon:
systemd.daemon.notify("WATCHDOG=1")
print(f"Sleeping for {pause} sec")
time.sleep(pause)
_event.wait(timeout=pause)
_event.clear()
else:
# Dont loop unless running as daemon
log.debug("Not daemon, so not looping")
Expand Down