Skip to content

Commit

Permalink
✨ http api 添加 stats、clients 列表和 client 详情接口 MqttServer 和 MqttServerTe…
Browse files Browse the repository at this point in the history
…mplate 添加 getSubscriptions 接口
  • Loading branch information
li-xunhuan committed Dec 2, 2024
1 parent f189dce commit 2a62fae
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 26 deletions.
100 changes: 86 additions & 14 deletions docs/http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ $ curl -i --basic -u mica:mica "http://localhost:8083/api/v1/endpoints"

**Parameters (json):**

| Name | Type | Required | Default | Description |
| -------- | ------- | -------- | ------- | ----------------------------------------------------------- |
| topic | String | Required | | 主题 |
| clientId | String | Required | | 客户端标识符 |
| payload | String | Required | | 消息正文 |
| Name | Type | Required | Default | Description |
| -------- | ------- | -------- | ------- |------------------------------------------------|
| topic | String | Required | | 主题,消息会按 topic 订阅投递 |
| clientId | String | Required | | 客户端标识符,不为空参数即可,无实际意义,建议可以取名 httpApi |
| payload | String | Required | | 消息正文 |
| encoding | String | Optional | plain | 消息正文使用的编码方式,目前仅支持 目前仅支持 `plain``hex``base64` |
| qos | Integer | Optional | 0 | QoS 等级 |
| retain | Boolean | Optional | false | 是否为保留消息 |
| qos | Integer | Optional | 0 | QoS 等级 |
| retain | Boolean | Optional | false | 是否为保留消息 |

**Success Response Body (JSON):**

Expand Down Expand Up @@ -187,14 +187,14 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs

**Parameters (json):**

| Name | Type | Required | Default | Description |
| ------------ | ------- | -------- | ------- | ----------------------------------------------------------- |
| [0].topic | String | Required | | 主题 |
| [0].clientId | String | Required | | 客户端标识符 |
| [0].payload | String | Required | | 消息正文 |
| Name | Type | Required | Default | Description |
| ------------ | ------- | -------- | ------- |------------------------------------------|
| [0].topic | String | Required | | 主题,消息按订阅投递 |
| [0].clientId | String | Required | | 客户端标识符,不为空参数即可,无实际意义,建议可以取名 httpApi |
| [0].payload | String | Required | | 消息正文 |
| [0].encoding | String | Optional | plain | 消息正文使用的编码方式,目前仅支持 `plain``hex``base64` |
| [0].qos | Integer | Optional | 0 | QoS 等级 |
| [0].retain | Boolean | Optional | false | 是否为保留消息 |
| [0].qos | Integer | Optional | 0 | QoS 等级 |
| [0].retain | Boolean | Optional | false | 是否为保留消息 |

**Success Response Body (JSON):**

Expand Down Expand Up @@ -267,6 +267,78 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs
{"code":1}
```

## 获取客户端详情

### GET /api/v1/clients/info

**Query Parameters:**

| Name | Type | Required | Description |
| -------- | ------ | -------- | ----------- |
| clientId | String | True | ClientID |

**Success Response Body (JSON):**

| Name | Type | Description |
|-----------|---------|-------------|
| code | Integer | 0 |
| clientId | String | clientId |
| username | String | 用户名 |
| connected | Boolen | 是否已经连接 |
| createdAt | Long | 连接的时间 |
| connectedAt | Long | 连接成功时间 |

**Examples:**

踢除指定客户端

```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/clients/info?clientId=mqttx_5fe4cfcf"

{"code":1,"data":{"clientId":"mqttx_5fe4cfcf","connected":true,"connectedAt":1681792417835,"createdAt":1681792417835,"ipAddress":"127.0.0.1","port":11852,"protoName":"MQTT","protoVer":5}}
```

## 分页获取客户端

### GET /api/v1/clients

**Query Parameters:**

| Name | Type | Required | Description |
|--------|------|----------|--------------|
| _page | int | False | Page 默认1 |
| _limit | int | False | 分页大小 默认10000 |

**Success Response Body (JSON):**

| Name | Type | Description |
| ---- | ------- | ----------- |
| code | Integer | 0 |

**Success Response Body (JSON):**

| Name | Type | Description |
| ---- | ------- |-------------|
| code | Integer | 0 |
| pageNumber | Integer | 当前页码 |
| pageSize | Integer | 分页大小 |
| totalRow | Integer | 分页数 |
| clientId | String | clientId |
| username | String | 用户名 |
| connected | Boolen | 是否已经连接 |
| createdAt | Long | 连接的时间 |
| connectedAt | Long | 连接成功时间 |

**Examples:**

踢除指定客户端

```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/clients?_page=1&_limit=100"

