diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index ce6099b..0e2a437 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -6,13 +6,13 @@
-
+
-
+
diff --git a/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java b/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java
index 0f2b4c0..010e868 100644
--- a/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java
+++ b/mmq-broker/src/main/java/org/monkey/mmq/protocol/BrokerHandler.java
@@ -31,10 +31,14 @@
import org.monkey.mmq.core.exception.MmqException;
import org.monkey.mmq.core.actor.metadata.message.SessionMateData;
import org.monkey.mmq.core.actor.message.SystemMessage;
+import org.monkey.mmq.core.utils.JacksonUtils;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
-import static org.monkey.mmq.core.common.Constants.MODULES;
+import static org.monkey.mmq.core.common.Constants.*;
/**
* MQTT消息处理
@@ -94,7 +98,20 @@ public void channelRead(ChannelHandlerContext ctx, Object obj) {
case CONNECT:
//protocolProcess.getConnectExecutor().submit(() -> {
try {
+ MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) msg;
protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage) msg);
+ SystemMessage systemMessage = new SystemMessage();
+ systemMessage.setTopic(CLIENT_CONNECT + "/" + mqttConnectMessage.payload().clientIdentifier());
+ Map payload = new HashMap();
+ payload.put("clientId", mqttConnectMessage.payload().clientIdentifier());
+// payload.put("username", mqttConnectMessage.payload().userName());
+ InetSocketAddress clientIpSocket = (InetSocketAddress)ctx.channel().remoteAddress();
+ String clientIp = clientIpSocket.getAddress().getHostAddress();
+ payload.put("ip", clientIp);
+ systemMessage.setPayload(JacksonUtils.toJson(payload));
+ systemMessage.setMqttQoS(MqttQoS.AT_LEAST_ONCE);
+ ActorSelection actorRef = actorSystem.actorSelection("/user/" + ((MqttConnectMessage) msg).payload().clientIdentifier());
+ actorRef.tell(systemMessage, ActorRef.noSender());
} catch (MmqException e) {
Loggers.BROKER_SERVER.error(e.getErrMsg());
SystemMessage systemMessage = new SystemMessage();
diff --git a/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java b/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java
index 7c69e8d..3abce2e 100644
--- a/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java
+++ b/mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java
@@ -16,11 +16,16 @@
package org.monkey.mmq.protocol;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
-import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.handler.codec.mqtt.*;
import io.netty.util.AttributeKey;
import org.monkey.mmq.config.Loggers;
+import org.monkey.mmq.core.actor.metadata.subscribe.SubscribeMateData;
import org.monkey.mmq.core.exception.MmqException;
+import org.monkey.mmq.core.utils.JacksonUtils;
import org.monkey.mmq.core.utils.LoggerUtils;
import org.monkey.mmq.core.actor.metadata.message.SessionMateData;
import org.monkey.mmq.service.DupPubRelMessageStoreService;
@@ -28,6 +33,13 @@
import org.monkey.mmq.service.SessionStoreService;
import org.monkey.mmq.service.SubscribeStoreService;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.monkey.mmq.core.common.Constants.CLIENT_DISCONNECT;
+
/**
* DISCONNECT连接处理
@@ -66,6 +78,24 @@ public void processDisConnect(Channel channel, MqttMessage msg) throws MmqExcept
}
LoggerUtils.printIfDebugEnabled(Loggers.BROKER_PROTOCOL,"DISCONNECT - clientId: {}, cleanSession: {}", clientId, sessionStore.isCleanSession());
sessionStoreService.delete(clientId, channel.id().asLongText());
+
+ // 发送设备下线通知
+ List subscribeStores = subscribeStoreService.search(CLIENT_DISCONNECT + "/" + clientId);
+ subscribeStores.forEach(subscribeStore -> {
+ Map payload = new HashMap();
+ payload.put("clientId", clientId);
+ InetSocketAddress clientIpSocket = (InetSocketAddress) channel.remoteAddress();
+ String clientIp = clientIpSocket.getAddress().getHostAddress();
+ payload.put("ip", clientIp);
+ MqttQoS respQoS = MqttQoS.AT_LEAST_ONCE;
+ MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
+ new MqttFixedHeader(MqttMessageType.PUBLISH, false, respQoS, false, 0),
+ new MqttPublishVariableHeader(CLIENT_DISCONNECT + "/" + clientId, 0), Unpooled.buffer().writeBytes(JacksonUtils.toJson(payload).getBytes()));
+ SessionMateData subSession = sessionStoreService.get(subscribeStore.getClientId());
+ if (subSession != null) {
+ subSession.getChannel().writeAndFlush(publishMessage);
+ }
+ });
channel.close();
}
diff --git a/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java b/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java
index f79fe47..fb5acd2 100644
--- a/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java
+++ b/mmq-core/src/main/java/org/monkey/mmq/core/common/Constants.java
@@ -50,4 +50,8 @@ public class Constants {
public static final String RULE_ENGINE = SYSTOPIC + "/RULE";
public static final String MODULES = SYSTOPIC + "/MODULES";
+
+ public static final String CLIENT_CONNECT = SYSTOPIC + "/client/connected";
+
+ public static final String CLIENT_DISCONNECT = SYSTOPIC + "/client/disconnected";
}