From 404cda3cecfe96d8a835129fbfee400c2889dea7 Mon Sep 17 00:00:00 2001 From: solley Date: Sun, 15 Jan 2023 19:45:08 +0800 Subject: [PATCH] =?UTF-8?q?add:=E6=B7=BB=E5=8A=A0=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E4=B8=8A=E4=B8=8B=E7=BA=BF=E7=B3=BB=E7=BB=9Ftopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/compiler.xml | 4 +-- .../monkey/mmq/protocol/BrokerHandler.java | 19 ++++++++++- .../org/monkey/mmq/protocol/DisConnect.java | 32 ++++++++++++++++++- .../org/monkey/mmq/core/common/Constants.java | 4 +++ 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/.idea/compiler.xml b/.idea/compiler.xml index ce6099b..0e2a437 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -6,13 +6,13 @@ - + - + diff --git a/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java b/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java index 0f2b4c0..010e868 100644 --- a/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java +++ b/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java @@ -31,10 +31,14 @@ import org.monkey.mmq.core.exception.MmqException; import org.monkey.mmq.core.actor.metadata.message.SessionMateData; import org.monkey.mmq.core.actor.message.SystemMessage; +import org.monkey.mmq.core.utils.JacksonUtils; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; -import static org.monkey.mmq.core.common.Constants.MODULES; +import static org.monkey.mmq.core.common.Constants.*; /** * MQTT消息处理 @@ -94,7 +98,20 @@ public void channelRead(ChannelHandlerContext ctx, Object obj) { case CONNECT: //protocolProcess.getConnectExecutor().submit(() -> { try { + MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) msg; protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage) msg); + SystemMessage systemMessage = new SystemMessage(); + systemMessage.setTopic(CLIENT_CONNECT + "/" + mqttConnectMessage.payload().clientIdentifier()); + Map payload = new HashMap(); + payload.put("clientId", mqttConnectMessage.payload().clientIdentifier()); +// payload.put("username", mqttConnectMessage.payload().userName()); + InetSocketAddress clientIpSocket = (InetSocketAddress)ctx.channel().remoteAddress(); + String clientIp = clientIpSocket.getAddress().getHostAddress(); + payload.put("ip", clientIp); + systemMessage.setPayload(JacksonUtils.toJson(payload)); + systemMessage.setMqttQoS(MqttQoS.AT_LEAST_ONCE); + ActorSelection actorRef = actorSystem.actorSelection("/user/" + ((MqttConnectMessage) msg).payload().clientIdentifier()); + actorRef.tell(systemMessage, ActorRef.noSender()); } catch (MmqException e) { Loggers.BROKER_SERVER.error(e.getErrMsg()); SystemMessage systemMessage = new SystemMessage(); diff --git a/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java b/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java index 7c69e8d..3abce2e 100644 --- a/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java +++ b/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java @@ -16,11 +16,16 @@ package org.monkey.mmq.protocol; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.handler.codec.mqtt.*; import io.netty.util.AttributeKey; import org.monkey.mmq.config.Loggers; +import org.monkey.mmq.core.actor.metadata.subscribe.SubscribeMateData; import org.monkey.mmq.core.exception.MmqException; +import org.monkey.mmq.core.utils.JacksonUtils; import org.monkey.mmq.core.utils.LoggerUtils; import org.monkey.mmq.core.actor.metadata.message.SessionMateData; import org.monkey.mmq.service.DupPubRelMessageStoreService; @@ -28,6 +33,13 @@ import org.monkey.mmq.service.SessionStoreService; import org.monkey.mmq.service.SubscribeStoreService; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.monkey.mmq.core.common.Constants.CLIENT_DISCONNECT; + /** * DISCONNECT连接处理 @@ -66,6 +78,24 @@ public void processDisConnect(Channel channel, MqttMessage msg) throws MmqExcept } LoggerUtils.printIfDebugEnabled(Loggers.BROKER_PROTOCOL,"DISCONNECT - clientId: {}, cleanSession: {}", clientId, sessionStore.isCleanSession()); sessionStoreService.delete(clientId, channel.id().asLongText()); + + // 发送设备下线通知 + List subscribeStores = subscribeStoreService.search(CLIENT_DISCONNECT + "/" + clientId); + subscribeStores.forEach(subscribeStore -> { + Map payload = new HashMap(); + payload.put("clientId", clientId); + InetSocketAddress clientIpSocket = (InetSocketAddress) channel.remoteAddress(); + String clientIp = clientIpSocket.getAddress().getHostAddress(); + payload.put("ip", clientIp); + MqttQoS respQoS = MqttQoS.AT_LEAST_ONCE; + MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( + new MqttFixedHeader(MqttMessageType.PUBLISH, false, respQoS, false, 0), + new MqttPublishVariableHeader(CLIENT_DISCONNECT + "/" + clientId, 0), Unpooled.buffer().writeBytes(JacksonUtils.toJson(payload).getBytes())); + SessionMateData subSession = sessionStoreService.get(subscribeStore.getClientId()); + if (subSession != null) { + subSession.getChannel().writeAndFlush(publishMessage); + } + }); channel.close(); } diff --git a/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java b/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java index f79fe47..fb5acd2 100644 --- a/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java +++ b/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java @@ -50,4 +50,8 @@ public class Constants { public static final String RULE_ENGINE = SYSTOPIC + "/RULE"; public static final String MODULES = SYSTOPIC + "/MODULES"; + + public static final String CLIENT_CONNECT = SYSTOPIC + "/client/connected"; + + public static final String CLIENT_DISCONNECT = SYSTOPIC + "/client/disconnected"; }