Skip to content

Commit

Permalink
feat: support paho v2 (#52)
Browse files Browse the repository at this point in the history
* feat: Support eclipse paho v2

* feat: Add option for MQTT v5.0

* feat: Show app version during bootstrap

* chore: add sanity check for mqtt-topic-prefix

* chore: update docker commands for local use
  • Loading branch information
gavinying authored Jul 13, 2024
1 parent 6e40ef6 commit 3e6681a
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 81 deletions.
20 changes: 8 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,17 @@ publish: ## Publish a release to PyPI.
build-and-publish: build publish ## Build and publish.

.PHONY: docker-build-dev
docker-build-dev: ## Build docker using docker buildx
@echo "🚀 Login to docker registry"
@echo "${DOCKER_PASSWORD}" | docker login -u "${DOCKER_USERNAME}" --password-stdin "${DOCKER_REGISTRY}"
@echo "🚀 Set up QEMU"
@docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
@echo "🚀 Create the builder if not exists"
@docker buildx inspect mybuilder &>/dev/null || docker buildx create --name mybuilder ; docker buildx use mybuilder
docker-build-dev: ## Build docker for local dev
@echo "🚀 Creating docker image file"
@docker buildx build --platform linux/amd64,linux/arm64 -t ${DOCKER_REGISTRY}/${DOCKER_USERNAME}/${PROJECT_NAME}:dev --push .
@docker build -t ${PROJECT_NAME}:dev .

.PHONY: docker-run-dev
docker-run-dev: docker-build-dev ## run in docker
@echo "🚀 Docker run: ${DOCKER_REGISTRY}/${DOCKER_USERNAME}/${PROJECT_NAME}:dev"
@docker run --rm ${DOCKER_REGISTRY}/${DOCKER_USERNAME}/${PROJECT_NAME}:dev \
modpoll -1 --tcp modsim.topmaker.net \
docker-run-dev: docker-build-dev ## Run docker for local dev
@echo "🚀 Docker run: ${PROJECT_NAME}:dev"
@docker run --rm ${PROJECT_NAME}:dev \
modpoll -d --tcp modsim.topmaker.net \
--mqtt-host mqtt.eclipseprojects.io \
--mqtt-topic-prefix modpoll/dev/ \
-f https://raw.githubusercontent.com/gavinying/modpoll/master/examples/modsim.csv

.PHONY: docs
Expand Down
6 changes: 6 additions & 0 deletions modpoll/arg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ def get_parser():
default=None,
help="The file name to export references/registers",
)
parser.add_argument(
"--mqtt-version",
choices=["3.1.1", "5.0"],
default="3.1.1",
help="MQTT version. Defaults to MQTT v3.1.1",
)
parser.add_argument(
"--mqtt-host",
default=None,
Expand Down
10 changes: 9 additions & 1 deletion modpoll/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
)
from modpoll.mqtt_task import mqttc_close, mqttc_receive, mqttc_setup

from . import __version__

LOG_SIMPLE = "%(asctime)s | %(levelname).1s | %(name)s | %(message)s"
log = None
event_exit = threading.Event()
Expand All @@ -37,12 +39,18 @@ def get_utc_time():


def app(name="modpoll"):
print("\nmodpoll - A New Command-line Tool for Modbus and MQTT\n", flush=True)
print(
f"\nModpoll v{__version__} - A New Command-line Tool for Modbus and MQTT\n",
flush=True,
)

signal.signal(signal.SIGINT, _signal_handler)

# parse args
args = get_parser().parse_args()
# sanity check
if args.mqtt_topic_prefix.endswith("/"):
args.mqtt_topic_prefix = args.mqtt_topic_prefix[:-1]

# get logger
logging.basicConfig(level=args.loglevel, format=LOG_SIMPLE)
Expand Down
6 changes: 3 additions & 3 deletions modpoll/modbus_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ def modbus_publish(timestamp=None, on_change=False):
else:
payload[f"{ref.name}"] = ref.val
if args.mqtt_single:
topic = f"{args.mqtt_topic_prefix}{dev.name}/{ref.name}"
topic = f"{args.mqtt_topic_prefix}/{dev.name}/{ref.name}"
if isinstance(ref.val, list):
for i, ref_val_entry in enumerate(ref.val):
mqttc_publish(
Expand All @@ -569,15 +569,15 @@ def modbus_publish(timestamp=None, on_change=False):
if timestamp:
payload["timestamp_ms"] = int(timestamp * 1000)
if not args.mqtt_single:
topic = f"{args.mqtt_topic_prefix}{dev.name}"
topic = f"{args.mqtt_topic_prefix}/{dev.name}"
mqttc_publish(topic, json.dumps(payload), qos=args.mqtt_qos)


def modbus_publish_diagnostics():
for dev in deviceList:
log.debug(f"Publishing diagnostics for device {dev.name} ...")
payload = {"pollCount": dev.pollCount, "errorCount": dev.errorCount}
topic = f"{args.mqtt_topic_prefix}diagnostics/{dev.name}"
topic = f"{args.mqtt_topic_prefix}/diagnostics/{dev.name}"
mqttc_publish(topic, json.dumps(payload), qos=args.mqtt_qos)


Expand Down
146 changes: 95 additions & 51 deletions modpoll/mqtt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import ssl
from multiprocessing import Queue

import paho.mqtt.client as mqtt
from paho.mqtt.client import (
Client as MQTTClient,
CallbackAPIVersion,
ReasonCode,
MQTTProtocolVersion,
)
from paho.mqtt import MQTTException

args = None
Expand All @@ -21,53 +26,69 @@ def __init__(self, topic, payload):


# Callbacks
def _on_connect(client, userdata, flags, rc, properties=None):
def _on_connect(client, userdata, flags, reason_code, properties):
# check if the broker already has a persistent session for this client
if isinstance(flags, dict): # MQTTv5
session_present = flags.get("session present", False)
else: # MQTTv3
session_present = flags.session_present

if isinstance(reason_code, ReasonCode):
rc = reason_code.value
else:
rc = reason_code

if rc == 0:
log.info("Connection successful")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
qos = userdata.get("qos", 0) # Default to QoS 0 if not provided
client.subscribe(f"{args.mqtt_topic_prefix}+/set", qos)
elif rc == 1:
log.warning("Connection refused - incorrect protocol version")
elif rc == 2:
log.warning("Connection refused - invalid client identifier")
elif rc == 3:
log.warning("Connection refused - server unavailable")
elif rc == 4:
log.warning("Connection refused - bad username or password")
elif rc == 5:
log.warning("Connection refused - not authorised")
if session_present:
log.info("Session present, reusing existing session")
else:
log.info("New session created")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
if "subscribe_topic" in userdata:
topic = userdata.get("subscribe_topic")
qos = userdata.get("subscribe_qos", 0) # Default to QoS 0
log.info(f"Subscribe to topic: {topic} with QoS: {qos}")
client.subscribe(topic, qos)
else:
log.warning("Unknown error")
log.warning(f"Connection failed with reason code: {reason_code}")


def _on_subscribe(client, userdata, mid, reason_codes, properties):
for sub_result in reason_codes:
if sub_result == 1:
log.info("Subscribed.")
# Any reason code >= 128 is a failure.
if sub_result >= 128:
log.warning("Failed to subscribe.")

def _on_disconnect(client, userdata, rc):
log.info(f"disconnected. rc={rc}.")

def _on_publish(client, userdata, mid, reason_codes, properties):
log.debug("Sent.")


def _on_message(client, userdata, msg):
if msg.retain == 0:
log.info(f"Receive message ({msg.topic}): {msg.payload}")
def _on_message(client, userdata, message):
if message.retain == 0:
log.info(f"Receive message ({message.topic}): {message.payload}")
else:
log.info(f"Receive retained message ({msg.topic}): {msg.payload}")
obj = msg.topic, msg.payload
log.info(f"Receive retained message ({message.topic}): {message.payload}")
obj = message.topic, message.payload
try:
rx_queue.put(obj, block=False)
except queue.Full:
log.warning("MQTT receiving queue is full, ignoring new message.")


def _on_publish(client, userdata, mid):
log.debug("Sent.")

def _on_disconnect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
log.info("Disconnected.")
if reason_code > 0:
log.warning(f"Disconnected with error, reason_code={reason_code}.")

def _on_subscribe(client, userdata, mid, granted_qos):
log.debug("Subscribed.")


def _on_log(client, userdata, level, string):
log.debug(f"{level} | {string}")
def _on_log(client, userdata, level, buf):
log.debug(f"{level} | {buf}")


def mqttc_setup(config):
Expand All @@ -84,11 +105,7 @@ def mqttc_setup(config):
else:
clientid = args.mqtt_clientid
global mqttc
mqttc = mqtt.Client(
clientid,
clean_session=(args.mqtt_qos == 0),
userdata={"qos": args.mqtt_qos},
)

if args.mqtt_use_tls:
if args.mqtt_tls_version == "tlsv1.2":
tlsVersion = ssl.PROTOCOL_TLSv1_2
Expand Down Expand Up @@ -116,17 +133,46 @@ def mqttc_setup(config):
if args.mqtt_user:
mqttc.username_pw_set(args.mqtt_user, args.mqtt_pass)

mqttc.on_message = _on_message
mqttc.on_connect = _on_connect
mqttc.on_disconnect = _on_disconnect
mqttc.on_publish = _on_publish
mqttc.on_subscribe = _on_subscribe
clean_start_or_session = args.mqtt_qos == 0

if "DEBUG" == args.loglevel.upper():
mqttc.on_log = _on_log
if args.mqtt_version == "5.0":
mqttc = MQTTClient(
CallbackAPIVersion.VERSION2,
client_id=clientid,
userdata={"qos": args.mqtt_qos},
protocol=MQTTProtocolVersion.MQTTv5,
)
mqttc.on_connect = _on_connect
mqttc.on_subscribe = _on_subscribe
mqttc.on_message = _on_message
mqttc.on_publish = _on_publish
mqttc.on_disconnect = _on_disconnect

if "DEBUG" == args.loglevel.upper():
mqttc.on_log = _on_log
mqttc.connect(
host=args.mqtt_host,
port=args.mqtt_port,
keepalive=60,
clean_start=clean_start_or_session,
)
else:
mqttc = MQTTClient(
CallbackAPIVersion.VERSION2,
client_id=clientid,
clean_session=clean_start_or_session,
userdata={"qos": args.mqtt_qos},
protocol=MQTTProtocolVersion.MQTTv311,
)
mqttc.on_connect = _on_connect
mqttc.on_subscribe = _on_subscribe
mqttc.on_message = _on_message
mqttc.on_publish = _on_publish
mqttc.on_disconnect = _on_disconnect

# try to connect
mqttc.connect(host=args.mqtt_host, port=args.mqtt_port, keepalive=60)
if "DEBUG" == args.loglevel.upper():
mqttc.on_log = _on_log
mqttc.connect(host=args.mqtt_host, port=args.mqtt_port, keepalive=60)

# start loop - let paho manage connection
mqttc.loop_start()
Expand All @@ -137,7 +183,6 @@ def mqttc_setup(config):

except Exception as ex:
log.error(f"mqtt connection error: {ex}")
# raise ex
return False


Expand All @@ -148,14 +193,13 @@ def mqttc_publish(topic, msg, qos=0, retain=False):
if not mqttc.is_connected() and qos == 0:
return
pubinfo = mqttc.publish(topic, msg, qos, retain)
# pubinfo.wait_for_publish()
log.debug(
f"publishing MQTT topic: {topic}, msg: {msg}, qos: {qos}, RC: {pubinfo.rc}"
f"Publishing MQTT topic: {topic}, msg: {msg}, qos: {qos}, RC: {pubinfo.rc}"
)
log.info(f"published message to topic: {topic}")
log.info(f"Publish message to topic: {topic}")
return pubinfo
except MQTTException as ex:
log.error(f"Error publishing MQTT topic: {topic}, msg: {msg}, qos: {qos}")
log.error(f"Failed to publish MQTT topic: {topic}, msg: {msg}, qos: {qos}")
raise ex


Expand Down
23 changes: 12 additions & 11 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ packages = [
python = ">=3.8.2,<4.0"
importlib-metadata = "^8.0.0"
pymodbus = "~3.5.4"
paho-mqtt = "^1.6.1"
paho-mqtt = "^2.1.0"
prettytable = "^3.6.0"
requests = "^2.28.1"
pyserial = { version = "^3.5", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ certifi==2024.7.4 ; python_full_version >= "3.8.2" and python_version < "4.0"
charset-normalizer==3.3.2 ; python_full_version >= "3.8.2" and python_version < "4.0"
idna==3.7 ; python_full_version >= "3.8.2" and python_version < "4.0"
importlib-metadata==8.0.0 ; python_full_version >= "3.8.2" and python_version < "4.0"
paho-mqtt==1.6.1 ; python_full_version >= "3.8.2" and python_version < "4.0"
prettytable==3.10.0 ; python_full_version >= "3.8.2" and python_version < "4.0"
paho-mqtt==2.1.0 ; python_full_version >= "3.8.2" and python_version < "4.0"
prettytable==3.10.2 ; python_full_version >= "3.8.2" and python_version < "4.0"
pymodbus==3.5.4 ; python_full_version >= "3.8.2" and python_version < "4.0"
requests==2.32.3 ; python_full_version >= "3.8.2" and python_version < "4.0"
urllib3==2.2.2 ; python_full_version >= "3.8.2" and python_version < "4.0"
Expand Down

0 comments on commit 3e6681a

Please sign in to comment.