From b9f43e11e16c51bb066bfe7ac3817206d3fdf60a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=9D=8E=E5=AF=BB=E6=AC=A2?= <1101766085@qq.com>
Date: Wed, 25 Aug 2021 18:02:46 +0800
Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E8=80=83=E8=99=91=E5=86=85?=
=?UTF-8?q?=E7=BD=AE=20http=20api=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../dreamlu/iot/mqtt/core/util/HexUtil.java | 172 ++++++++++++++
.../iot/mqtt/core/MqttHttpRequestHandler.java | 17 ++
.../iot/mqtt/core/api/MqttHttpApi.java | 214 ++++++++++++++++--
.../mqtt/core/api/auth/BasicAuthFilter.java | 60 +++++
.../iot/mqtt/core/api/form/PayloadEncode.java | 90 ++++++++
.../iot/mqtt/core/api/form/PublishForm.java | 12 -
.../iot/mqtt/core/api/form/SubscribeForm.java | 38 ++++
.../iot/mqtt/core/core/HttpFilter.java | 47 ++++
.../iot/mqtt/core/core/HttpHandler.java | 4 +-
.../iot/mqtt/core/core/MqttHttpRoutes.java | 32 ++-
10 files changed, 646 insertions(+), 40 deletions(-)
create mode 100644 mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/HexUtil.java
create mode 100644 mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java
create mode 100644 mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java
create mode 100644 mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java
create mode 100644 mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java
diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/HexUtil.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/HexUtil.java
new file mode 100644
index 0000000..5bd8d17
--- /dev/null
+++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/HexUtil.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & www.dreamlu.net).
+ *
+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.gnu.org/licenses/lgpl.html
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.dreamlu.iot.mqtt.core.util;
+
+import org.tio.utils.hutool.StrUtil;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * hex 工具,编解码全用 byte
+ *
+ * @author L.cm
+ */
+public final class HexUtil {
+ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+ private static final byte[] DIGITS_LOWER = new byte[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+ private static final byte[] DIGITS_UPPER = new byte[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
+ /**
+ * encode Hex
+ *
+ * @param data data to hex
+ * @return hex bytes
+ */
+ public static byte[] encode(byte[] data) {
+ return encode(data, true);
+ }
+
+ /**
+ * encode Hex
+ *
+ * @param data data to hex
+ * @param toLowerCase 是否小写
+ * @return hex bytes
+ */
+ public static byte[] encode(byte[] data, boolean toLowerCase) {
+ return encode(data, toLowerCase ? DIGITS_LOWER : DIGITS_UPPER);
+ }
+
+ /**
+ * encode Hex
+ *
+ * @param data Data to Hex
+ * @return bytes as a hex string
+ */
+ private static byte[] encode(byte[] data, byte[] digits) {
+ int len = data.length;
+ byte[] out = new byte[len << 1];
+ for (int i = 0, j = 0; i < len; i++) {
+ out[j++] = digits[(0xF0 & data[i]) >>> 4];
+ out[j++] = digits[0xF & data[i]];
+ }
+ return out;
+ }
+
+ /**
+ * encode Hex
+ *
+ * @param data Data to Hex
+ * @param toLowerCase 是否小写
+ * @return bytes as a hex string
+ */
+ public static String encodeToString(byte[] data, boolean toLowerCase) {
+ return new String(encode(data, toLowerCase), DEFAULT_CHARSET);
+ }
+
+ /**
+ * encode Hex
+ *
+ * @param data Data to Hex
+ * @return bytes as a hex string
+ */
+ public static String encodeToString(byte[] data) {
+ return new String(encode(data), DEFAULT_CHARSET);
+ }
+
+ /**
+ * encode Hex
+ *
+ * @param data Data to Hex
+ * @return bytes as a hex string
+ */
+ public static String encodeToString(String data) {
+ if (StrUtil.isBlank(data)) {
+ return null;
+ }
+ return encodeToString(data.getBytes(DEFAULT_CHARSET));
+ }
+
+ /**
+ * decode Hex
+ *
+ * @param data Hex data
+ * @return decode hex to bytes
+ */
+ public static byte[] decode(String data) {
+ if (StrUtil.isBlank(data)) {
+ return null;
+ }
+ return decode(data.getBytes(DEFAULT_CHARSET));
+ }
+
+ /**
+ * decodeToString Hex
+ *
+ * @param data Data to Hex
+ * @return bytes as a hex string
+ */
+ public static String decodeToString(byte[] data) {
+ byte[] decodeBytes = decode(data);
+ return new String(decodeBytes, DEFAULT_CHARSET);
+ }
+
+ /**
+ * decodeToString Hex
+ *
+ * @param data Data to Hex
+ * @return bytes as a hex string
+ */
+ public static String decodeToString(String data) {
+ if (StrUtil.isBlank(data)) {
+ return null;
+ }
+ return decodeToString(data.getBytes(DEFAULT_CHARSET));
+ }
+
+ /**
+ * decode Hex
+ *
+ * @param data Hex data
+ * @return decode hex to bytes
+ */
+ public static byte[] decode(byte[] data) {
+ int len = data.length;
+ if ((len & 0x01) != 0) {
+ throw new IllegalArgumentException("hexBinary needs to be even-length: " + len);
+ }
+ byte[] out = new byte[len >> 1];
+ for (int i = 0, j = 0; j < len; i++) {
+ int f = toDigit(data[j], j) << 4;
+ j++;
+ f |= toDigit(data[j], j);
+ j++;
+ out[i] = (byte) (f & 0xFF);
+ }
+ return out;
+ }
+
+ private static int toDigit(byte b, int index) {
+ int digit = Character.digit(b, 16);
+ if (digit == -1) {
+ throw new IllegalArgumentException("Illegal hexadecimal byte " + b + " at index " + index);
+ }
+ return digit;
+ }
+
+}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java
index 071443a..246091c 100644
--- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/MqttHttpRequestHandler.java
@@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.core;
import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.api.result.Result;
+import net.dreamlu.iot.mqtt.core.core.HttpFilter;
import net.dreamlu.iot.mqtt.core.core.HttpHandler;
import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes;
import org.slf4j.Logger;
@@ -25,6 +26,8 @@ import org.slf4j.LoggerFactory;
import org.tio.http.common.*;
import org.tio.http.common.handler.HttpRequestHandler;
+import java.util.List;
+
/**
* mqtt http 消息处理
*
@@ -36,10 +39,24 @@ public class MqttHttpRequestHandler implements HttpRequestHandler {
@Override
public HttpResponse handler(HttpRequest request) {
RequestLine requestLine = request.getRequestLine();
+ // 1. 处理过滤器
+ List httpFilters = MqttHttpRoutes.getFilters();
+ try {
+ for (HttpFilter filter : httpFilters) {
+ if (!filter.filter(request)) {
+ HttpResponse response = new HttpResponse(request);
+ return filter.response(request, response);
+ }
+ }
+ } catch (Exception e) {
+ return resp500(request, requestLine, e);
+ }
+ // 2. 路由处理
HttpHandler handler = MqttHttpRoutes.getHandler(requestLine);
if (handler == null) {
return resp404(request, requestLine);
}
+ logger.info("mqtt http api {} path:{}", requestLine.getMethod().name(), requestLine.getPathAndQuery());
try {
return handler.apply(request);
} catch (Exception e) {
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java
index e58f9b1..206274d 100644
--- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/MqttHttpApi.java
@@ -17,14 +17,26 @@
package net.dreamlu.iot.mqtt.core.api;
import com.alibaba.fastjson.JSON;
+import net.dreamlu.iot.mqtt.codec.MqttMessageType;
+import net.dreamlu.iot.mqtt.codec.MqttQoS;
+import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
+import net.dreamlu.iot.mqtt.core.api.form.BaseForm;
+import net.dreamlu.iot.mqtt.core.api.form.PayloadEncode;
import net.dreamlu.iot.mqtt.core.api.form.PublishForm;
+import net.dreamlu.iot.mqtt.core.api.form.SubscribeForm;
import net.dreamlu.iot.mqtt.core.api.result.Result;
import net.dreamlu.iot.mqtt.core.core.MqttHttpRoutes;
-import net.dreamlu.iot.mqtt.core.server.MqttServer;
+import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
+import net.dreamlu.iot.mqtt.core.server.model.Message;
+import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
-import org.tio.http.common.HttpResponseStatus;
import org.tio.http.common.Method;
+import org.tio.utils.hutool.StrUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.function.Function;
/**
* mqtt http api
@@ -32,10 +44,13 @@ import org.tio.http.common.Method;
* @author L.cm
*/
public class MqttHttpApi {
- private final MqttServer mqttServer;
+ private final IMqttMessageDispatcher messageDispatcher;
+ private final IMqttSessionManager sessionManager;
- public MqttHttpApi(MqttServer mqttServer) {
- this.mqttServer = mqttServer;
+ public MqttHttpApi(IMqttMessageDispatcher messageDispatcher,
+ IMqttSessionManager sessionManager) {
+ this.messageDispatcher = messageDispatcher;
+ this.sessionManager = sessionManager;
}
/**
@@ -47,17 +62,19 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse publish(HttpRequest request) throws Exception {
- byte[] requestBody = request.getBody();
+ PublishForm form = readForm(request, (requestBody) ->
+ JSON.parseObject(requestBody, PublishForm.class)
+ );
HttpResponse response = new HttpResponse();
- if (requestBody == null) {
- response.setStatus(HttpResponseStatus.C400);
- return response;
+ if (form == null) {
+ return Result.fail(response, ResultCode.E101);
}
- PublishForm form = JSON.parseObject(requestBody, PublishForm.class);
- String clientId = form.getClientId();
- String topic = form.getTopic();
- String payload = form.getPayload();
-// mqttServer.publish()
+ // 表单校验
+ HttpResponse validResponse = validForm(form, response);
+ if (validResponse != null) {
+ return validResponse;
+ }
+ send(form);
return Result.ok(response);
}
@@ -70,7 +87,42 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse publishBatch(HttpRequest request) throws Exception {
- return null;
+ List formList = readForm(request, (requestBody) -> {
+ String jsonBody = new String(requestBody, StandardCharsets.UTF_8);
+ return JSON.parseArray(jsonBody, PublishForm.class);
+ });
+ HttpResponse response = new HttpResponse();
+ if (formList == null || formList.isEmpty()) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ // 参数校验,保证一个批次同时不成功,所以先校验
+ for (PublishForm form : formList) {
+ // 表单校验
+ HttpResponse validResponse = validForm(form, response);
+ if (validResponse != null) {
+ return validResponse;
+ }
+ }
+ // 批量发送
+ for (PublishForm form : formList) {
+ send(form);
+ }
+ return Result.ok(response);
+ }
+
+ private void send(PublishForm form) {
+ String payload = form.getPayload();
+ Message message = new Message();
+ message.setMessageType(MqttMessageType.PUBLISH.value());
+ message.setClientId(form.getClientId());
+ message.setTopic(form.getTopic());
+ message.setQos(form.getQos());
+ message.setRetain(form.isRetain());
+ // payload 解码
+ if (StrUtil.isNotBlank(payload)) {
+ message.setPayload(PayloadEncode.decode(payload, form.getEncoding()));
+ }
+ messageDispatcher.send(message);
}
/**
@@ -82,7 +134,25 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse subscribe(HttpRequest request) throws Exception {
- return null;
+ SubscribeForm form = readForm(request, (requestBody) ->
+ JSON.parseObject(requestBody, SubscribeForm.class)
+ );
+ HttpResponse response = new HttpResponse();
+ if (form == null) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ // 表单校验
+ HttpResponse validResponse = validForm(form, response);
+ if (validResponse != null) {
+ return validResponse;
+ }
+ int qos = form.getQos();
+ if (qos < 0 || qos > 2) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
+ sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(qos));
+ return Result.ok(response);
}
/**
@@ -94,7 +164,32 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse subscribeBatch(HttpRequest request) throws Exception {
- return null;
+ List formList = readForm(request, (requestBody) -> {
+ String jsonBody = new String(requestBody, StandardCharsets.UTF_8);
+ return JSON.parseArray(jsonBody, SubscribeForm.class);
+ });
+ HttpResponse response = new HttpResponse();
+ if (formList == null || formList.isEmpty()) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ // 参数校验,保证一个批次同时不成功,所以先校验
+ for (SubscribeForm form : formList) {
+ // 表单校验
+ HttpResponse validResponse = validForm(form, response);
+ if (validResponse != null) {
+ return validResponse;
+ }
+ int qos = form.getQos();
+ if (qos < 0 || qos > 2) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ }
+ // 批量处理
+ for (SubscribeForm form : formList) {
+ // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
+ sessionManager.addSubscribe(form.getTopic(), form.getClientId(), MqttQoS.valueOf(form.getQos()));
+ }
+ return Result.ok(response);
}
/**
@@ -106,7 +201,21 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse unsubscribe(HttpRequest request) throws Exception {
- return null;
+ BaseForm form = readForm(request, (requestBody) ->
+ JSON.parseObject(requestBody, BaseForm.class)
+ );
+ HttpResponse response = new HttpResponse();
+ if (form == null) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ // 表单校验
+ HttpResponse validResponse = validForm(form, response);
+ if (validResponse != null) {
+ return validResponse;
+ }
+ // 接口手动取消的订阅关系,可用来调试,不建议其他场景使用
+ sessionManager.removeSubscribe(form.getTopic(), form.getClientId());
+ return Result.ok(response);
}
/**
@@ -118,6 +227,63 @@ public class MqttHttpApi {
* @return HttpResponse
*/
public HttpResponse unsubscribeBatch(HttpRequest request) throws Exception {
+ List formList = readForm(request, (requestBody) -> {
+ String jsonBody = new String(requestBody, StandardCharsets.UTF_8);
+ return JSON.parseArray(jsonBody, BaseForm.class);
+ });
+ HttpResponse response = new HttpResponse();
+ if (formList == null || formList.isEmpty()) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ // 参数校验,保证一个批次同时不成功,所以先校验
+ for (BaseForm form : formList) {
+ // 表单校验
+ HttpResponse validResponse = validForm(form, response);
+ if (validResponse != null) {
+ return validResponse;
+ }
+ }
+ // 批量处理
+ for (BaseForm form : formList) {
+ // 接口手动添加的订阅关系,可用来调试,不建议其他场景使用
+ sessionManager.removeSubscribe(form.getTopic(), form.getClientId());
+ }
+ return Result.ok(response);
+ }
+
+ /**
+ * 读取表单
+ *
+ * @param request HttpRequest
+ * @param function Function
+ * @param 泛型
+ * @return 表单
+ */
+ private static T readForm(HttpRequest request, Function function) {
+ byte[] requestBody = request.getBody();
+ if (requestBody == null) {
+ return null;
+ }
+ return function.apply(requestBody);
+ }
+
+ /**
+ * 校验表单
+ *
+ * @param form BaseForm
+ * @param response HttpResponse
+ * @return 表单
+ */
+ private static HttpResponse validForm(BaseForm form, HttpResponse response) {
+ // 必须的参数
+ String clientId = form.getClientId();
+ if (StrUtil.isBlank(clientId)) {
+ return Result.fail(response, ResultCode.E101);
+ }
+ String topic = form.getTopic();
+ if (StrUtil.isBlank(topic)) {
+ return Result.fail(response, ResultCode.E101);
+ }
return null;
}
@@ -126,12 +292,12 @@ public class MqttHttpApi {
*/
public void register() {
// @formatter:off
- MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish", this::publish);
- MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish/batch", this::publishBatch);
- MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe", this::subscribe);
- MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe/batch", this::subscribeBatch);
- MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe);
- MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch);
+ MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish", this::publish);
+ MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/publish/batch", this::publishBatch);
+ MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe", this::subscribe);
+ MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/subscribe/batch", this::subscribeBatch);
+ MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe", this::unsubscribe);
+ MqttHttpRoutes.register(Method.POST, "/api/v1/mqtt/unsubscribe/batch", this::unsubscribeBatch);
// @formatter:on
}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java
new file mode 100644
index 0000000..2c9eab3
--- /dev/null
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/auth/BasicAuthFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.dreamlu.iot.mqtt.core.api.auth;
+
+import net.dreamlu.iot.mqtt.core.api.code.ResultCode;
+import net.dreamlu.iot.mqtt.core.api.result.Result;
+import net.dreamlu.iot.mqtt.core.core.HttpFilter;
+import org.tio.http.common.HttpRequest;
+import org.tio.http.common.HttpResponse;
+import org.tio.utils.hutool.StrUtil;
+
+import java.util.Objects;
+
+/**
+ * Basic 认证
+ *
+ * @author L.cm
+ */
+public class BasicAuthFilter implements HttpFilter {
+ public static final String BASIC_AUTH_HEADER_NAME = "Authorization";
+ public static final String AUTHORIZATION_PREFIX = "Basic ";
+ private final String token;
+
+ public BasicAuthFilter(String token) {
+ this.token = Objects.requireNonNull(token, "Basic auth token is null");
+ }
+
+ @Override
+ public boolean filter(HttpRequest request) throws Exception {
+ String authorization = request.getHeader(BASIC_AUTH_HEADER_NAME);
+ if (StrUtil.isBlank(authorization)) {
+ return false;
+ }
+ int length = AUTHORIZATION_PREFIX.length();
+ if (length >= authorization.length()) {
+ return false;
+ }
+ return token.equals(authorization.substring(length));
+ }
+
+ @Override
+ public HttpResponse response(HttpRequest request, HttpResponse response) {
+ return Result.fail(response, ResultCode.E103);
+ }
+
+}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java
new file mode 100644
index 0000000..3ab15ac
--- /dev/null
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PayloadEncode.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.dreamlu.iot.mqtt.core.api.form;
+
+import net.dreamlu.iot.mqtt.core.util.HexUtil;
+import org.tio.utils.hutool.StrUtil;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * 消息正文编码
+ *
+ * @author L.cm
+ */
+public enum PayloadEncode {
+ /**
+ * 纯文本、hex、base64
+ */
+ plain {
+ @Override
+ public byte[] decode(String data) {
+ return data.getBytes(StandardCharsets.UTF_8);
+ }
+ },
+ hex {
+ @Override
+ public byte[] decode(String data) {
+ return HexUtil.decode(data);
+ }
+ },
+ base64 {
+ @Override
+ public byte[] decode(String data) {
+ return Base64.getDecoder().decode(data);
+ }
+ };
+
+ /**
+ * 解码
+ *
+ * @return byte array
+ */
+ public abstract byte[] decode(String data);
+
+ /**
+ * 解码
+ *
+ * @param data data
+ * @param encoding encoding
+ * @return byte array
+ */
+ public static byte[] decode(String data, String encoding) {
+ return PayloadEncode.getEncode(encoding).decode(data);
+ }
+
+ /**
+ * 获取解码器
+ *
+ * @param encoding encoding
+ * @return PayloadEncode
+ */
+ public static PayloadEncode getEncode(String encoding) {
+ if (StrUtil.isBlank(encoding)) {
+ return PayloadEncode.plain;
+ }
+ PayloadEncode[] values = PayloadEncode.values();
+ for (PayloadEncode encode : values) {
+ if (encode.name().equalsIgnoreCase(encoding)) {
+ return encode;
+ }
+ }
+ return PayloadEncode.plain;
+ }
+
+}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java
index 1803ee9..f2bfe5e 100644
--- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/PublishForm.java
@@ -23,10 +23,6 @@ package net.dreamlu.iot.mqtt.core.api.form;
*/
public class PublishForm extends BaseForm {
- /**
- * 以 , 分割的多个主题,使用此字段能够同时发布消息到多个主题
- */
- private String topics;
/**
* 消息正文
*/
@@ -44,14 +40,6 @@ public class PublishForm extends BaseForm {
*/
private boolean retain = false;
- public String getTopics() {
- return topics;
- }
-
- public void setTopics(String topics) {
- this.topics = topics;
- }
-
public String getPayload() {
return payload;
}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java
new file mode 100644
index 0000000..5378391
--- /dev/null
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/api/form/SubscribeForm.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.dreamlu.iot.mqtt.core.api.form;
+
+/**
+ * 订阅表单
+ *
+ * @author L.cm
+ */
+public class SubscribeForm extends BaseForm {
+
+ /**
+ * QoS 等级 0
+ */
+ private int qos = 0;
+
+ public int getQos() {
+ return qos;
+ }
+
+ public void setQos(int qos) {
+ this.qos = qos;
+ }
+}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java
new file mode 100644
index 0000000..fee2ada
--- /dev/null
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpFilter.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.dreamlu.iot.mqtt.core.core;
+
+import org.tio.http.common.HttpRequest;
+import org.tio.http.common.HttpResponse;
+
+/**
+ * http 过滤器
+ *
+ * @author L.cm
+ */
+public interface HttpFilter {
+
+ /**
+ * 处理请求
+ *
+ * @param request HttpRequest
+ * @return 可以为null
+ * @throws Exception Exception
+ */
+ boolean filter(HttpRequest request) throws Exception;
+
+ /**
+ * 响应
+ *
+ * @param request HttpRequest
+ * @param response HttpResponse
+ * @return HttpResponse
+ */
+ HttpResponse response(HttpRequest request, HttpResponse response);
+
+}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java
index 73ec634..af6e7a3 100644
--- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/HttpHandler.java
@@ -30,10 +30,10 @@ public interface HttpHandler {
/**
* 处理请求
*
- * @param packet HttpRequest
+ * @param request HttpRequest
* @return 可以为null
* @throws Exception Exception
*/
- HttpResponse apply(HttpRequest packet) throws Exception;
+ HttpResponse apply(HttpRequest request) throws Exception;
}
diff --git a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java
index 0cb4dd0..f132685 100644
--- a/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java
+++ b/mica-mqtt-core/src/test/java/net/dreamlu/iot/mqtt/core/core/MqttHttpRoutes.java
@@ -19,8 +19,7 @@ package net.dreamlu.iot.mqtt.core.core;
import org.tio.http.common.Method;
import org.tio.http.common.RequestLine;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
/**
* mqtt http api 路由
@@ -28,8 +27,37 @@ import java.util.Map;
* @author L.cm
*/
public final class MqttHttpRoutes {
+ private static final LinkedList FILTERS = new LinkedList<>();
private static final Map ROUTS = new HashMap<>();
+ /**
+ * 注册路由
+ *
+ * @param filter HttpFilter
+ */
+ public static void addFilter(HttpFilter filter) {
+ FILTERS.add(filter);
+ }
+
+ /**
+ * 注册路由
+ *
+ * @param index index
+ * @param filter HttpFilter
+ */
+ public static void addFilter(int index, HttpFilter filter) {
+ FILTERS.add(index, filter);
+ }
+
+ /**
+ * 读取所以的过滤器
+ *
+ * @return 过滤器集合
+ */
+ public static List getFilters() {
+ return Collections.unmodifiableList(FILTERS);
+ }
+
/**
* 注册路由
*
--
GitLab