diff --git a/modpoll/__init__.py b/modpoll/__init__.py new file mode 100644 index 0000000..d4c6a34 --- /dev/null +++ b/modpoll/__init__.py @@ -0,0 +1,8 @@ +try: + import importlib.metadata as importlib_metadata +except ModuleNotFoundError: + import importlib_metadata +__version__ = importlib_metadata.version(__name__) + +from .main import app +from . import arg_parser diff --git a/modpoll/__main__.py b/modpoll/__main__.py new file mode 100644 index 0000000..2310408 --- /dev/null +++ b/modpoll/__main__.py @@ -0,0 +1,2 @@ +from .main import app +app() diff --git a/modpoll/arg_parser.py b/modpoll/arg_parser.py new file mode 100644 index 0000000..ec6f2d4 --- /dev/null +++ b/modpoll/arg_parser.py @@ -0,0 +1,32 @@ +import argparse + +from . import __version__ + +def get_parser(): + parser = argparse.ArgumentParser(description=f'modpoll v{__version__} - A command line tool to communicate with modbus devices') + parser.add_argument('--version', action='version', version=f'modpoll v{__version__}') + parser.add_argument('--loglevel', default='INFO', help='log level (DEBUG/INFO/WARNING/ERROR/CRITICAL), Defaults to INFO') + # parser.add_argument('--config', required=True, help='Configuration file. Required!') + parser.add_argument('--config', required=True, help='Modbus configuration file. Required!') + parser.add_argument('--rate', default=5.0, type=float, help='The sampling rate (s) to poll modbus device, Defaults to 5.0') + parser.add_argument('--interval', default=1.0, type=float, help='The time interval (s) between two polling, Defaults to 1.0') + parser.add_argument('--tcp', help='Act as a Modbus TCP master, connecting to host TCP') + parser.add_argument('--tcp-port', default=502, type=int, help='Port for MODBUS TCP. Defaults to 502') + parser.add_argument('--rtu', help='pyserial URL (or port name) for RTU serial port') + parser.add_argument('--rtu-baud', default=9600, type=int, help='Baud rate for serial port. Defaults to 9600') + parser.add_argument('--rtu-parity', default='none', choices=['even', 'odd', 'none'], help='Parity for serial port. Defaults to none') + parser.add_argument('--timeout', default=3.0, type=float, help='Response time-out for MODBUS devices, Defaults to 3.0') + parser.add_argument('--export', default=None, help='Export references/registers to local csv file') + parser.add_argument('--mqtt-host', default=None, help='MQTT server address. Skip MQTT setup if not specified') + parser.add_argument('--mqtt-port', default=1883, type=int, help='1833 for non-TLS or 8883 for TLS, Defaults to 1883') + parser.add_argument('--mqtt-topic-prefix', default='modbus/', help='Topic prefix to be used for subscribing/publishing. Defaults to "modbus/"') + parser.add_argument('--mqtt-qos', default=0, type=int, help='MQTT QoS value (0/1/2). Defaults to 0') + parser.add_argument('--mqtt-user', default=None, help='Username for authentication (optional)') + parser.add_argument('--mqtt-pass', default=None, help='Password for authentication (optional)') + parser.add_argument('--mqtt-use-tls', action='store_true', help='Use TLS') + parser.add_argument('--mqtt-insecure', action='store_true', help='Use TLS without providing certificates') + parser.add_argument('--mqtt-cacerts', default=None, help="Path to keychain including ") + parser.add_argument('--mqtt-tls-version', default=None, help='TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1') + parser.add_argument('--diagnostics-rate', default=0, type=float, help='Time in seconds after which for each device diagnostics are published via mqtt.') + parser.add_argument('--autoremove', action='store_true', help='Automatically remove poller if modbus communication has failed 3 times.') + return parser diff --git a/modpoll/main.py b/modpoll/main.py new file mode 100644 index 0000000..c6e3100 --- /dev/null +++ b/modpoll/main.py @@ -0,0 +1,79 @@ +import logging +import time +import sys +import signal + +# from modpoll import __version__ +from modpoll.arg_parser import get_parser +from modpoll.mqtt_task import mqttc_setup, mqttc_publish, mqttc_close +from modpoll.modbus_task import modbus_setup, modbus_poll, modbus_export, modbus_close + +# global objects +run_loop = True + +# logging format +LOG_SIMPLE = "%(asctime)s | %(levelname).1s | %(name)s | %(message)s" +LOG_PROCESS = "%(asctime)s | %(levelname).1s | %(processName)s | %(name)s | %(message)s" +LOG_THREAD = "%(asctime)s | %(levelname).1s | %(threadName)s | %(name)s | %(message)s" +log = None + +def _signal_handler(signal, frame): + print('Exiting ' + sys.argv[0]) + _shutdown() + + +def app(name="modpoll"): + # log.info(f"Starting {name} v{__version__}") + signal.signal(signal.SIGINT, _signal_handler) + + # parse args + args = get_parser().parse_args() + + # get logger + logging.basicConfig(level=args.loglevel, format=LOG_SIMPLE) + global log + log = logging.getLogger(__name__) + + # setup mqtt + if args.mqtt_host: + log.info(f"Setup MQTT connection to {args.mqtt_host}") + if not mqttc_setup(args.mqtt_host, port=args.mqtt_port, user=args.mqtt_user, password=args.mqtt_pass, qos=args.mqtt_qos): + log.error("fail to setup MQTT client") + mqttc_close() + exit(1) + else: + log.info("No MQTT host specified, skip MQTT setup.") + + # setup modbus + if not modbus_setup(args): + log.error("fail to setup modbus client(master)") + modbus_close() + mqttc_close() + exit(1) + + # main loop + last_check = 0 + while run_loop: + now = time.time() + if last_check == 0: + last_check = now + continue + # routine check + if now > last_check + args.rate: + elapsed = now - last_check + last_check = now + log.info(f"looping at rate:{args.rate}, actual:{elapsed}") + modbus_poll() + if args.export: + modbus_export(args.export) + modbus_close() + mqttc_close() + + +def _shutdown(): + global run_loop + run_loop = False + + +if __name__ == "__main__": + app() diff --git a/modpoll/modbus_task.py b/modpoll/modbus_task.py new file mode 100644 index 0000000..cd5e8fd --- /dev/null +++ b/modpoll/modbus_task.py @@ -0,0 +1,381 @@ +import csv +import json +import logging +import math +import time + +from pymodbus.client.sync import ModbusSerialClient +from pymodbus.client.sync import ModbusTcpClient +from pymodbus.constants import Endian +from pymodbus.exceptions import ModbusException +from pymodbus.payload import BinaryPayloadDecoder + +from modpoll.arg_parser import get_parser +from modpoll.mqtt_task import mqttc_publish + +# global objects +master = None +args = None +log = None +deviceList = [] +referenceList = [] +pollers = [] + + +class Device: + def __init__(self, name, devid): + self.name = name + self.devid = devid + self.occupiedTopics = [] + self.writableReferences = [] + self.errorCount = 0 + self.pollCount = 0 + self.next_due = time.time() + args.diagnostics_rate + log.info(f"Added new device {self.name}") + + def publish_diagnostics(self): + if args.diagnostics_rate > 0: + now = time.time() + if now > self.next_due: + self.next_due = now + args.diagnostics_rate + try: + error_rate = float(self.errorCount) / float(self.pollCount) + except ValueError: + error_rate = 0 + mqttc_publish( + f"{args.mqtt_topic_prefix}{self.name}/diagnostics/error_rate", str(error_rate)) + mqttc_publish( + f"{args.mqtt_topic_prefix}{self.name}/diagnostics/total_poll", str(self.pollCount)) + self.pollCount = 0 + self.errorCount = 0 + + +class Poller: + def __init__(self, name, devid, reference, size, functioncode, endian): + self.name = name + self.devid = int(devid) + self.reference = int(reference) + self.size = int(size) + self.functioncode = int(functioncode) + self.endian = endian + self.device = None + self.readableReferences = [] + self.disabled = False + self.failcounter = 0 + + for myDev in deviceList: + if myDev.name == self.name: + self.device = myDev + break + if not self.device: + device = Device(self.name, devid) + deviceList.append(device) + self.device = device + + def count_success(self, success): + self.device.pollCount += 1 + if success: + self.failcounter = 0 + else: + self.failcounter += 1 + self.device.errorCount += 1 + if self.failcounter >= 3: + if args.autoremove: + self.disabled = True + log.info(f"Poller {self.name} disabled (functioncode: {self.functioncode}, " + f"reference: {self.reference}, size: {self.size}).") + # else: + # if master.connect(): + # self.failcounter = 0 + # log.info("Reconnecting to device... SUCCESS") + # else: + # log.info("Reconnecting to device... FAILED") + + def poll(self): + if self.disabled or not master: + return + try: + result = None + data = None + if self.functioncode == 1: + result = master.read_coils( + self.reference, self.size, unit=self.devid) + if not result.isError(): + data = result.bits + elif self.functioncode == 2: + result = master.read_discrete_inputs( + self.reference, self.size, unit=self.devid) + if not result.isError(): + data = result.bits + elif self.functioncode == 3: + result = master.read_holding_registers( + self.reference, self.size, unit=self.devid) + if not result.isError(): + data = result.registers + elif self.functioncode == 4: + result = master.read_input_registers( + self.reference, self.size, unit=self.devid) + if not result.isError(): + data = result.registers + if not data: + self.count_success(False) + log.warning(f"Reading device:{self.devid}, FuncCode:{self.functioncode}, " + f"Ref:{self.reference}, Size:{self.size}... ERROR") + log.debug(result) + return + if "BE_BE" == self.endian.upper(): + decoder = BinaryPayloadDecoder.fromRegisters( + data, Endian.Big, wordorder=Endian.Big) + elif "LE_BE" == self.endian.upper(): + decoder = BinaryPayloadDecoder.fromRegisters( + data, Endian.Little, wordorder=Endian.Big) + elif "LE_LE" == self.endian.upper(): + decoder = BinaryPayloadDecoder.fromRegisters( + data, Endian.Little, wordorder=Endian.Little) + else: + decoder = BinaryPayloadDecoder.fromRegisters( + data, Endian.Big, wordorder=Endian.Little) + cur_ref = self.reference + for ref in self.readableReferences: + while cur_ref < ref.reference and cur_ref < self.reference + self.size: + decoder.skip_bytes(2) + cur_ref += 1 + if cur_ref >= self.reference + self.size: + break + if "uint16" == ref.dtype: + ref.update_value(decoder.decode_16bit_uint()) + cur_ref += 1 + elif "int16" == ref.dtype: + ref.update_value(decoder.decode_16bit_int()) + cur_ref += 1 + elif "uint32" == ref.dtype: + ref.update_value(decoder.decode_32bit_uint()) + cur_ref += 2 + elif "int32" == ref.dtype: + ref.update_value(decoder.decode_32bit_int()) + cur_ref += 2 + elif "float32" == ref.dtype: + ref.update_value(decoder.decode_32bit_float()) + cur_ref += 2 + # elif "bool" == ref.dtype: + # ref.update_value(decoder.decode_bits()) + # cur_ref += ref.length + # elif ref.dtype.startswith("string"): + # ref.update_value(decoder.decode_string()) + # cur_ref += ref.length + else: + decoder.decode_16bit_uint() + cur_ref += 1 + self.count_success(True) + log.info(f"Reading device:{self.devid}, FuncCode:{self.functioncode}, " + f"Ref:{self.reference}, Size:{self.size}... SUCCESS") + return True + except ModbusException as ex: + self.count_success(False) + log.warning(f"Reading device:{self.devid}, FuncCode:{self.functioncode}, " + f"Ref:{self.reference}, Size:{self.size}... FAILED") + log.debug(ex) + return False + + def add_readable_reference(self, ref): + if ref.name not in self.device.occupiedTopics: + self.device.occupiedTopics.append(ref.name) + ref.device = self.device + log.debug(f"Added new reference {ref.name} to poller {self.name}") + if ref.check_sanity(self.reference, self.size): + self.readableReferences.append(ref) + referenceList.append(ref) + else: + log.warning( + f"Reference name {ref.name} failed to pass sanity check, therefore ignoring it.") + else: + log.warning( + f"Reference name ({ref.name}) is already occupied, therefore ignoring it.") + + def publish_data(self, timestamp=None, on_change=False): + payload = {} + for ref in self.readableReferences: + if on_change and ref.val == ref.last_val: + continue + payload[f'{ref.name}|{ref.unit}'] = ref.val + if timestamp: + payload['timestamp'] = timestamp + topic = f"{args.mqtt_topic_prefix}{self.device.name}" + mqttc_publish(topic, json.dumps(payload)) + + +class Reference: + def __init__(self, name, unit, reference, dtype, scale): + self.name = name + self.unit = unit + self.reference = int(reference) + self.dtype = dtype + if "int16" in dtype: + self.length = 1 + elif "uint16" in dtype: + self.length = 1 + elif "int32" in dtype: + self.length = 2 + elif "uint32" in dtype: + self.length = 2 + elif "float32" == dtype: + self.length = 2 + elif "bool" == dtype: + self.length = 1 + elif dtype.startswith("string"): + try: + self.length = int(dtype[6:9]) + except ValueError: + self.length = 2 + if self.length > 100: + log.warning("Data type string: length too long") + self.length = 100 + if math.fmod(self.length, 2) != 0: + self.length = self.length - 1 + log.warning("Data type string: length must be divisible by 2") + else: + log.error(f"unknown data type: {dtype}") + self.scale = scale + self.val = None + self.lastval = None + self.device = None + + def check_sanity(self, reference, size): + if self.reference in range(reference, size + reference) \ + and self.reference + self.length - 1 in range(reference, size + reference): + return True + return False + + def update_value(self, v): + if self.scale: + try: + v = v * float(self.scale) + except ValueError: + pass + self.lastval = self.val + self.val = v + + +def load_config(file): + with open(file, "r") as f: + f.seek(0) + csv_reader = csv.reader(f) + current_poller = None + for row in csv_reader: + if not row or len(row) == 0: + continue + if "poll" in row[0]: + name = row[1] + devid = int(row[2]) + reference = int(row[3]) + size = int(row[4]) + endian = row[6] + if "coil" == row[5]: + functioncode = 1 + if size > 2000: # some implementations don't seem to support 2008 coils/inputs + current_poller = None + log.error( + "Too many coils (max. 2000). Ignoring poller " + row[1] + ".") + continue + elif "input_status" == row[5]: + functioncode = 2 + if size > 2000: + current_poller = None + log.error( + "Too many inputs (max. 2000). Ignoring poller " + row[1] + ".") + continue + elif "holding_register" == row[5]: + functioncode = 3 + if size > 123: # applies to TCP, RTU should support 125 registers. But let's be safe. + current_poller = None + log.error( + "Too many registers (max. 123). Ignoring poller " + row[1] + ".") + continue + elif "input_register" == row[5]: + functioncode = 4 + if size > 123: + current_poller = None + log.error( + "Too many registers (max. 123). Ignoring poller " + row[1] + ".") + continue + else: + log.warning("Unknown function code (" + + row[5] + " ignoring poller " + row[1] + ".") + current_poller = None + continue + current_poller = Poller( + name, devid, reference, size, functioncode, endian) + pollers.append(current_poller) + log.info(f"Added new poller {current_poller.name}, {current_poller.devid}, " + f"{current_poller.reference}, {current_poller.size}") + elif "ref" in row[0]: + if current_poller: + name = row[1].replace(" ", "_") + unit = row[2] + ref = row[3] + dtype = row[4] + scale = row[5] + current_poller.add_readable_reference( + Reference(name, unit, ref, dtype, scale)) + else: + log.debug(f"No poller for reference {name}.") + + +def modbus_setup(config): + global master + global args + global log + args = config + log = logging.getLogger(__name__) + log.info(f"Loading config from file: {args.config}") + load_config(args.config) + if args.rtu: + if args.rtu_parity == "odd": + parity = "O" + elif args.rtu_parity == "even": + parity = "E" + else: + parity = "N" + master = ModbusSerialClient(method="rtu", port=args.rtu, stopbits=1, bytesize=8, parity=parity, + baudrate=int(args.rtu_baud), reset_socket=True) + elif args.tcp: + master = ModbusTcpClient(args.tcp, args.tcp_port, timeout=args.timeout, reset_socket=True) + else: + log.error("You must specify a modbus access method, either --rtu or --tcp") + return False + return True + + +def modbus_poll(): + global master + if not master: + return + master.connect() + for p in pollers: + if not p.disabled: + ret = p.poll() + t = time.time() + if ret: + p.publish_data() + while time.time() < t + args.interval: + time.sleep(0.001) + master.close() + for d in deviceList: + d.publish_diagnostics() + + +def modbus_export(file): + with open(file, 'w') as f: + writer = csv.writer(f) + header = ['name', 'unit', 'reference', 'value'] + writer.writerow(header) + for r in referenceList: + row = [r.name, r.unit, r.reference, r.val] + writer.writerow(row) + log.info(f"Saved references/registers to {file}") + + +def modbus_close(): + global master + if master: + master.close() diff --git a/modpoll/mqtt_task.py b/modpoll/mqtt_task.py new file mode 100644 index 0000000..e04897c --- /dev/null +++ b/modpoll/mqtt_task.py @@ -0,0 +1,140 @@ +import logging +import uuid + +import paho.mqtt.client as mqtt + +# get logger +from paho.mqtt import MQTTException + +log = logging.getLogger(__name__) + +# global objects +_mqttc = None +_initial_connection_made = False + + +# Callbacks +def _on_connect(client, userdata, flags, rc, properties=None): + if rc == 0: + log.info("Connection successful") + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + # client.subscribe(subscribe_topic) + elif rc == 1: + log.warning("Connection refused - incorrect protocol version") + elif rc == 2: + log.warning("Connection refused - invalid client identifier") + elif rc == 3: + log.warning("Connection refused - server unavailable") + elif rc == 4: + log.warning("Connection refused - bad username or password") + elif rc == 5: + log.warning("Connection refused - not authorised") + else: + log.warning("Unknown error") + + +def _on_disconnect(client, userdata, rc): + log.info(f"disconnected. rc={rc}.") + + +def _on_message(client, obj, msg): + if msg.retain == 0: + log.info(f"Receive message ({msg.topic}): {msg.payload}") + else: + log.info(f"Receive retained message ({msg.topic}): {msg.payload}") + + +def _on_publish(client, obj, mid): + log.debug("Sent.") + + +def _on_subscribe(client, obj, mid, granted_qos): + log.debug("Subscribed.") + + +def _on_log(client, obj, level, string): + log.debug(f"{level} | {string}") + + +def mqttc_setup(host: str, port=1883, user=None, password="", qos=0, mqtt_debug=False): + try: + # If you want to use a specific client id, use + # mqttc = mqtt.Client("client-id") + # but note that the client id must be unique on the broker. Leaving the client + # id parameter empty will generate a random id for you. + global _mqttc, _initial_connection_made + _mqttc = mqtt.Client(str(uuid.uuid4()), clean_session=(qos == 0)) + # if args.mqtt_use_tls: + # if args.mqtt_tls_version == "tlsv1.2": + # tlsVersion = ssl.PROTOCOL_TLSv1_2 + # elif args.mqtt_tls_version == "tlsv1.1": + # tlsVersion = ssl.PROTOCOL_TLSv1_1 + # elif args.mqtt_tls_version == "tlsv1": + # tlsVersion = ssl.PROTOCOL_TLSv1 + # elif args.mqtt_tls_version is None: + # tlsVersion = None + # else: + # print("Unknown TLS version - ignoring") + # tlsVersion = None + # + # if not args.mqtt_insecure: + # cert_required = ssl.CERT_REQUIRED + # else: + # cert_required = ssl.CERT_NONE + # + # _mqttc.tls_set(ca_certs=args.cacerts, certfile=None, keyfile=None, cert_reqs=cert_required, + # tls_version=tlsVersion) + # + # if args.mqtt_insecure: + # _mqttc.tls_insecure_set(True) + + if user and password: + _mqttc.username_pw_set(user, password) + + _mqttc.on_message = _on_message + _mqttc.on_connect = _on_connect + _mqttc.on_disconnect = _on_disconnect + _mqttc.on_publish = _on_publish + _mqttc.on_subscribe = _on_subscribe + + if mqtt_debug: + _mqttc.on_log = _on_log + + # try to connect + _mqttc.connect(host=host, port=port, keepalive=60) + + # start loop - let paho manage connection + _mqttc.loop_start() + + _initial_connection_made = True + return True + + except Exception as ex: + log.error("mqtt connection error") + # raise ex + return False + + +def mqttc_publish(topic, msg, qos=0, retain=False): + global _mqttc + try: + if not _mqttc: + return + if not _mqttc.is_connected() and qos == 0: + return + pubinfo = _mqttc.publish(topic, msg, qos, retain) + # pubinfo.wait_for_publish() + log.debug(f"publishing MQTT topic: {topic}, msg: {msg}, qos: {qos}, RC: {pubinfo.rc}") + log.info(f"published message to topic: {topic}") + return pubinfo + except MQTTException as ex: + log.error(f"Error publishing MQTT topic: {topic}, msg: {msg}, qos: {qos}") + raise ex + + +def mqttc_close(): + global _mqttc + if _mqttc: + _mqttc.loop_stop() + _mqttc.disconnect()