Skip to content

Commit

Permalink
✨ mica-mqtt-client 添加连接测试功能 connectTest
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunMengLu committed Nov 25, 2023
1 parent 71485e9 commit b22b97c
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.iot.mqtt.client;

import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode;
import net.dreamlu.iot.mqtt.core.client.MqttClient;

/**
* 客户端测试
*
* @author L.cm
*/
public class MqttClientConnTest {

public static void main(String[] args) throws Exception {
// 初始化 mqtt 客户端
MqttConnectReasonCode reasonCode = MqttClient.create()
// .ip("127.0.0.1")
.ip("mqtt.dreamlu.net")
.port(1883)
.username("mica")
.password("mica1")
.connectTest();
System.out.println(reasonCode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.iot.mqtt.core.client;

import net.dreamlu.iot.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;

import java.util.concurrent.CompletableFuture;

/**
* 默认的 mqtt 消息处理器
*
* @author L.cm
*/
public class MqttClientConnectTestProcessor implements IMqttClientProcessor {
private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectTestProcessor.class);
private final CompletableFuture<MqttConnectReasonCode> future;

public MqttClientConnectTestProcessor(CompletableFuture<MqttConnectReasonCode> future) {
this.future = future;
}

@Override
public void processDecodeFailure(ChannelContext context, MqttMessage message, Throwable ex) {
// 客户端失败,默认记录异常日志
logger.error(ex.getMessage(), ex);
}

@Override
public void processConAck(ChannelContext context, MqttConnAckMessage message) {
MqttConnAckVariableHeader connAckVariableHeader = message.variableHeader();
Tio.remove(context, "mqtt connect tested.");
future.complete(connAckVariableHeader.connectReturnCode());
}

@Override
public void processSubAck(ChannelContext context, MqttSubAckMessage message) {

}

@Override
public void processPublish(ChannelContext context, MqttPublishMessage message) {

}

@Override
public void processUnSubAck(MqttUnsubAckMessage message) {

}

@Override
public void processPubAck(MqttPubAckMessage message) {

}

@Override
public void processPubRec(ChannelContext context, MqttMessage message) {

}

@Override
public void processPubRel(ChannelContext context, MqttMessage message) {

}

@Override
public void processPubComp(MqttMessage message) {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package net.dreamlu.iot.mqtt.core.client;

import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode;
import net.dreamlu.iot.mqtt.codec.MqttConstant;
import net.dreamlu.iot.mqtt.codec.MqttProperties;
import net.dreamlu.iot.mqtt.codec.MqttVersion;
Expand All @@ -24,6 +25,7 @@
import org.tio.client.TioClientConfig;
import org.tio.client.intf.TioClientHandler;
import org.tio.client.intf.TioClientListener;
import org.tio.core.Node;
import org.tio.core.TioConfig;
import org.tio.core.ssl.SslConfig;
import org.tio.utils.buffer.ByteBufferAllocator;
Expand All @@ -34,7 +36,7 @@
import org.tio.utils.timer.TimerTaskService;

import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.*;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -580,4 +582,51 @@ public MqttClient connectSync() {
return this.build().start(true);
}

/**
* 连接测试
*
* @return MqttConnectReasonCode
*/
public MqttConnectReasonCode connectTest() {
return connectTest(3, TimeUnit.SECONDS);
}

/**
* 连接测试
*
* @return MqttConnectReasonCode
*/
public MqttConnectReasonCode connectTest(long timeout, TimeUnit timeUnit) {
// 1. clientId 为空,生成默认的 clientId
if (StrUtil.isBlank(this.clientId)) {
// 默认为:MICA-MQTT- 前缀和 36进制的纳秒数
this.clientId("MICA-MQTT-" + Long.toString(System.nanoTime(), 36));
}
CompletableFuture<MqttConnectReasonCode> future = new CompletableFuture<>();
IMqttClientProcessor processor = new MqttClientConnectTestProcessor(future);
// 2. 初始化 mqtt 处理器
TioClientHandler clientAioHandler = new MqttClientAioHandler(this, processor);
TioClientListener clientAioListener = new MqttClientAioListener(this);
// 3. tioConfig
TioClientConfig tioConfig = new TioClientConfig(clientAioHandler, clientAioListener);
tioConfig.setName(this.name);
// 4. 心跳超时时间,关闭心跳检测
tioConfig.setHeartbeatTimeout(0);
TioClient tioClient;
try {
tioClient = new TioClient(tioConfig);
tioClient.asyncConnect(new Node(this.getIp(), this.getPort()));
} catch (Exception e) {
throw new IllegalStateException("Mica mqtt client start fail.", e);
}
try {
return future.get(timeout, timeUnit);
} catch (Exception e) {
// 超时,一般为服务器不可用
return MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
} finally {
tioClient.stop();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,140 @@
*/
public enum MqttConnectReasonCode implements MqttReasonCode {
/**
* ReturnCode
* 连接被接受
*/
CONNECTION_ACCEPTED((byte) 0x00),

//MQTT 3 codes
/**
* 连接被拒绝,不可接受的协议版本
*/
CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01),

/**
* 连接被拒绝,标识符被拒绝
*/
CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02),

/**
* 连接被拒绝,服务器不可用
*/
CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03),

/**
* 连接被拒绝,用户名或密码错误
*/
CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04),

/**
* 连接被拒绝,未经授权
*/
CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05),

//MQTT 5 codes
/**
* 连接被拒绝,未指定错误
*/
CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80),

/**
* 连接被拒绝,格式错误的数据包
*/
CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81),

/**
* 连接被拒绝,协议错误
*/
CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82),

/**
* 连接被拒绝,实现特定的错误
*/
CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83),

/**
* 连接被拒绝,不支持的协议版本
*/
CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84),

/**
* 连接被拒绝,客户端标识符无效
*/
CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85),

/**
* 连接被拒绝,用户名或密码错误
*/
CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86),

/**
* 连接被拒绝,未经授权
*/
CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87),

/**
* 连接被拒绝,服务器不可用
*/
CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88),

/**
* 连接被拒绝,服务器忙
*/
CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89),

/**
* 连接被拒绝,被禁止
*/
CONNECTION_REFUSED_BANNED((byte) 0x8A),

/**
* 连接被拒绝,身份验证方法错误
*/
CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C),

/**
* 连接被拒绝,主题名无效
*/
CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90),

/**
* 连接被拒绝,数据包过大
*/
CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95),

/**
* 连接被拒绝,超出配额限制
*/
CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97),

/**
* 连接被拒绝,有效载荷格式无效
*/
CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99),

/**
* 连接被拒绝,不支持保留消息
*/
CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A),

/**
* 连接被拒绝,不支持的QoS级别
*/
CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B),

/**
* 连接被拒绝,使用其他服务器
*/
CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C),

/**
* 连接被拒绝,服务器已移动
*/
CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D),

/**
* 连接被拒绝,连接速率超过限制
*/
CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F);

private static final MqttConnectReasonCode[] VALUES = new MqttConnectReasonCode[160];
Expand Down

0 comments on commit b22b97c

Please sign in to comment.