{"data":{"list":[{"clientId":"mqttx_5fe4cfcf","connected":true,"protoName":"MQTT","protoVer":5,"ipAddress":"127.0.0.1","port":11852,"connectedAt":1681792417835,"createdAt":1681792417835}],"pageNumber":1,"pageSize":1,"totalRow":1},"code":1}
```

## 踢除指定客户端

### POST /api/v1/clients/delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.server.model.ClientInfo;
import org.dromara.mica.mqtt.core.server.model.Subscribe;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -411,6 +412,16 @@ public StatVo getStat() {
return tioServer.getServerConfig().getStat();
}

/**
* 获取客户端订阅情况
*
* @param clientId clientId
* @return 订阅集合
*/
public List<Subscribe> getSubscriptions(String clientId) {
return serverCreator.getSessionManager().getSubscriptions(clientId);
}

/**
* 判断是否 mqtt 连接
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@

package org.dromara.mica.mqtt.core.server.http.api;

import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
import org.dromara.mica.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import org.dromara.mica.mqtt.core.server.enums.MessageType;
import org.dromara.mica.mqtt.core.server.http.api.code.ResultCode;
import org.dromara.mica.mqtt.core.server.http.api.form.BaseForm;
import org.dromara.mica.mqtt.core.server.http.api.form.PublishForm;
import org.dromara.mica.mqtt.core.server.http.api.form.SubscribeForm;
import org.dromara.mica.mqtt.core.server.http.api.result.Result;
import org.dromara.mica.mqtt.core.server.http.handler.MqttHttpRoutes;
import org.dromara.mica.mqtt.core.server.model.ClientInfo;
import org.dromara.mica.mqtt.core.server.model.Message;
import org.dromara.mica.mqtt.core.server.model.Subscribe;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.dromara.mica.mqtt.core.util.PayloadEncode;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.http.common.HttpConst;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.http.common.Method;
import org.tio.server.TioServerConfig;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.json.JsonUtil;

Expand All @@ -46,12 +49,12 @@
* @author L.cm
*/
public class MqttHttpApi {
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttSessionManager sessionManager;
private final MqttServerCreator serverCreator;
private final TioServerConfig mqttServerConfig;

public MqttHttpApi(MqttServerCreator serverCreator) {
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.sessionManager = serverCreator.getSessionManager();
public MqttHttpApi(MqttServerCreator serverCreator, TioServerConfig mqttServerConfig) {
this.serverCreator = serverCreator;
this.mqttServerConfig = mqttServerConfig;
}

/**
Expand All @@ -66,6 +69,18 @@ public HttpResponse endpoints(HttpRequest request) {
return Result.ok(request, MqttHttpRoutes.getRouts().keySet());
}

/**
* 获取 api 列表
* <p>
* GET /api/v1/stats
*
* @param request HttpRequest
* @return HttpResponse
*/
public HttpResponse stats(HttpRequest request) {
return Result.ok(request, this.mqttServerConfig.getStat());
}

/**
* 消息发布
* <p>
Expand Down Expand Up @@ -132,7 +147,7 @@ private void sendPublish(PublishForm form) {
if (StrUtil.isNotBlank(payload)) {
message.setPayload(PayloadEncode.decode(payload, form.getEncoding()));
}
messageDispatcher.send(message);
serverCreator.getMessageDispatcher().send(message);
}

/**
Expand Down Expand Up @@ -255,6 +270,43 @@ public HttpResponse unsubscribeBatch(HttpRequest request) {
return Result.ok();
}

/**
* 获取取客户端信息
*
* <p>
* GET /api/v1/clients/info
*
* @param request HttpRequest
* @return HttpResponse
*/
public HttpResponse getClientInfo(HttpRequest request) {
String clientId = request.getParam("clientId");
if (StrUtil.isBlank(clientId)) {
return Result.fail(request, ResultCode.E101);
}
ChannelContext context = Tio.getByBsId(this.mqttServerConfig, clientId);
if (context == null) {
return Result.fail(request, ResultCode.E101);
}
ClientInfo clientInfo = ClientInfo.form(serverCreator, context);
return Result.ok(request, clientInfo);
}

/**
* 分页拉取客户端列表
*
* <p>
* GET /api/v1/clients?_page=1&amp;_limit=10
*
* @param request HttpRequest
* @return HttpResponse
*/
public HttpResponse getClients(HttpRequest request) {
int page = request.getInt("_page", 1);
int limit = request.getInt("_limit", 10000);
return Result.ok(request, MqttServer.getClients(serverCreator, mqttServerConfig, page, limit));
}

/**
* 踢除指定客户端。注意踢除客户端操作会将连接与会话一并终结。
* <p>
Expand All @@ -271,7 +323,7 @@ public HttpResponse deleteClients(HttpRequest request) {
Message message = new Message();
message.setClientId(clientId);
message.setMessageType(MessageType.DISCONNECT);
messageDispatcher.send(message);
serverCreator.getMessageDispatcher().send(message);
return Result.ok();
}

Expand All @@ -288,7 +340,7 @@ public HttpResponse getClientSubscriptions(HttpRequest request) {
if (StrUtil.isBlank(clientId)) {
return Result.fail(request, ResultCode.E101);
}
List<Subscribe> subscribeList = sessionManager.getSubscriptions(clientId);
List<Subscribe> subscribeList = serverCreator.getSessionManager().getSubscriptions(clientId);
return Result.ok(new HttpResponse(request), subscribeList);
}

Expand All @@ -302,7 +354,7 @@ private void sendSubOrUnSubscribe(BaseForm form) {
} else {
message.setMessageType(MessageType.UNSUBSCRIBE);
}
messageDispatcher.send(message);
serverCreator.getMessageDispatcher().send(message);
}

/**
Expand Down Expand Up @@ -357,12 +409,15 @@ private static HttpResponse validForm(boolean isTopicFilter, BaseForm form, Http
public void register() {
// @formatter:off
MqttHttpRoutes.register(Method.GET, "/api/v1/endpoints", this::endpoints);
MqttHttpRoutes.register(Method.GET, "/api/v1/stats", this::stats);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish", this::publish);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish/batch", this::publishBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe", this::subscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe/batch", this::subscribeBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch);
MqttHttpRoutes.register(Method.GET, "/api/v1/clients/info", this::getClientInfo);
MqttHttpRoutes.register(Method.GET, "/api/v1/clients", this::getClients);
MqttHttpRoutes.register(Method.POST, "/api/v1/clients/delete", this::deleteClients);
MqttHttpRoutes.register(Method.GET, "/api/v1/client/subscriptions", this::getClientSubscriptions);
// @formatter:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public static MqttWebServer config(MqttServerCreator serverCreator, TioServerCon
// 1.1 http-api 用到 json
serverCreator.jsonAdapter(JsonUtil.getJsonAdapter(serverCreator.getJsonAdapter()));
// 1.2 http 路由配置
MqttHttpApi httpApi = new MqttHttpApi(serverCreator);
MqttHttpApi httpApi = new MqttHttpApi(serverCreator, mqttServerConfig);
httpApi.register();
// 1.3 认证配置
String username = serverCreator.getHttpBasicUsername();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.core.server.model.ClientInfo;
import org.dromara.mica.mqtt.core.server.model.Subscribe;
import org.tio.core.ChannelContext;
import org.tio.core.stat.vo.StatVo;
import org.tio.utils.page.Page;
Expand Down Expand Up @@ -187,6 +188,16 @@ public StatVo getStat() {
return mqttServer.getStat();
}

/**
* 获取客户端订阅情况
*
* @param clientId clientId
* @return 订阅集合
*/
public List<Subscribe> getSubscriptions(String clientId) {
return mqttServer.getSubscriptions(clientId);
}

/**
* 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.core.server.model.ClientInfo;
import org.dromara.mica.mqtt.core.server.model.Subscribe;
import org.tio.core.ChannelContext;
import org.tio.core.stat.vo.StatVo;
import org.tio.utils.page.Page;
Expand Down Expand Up @@ -185,6 +186,16 @@ public StatVo getStat() {
return mqttServer.getStat();
}

/**
* 获取客户端订阅情况
*
* @param clientId clientId
* @return 订阅集合
*/
public List<Subscribe> getSubscriptions(String clientId) {
return mqttServer.getSubscriptions(clientId);
}

/**
* 添加定时任务,注意:如果抛出异常,会终止后续任务,请自行处理异常
*
Expand Down

0 comments on commit 2a62fae

Please sign in to comment.