diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..cefb937 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..d084162 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/mqtt-jmeter.iml b/mqtt-jmeter.iml new file mode 100644 index 0000000..46e0edd --- /dev/null +++ b/mqtt-jmeter.iml @@ -0,0 +1,77 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/mqtt_jmeter/pom.xml b/mqtt_jmeter/pom.xml index c4b41ec..768acc3 100644 --- a/mqtt_jmeter/pom.xml +++ b/mqtt_jmeter/pom.xml @@ -1,74 +1,73 @@ - - 4.0.0 - net.xmeter - mqtt-jmeter - 2.0.2 + + 4.0.0 + net.xmeter + mqtt-jmeter + 2.0.3 - - 5.4.3 - + + 5.5 + - - - org.apache.jmeter - ApacheJMeter_core - ${jmeter-version} - provided - + + + org.apache.jmeter + ApacheJMeter_core + ${jmeter-version} + provided + - - org.apache.jmeter - ApacheJMeter_java - ${jmeter-version} - provided - + + org.apache.jmeter + ApacheJMeter_java + ${jmeter-version} + provided + - - org.fusesource.mqtt-client - mqtt-client - 1.14 - + + com.hivemq + hivemq-mqtt-client + 1.3.0 + - - com.hivemq - hivemq-mqtt-client - 1.1.3 - + + javax.xml.bind + jaxb-api + 2.3.0 + - + - - mqtt-xmeter-${project.version} - install - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.0 - - 1.8 - 1.8 - - - - org.apache.maven.plugins - maven-assembly-plugin - - - jar-with-dependencies - - - - - assemble-all - package - - single - - - - - - - + + mqtt-xmeter-${project.version} + install + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + assemble-all + package + + single + + + + + + + \ No newline at end of file diff --git a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java index e207efb..8293123 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java @@ -53,6 +53,7 @@ public interface Constants { public static final String MQTT_VERSION_3_1_1 = "3.1.1"; public static final String MQTT_VERSION_3_1 = "3.1"; + public static final String MQTT_VERSION_5 = "5.0"; public static final String SAMPLE_ON_CONDITION_OPTION1 = "specified elapsed time (ms)"; public static final String SAMPLE_ON_CONDITION_OPTION2 = "number of received messages"; @@ -60,7 +61,7 @@ public interface Constants { public static final int MAX_CLIENT_ID_LENGTH = 23; public static final String DEFAULT_SERVER = "127.0.0.1"; - public static final String DEFAULT_MQTT_VERSION = "3.1"; + public static final String DEFAULT_MQTT_VERSION = "5.0"; public static final String DEFAULT_PORT = "1883"; public static final String DEFAULT_CONN_TIME_OUT = "10"; public static final String TCP_PROTOCOL = "TCP"; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java index 8759352..51a76ec 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java @@ -33,7 +33,7 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ private final JLabeledTextField serverAddr = new JLabeledTextField("Server name or IP:"); private final JLabeledTextField serverPort = new JLabeledTextField("Port number:", 5); - private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1 }, false, false);; + private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_5, MQTT_VERSION_3_1, MQTT_VERSION_3_1_1 }, false, false);; private final JLabeledTextField timeout = new JLabeledTextField("Timeout(s):", 5); private final JLabeledTextField userNameAuth = new JLabeledTextField("User name:"); @@ -257,9 +257,11 @@ public void configure(AbstractMQTTSampler sampler) { serverAddr.setText(sampler.getServer()); serverPort.setText(sampler.getPort()); if(sampler.getMqttVersion().equals(MQTT_VERSION_3_1)) { - mqttVersion.setSelectedIndex(0); - } else if(sampler.getMqttVersion().equals(MQTT_VERSION_3_1_1)) { mqttVersion.setSelectedIndex(1); + } else if(sampler.getMqttVersion().equals(MQTT_VERSION_3_1_1)) { + mqttVersion.setSelectedIndex(2); + } else if (sampler.getMqttVersion().equals(MQTT_VERSION_5)) { + mqttVersion.setSelectedIndex(0); } timeout.setText(sampler.getConnTimeout()); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java index da72e2a..9b43208 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientConnectSamplerUI.java @@ -131,7 +131,7 @@ public String getLabelResource() { @Override public String getStaticLabel() { - return "Efficient MQTT Connect"; + return "MQTT Connect"; } @Override diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientDisConnectSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientDisConnectSamplerUI.java index fe74a17..8e7fe79 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientDisConnectSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/EfficientDisConnectSamplerUI.java @@ -45,7 +45,7 @@ public String getLabelResource() { @Override public String getStaticLabel() { - return "Efficient MQTT DisConnect"; + return "MQTT DisConnect"; } @Override diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java index 556d6b2..a5cae2a 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientConnectSampler.java @@ -35,7 +35,7 @@ public class EfficientConnectSampler extends AbstractMQTTSampler { public SampleResult sample(Entry entry) { lock = new Object(); SampleResult result = new SampleResult(); - result.setSampleLabel(getLabelPrefix() + getName()); + result.setSampleLabel(getName()); result.setSuccessful(true); JMeterVariables vars = JMeterContextService.getContext().getVariables(); @@ -75,7 +75,7 @@ public SampleResult sample(Entry entry) { } else { clientId = getConnPrefix(); if (clientId != null && !clientId.isEmpty()) { - clientId += "-xmeter-suffix-" + i; + clientId += "-xmeter-suffix-" + i + JMeterContextService.getContext().getThreadNum(); } } @@ -96,6 +96,8 @@ public SampleResult sample(Entry entry) { suc = handleSubscription(connection); } if (suc) { + vars.putObject("conn", connection); // save connection object as thread local variable !! + vars.putObject("clientId", client.getClientId()); //save client id as thread local variable subResult.setSuccessful(true); subResult.setResponseData("Successful.".getBytes()); subResult.setResponseMessage(MessageFormat.format("Connection {0} established successfully.", connection)); @@ -129,6 +131,7 @@ public SampleResult sample(Entry entry) { } if (!connections.isEmpty()) { vars.putObject("conns", connections); + logger.info(MessageFormat.format("Connections created. Count={0}", connections.size())); } result.setSampleCount(totalSampleCount); if (result.getEndTime() == 0) { diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java index e33502d..4cc1dce 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/EfficientDisConnectSampler.java @@ -21,7 +21,7 @@ public class EfficientDisConnectSampler extends AbstractMQTTSampler { @Override public SampleResult sample(Entry entry) { SampleResult result = new SampleResult(); - result.setSampleLabel(getLabelPrefix() + getName()); + result.setSampleLabel(getName()); result.setSuccessful(true); JMeterVariables vars = JMeterContextService.getContext().getVariables(); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT3Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT3Client.java new file mode 100644 index 0000000..11db97a --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT3Client.java @@ -0,0 +1,102 @@ +package net.xmeter.samplers.mqtt.hivemq; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.hivemq.client.mqtt.MqttClientSslConfig; +import com.hivemq.client.mqtt.MqttWebSocketConfig; +import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder; +import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect; +import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; +import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder; +import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuth; +import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder; +import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder; +import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; + +import net.xmeter.samplers.mqtt.ConnectionParameters; +import net.xmeter.samplers.mqtt.MQTTClient; +import net.xmeter.samplers.mqtt.MQTTClientException; +import net.xmeter.samplers.mqtt.MQTTConnection; + +class HiveMQTT3Client implements MQTTClient { + private static final Logger logger = Logger.getLogger(HiveMQTT3Client.class.getCanonicalName()); + private final ConnectionParameters parameters; + private final Mqtt3BlockingClient client; + + HiveMQTT3Client(ConnectionParameters parameters) throws Exception { + this.parameters = parameters; + Mqtt3ClientBuilder mqtt3ClientBuilder = Mqtt3Client.builder() + .identifier(parameters.getClientId()) + .serverHost(parameters.getHost()) + .serverPort(parameters.getPort()); + mqtt3ClientBuilder = applyAdditionalConfig(mqtt3ClientBuilder, parameters); + client = mqtt3ClientBuilder + .buildBlocking(); + } + + private Mqtt3ClientBuilder applyAdditionalConfig(Mqtt3ClientBuilder builder, ConnectionParameters parameters) + throws Exception { + if (parameters.getReconnectMaxAttempts() > 0) { + builder = builder.automaticReconnect(MqttClientAutoReconnect.builder().build()); + } + if (parameters.isSecureProtocol()) { + MqttClientSslConfig sslConfig = ((HiveMQTTSsl) parameters.getSsl()).getSslConfig(); + builder = builder.sslConfig(sslConfig); + } + if (parameters.isWebSocketProtocol()) { + MqttWebSocketConfigBuilder wsConfigBuilder = MqttWebSocketConfig.builder(); + if (parameters.getPath() != null) { + wsConfigBuilder.serverPath(parameters.getPath()); + } + builder = builder.webSocketConfig(wsConfigBuilder.build()); + } + return builder; + } + + @Override + public String getClientId() { + return parameters.getClientId(); + } + + @Override + public MQTTConnection connect() throws Exception { + Mqtt3ConnectBuilder.Send> connectSend = client.toAsync().connectWith() + .cleanSession(parameters.isCleanSession()) + .keepAlive(parameters.getKeepAlive()); + Mqtt3SimpleAuth auth = createAuth(); + if (auth != null) { + connectSend = connectSend.simpleAuth(auth); + } + logger.info(() -> "Connect client: " + parameters.getClientId()); + CompletableFuture connectFuture = connectSend.send(); + try { + Mqtt3ConnAck connAck = connectFuture.get(parameters.getConnectTimeout(), TimeUnit.SECONDS); + logger.info(() -> "Connected client: " + parameters.getClientId()); + return new HiveMQTT3Connection(client, parameters.getClientId(), connAck); + } catch (TimeoutException timeoutException) { + try { + client.disconnect(); + } catch (Exception e) { + logger.log(Level.FINE, "Disconnect on timeout failed " + client, e); + } + throw new MQTTClientException("Connection timeout " + client, timeoutException); + } + } + + private Mqtt3SimpleAuth createAuth() { + if (parameters.getUsername() != null) { + Mqtt3SimpleAuthBuilder.Complete simpleAuth = Mqtt3SimpleAuth.builder() + .username(parameters.getUsername()); + if (parameters.getPassword() != null) { + simpleAuth.password(parameters.getPassword().getBytes()); + } + return simpleAuth.build(); + } + return null; + } +} \ No newline at end of file diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT3Connection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT3Connection.java new file mode 100644 index 0000000..417fc73 --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT3Connection.java @@ -0,0 +1,132 @@ +package net.xmeter.samplers.mqtt.hivemq; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.util.List; +import java.util.function.Consumer; +import java.util.logging.Logger; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient; +import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe; +import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3SubscribeBuilder; +import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription; +import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode; + +import net.xmeter.samplers.mqtt.MQTTClientException; +import net.xmeter.samplers.mqtt.MQTTConnection; +import net.xmeter.samplers.mqtt.MQTTPubResult; +import net.xmeter.samplers.mqtt.MQTTQoS; +import net.xmeter.samplers.mqtt.MQTTSubListener; + +class HiveMQTT3Connection implements MQTTConnection { + private static final Logger logger = Logger.getLogger(HiveMQTT3Connection.class.getCanonicalName()); + + private static final Charset charset = Charset.forName("UTF-8"); + private static ThreadLocal decoder = ThreadLocal.withInitial(() -> charset.newDecoder()); + + private final Mqtt3BlockingClient client; + private final String clientId; + private final Mqtt3ConnAck connAck; + private MQTTSubListener listener; + + HiveMQTT3Connection(Mqtt3BlockingClient client, String clientId, Mqtt3ConnAck connAck) { + this.client = client; + this.clientId = clientId; + this.connAck = connAck; + } + + @Override + public boolean isConnectionSucc() { + return !connAck.getReturnCode().isError(); + } + + @Override + public String getClientId() { + return clientId; + } + + @Override + public void disconnect() { + client.disconnect(); + } + + @Override + public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained) { + try { + client.publishWith() + .topic(topicName) + .payload(message) + .qos(HiveUtil.map(qoS)) + .retain(retained) + .send(); + return new MQTTPubResult(true); + } catch (Exception error) { + return new MQTTPubResult(false, error.getMessage()); + } + } + + @Override + public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure) { + MqttQos hiveQos = HiveUtil.map(qos); + Mqtt3Subscribe subscribe = null; + Mqtt3SubscribeBuilder builder = Mqtt3Subscribe.builder(); + for (int i = 0; i < topicNames.length; i++) { + String topicName = topicNames[i]; + Mqtt3Subscription subscription = Mqtt3Subscription.builder().topicFilter(topicName).qos(hiveQos).build(); + if (i < topicNames.length - 1) { + builder.addSubscription(subscription); + } else { + subscribe = builder.addSubscription(subscription).build(); + } + } + + Mqtt3AsyncClient asyncClient = client.toAsync(); + asyncClient.subscribe(subscribe, this::handlePublishReceived).whenComplete((ack, error) -> { + if (error != null) { + onFailure.accept(error); + } else { + List ackCodes = ack.getReturnCodes(); + for (int i = 0; i < ackCodes.size(); i++) { + Mqtt3SubAckReturnCode ackCode = ackCodes.get(i); + if (ackCode.isError()) { + int index = i; + logger.warning(() -> "Failed to subscribe " + topicNames[index] + " code: " + ackCode.name()); + } + } + onSuccess.run(); + } + }); + } + + private void handlePublishReceived(Mqtt3Publish received) { + String topic = decode(received.getTopic().toByteBuffer()); + String payload = received.getPayload().map(this::decode).orElse(""); + this.listener.accept(topic, payload, () -> {}); + } + + private String decode(ByteBuffer value) { + try { + return decoder.get().decode(value).toString(); + } catch (CharacterCodingException e) { + throw new RuntimeException(new MQTTClientException("Failed to decode", e)); + } + } + + @Override + public void setSubListener(MQTTSubListener listener) { + this.listener = listener; + } + + @Override + public String toString() { + return "HiveMQTT3Connection{" + + "clientId='" + clientId + '\'' + + '}'; + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT5Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT5Client.java new file mode 100644 index 0000000..c4e67ce --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT5Client.java @@ -0,0 +1,107 @@ +package net.xmeter.samplers.mqtt.hivemq; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.hivemq.client.mqtt.MqttClientSslConfig; +import com.hivemq.client.mqtt.MqttWebSocketConfig; +import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder; +import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; +import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth; +import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuthBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; + +import net.xmeter.samplers.mqtt.ConnectionParameters; +import net.xmeter.samplers.mqtt.MQTTClient; +import net.xmeter.samplers.mqtt.MQTTClientException; +import net.xmeter.samplers.mqtt.MQTTConnection; + +class HiveMQTT5Client implements MQTTClient { + private static final Logger logger = Logger.getLogger(HiveMQTT5Client.class.getCanonicalName()); + private final ConnectionParameters parameters; + private final Mqtt5BlockingClient client; + + HiveMQTT5Client(ConnectionParameters parameters) throws Exception { + this.parameters = parameters; + + Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder() + .identifier(parameters.getClientId()) + .serverHost(parameters.getHost()) + .serverPort(parameters.getPort()); + + mqtt5ClientBuilder = applyAdditionalConfig(mqtt5ClientBuilder, parameters); + client = mqtt5ClientBuilder.buildBlocking(); + + } + + private Mqtt5ClientBuilder applyAdditionalConfig(Mqtt5ClientBuilder builder, ConnectionParameters parameters) + throws Exception + { + if (parameters.getReconnectMaxAttempts() > 0) { + builder = builder.automaticReconnect(MqttClientAutoReconnect.builder().build()); + } + if (parameters.isSecureProtocol()) { + MqttClientSslConfig sslConfig = ((HiveMQTTSsl) parameters.getSsl()).getSslConfig(); + builder = builder.sslConfig(sslConfig); + } + if (parameters.isWebSocketProtocol()) { + MqttWebSocketConfigBuilder wsConfigBuilder = MqttWebSocketConfig.builder(); + if (parameters.getPath() != null) { + wsConfigBuilder.serverPath(parameters.getPath()); + } + builder = builder.webSocketConfig(wsConfigBuilder.build()); + } + + return builder; + } + + + @Override + public String getClientId() { + return parameters.getClientId(); + } + + @Override + public MQTTConnection connect() throws Exception { + Mqtt5ConnectBuilder.Send> connectSend = client.toAsync().connectWith() + .cleanStart(parameters.isCleanSession()) + .keepAlive(parameters.getKeepAlive()); + Mqtt5SimpleAuth auth = createAuth(); + if (auth != null) { + connectSend = connectSend.simpleAuth(auth); + } + logger.info(() -> "Connect client: " + parameters.getClientId()); + CompletableFuture connectFuture = connectSend.send(); + try { + Mqtt5ConnAck connAck = connectFuture.get(parameters.getConnectTimeout(), TimeUnit.SECONDS); + logger.info(() -> "Connected client: " + parameters.getClientId()); + return new HiveMQTT5Connection(client, parameters.getClientId(), connAck); + } catch (TimeoutException timeoutException) { + try { + client.disconnect(); + } catch (Exception e) { + logger.log(Level.FINE, "Disconnect on timeout failed " + client, e); + } + throw new MQTTClientException("Connection timeout " + client, timeoutException); + } + } + + private Mqtt5SimpleAuth createAuth() { + if (parameters.getUsername() != null) { + Mqtt5SimpleAuthBuilder.Complete simpleAuth = Mqtt5SimpleAuth.builder() + .username(parameters.getUsername()); + if (parameters.getPassword() != null) { + simpleAuth.password(parameters.getPassword().getBytes()); + } + return simpleAuth.build(); + } + return null; + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT5Connection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT5Connection.java new file mode 100644 index 0000000..ca6a895 --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTT5Connection.java @@ -0,0 +1,133 @@ +package net.xmeter.samplers.mqtt.hivemq; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.util.List; +import java.util.function.Consumer; +import java.util.logging.Logger; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5SubscribeBuilder; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscription; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; + +import net.xmeter.samplers.mqtt.MQTTClientException; +import net.xmeter.samplers.mqtt.MQTTConnection; +import net.xmeter.samplers.mqtt.MQTTPubResult; +import net.xmeter.samplers.mqtt.MQTTQoS; +import net.xmeter.samplers.mqtt.MQTTSubListener; + +class HiveMQTT5Connection implements MQTTConnection { + private static final Logger logger = Logger.getLogger(HiveMQTT5Connection.class.getCanonicalName()); + + private static final Charset charset = Charset.forName("UTF-8"); + private static ThreadLocal decoder = ThreadLocal.withInitial(() -> charset.newDecoder()); + + private final Mqtt5BlockingClient client; + private final String clientId; + private final Mqtt5ConnAck connAck; + private MQTTSubListener listener; + + HiveMQTT5Connection(Mqtt5BlockingClient client, String clientId, Mqtt5ConnAck connAck) { + this.client = client; + this.clientId = clientId; + this.connAck = connAck; + } + + @Override + public boolean isConnectionSucc() { + return !connAck.getReasonCode().isError(); + } + + @Override + public String getClientId() { + return clientId; + } + + @Override + public void disconnect() { + client.disconnect(); + } + + @Override + public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained) { + try { + client.publishWith() + .topic(topicName) + .payload(message) + .qos(HiveUtil.map(qoS)) + .retain(retained) + .send(); + return new MQTTPubResult(true); + } catch (Exception error) { + return new MQTTPubResult(false, error.getMessage()); + } + } + + @Override + public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure) { + MqttQos hiveQos = HiveUtil.map(qos); + Mqtt5Subscribe subscribe = null; + Mqtt5SubscribeBuilder builder = Mqtt5Subscribe.builder(); + for (int i = 0; i < topicNames.length; i++) { + String topicName = topicNames[i]; + Mqtt5Subscription subscription = Mqtt5Subscription.builder().topicFilter(topicName).qos(hiveQos).build(); + if (i < topicNames.length - 1) { + builder.addSubscription(subscription); + } else { + subscribe = builder.addSubscription(subscription).build(); + } + } + + Mqtt5AsyncClient asyncClient = client.toAsync(); + + asyncClient.subscribe(subscribe, this::handlePublishReceived).whenComplete((ack, error) -> { + if (error != null) { + onFailure.accept(error); + } else { + List ackCodes = ack.getReasonCodes(); + for (int i = 0; i < ackCodes.size(); i++) { + Mqtt5SubAckReasonCode ackCode = ackCodes.get(i); + if (ackCode.isError()) { + int index = i; + logger.warning(() -> "Failed to subscribe " + topicNames[index] + " code: " + ackCode.name()); + } + } + onSuccess.run(); + } + }); + } + + private void handlePublishReceived(Mqtt5Publish received) { + String topic = decode(received.getTopic().toByteBuffer()); + String payload = received.getPayload().map(this::decode).orElse(""); + this.listener.accept(topic, payload, () -> {}); + } + + private String decode(ByteBuffer value) { + try { + return decoder.get().decode(value).toString(); + } catch (CharacterCodingException e) { + throw new RuntimeException(new MQTTClientException("Failed to decode", e)); + } + } + + @Override + public void setSubListener(MQTTSubListener listener) { + this.listener = listener; + } + + @Override + public String toString() { + return "HiveMQTT5Connection{" + + "clientId='" + clientId + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java index d514035..37a59c3 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTFactory.java @@ -14,6 +14,7 @@ import com.hivemq.client.util.KeyStoreUtil; import net.xmeter.AcceptAllTrustManagerFactory; +import net.xmeter.Constants; import net.xmeter.Util; import net.xmeter.samplers.AbstractMQTTSampler; import net.xmeter.samplers.mqtt.ConnectionParameters; @@ -36,7 +37,11 @@ public List getSupportedProtocols() { @Override public MQTTClient createClient(ConnectionParameters parameters) throws Exception { - return new HiveMQTTClient(parameters); + if(parameters.getVersion().equals(Constants.MQTT_VERSION_5)) { + return new HiveMQTT5Client(parameters); + } + + return new HiveMQTT3Client(parameters); } @Override @@ -56,6 +61,7 @@ public MQTTSsl createSsl(AbstractMQTTSampler sampler) throws Exception { // String keyStorePass = sampler.getKeyStorePassword(); File clientCertFile = Util.getClientCertFile(sampler); String clientPass = sampler.getClientCertPassword(); + sslBuilder = sslBuilder.keyManagerFactory(KeyStoreUtil.keyManagerFromKeystore(clientCertFile, clientPass, clientPass)) .trustManagerFactory(AcceptAllTrustManagerFactory.getInstance()); } diff --git a/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi b/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi index 4ccdcc5..ebb66d8 100644 --- a/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi +++ b/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi @@ -1,2 +1 @@ -net.xmeter.samplers.mqtt.fuse.FuseMQTTSpi net.xmeter.samplers.mqtt.hivemq.HiveMQTTSpi