Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscribe to a topic fails #195

Open
kamyarz-aws opened this issue Oct 8, 2024 · 4 comments
Open

subscribe to a topic fails #195

kamyarz-aws opened this issue Oct 8, 2024 · 4 comments

Comments

@kamyarz-aws
Copy link


import os
import ssl
import time

import base64
import json
from datetime import datetime
from datetime import timezone
import random

from locust import task, TaskSet
from locust.user.wait_time import constant_throughput, between
from locust_plugins.users.mqtt import MqttUser
import logging
logging.basicConfig(level=logging.DEBUG)

class OneTpsPublish(MqttUser):
    protocol_version = 3
    host = "XXXXX.amazonaws.com"
    port = 8883

    unique_topic = "locust/test/topic"
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    context.load_cert_chain("XXX.crt", "mqtt/XXX.key", None)
    context.load_verify_locations("mqtt/AmazonRootCA1.pem")

    def on_start(self):
        self.client.tls_set_context(self.context)
        self.client.subscribe(self.unique_topic)
        time.sleep(2)

    def on_connect(self, client, userdata, flags, rc):
        time.sleep(1)

    @task
    def publish(self):
        payload = '{"message": "Hello, 123!"}'
        self.client.publish(self.unique_topic, payload)

I have this script trying to test against an aws iot message broker.

It does successfully connect and publish, however it is unable to subscribe to a topic.

if I place the subscribe to happen on_connect, no failure would be reported.

I see in the codebase of the mqtt user, that there is a comment on line 25 that says # indicates a failure to subscribe.

Does this mean it is normal for subscribe to fail ??

The same does NOT happen when I use a public message broker like hiveMQ.

@kamyarz-aws
Copy link
Author

kamyarz-aws commented Oct 9, 2024

one of my suspicions for the reason behind this behaviour is that the keep_alive value is not set when calling self.client.connect_async through locust_plugins.

@kamyarz-aws
Copy link
Author

Any feedback ?

@cyberw
Copy link
Collaborator

cyberw commented Nov 28, 2024

Hi! Sorry, I dont use this User, it was written by @ionutab

But ”its just Python”, so maybe you can ask on stackoverflow. Maybe strip away some of the locust-specific stuff and ask in paho-mqtt repo instead.

@ionutab
Copy link
Contributor

ionutab commented Nov 28, 2024

Hi,
I did not write the MqttUser entirely.
I merely added an extensibility option to allow for better display on the interface and support for MqttV5.

@kamyarz-aws a couple of ideas.

I'm not sure why you would need to subscribe to a topic in a performance test.
I usually use it to push a high volume of messages to an Mqtt broker to test ingestion and response times.

However, you can create your own custom MqttUser, copy paste the code from the existing one and then alter the params for connect_async and add a keepalive param with a value bigger than the current one.

I'm skeptical that that is the reason though.
I would more likely look into you AWS configuration to see if there are subscription restrictions.
Since it's working with HiveMQ I'm guessing the configuration issue is in another place and not from the locust plugins.

Thanks!

class KamyarzMqttUser(User):
    abstract = True

    host = "localhost"
    port = 1883
    transport = "tcp"
    ws_path = "/mqtt"
    tls_context = None
    client_cls: typing.Type[MqttClient] = MqttClient
    client_id = None
    username = None
    password = None
    protocol = mqtt.MQTTv311

    def __init__(self, environment: Environment):
        super().__init__(environment)
        self.client: MqttClient = self.client_cls(
            environment=self.environment, transport=self.transport, client_id=self.client_id, protocol=self.protocol
        )

        if self.tls_context:
            self.client.tls_set_context(self.tls_context)

        if self.transport == "websockets" and self.ws_path:
            self.client.ws_set_options(path=self.ws_path)

        if self.username and self.password:
            self.client.username_pw_set(
                username=self.username,
                password=self.password,
            )

        self.client.connect_async(
            host=self.host,
            port=self.port,
            keepalive=3600
        )
        self.client.loop_start()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants