diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java index d9983b42..e30070d5 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/DefaultMqttClientProcessor.java @@ -68,6 +68,7 @@ public void processConAck(ChannelContext context, MqttConnAckMessage message) { switch (returnCode) { case CONNECTION_ACCEPTED: // 1. 连接成功的日志 + context.setAccepted(true); if (logger.isInfoEnabled()) { Node node = context.getServerNode(); logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort()); @@ -104,7 +105,7 @@ private void publishConnectEvent(ChannelContext context) { // 触发客户端连接事件 executor.submit(() -> { try { - connectListener.onConnected(context, context.isReconnect); + connectListener.onConnected(context, context.isReconnect()); } catch (Throwable e) { logger.error(e.getMessage(), e); } diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java index 7e163648..df5aac79 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClient.java @@ -390,7 +390,7 @@ public void reconnect() { } try { // 判断是否 removed - if (channelContext.isRemoved) { + if (channelContext.isRemoved()) { channelContext.setRemoved(false); } tioClient.reconnect(channelContext, config.getTimeout()); @@ -538,7 +538,7 @@ public ClientChannelContext getContext() { */ public boolean isConnected() { ClientChannelContext channelContext = getContext(); - return channelContext != null && !channelContext.isClosed; + return channelContext != null && !channelContext.isAccepted(); } /** @@ -570,7 +570,7 @@ private void startHeartbeatTask() { long currTime = System.currentTimeMillis(); for (ChannelContext entry : set) { ClientChannelContext channelContext = (ClientChannelContext) entry; - if (channelContext.isClosed || channelContext.isRemoved) { + if (channelContext.isClosed() || channelContext.isRemoved()) { continue; } long interval = currTime - channelContext.stat.latestTimeOfSentPacket; diff --git a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java index b0db8d6f..70250d48 100644 --- a/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java +++ b/mica-mqtt-client/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientAioListener.java @@ -57,7 +57,8 @@ public void onAfterConnected(ChannelContext context, boolean isConnected, boolea } @Override - public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) { + public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { + context.setAccepted(false); // 先判断是否配置监听 if (connectListener == null) { return; @@ -65,7 +66,7 @@ public void onBeforeClose(ChannelContext channelContext, Throwable throwable, St // 2. 触发客户断开连接事件 executor.submit(() -> { try { - connectListener.onDisconnect(channelContext, throwable, remark, isRemove); + connectListener.onDisconnect(context, throwable, remark, isRemove); } catch (Throwable e) { logger.error(e.getMessage(), e); } diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java index bbed4611..70c03ed0 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttConst.java @@ -23,10 +23,6 @@ */ public interface MqttConst { - /** - * 正常断开连接 - */ - String DIS_CONNECTED = "disconnected"; /** * 是 http 协议 */ diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java index e42f4004..01f6bcfc 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java @@ -161,7 +161,7 @@ public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qo TopicUtil.validateTopicName(topic); // 获取 context ChannelContext context = Tio.getByBsId(getServerConfig(), clientId); - if (context == null || context.isClosed) { + if (context == null || context.isClosed()) { logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId); return false; } @@ -267,7 +267,7 @@ public boolean publishAll(String topic, byte[] payload, MqttQoS qos, boolean ret for (Subscribe subscribe : subscribeList) { String clientId = subscribe.getClientId(); ChannelContext context = Tio.getByBsId(getServerConfig(), clientId); - if (context == null || context.isClosed) { + if (context == null || context.isClosed()) { logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", topic, clientId); continue; } diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioHandler.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioHandler.java index 535d8f1c..9d653625 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioHandler.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioHandler.java @@ -99,9 +99,8 @@ public void handler(Packet packet, ChannelContext context) { processor.processConnect(context, (MqttConnectMessage) mqttMessage); return; } - // 3. 客户端 id 是创建连接之后才有的,如果客户端 id 为空,直接关闭 - String clientId = context.getBsId(); - if (StrUtil.isBlank(clientId)) { + // 3. 判定是否认证成功 + if (!context.isAccepted()) { Tio.remove(context, "Mqtt connected but clientId is blank."); return; } diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java index ac47bc3e..e9eaf64b 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServerAioListener.java @@ -65,6 +65,8 @@ public boolean onHeartbeatTimeout(ChannelContext context, long interval, int hea @Override public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) { + // 标记认证为 false + context.setAccepted(false); // 1. http 请求跳过 boolean isHttpRequest = context.get(MqttConst.IS_HTTP) != null; if (isHttpRequest) { @@ -74,7 +76,8 @@ public void onBeforeClose(ChannelContext context, Throwable throwable, String re // 2. 业务 id String clientId = context.getBsId(); // 3. 判断是否正常断开 - boolean isNotNormalDisconnect = context.get(MqttConst.DIS_CONNECTED) == null; + boolean isNotNormalDisconnect = context.isBizStatus(); + context.setBizStatus(false); if (isNotNormalDisconnect || throwable != null) { // 避免网络异常时短期照成大量异常打印,会导致内存突增 if (throwable instanceof IOException) { @@ -95,7 +98,6 @@ public void onBeforeClose(ChannelContext context, Throwable throwable, String re } // 6. 会话清理 cleanSession(clientId); - context.remove(MqttConst.DIS_CONNECTED); // 7. 下线事件 String username = context.get(MqttConst.USER_NAME_KEY); context.remove(MqttConst.USER_NAME_KEY); diff --git a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java index f0aec259..2806da86 100644 --- a/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-server/src/main/java/net/dreamlu/iot/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -115,6 +115,8 @@ public void processConnect(ChannelContext context, MqttConnectMessage mqttMessag connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); return; } + // 认证成功 + context.setAccepted(true); // 4. 判断 uniqueId 是否在多个地方使用,如果在其他地方有使用,先解绑 ChannelContext otherContext = Tio.getByBsId(context.getTioConfig(), uniqueId); if (otherContext != null) { @@ -440,7 +442,7 @@ public void processDisConnect(ChannelContext context) { String clientId = context.getBsId(); logger.info("DisConnect - clientId:{} contextId:{}", clientId, context.getId()); // 设置正常断开的标识 - context.set(MqttConst.DIS_CONNECTED, (byte) 1); + context.setBizStatus(true); Tio.remove(context, "Mqtt DisConnect"); } diff --git a/pom.xml b/pom.xml index dcca15fb..ce11f27f 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ UTF-8 1.2.2 - 0.1.6 + 0.1.8 2.7.18 2.7.18 1.2.83