diff --git a/docs/http-api.md b/docs/http-api.md index ae441581..99e4ea8c 100644 --- a/docs/http-api.md +++ b/docs/http-api.md @@ -99,14 +99,14 @@ $ curl -i --basic -u mica:mica "http://localhost:8083/api/v1/endpoints" **Parameters (json):** -| Name | Type | Required | Default | Description | -| -------- | ------- | -------- | ------- | ----------------------------------------------------------- | -| topic | String | Required | | 主题 | -| clientId | String | Required | | 客户端标识符 | -| payload | String | Required | | 消息正文 | +| Name | Type | Required | Default | Description | +| -------- | ------- | -------- | ------- |------------------------------------------------| +| topic | String | Required | | 主题,消息会按 topic 订阅投递 | +| clientId | String | Required | | 客户端标识符,不为空参数即可,无实际意义,建议可以取名 httpApi | +| payload | String | Required | | 消息正文 | | encoding | String | Optional | plain | 消息正文使用的编码方式,目前仅支持 目前仅支持 `plain`、`hex`、`base64` | -| qos | Integer | Optional | 0 | QoS 等级 | -| retain | Boolean | Optional | false | 是否为保留消息 | +| qos | Integer | Optional | 0 | QoS 等级 | +| retain | Boolean | Optional | false | 是否为保留消息 | **Success Response Body (JSON):** @@ -187,14 +187,14 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs **Parameters (json):** -| Name | Type | Required | Default | Description | -| ------------ | ------- | -------- | ------- | ----------------------------------------------------------- | -| [0].topic | String | Required | | 主题 | -| [0].clientId | String | Required | | 客户端标识符 | -| [0].payload | String | Required | | 消息正文 | +| Name | Type | Required | Default | Description | +| ------------ | ------- | -------- | ------- |------------------------------------------| +| [0].topic | String | Required | | 主题,消息按订阅投递 | +| [0].clientId | String | Required | | 客户端标识符,不为空参数即可,无实际意义,建议可以取名 httpApi | +| [0].payload | String | Required | | 消息正文 | | [0].encoding | String | Optional | plain | 消息正文使用的编码方式,目前仅支持 `plain`、`hex`、`base64` | -| [0].qos | Integer | Optional | 0 | QoS 等级 | -| [0].retain | Boolean | Optional | false | 是否为保留消息 | +| [0].qos | Integer | Optional | 0 | QoS 等级 | +| [0].retain | Boolean | Optional | false | 是否为保留消息 | **Success Response Body (JSON):** @@ -267,6 +267,78 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs {"code":1} ``` +## 获取客户端详情 + +### GET /api/v1/clients/info + +**Query Parameters:** + +| Name | Type | Required | Description | +| -------- | ------ | -------- | ----------- | +| clientId | String | True | ClientID | + +**Success Response Body (JSON):** + +| Name | Type | Description | +|-----------|---------|-------------| +| code | Integer | 0 | +| clientId | String | clientId | +| username | String | 用户名 | +| connected | Boolen | 是否已经连接 | +| createdAt | Long | 连接的时间 | +| connectedAt | Long | 连接成功时间 | + +**Examples:** + +踢除指定客户端 + +```bash +$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/clients/info?clientId=mqttx_5fe4cfcf" + +{"code":1,"data":{"clientId":"mqttx_5fe4cfcf","connected":true,"connectedAt":1681792417835,"createdAt":1681792417835,"ipAddress":"127.0.0.1","port":11852,"protoName":"MQTT","protoVer":5}} +``` + +## 分页获取客户端 + +### GET /api/v1/clients + +**Query Parameters:** + +| Name | Type | Required | Description | +|--------|------|----------|--------------| +| _page | int | False | Page 默认1 | +| _limit | int | False | 分页大小 默认10000 | + +**Success Response Body (JSON):** + +| Name | Type | Description | +| ---- | ------- | ----------- | +| code | Integer | 0 | + +**Success Response Body (JSON):** + +| Name | Type | Description | +| ---- | ------- |-------------| +| code | Integer | 0 | +| pageNumber | Integer | 当前页码 | +| pageSize | Integer | 分页大小 | +| totalRow | Integer | 分页数 | +| clientId | String | clientId | +| username | String | 用户名 | +| connected | Boolen | 是否已经连接 | +| createdAt | Long | 连接的时间 | +| connectedAt | Long | 连接成功时间 | + +**Examples:** + +踢除指定客户端 + +```bash +$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/clients?_page=1&_limit=100" + +{"data":{"list":[{"clientId":"mqttx_5fe4cfcf","connected":true,"protoName":"MQTT","protoVer":5,"ipAddress":"127.0.0.1","port":11852,"connectedAt":1681792417835,"createdAt":1681792417835}],"pageNumber":1,"pageSize":1,"totalRow":1},"code":1} +``` + ## 踢除指定客户端 ### POST /api/v1/clients/delete diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java index 5e53b8b4..4206d867 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java @@ -21,6 +21,7 @@ import org.dromara.mica.mqtt.codec.MqttQoS; import org.dromara.mica.mqtt.core.common.MqttPendingPublish; import org.dromara.mica.mqtt.core.server.model.ClientInfo; +import org.dromara.mica.mqtt.core.server.model.Subscribe; import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager; import org.dromara.mica.mqtt.core.util.TopicUtil; import org.slf4j.Logger; @@ -411,6 +412,16 @@ public StatVo getStat() { return tioServer.getServerConfig().getStat(); } + /** + * 获取客户端订阅情况 + * + * @param clientId clientId + * @return 订阅集合 + */ + public List getSubscriptions(String clientId) { + return serverCreator.getSessionManager().getSubscriptions(clientId); + } + /** * 判断是否 mqtt 连接 * diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/api/MqttHttpApi.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/api/MqttHttpApi.java index 25ddff76..113a68b3 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/api/MqttHttpApi.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/api/MqttHttpApi.java @@ -16,8 +16,8 @@ package org.dromara.mica.mqtt.core.server.http.api; +import org.dromara.mica.mqtt.core.server.MqttServer; import org.dromara.mica.mqtt.core.server.MqttServerCreator; -import org.dromara.mica.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import org.dromara.mica.mqtt.core.server.enums.MessageType; import org.dromara.mica.mqtt.core.server.http.api.code.ResultCode; import org.dromara.mica.mqtt.core.server.http.api.form.BaseForm; @@ -25,15 +25,18 @@ import org.dromara.mica.mqtt.core.server.http.api.form.SubscribeForm; import org.dromara.mica.mqtt.core.server.http.api.result.Result; import org.dromara.mica.mqtt.core.server.http.handler.MqttHttpRoutes; +import org.dromara.mica.mqtt.core.server.model.ClientInfo; import org.dromara.mica.mqtt.core.server.model.Message; import org.dromara.mica.mqtt.core.server.model.Subscribe; -import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager; import org.dromara.mica.mqtt.core.util.PayloadEncode; import org.dromara.mica.mqtt.core.util.TopicUtil; +import org.tio.core.ChannelContext; +import org.tio.core.Tio; import org.tio.http.common.HttpConst; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.http.common.Method; +import org.tio.server.TioServerConfig; import org.tio.utils.hutool.StrUtil; import org.tio.utils.json.JsonUtil; @@ -46,12 +49,12 @@ * @author L.cm */ public class MqttHttpApi { - private final IMqttMessageDispatcher messageDispatcher; - private final IMqttSessionManager sessionManager; + private final MqttServerCreator serverCreator; + private final TioServerConfig mqttServerConfig; - public MqttHttpApi(MqttServerCreator serverCreator) { - this.messageDispatcher = serverCreator.getMessageDispatcher(); - this.sessionManager = serverCreator.getSessionManager(); + public MqttHttpApi(MqttServerCreator serverCreator, TioServerConfig mqttServerConfig) { + this.serverCreator = serverCreator; + this.mqttServerConfig = mqttServerConfig; } /** @@ -66,6 +69,18 @@ public HttpResponse endpoints(HttpRequest request) { return Result.ok(request, MqttHttpRoutes.getRouts().keySet()); } + /** + * 获取 api 列表 + *

+ * GET /api/v1/stats + * + * @param request HttpRequest + * @return HttpResponse + */ + public HttpResponse stats(HttpRequest request) { + return Result.ok(request, this.mqttServerConfig.getStat()); + } + /** * 消息发布 *

@@ -132,7 +147,7 @@ private void sendPublish(PublishForm form) { if (StrUtil.isNotBlank(payload)) { message.setPayload(PayloadEncode.decode(payload, form.getEncoding())); } - messageDispatcher.send(message); + serverCreator.getMessageDispatcher().send(message); } /** @@ -255,6 +270,43 @@ public HttpResponse unsubscribeBatch(HttpRequest request) { return Result.ok(); } + /** + * 获取取客户端信息 + * + *

+ * GET /api/v1/clients/info + * + * @param request HttpRequest + * @return HttpResponse + */ + public HttpResponse getClientInfo(HttpRequest request) { + String clientId = request.getParam("clientId"); + if (StrUtil.isBlank(clientId)) { + return Result.fail(request, ResultCode.E101); + } + ChannelContext context = Tio.getByBsId(this.mqttServerConfig, clientId); + if (context == null) { + return Result.fail(request, ResultCode.E101); + } + ClientInfo clientInfo = ClientInfo.form(serverCreator, context); + return Result.ok(request, clientInfo); + } + + /** + * 分页拉取客户端列表 + * + *

+ * GET /api/v1/clients?_page=1&_limit=10 + * + * @param request HttpRequest + * @return HttpResponse + */ + public HttpResponse getClients(HttpRequest request) { + int page = request.getInt("_page", 1); + int limit = request.getInt("_limit", 10000); + return Result.ok(request, MqttServer.getClients(serverCreator, mqttServerConfig, page, limit)); + } + /** * 踢除指定客户端。注意踢除客户端操作会将连接与会话一并终结。 *

@@ -271,7 +323,7 @@ public HttpResponse deleteClients(HttpRequest request) { Message message = new Message(); message.setClientId(clientId); message.setMessageType(MessageType.DISCONNECT); - messageDispatcher.send(message); + serverCreator.getMessageDispatcher().send(message); return Result.ok(); } @@ -288,7 +340,7 @@ public HttpResponse getClientSubscriptions(HttpRequest request) { if (StrUtil.isBlank(clientId)) { return Result.fail(request, ResultCode.E101); } - List subscribeList = sessionManager.getSubscriptions(clientId); + List subscribeList = serverCreator.getSessionManager().getSubscriptions(clientId); return Result.ok(new HttpResponse(request), subscribeList); } @@ -302,7 +354,7 @@ private void sendSubOrUnSubscribe(BaseForm form) { } else { message.setMessageType(MessageType.UNSUBSCRIBE); } - messageDispatcher.send(message); + serverCreator.getMessageDispatcher().send(message); } /** @@ -357,12 +409,15 @@ private static HttpResponse validForm(boolean isTopicFilter, BaseForm form, Http public void register() { // @formatter:off MqttHttpRoutes.register(Method.GET, "/api/v1/endpoints", this::endpoints); + MqttHttpRoutes.register(Method.GET, "/api/v1/stats", this::stats); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish", this::publish); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish/batch", this::publishBatch); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe", this::subscribe); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe/batch", this::subscribeBatch); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch); + MqttHttpRoutes.register(Method.GET, "/api/v1/clients/info", this::getClientInfo); + MqttHttpRoutes.register(Method.GET, "/api/v1/clients", this::getClients); MqttHttpRoutes.register(Method.POST, "/api/v1/clients/delete", this::deleteClients); MqttHttpRoutes.register(Method.GET, "/api/v1/client/subscriptions", this::getClientSubscriptions); // @formatter:on diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/core/MqttWebServer.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/core/MqttWebServer.java index 47ea9852..f64c6eff 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/core/MqttWebServer.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/http/core/MqttWebServer.java @@ -289,7 +289,7 @@ public static MqttWebServer config(MqttServerCreator serverCreator, TioServerCon // 1.1 http-api 用到 json serverCreator.jsonAdapter(JsonUtil.getJsonAdapter(serverCreator.getJsonAdapter())); // 1.2 http 路由配置 - MqttHttpApi httpApi = new MqttHttpApi(serverCreator); + MqttHttpApi httpApi = new MqttHttpApi(serverCreator, mqttServerConfig); httpApi.register(); // 1.3 认证配置 String username = serverCreator.getHttpBasicUsername(); diff --git a/starter/mica-mqtt-server-solon-plugin/src/main/java/org/dromara/mica/mqtt/server/solon/MqttServerTemplate.java b/starter/mica-mqtt-server-solon-plugin/src/main/java/org/dromara/mica/mqtt/server/solon/MqttServerTemplate.java index d40e0afa..1cb85243 100644 --- a/starter/mica-mqtt-server-solon-plugin/src/main/java/org/dromara/mica/mqtt/server/solon/MqttServerTemplate.java +++ b/starter/mica-mqtt-server-solon-plugin/src/main/java/org/dromara/mica/mqtt/server/solon/MqttServerTemplate.java @@ -21,6 +21,7 @@ import org.dromara.mica.mqtt.codec.MqttQoS; import org.dromara.mica.mqtt.core.server.MqttServer; import org.dromara.mica.mqtt.core.server.model.ClientInfo; +import org.dromara.mica.mqtt.core.server.model.Subscribe; import org.tio.core.ChannelContext; import org.tio.core.stat.vo.StatVo; import org.tio.utils.page.Page; @@ -187,6 +188,16 @@ public StatVo getStat() { return mqttServer.getStat(); } + /** + * 获取客户端订阅情况 + * + * @param clientId clientId + * @return 订阅集合 + */ + public List getSubscriptions(String clientId) { + return mqttServer.getSubscriptions(clientId); + } + /** * 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常 * diff --git a/starter/mica-mqtt-server-spring-boot-starter/src/main/java/org/dromara/mica/mqtt/spring/server/MqttServerTemplate.java b/starter/mica-mqtt-server-spring-boot-starter/src/main/java/org/dromara/mica/mqtt/spring/server/MqttServerTemplate.java index 113cd1da..41e559a5 100644 --- a/starter/mica-mqtt-server-spring-boot-starter/src/main/java/org/dromara/mica/mqtt/spring/server/MqttServerTemplate.java +++ b/starter/mica-mqtt-server-spring-boot-starter/src/main/java/org/dromara/mica/mqtt/spring/server/MqttServerTemplate.java @@ -20,6 +20,7 @@ import org.dromara.mica.mqtt.codec.MqttQoS; import org.dromara.mica.mqtt.core.server.MqttServer; import org.dromara.mica.mqtt.core.server.model.ClientInfo; +import org.dromara.mica.mqtt.core.server.model.Subscribe; import org.tio.core.ChannelContext; import org.tio.core.stat.vo.StatVo; import org.tio.utils.page.Page; @@ -185,6 +186,16 @@ public StatVo getStat() { return mqttServer.getStat(); } + /** + * 获取客户端订阅情况 + * + * @param clientId clientId + * @return 订阅集合 + */ + public List getSubscriptions(String clientId) { + return mqttServer.getSubscriptions(clientId); + } + /** * 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常 *