Skip to content

Commit

Permalink
修复同一客户端id重复链接bug
Browse files Browse the repository at this point in the history
  • Loading branch information
solley committed Jan 8, 2023
1 parent d27305f commit 3582430
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
39 changes: 28 additions & 11 deletions mmq-broker/src/main/java/org/monkey/mmq/protocol/Connect.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
Expand All @@ -36,6 +38,7 @@
import org.monkey.mmq.service.SubscribeStoreService;

import java.util.List;
import java.util.concurrent.ExecutorService;

/**
* CONNECT连接处理
Expand All @@ -53,12 +56,22 @@ public class Connect {

private IMqttAuthService authService;

public Connect(SessionStoreService sessionStoreService, SubscribeStoreService subscribeStoreService, DupPublishMessageStoreService dupPublishMessageStoreService, DupPubRelMessageStoreService dupPubRelMessageStoreService, IMqttAuthService authService) {
private ExecutorService connectExecutor;

private int total = 100;

public Connect(SessionStoreService sessionStoreService,
SubscribeStoreService subscribeStoreService,
DupPublishMessageStoreService dupPublishMessageStoreService,
DupPubRelMessageStoreService dupPubRelMessageStoreService,
IMqttAuthService authService,
ExecutorService connectExecutor) {
this.sessionStoreService = sessionStoreService;
this.subscribeStoreService = subscribeStoreService;
this.dupPublishMessageStoreService = dupPublishMessageStoreService;
this.dupPubRelMessageStoreService = dupPubRelMessageStoreService;
this.authService = authService;
this.connectExecutor = connectExecutor;
}

public void processConnect(Channel channel, MqttConnectMessage msg) throws MmqException {
Expand Down Expand Up @@ -118,15 +131,20 @@ public void processConnect(Channel channel, MqttConnectMessage msg) throws MmqEx
dupPublishMessageStoreService.deleteForClient(msg.payload().clientIdentifier());
dupPubRelMessageStoreService.deleteForClient(msg.payload().clientIdentifier());
}
previous.close();
// previous.close();

}
handle(channel, msg, username);
}

private void handle(Channel channel, MqttConnectMessage msg, String username) throws MmqException {
// 处理遗嘱信息
SessionMateData sessionStore = new SessionMateData(msg.payload().clientIdentifier(),
channel, msg.variableHeader().isCleanSession(), null, username);
if (msg.variableHeader().isWillFlag()) {
MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(msg.payload().willTopic(), 0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()));
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(msg.payload().willTopic(), 0), Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()));
sessionStore.setWillMessage(willMessage);
}
// 处理连接心跳包
Expand All @@ -142,8 +160,8 @@ public void processConnect(Channel channel, MqttConnectMessage msg) throws MmqEx
channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier());
Boolean sessionPresent = sessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent), null);
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent), null);
channel.writeAndFlush(okResp);
Loggers.BROKER_PROTOCOL.info("CONNECT - clientId: {}, cleanSession: {}", msg.payload().clientIdentifier(), msg.variableHeader().isCleanSession());
// 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
Expand All @@ -152,17 +170,16 @@ public void processConnect(Channel channel, MqttConnectMessage msg) throws MmqEx
List<DupPubRelMessageMateData> dupPubRelMessageStoreList = dupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()));
new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0),
new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()));
channel.writeAndFlush(publishMessage);
});
dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null);
new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), null);
channel.writeAndFlush(pubRelMessage);
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void processDisConnect(Channel channel, MqttMessage msg) throws MmqExcept
dupPubRelMessageStoreService.deleteForClient(clientId);
}
LoggerUtils.printIfDebugEnabled(Loggers.BROKER_PROTOCOL,"DISCONNECT - clientId: {}, cleanSession: {}", clientId, sessionStore.isCleanSession());
sessionStoreService.delete(clientId);
sessionStoreService.delete(clientId, channel.id().asLongText());
channel.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public ExecutorService getPingExecutor() {

public Connect connect() {
if (connect == null) {
connect = new Connect(sessionStoreService, subscribeStoreService, dupPublishMessageStoreService, dupPubRelMessageStoreService, authService);
connect = new Connect(sessionStoreService, subscribeStoreService, dupPublishMessageStoreService, dupPubRelMessageStoreService, authService, this.connectExecutor);
}
return connect;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ public Collection<ClientMateData> getClients() {
}

public void put(String clientId, SessionMateData sessionStore) throws MmqException {
SessionMateData sessionMateData = storage.get(clientId);
if (sessionMateData != null) {
sessionMateData.getChannel().close();
storage.remove(clientId);
}

storage.put(clientId, sessionStore);

InetSocketAddress clientIpSocket = (InetSocketAddress)sessionStore.getChannel().remoteAddress();
String clientIp = clientIpSocket.getAddress().getHostAddress();

Expand Down Expand Up @@ -192,8 +199,25 @@ public boolean containsKey(String clientId) {
return storage.containsKey(clientId);
}

public void delete(String clientId, String channelId) {
SessionMateData sessionMateData = storage.get(clientId);
if (sessionMateData != null && sessionMateData.getChannel().id().asLongText().equals(channelId)) {
storage.remove(clientId);
ClientRemoveMessage clientRemoveMessage = new ClientRemoveMessage();
ClientMateData clientMateData = new ClientMateData();
clientMateData.setClientId(clientId);
clientRemoveMessage.setClientMateData(clientMateData);
ActorSelection actorRef = actorSystem.actorSelection("/user/" + clientId);
actorRef.tell(clientRemoveMessage, ActorRef.noSender());
}
}

public void delete(String clientId) throws MmqException {
storage.remove(clientId);
SessionMateData sessionMateData = storage.get(clientId);
if (sessionMateData != null) {
sessionMateData.getChannel().close();
storage.remove(clientId);
}
ClientRemoveMessage clientRemoveMessage = new ClientRemoveMessage();
ClientMateData clientMateData = new ClientMateData();
clientMateData.setClientId(clientId);
Expand Down
4 changes: 2 additions & 2 deletions mmq-web/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ server.servlet.contextPath=/
server.port=8888

#*************** mqtt broker Configurations ***************#
mmq.broker.websocketPort=18888
mmq.broker.port=28888
mmq.broker.websocketPort=1888
mmq.broker.port=2888
mmq.broker.ssl.password=mmq
mmq.broker.ssl.certPath=cert/mmq.pfx
mmq.broker.ssl.port=38888
Expand Down

0 comments on commit 3582430

Please sign in to comment.