Skip to content

Commit

Permalink
add:添加设备上下线系统topic
Browse files Browse the repository at this point in the history
  • Loading branch information
solley committed Jan 15, 2023
1 parent b371acd commit 404cda3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 4 deletions.
4 changes: 2 additions & 2 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@
import org.monkey.mmq.core.exception.MmqException;
import org.monkey.mmq.core.actor.metadata.message.SessionMateData;
import org.monkey.mmq.core.actor.message.SystemMessage;
import org.monkey.mmq.core.utils.JacksonUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

import static org.monkey.mmq.core.common.Constants.MODULES;
import static org.monkey.mmq.core.common.Constants.*;

/**
* MQTT消息处理
Expand Down Expand Up @@ -94,7 +98,20 @@ public void channelRead(ChannelHandlerContext ctx, Object obj) {
case CONNECT:
//protocolProcess.getConnectExecutor().submit(() -> {
try {
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) msg;
protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage) msg);
SystemMessage systemMessage = new SystemMessage();
systemMessage.setTopic(CLIENT_CONNECT + "/" + mqttConnectMessage.payload().clientIdentifier());
Map payload = new HashMap();
payload.put("clientId", mqttConnectMessage.payload().clientIdentifier());
// payload.put("username", mqttConnectMessage.payload().userName());
InetSocketAddress clientIpSocket = (InetSocketAddress)ctx.channel().remoteAddress();
String clientIp = clientIpSocket.getAddress().getHostAddress();
payload.put("ip", clientIp);
systemMessage.setPayload(JacksonUtils.toJson(payload));
systemMessage.setMqttQoS(MqttQoS.AT_LEAST_ONCE);
ActorSelection actorRef = actorSystem.actorSelection("/user/" + ((MqttConnectMessage) msg).payload().clientIdentifier());
actorRef.tell(systemMessage, ActorRef.noSender());
} catch (MmqException e) {
Loggers.BROKER_SERVER.error(e.getErrMsg());
SystemMessage systemMessage = new SystemMessage();
Expand Down
32 changes: 31 additions & 1 deletion mmq-broker/src/main/java/org/monkey/mmq/protocol/DisConnect.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,30 @@

package org.monkey.mmq.protocol;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.AttributeKey;
import org.monkey.mmq.config.Loggers;
import org.monkey.mmq.core.actor.metadata.subscribe.SubscribeMateData;
import org.monkey.mmq.core.exception.MmqException;
import org.monkey.mmq.core.utils.JacksonUtils;
import org.monkey.mmq.core.utils.LoggerUtils;
import org.monkey.mmq.core.actor.metadata.message.SessionMateData;
import org.monkey.mmq.service.DupPubRelMessageStoreService;
import org.monkey.mmq.service.DupPublishMessageStoreService;
import org.monkey.mmq.service.SessionStoreService;
import org.monkey.mmq.service.SubscribeStoreService;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.monkey.mmq.core.common.Constants.CLIENT_DISCONNECT;


/**
* DISCONNECT连接处理
Expand Down Expand Up @@ -66,6 +78,24 @@ public void processDisConnect(Channel channel, MqttMessage msg) throws MmqExcept
}
LoggerUtils.printIfDebugEnabled(Loggers.BROKER_PROTOCOL,"DISCONNECT - clientId: {}, cleanSession: {}", clientId, sessionStore.isCleanSession());
sessionStoreService.delete(clientId, channel.id().asLongText());

// 发送设备下线通知
List<SubscribeMateData> subscribeStores = subscribeStoreService.search(CLIENT_DISCONNECT + "/" + clientId);
subscribeStores.forEach(subscribeStore -> {
Map payload = new HashMap();
payload.put("clientId", clientId);
InetSocketAddress clientIpSocket = (InetSocketAddress) channel.remoteAddress();
String clientIp = clientIpSocket.getAddress().getHostAddress();
payload.put("ip", clientIp);
MqttQoS respQoS = MqttQoS.AT_LEAST_ONCE;
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, respQoS, false, 0),
new MqttPublishVariableHeader(CLIENT_DISCONNECT + "/" + clientId, 0), Unpooled.buffer().writeBytes(JacksonUtils.toJson(payload).getBytes()));
SessionMateData subSession = sessionStoreService.get(subscribeStore.getClientId());
if (subSession != null) {
subSession.getChannel().writeAndFlush(publishMessage);
}
});
channel.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ public class Constants {
public static final String RULE_ENGINE = SYSTOPIC + "/RULE";

public static final String MODULES = SYSTOPIC + "/MODULES";

public static final String CLIENT_CONNECT = SYSTOPIC + "/client/connected";

public static final String CLIENT_DISCONNECT = SYSTOPIC + "/client/disconnected";
}

0 comments on commit 404cda3

Please sign in to comment.