diff --git a/docs/http-api.md b/docs/http-api.md index 133d39d62e4885667f6e49ca1a65ca5e849d9862..b7e45b4c2b4194dc2c17c7fba2c3a57831ea1fb0 100644 --- a/docs/http-api.md +++ b/docs/http-api.md @@ -60,7 +60,7 @@ ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publish" -d '{"topic":"a/b/c","payload":"Hello World","qos":1,"retain":false,"clientId":"example"}' -{"code":0} +{"code":1} ``` ## 主题订阅 @@ -90,7 +90,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publis ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscribe" -d '{"topic":"a/b/c","qos":1,"clientId":"example"}' -{"code":0} +{"code":1} ``` ### POST /api/v1/mqtt/unsubscribe @@ -117,7 +117,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscr ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubscribe" -d '{"topic":"a","clientId":"example"}' -{"code":0} +{"code":1} ``` ## 消息批量发布 @@ -148,7 +148,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publish/batch" -d '[{"topic":"a/b/c","payload":"Hello World","qos":1,"retain":false,"clientId":"example"},{"topic":"a/b/c","payload":"Hello World Again","qos":0,"retain":false,"clientId":"example"}]' -{"code":0} +{"code":1} ``` ## 主题批量订阅 @@ -178,7 +178,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/publis ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscribe/batch" -d '[{"topic":"a","qos":1,"clientId":"example"},{"topic":"b","qos":1,"clientId":"example"},{"topic":"c","qos":1,"clientId":"example"}]' -{"code":0} +{"code":1} ``` ### POST /api/v1/mqtt/unsubscribe/batch @@ -205,7 +205,7 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/subscr ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubscribe/batch" -d '[{"topic":"a","clientId":"example"},{"topic":"b","clientId":"example"}]' -{"code":0} +{"code":1} ``` ## 踢除指定客户端 @@ -233,6 +233,51 @@ $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/mqtt/unsubs ```bash $ curl -i --basic -u mica:mica -X POST "http://localhost:8083/api/v1/clients/delete?clientId=123" -{"code":0} +{"code":1} ``` +## 获取客户端订阅情况 + +### GET /api/v1/client/subscriptions + +获取指定客户端订阅详情。 + +**Query Parameters:** + +| Name | Type | Required | Description | +| -------- | ------ | -------- | ----------- | +| clientId | String | True | ClientID | + +**Success Response Body (JSON):** + +| Name | Type | Description | +| ---- |---------|-------------| +| code | Integer | 0 | +| data | Array | [] | +| topicFilter | String | | +| clientId | String | | +| mqttQoS | Integer | 0 | + +**Examples:** + +踢除指定客户端 + +```bash +$ curl -i --basic -u mica:mica "http://127.0.0.1:8083/api/v1/client/subscriptions?clientId=123" + +{ + "code": 1, + "data": [ + { + "clientId": "123", + "mqttQoS": 0, + "topicFilter": "#" + }, + { + "clientId": "123", + "mqttQoS": 0, + "topicFilter": "testtopic/#" + } + ] +} +``` \ No newline at end of file diff --git a/mica-mqtt-broker/http/mica-mqtt-api.http b/mica-mqtt-broker/http/mica-mqtt-api.http index 448bba78b239cb7f8c8d989b5bb86ccc43d6410b..d5364d7122316c982aa4c43caed4f35c0d643c7a 100644 --- a/mica-mqtt-broker/http/mica-mqtt-api.http +++ b/mica-mqtt-broker/http/mica-mqtt-api.http @@ -99,3 +99,10 @@ Content-Type: application/x-www-form-urlencoded Authorization: Basic {{username}} {{password}} clientId=123 + +### mqtt client subscriptions +GET http://{{host}}/api/v1/client/subscriptions +Content-Type: application/x-www-form-urlencoded +Authorization: Basic {{username}} {{password}} + +clientId=123 diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java index 9da655ccb902338a3762800c4235c1b7984414c0..f6f4e26bf9b795c31ea9ae0e8b2ba17ca94230d5 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/MqttServer.java @@ -20,12 +20,16 @@ import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders; import net.dreamlu.iot.mqtt.codec.MqttPublishMessage; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish; +import net.dreamlu.iot.mqtt.core.server.enums.MessageType; import net.dreamlu.iot.mqtt.core.server.http.core.MqttWebServer; +import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.model.Subscribe; import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; +import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; +import org.tio.core.Node; import org.tio.core.Tio; import org.tio.server.ServerTioConfig; import org.tio.server.TioServer; @@ -46,6 +50,7 @@ public final class MqttServer { private final MqttWebServer webServer; private final MqttServerCreator serverCreator; private final IMqttSessionManager sessionManager; + private final IMqttMessageStore messageStore; private final ScheduledThreadPoolExecutor executor; MqttServer(TioServer tioServer, @@ -56,6 +61,7 @@ public final class MqttServer { this.webServer = webServer; this.serverCreator = serverCreator; this.sessionManager = serverCreator.getSessionManager(); + this.messageStore = serverCreator.getMessageStore(); this.executor = executor; } @@ -183,6 +189,9 @@ public final class MqttServer { } else { payload.rewind(); } + if (retain) { + this.saveRetainMessage(topic, qos, payload); + } MqttPublishMessage message = MqttMessageBuilders.publish() .topicName(topic) .payload(payload) @@ -255,6 +264,9 @@ public final class MqttServer { if (payload == null) { payload = ByteBuffer.allocate(0); } + if (retain) { + this.saveRetainMessage(topic, qos, payload); + } for (Subscribe subscribe : subscribeList) { String clientId = subscribe.getClientId(); ChannelContext context = Tio.getByBsId(getServerConfig(), clientId); @@ -264,11 +276,31 @@ public final class MqttServer { } int subMqttQoS = subscribe.getMqttQoS(); MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf(subMqttQoS) : qos; - publish(context, clientId, topic, payload, mqttQoS, retain); + publish(context, clientId, topic, payload, mqttQoS, false); } return true; } + /** + * 存储保留消息 + * + * @param topic topic + * @param mqttQoS MqttQoS + * @param payload ByteBuffer + */ + private void saveRetainMessage(String topic, MqttQoS mqttQoS, ByteBuffer payload) { + Message retainMessage = new Message(); + retainMessage.setTopic(topic); + retainMessage.setQos(mqttQoS.value()); + retainMessage.setPayload(payload); + retainMessage.setMessageType(MessageType.DOWN_STREAM); + retainMessage.setRetain(true); + retainMessage.setDup(false); + retainMessage.setTimestamp(System.currentTimeMillis()); + retainMessage.setNode(serverCreator.getNodeName()); + this.messageStore.addRetainMessage(topic, retainMessage); + } + /** * 获取 ChannelContext * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java index 37bb119004b49de0c167fe786cd84d9f586a096b..ee2c465e8e9a2af9e5aca2de14d74bdde72e6c74 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/MqttHttpApi.java @@ -17,6 +17,7 @@ package net.dreamlu.iot.mqtt.core.server.http.api; import com.alibaba.fastjson.JSON; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.enums.MessageType; import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; @@ -26,6 +27,8 @@ import net.dreamlu.iot.mqtt.core.server.http.api.form.SubscribeForm; import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.model.Subscribe; +import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.iot.mqtt.core.util.PayloadEncode; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; @@ -44,9 +47,11 @@ import java.util.function.Function; */ public class MqttHttpApi { private final IMqttMessageDispatcher messageDispatcher; + private final IMqttSessionManager sessionManager; - public MqttHttpApi(IMqttMessageDispatcher messageDispatcher) { - this.messageDispatcher = messageDispatcher; + public MqttHttpApi(MqttServerCreator serverCreator) { + this.messageDispatcher = serverCreator.getMessageDispatcher(); + this.sessionManager = serverCreator.getSessionManager(); } /** @@ -270,6 +275,21 @@ public class MqttHttpApi { return Result.ok(response); } + /** + * 获取客户端订阅情况 + * + * @param request HttpRequest + * @return HttpResponse + */ + public HttpResponse getClientSubscriptions(HttpRequest request) { + String clientId = request.getParam("clientId"); + if (StrUtil.isBlank(clientId)) { + return Result.fail(request, ResultCode.E101); + } + List subscribeList = sessionManager.getSubscriptions(clientId); + return Result.ok(new HttpResponse(request), subscribeList); + } + private void sendSubOrUnSubscribe(BaseForm form) { Message message = new Message(); message.setFromClientId(form.getClientId()); @@ -331,6 +351,7 @@ public class MqttHttpApi { MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe); MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch); MqttHttpRoutes.register(Method.POST, "/api/v1/clients/delete", this::deleteClients); + MqttHttpRoutes.register(Method.GET, "/api/v1/client/subscriptions", this::getClientSubscriptions); // @formatter:on } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java index d913bc12f5ae039b23cffcfe2d930cc9e523a8f7..b7428cfe4aab002ec675cacd1f2e889ee08c9e87 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/api/result/Result.java @@ -67,6 +67,17 @@ public final class Result { return ok(new HttpResponse(), data); } + /** + * 响应成功 + * + * @param request HttpRequest + * @param data Object + * @return HttpResponse + */ + public static HttpResponse ok(HttpRequest request, Object data) { + return ok(new HttpResponse(request), data); + } + /** * 响应成功 * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java index 2ebf4931f3c409fba4746c52574e6ba36abc49fb..9d71b0c8f496448efcf16497ce79c537576844ac 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/http/core/MqttWebServer.java @@ -308,7 +308,7 @@ public class MqttWebServer { System.setProperty(TIO_SYSTEM_TIMER_PERIOD, "50"); } // 1.2 http 路由配置 - MqttHttpApi httpApi = new MqttHttpApi(serverCreator.getMessageDispatcher()); + MqttHttpApi httpApi = new MqttHttpApi(serverCreator); httpApi.register(); // 1.3 认证配置 String username = serverCreator.getHttpBasicUsername(); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java index 728f22227f9e78457ffe07a26185ea3687d0dc56..12dc2faa1f78f559873fc423b5d8026b4671a82c 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/IMqttSessionManager.java @@ -63,6 +63,14 @@ public interface IMqttSessionManager { */ List searchSubscribe(String topicName); + /** + * 获取设备订阅 + * + * @param clientId clientId + * @return 订阅列表 + */ + List getSubscriptions(String clientId); + /** * 添加发布过程存储 * diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java index 5e2a4cdbb90bd60c358c10033f0a312fed9af3ab..a0a972ec7fa1e729ac34b9032fc21ef7f71b9eb2 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java @@ -129,6 +129,25 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { return subscribeList; } + @Override + public List getSubscriptions(String clientId) { + List subscribeList = new ArrayList<>(); + Set>> entrySet = subscribeStore.entrySet(); + for (Map.Entry> mapEntry : entrySet) { + ConcurrentMap mapEntryValue = mapEntry.getValue(); + if (mapEntryValue == null || mapEntryValue.isEmpty()) { + continue; + } + Integer qos = mapEntryValue.get(clientId); + if (qos == null) { + continue; + } + String topicFilter = mapEntry.getKey(); + subscribeList.add(new Subscribe(topicFilter, clientId, qos)); + } + return subscribeList; + } + @Override public void addPendingPublish(String clientId, int messageId, MqttPendingPublish pendingPublish) { Map data = pendingPublishStore.computeIfAbsent(clientId, (key) -> new IntObjectHashMap<>(16));