提交 7d24ac37 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt server 完善 http api。

上级 cac3221c
......@@ -60,7 +60,7 @@
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publish" -d '{"topic":"a/b/c","payload":"Hello World","qos":1,"retain":false,"clientId":"example"}'
{"code":0}
{"code":1}
```
## 主题订阅
......@@ -90,7 +90,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publis
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscribe" -d '{"topic":"a/b/c","qos":1,"clientId":"example"}'
{"code":0}
{"code":1}
```
### POST /api/v1/mqtt/unsubscribe
......@@ -117,7 +117,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscr
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubscribe" -d '{"topic":"a","clientId":"example"}'
{"code":0}
{"code":1}
```
## 消息批量发布
......@@ -148,7 +148,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publish/batch" -d '[{"topic":"a/b/c","payload":"Hello World","qos":1,"retain":false,"clientId":"example"},{"topic":"a/b/c","payload":"Hello World Again","qos":0,"retain":false,"clientId":"example"}]'
{"code":0}
{"code":1}
```
## 主题批量订阅
......@@ -178,7 +178,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publis
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscribe/batch" -d '[{"topic":"a","qos":1,"clientId":"example"},{"topic":"b","qos":1,"clientId":"example"},{"topic":"c","qos":1,"clientId":"example"}]'
{"code":0}
{"code":1}
```
### POST /api/v1/mqtt/unsubscribe/batch
......@@ -205,7 +205,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscr
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubscribe/batch" -d '[{"topic":"a","clientId":"example"},{"topic":"b","clientId":"example"}]'
{"code":0}
{"code":1}
```
## 踢除指定客户端
......@@ -233,6 +233,51 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs
```bash
$ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/clients/delete?clientId=123"
{"code":0}
{"code":1}
```
## 获取客户端订阅情况
### GET /api/v1/client/subscriptions
获取指定客户端订阅详情。
**Query Parameters:**
| Name | Type | Required | Description |
| -------- | ------ | -------- | ----------- |
| clientId | String | True | ClientID |
**Success Response Body (JSON):**
| Name | Type | Description |
| ---- |---------|-------------|
| code | Integer | 0 |
| data | Array | [] |
| topicFilter | String | |
| clientId | String | |
| mqttQoS | Integer | 0 |
**Examples:**
踢除指定客户端
```bash
$ curl -i --basic -u mica:mica "http://127.0.0.1:8083/api/v1/client/subscriptions?clientId=123"
{
"code": 1,
"data": [
{
"clientId": "123",
"mqttQoS": 0,
"topicFilter": "#"
},
{
"clientId": "123",
"mqttQoS": 0,
"topicFilter": "testtopic/#"
}
]
}
```
\ No newline at end of file
......@@ -99,3 +99,10 @@ Content-Type: application/x-www-form-urlencoded
Authorization: Basic {{username}} {{password}}
clientId=123
### mqtt client subscriptions
GET http://{{host}}/api/v1/client/subscriptions
Content-Type: application/x-www-form-urlencoded
Authorization: Basic {{username}} {{password}}
clientId=123
......@@ -20,12 +20,16 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.server.ServerTioConfig;
import org.tio.server.TioServer;
......@@ -46,6 +50,7 @@ public final class MqttServer {
private final MqttWebServer webServer;
private final MqttServerCreator serverCreator;
private final IMqttSessionManager sessionManager;
private final IMqttMessageStore messageStore;
private final ScheduledThreadPoolExecutor executor;
MqttServer(TioServer tioServer,
......@@ -56,6 +61,7 @@ public final class MqttServer {
this.webServer = webServer;
this.serverCreator = serverCreator;
this.sessionManager = serverCreator.getSessionManager();
this.messageStore = serverCreator.getMessageStore();
this.executor = executor;
}
......@@ -183,6 +189,9 @@ public final class MqttServer {
} else {
payload.rewind();
}
if (retain) {
this.saveRetainMessage(topic, qos, payload);
}
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.payload(payload)
......@@ -255,6 +264,9 @@ public final class MqttServer {
if (payload == null) {
payload = ByteBuffer.allocate(0);
}
if (retain) {
this.saveRetainMessage(topic, qos, payload);
}
for (Subscribe subscribe : subscribeList) {
String clientId = subscribe.getClientId();
ChannelContext context = Tio.getByBsId(getServerConfig(), clientId);
......@@ -264,11 +276,31 @@ public final class MqttServer {
}
int subMqttQoS = subscribe.getMqttQoS();
MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos;
publish(context, clientId, topic, payload, mqttQoS, retain);
publish(context, clientId, topic, payload, mqttQoS, false);
}
return true;
}
/**
* 存储保留消息
*
* @param topic topic
* @param mqttQoS MqttQoS
* @param payload ByteBuffer
*/
private void saveRetainMessage(String topic, MqttQoS mqttQoS, ByteBuffer payload) {
Message retainMessage = new Message();
retainMessage.setTopic(topic);
retainMessage.setQos(mqttQoS.value());
retainMessage.setPayload(payload);
retainMessage.setMessageType(MessageType.DOWN_STREAM);
retainMessage.setRetain(true);
retainMessage.setDup(false);
retainMessage.setTimestamp(System.currentTimeMillis());
retainMessage.setNode(serverCreator.getNodeName());
this.messageStore.addRetainMessage(topic, retainMessage);
}
/**
* 获取 ChannelContext
*
......
......@@ -17,6 +17,7 @@
package net.dreamlu.iot.mqtt.core.server.http.api;
import com.alibaba.fastjson.JSON;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
......@@ -26,6 +27,8 @@ import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.util.PayloadEncode;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
......@@ -44,9 +47,11 @@ import java.util.function.Function;
*/
public class MqttHttpApi {
private final IMqttMessageDispatcher messageDispatcher;
private final IMqttSessionManager sessionManager;
public MqttHttpApi(IMqttMessageDispatcher messageDispatcher) {
this.messageDispatcher = messageDispatcher;
public MqttHttpApi(MqttServerCreator serverCreator) {
this.messageDispatcher = serverCreator.getMessageDispatcher();
this.sessionManager = serverCreator.getSessionManager();
}
/**
......@@ -270,6 +275,21 @@ public class MqttHttpApi {
return Result.ok(response);
}
/**
* 获取客户端订阅情况
*
* @param request HttpRequest
* @return HttpResponse
*/
public HttpResponse getClientSubscriptions(HttpRequest request) {
String clientId = request.getParam("clientId");
if (StrUtil.isBlank(clientId)) {
return Result.fail(request, ResultCode.E101);
}
List<Subscribe> subscribeList = sessionManager.getSubscriptions(clientId);
return Result.ok(new HttpResponse(request), subscribeList);
}
private void sendSubOrUnSubscribe(BaseForm form) {
Message message = new Message();
message.setFromClientId(form.getClientId());
......@@ -331,6 +351,7 @@ public class MqttHttpApi {
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe);
MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch);
MqttHttpRoutes.register(Method.POST, "/api/v1/clients/delete", this::deleteClients);
MqttHttpRoutes.register(Method.GET, "/api/v1/client/subscriptions", this::getClientSubscriptions);
// @formatter:on
}
......
......@@ -67,6 +67,17 @@ public final class Result {
return ok(new HttpResponse(), data);
}
/**
* 响应成功
*
* @param request HttpRequest
* @param data Object
* @return HttpResponse
*/
public static HttpResponse ok(HttpRequest request, Object data) {
return ok(new HttpResponse(request), data);
}
/**
* 响应成功
*
......
......@@ -308,7 +308,7 @@ public class MqttWebServer {
System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50");
}
// 1.2 http 路由配置
MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher());
MqttHttpApi httpApi = new MqttHttpApi(serverCreator);
httpApi.register();
// 1.3 认证配置
String username = serverCreator.getHttpBasicUsername();
......
......@@ -63,6 +63,14 @@ public interface IMqttSessionManager {
*/
List<Subscribe> searchSubscribe(String topicName);
/**
* 获取设备订阅
*
* @param clientId clientId
* @return 订阅列表
*/
List<Subscribe> getSubscriptions(String clientId);
/**
* 添加发布过程存储
*
......
......@@ -129,6 +129,25 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
return subscribeList;
}
@Override
public List<Subscribe> getSubscriptions(String clientId) {
List<Subscribe> subscribeList = new ArrayList<>();
Set<Map.Entry<String, ConcurrentMap<String, Integer>>> entrySet = subscribeStore.entrySet();
for (Map.Entry<String, ConcurrentMap<String, Integer>> mapEntry : entrySet) {
ConcurrentMap<String, Integer> mapEntryValue = mapEntry.getValue();
if (mapEntryValue == null || mapEntryValue.isEmpty()) {
continue;
}
Integer qos = mapEntryValue.get(clientId);
if (qos == null) {
continue;
}
String topicFilter = mapEntry.getKey();
subscribeList.add(new Subscribe(topicFilter, clientId, qos));
}
return subscribeList;
}
@Override
public void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish) {
Map<Integer, MqttPendingPublish> data = pendingPublishStore.computeIfAbsent(clientId, (key) -> new IntObjectHashMap<>(16));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册