Skip to content

Commit

Permalink
⬆️ 依赖升级
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Dec 26, 2023
1 parent f34a458 commit d1d653b
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void processConAck(ChannelContext context, MqttConnAckMessage message) {
switch (returnCode) {
case CONNECTION_ACCEPTED:
// 1. 连接成功的日志
context.setAccepted(true);
if (logger.isInfoEnabled()) {
Node node = context.getServerNode();
logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", context.getId(), node.getIp(), node.getPort());
Expand Down Expand Up @@ -104,7 +105,7 @@ private void publishConnectEvent(ChannelContext context) {
// 触发客户端连接事件
executor.submit(() -> {
try {
connectListener.onConnected(context, context.isReconnect);
connectListener.onConnected(context, context.isReconnect());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public void reconnect() {
}
try {
// 判断是否 removed
if (channelContext.isRemoved) {
if (channelContext.isRemoved()) {
channelContext.setRemoved(false);
}
tioClient.reconnect(channelContext, config.getTimeout());
Expand Down Expand Up @@ -538,7 +538,7 @@ public ClientChannelContext getContext() {
*/
public boolean isConnected() {
ClientChannelContext channelContext = getContext();
return channelContext != null && !channelContext.isClosed;
return channelContext != null && !channelContext.isAccepted();
}

/**
Expand Down Expand Up @@ -570,7 +570,7 @@ private void startHeartbeatTask() {
long currTime = System.currentTimeMillis();
for (ChannelContext entry : set) {
ClientChannelContext channelContext = (ClientChannelContext) entry;
if (channelContext.isClosed || channelContext.isRemoved) {
if (channelContext.isClosed() || channelContext.isRemoved()) {
continue;
}
long interval = currTime - channelContext.stat.latestTimeOfSentPacket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@ public void onAfterConnected(ChannelContext context, boolean isConnected, boolea
}

@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
context.setAccepted(false);
// 先判断是否配置监听
if (connectListener == null) {
return;
}
// 2. 触发客户断开连接事件
executor.submit(() -> {
try {
connectListener.onDisconnect(channelContext, throwable, remark, isRemove);
connectListener.onDisconnect(context, throwable, remark, isRemove);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
*/
public interface MqttConst {

/**
* 正常断开连接
*/
String DIS_CONNECTED = "disconnected";
/**
* 是 http 协议
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qo
TopicUtil.validateTopicName(topic);
// 获取 context
ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
if (context == null || context.isClosed) {
if (context == null || context.isClosed()) {
logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", topic, clientId);
return false;
}
Expand Down Expand Up @@ -267,7 +267,7 @@ public boolean publishAll(String topic, byte[] payload, MqttQoS qos, boolean ret
for (Subscribe subscribe : subscribeList) {
String clientId = subscribe.getClientId();
ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
if (context == null || context.isClosed) {
if (context == null || context.isClosed()) {
logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", topic, clientId);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ public void handler(Packet packet, ChannelContext context) {
processor.processConnect(context, (MqttConnectMessage) mqttMessage);
return;
}
// 3. 客户端 id 是创建连接之后才有的,如果客户端 id 为空,直接关闭
String clientId = context.getBsId();
if (StrUtil.isBlank(clientId)) {
// 3. 判定是否认证成功
if (!context.isAccepted()) {
Tio.remove(context, "Mqtt connected but clientId is blank.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public boolean onHeartbeatTimeout(ChannelContext context, long interval, int hea

@Override
public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
// 标记认证为 false
context.setAccepted(false);
// 1. http 请求跳过
boolean isHttpRequest = context.get(MqttConst.IS_HTTP) != null;
if (isHttpRequest) {
Expand All @@ -74,7 +76,8 @@ public void onBeforeClose(ChannelContext context, Throwable throwable, String re
// 2. 业务 id
String clientId = context.getBsId();
// 3. 判断是否正常断开
boolean isNotNormalDisconnect = context.get(MqttConst.DIS_CONNECTED) == null;
boolean isNotNormalDisconnect = context.isBizStatus();
context.setBizStatus(false);
if (isNotNormalDisconnect || throwable != null) {
// 避免网络异常时短期照成大量异常打印,会导致内存突增
if (throwable instanceof IOException) {
Expand All @@ -95,7 +98,6 @@ public void onBeforeClose(ChannelContext context, Throwable throwable, String re
}
// 6. 会话清理
cleanSession(clientId);
context.remove(MqttConst.DIS_CONNECTED);
// 7. 下线事件
String username = context.get(MqttConst.USER_NAME_KEY);
context.remove(MqttConst.USER_NAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public void processConnect(ChannelContext context, MqttConnectMessage mqttMessag
connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
// 认证成功
context.setAccepted(true);
// 4. 判断 uniqueId 是否在多个地方使用,如果在其他地方有使用,先解绑
ChannelContext otherContext = Tio.getByBsId(context.getTioConfig(), uniqueId);
if (otherContext != null) {
Expand Down Expand Up @@ -440,7 +442,7 @@ public void processDisConnect(ChannelContext context) {
String clientId = context.getBsId();
logger.info("DisConnect - clientId:{} contextId:{}", clientId, context.getId());
// 设置正常断开的标识
context.set(MqttConst.DIS_CONNECTED, (byte) 1);
context.setBizStatus(true);
Tio.remove(context, "Mqtt DisConnect");
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-flatten.version>1.2.2</maven-flatten.version>
<!-- mica-net version -->
<mica-net.version>0.1.6</mica-net.version>
<mica-net.version>0.1.8</mica-net.version>
<mica.version>2.7.18</mica.version>
<spring.boot.version>2.7.18</spring.boot.version>
<fastjson.version>1.2.83</fastjson.version>
Expand Down

0 comments on commit d1d653b

Please sign in to comment.