diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java index 2e1dbb8541..c9194bb860 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshMQTTServer.java @@ -76,7 +76,7 @@ public class EventMeshMQTTServer extends AbstractRemotingServer { private final Acl acl; - protected final transient Map processorTable = + protected final Map processorTable = new ConcurrentHashMap<>(64); private final transient AtomicBoolean started = new AtomicBoolean(false); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java index 101e07cd22..8178cad8fd 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/mqtt/processor/PublishProcessor.java @@ -51,7 +51,6 @@ public PublishProcessor(EventMeshMQTTServer eventMeshMQTTServer, Executor execut @Override public void process(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws MqttException { - // clientManager.getOrRegisterClient(ctx, mqttMessage); MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();