diff --git a/mqtts-common/mqtts-common-core/pom.xml b/mqtts-common/mqtts-common-core/pom.xml
index 5218f758f8b62e4827216dd640957bc74548a3b0..884bc009a463c4cdb4cdae466d85b7914a53e2ad 100644
--- a/mqtts-common/mqtts-common-core/pom.xml
+++ b/mqtts-common/mqtts-common-core/pom.xml
@@ -114,6 +114,13 @@
5.7.14
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
+
diff --git a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DemoMessageInterceptor.java b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DemoMessageInterceptor.java
index 56e8755c4f87cbd7e47692fd87bdd01541cc5017..e12bb6817a874836288a37fd257aeeed0d65d0f6 100644
--- a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DemoMessageInterceptor.java
+++ b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DemoMessageInterceptor.java
@@ -1,97 +1,97 @@
-package net.mqtts.broker.service;
-
-import io.github.quickmsg.common.channel.MqttChannel;
-import io.github.quickmsg.common.config.Configuration;
-import io.github.quickmsg.common.context.ReceiveContext;
-import io.github.quickmsg.common.interceptor.Interceptor;
-import io.github.quickmsg.common.interceptor.Invocation;
-import io.github.quickmsg.common.message.HeapMqttMessage;
-import io.github.quickmsg.common.message.SmqttMessage;
-import io.github.quickmsg.common.rule.DslExecutor;
-import io.github.quickmsg.common.utils.MessageUtils;
-import io.netty.handler.codec.mqtt.MqttFixedHeader;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.handler.codec.mqtt.MqttPublishMessage;
-import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
-import reactor.core.scheduler.Schedulers;
-
-
-/**
- * @Description: mqtt消息拦截器示例
- * @Author: ShiHuan Sun
- * @E-mail: 13733918655@163.com
- * @CreateDate: 2021/11/3$ 18:47$
- * @UpdateUser: ShiHuan Sun
- * @UpdateDate: 2021/11/3$ 18:47$
- * @UpdateRemark: 修改内容
- * @Version: 1.0
- */
-@Service
-@Slf4j
-@Component
-public class DemoMessageInterceptor implements Interceptor {
- /**
- * 拦截目标参数
- *
- * @param invocation {@link Invocation}
- * @return Object
- */
- @Override
- public Object intercept(Invocation invocation) {
- try {
- MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
- SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
- ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
- DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
- MqttMessage message = smqttMessage.getMessage();
- if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage) {
- MqttPublishMessage publishMessage = (MqttPublishMessage) message;
- HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
- log.info("TOPIC-"+heapMqttMessage.getTopic()+"------Message:"+new String(heapMqttMessage.getMessage()));
- if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) {
- mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
- }
- if (dslExecutor.isExecute()) {
- dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext);
- }
- }
- return invocation.proceed(); // 放行
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 构建消息体
- *
- * @param message {@link MqttPublishMessage}
- * @param timestamp
- * @return {@link HeapMqttMessage}
- */
- private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
- MqttPublishVariableHeader header = message.variableHeader();
- MqttFixedHeader fixedHeader = message.fixedHeader();
- return HeapMqttMessage.builder()
- .timestamp(timestamp)
- .clientIdentifier(channel.getClientIdentifier())
- .message(MessageUtils.copyReleaseByteBuf(message.payload()))
- .topic(header.topicName())
- .retain(fixedHeader.isRetain())
- .qos(fixedHeader.qosLevel().value())
- .build();
- }
-
- /**
- * 排序
- *
- * @return 排序
- */
- @Override
- public int sort() {
- return 0;
- }
-}
+//package net.mqtts.broker.service;
+//
+//import io.github.quickmsg.common.channel.MqttChannel;
+//import io.github.quickmsg.common.config.Configuration;
+//import io.github.quickmsg.common.context.ReceiveContext;
+//import io.github.quickmsg.common.interceptor.Interceptor;
+//import io.github.quickmsg.common.interceptor.Invocation;
+//import io.github.quickmsg.common.message.HeapMqttMessage;
+//import io.github.quickmsg.common.message.SmqttMessage;
+//import io.github.quickmsg.common.rule.DslExecutor;
+//import io.github.quickmsg.common.utils.MessageUtils;
+//import io.netty.handler.codec.mqtt.MqttFixedHeader;
+//import io.netty.handler.codec.mqtt.MqttMessage;
+//import io.netty.handler.codec.mqtt.MqttPublishMessage;
+//import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.stereotype.Component;
+//import org.springframework.stereotype.Service;
+//import reactor.core.scheduler.Schedulers;
+//
+//
+///**
+// * @Description: mqtt消息拦截器示例
+// * @Author: ShiHuan Sun
+// * @E-mail: 13733918655@163.com
+// * @CreateDate: 2021/11/3$ 18:47$
+// * @UpdateUser: ShiHuan Sun
+// * @UpdateDate: 2021/11/3$ 18:47$
+// * @UpdateRemark: 修改内容
+// * @Version: 1.0
+// */
+//@Service
+//@Slf4j
+//@Component
+//public class DemoMessageInterceptor implements Interceptor {
+// /**
+// * 拦截目标参数
+// *
+// * @param invocation {@link Invocation}
+// * @return Object
+// */
+// @Override
+// public Object intercept(Invocation invocation) {
+// try {
+// MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
+// SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
+// ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
+// DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
+// MqttMessage message = smqttMessage.getMessage();
+// if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage) {
+// MqttPublishMessage publishMessage = (MqttPublishMessage) message;
+// HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
+// log.info("TOPIC-"+heapMqttMessage.getTopic()+"------Message:"+new String(heapMqttMessage.getMessage()));
+// if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) {
+// mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
+// }
+// if (dslExecutor.isExecute()) {
+// dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext);
+// }
+// }
+// return invocation.proceed(); // 放行
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// return null;
+// }
+//
+// /**
+// * 构建消息体
+// *
+// * @param message {@link MqttPublishMessage}
+// * @param timestamp
+// * @return {@link HeapMqttMessage}
+// */
+// private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
+// MqttPublishVariableHeader header = message.variableHeader();
+// MqttFixedHeader fixedHeader = message.fixedHeader();
+// return HeapMqttMessage.builder()
+// .timestamp(timestamp)
+// .clientIdentifier(channel.getClientIdentifier())
+// .message(MessageUtils.copyReleaseByteBuf(message.payload()))
+// .topic(header.topicName())
+// .retain(fixedHeader.isRetain())
+// .qos(fixedHeader.qosLevel().value())
+// .build();
+// }
+//
+// /**
+// * 排序
+// *
+// * @return 排序
+// */
+// @Override
+// public int sort() {
+// return 0;
+// }
+//}
diff --git a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
index 515880ee4603c4fd0f6a8e4f9e5672c91519828e..7d0fea4fbf7b17e8db3b4610ad68971597101ceb 100644
--- a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
+++ b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
@@ -1,125 +1,125 @@
-//package net.mqtts.broker.service;
-//
-//import cn.hutool.core.date.LocalDateTimeUtil;
-//import io.github.quickmsg.common.channel.MqttChannel;
-//import io.github.quickmsg.common.config.Configuration;
-//import io.github.quickmsg.common.context.ReceiveContext;
-//import io.github.quickmsg.common.interceptor.Interceptor;
-//import io.github.quickmsg.common.interceptor.Invocation;
-//import io.github.quickmsg.common.message.HeapMqttMessage;
-//import io.github.quickmsg.common.message.SmqttMessage;
-//import io.github.quickmsg.common.rule.DslExecutor;
-//import io.github.quickmsg.common.utils.MessageUtils;
-//import io.netty.handler.codec.mqtt.*;
-//import lombok.extern.slf4j.Slf4j;
-//import net.mqtts.link.api.RemoteMqttsDeviceActionService;
-//import net.mqtts.link.api.RemoteMqttsDeviceService;
-//import net.mqtts.link.api.domain.MqttsDevice;
-//import net.mqtts.link.api.domain.MqttsDeviceAction;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.stereotype.Component;
-//import org.springframework.stereotype.Service;
-//
-//import javax.annotation.PostConstruct;
-//import java.util.Arrays;
-//import java.util.List;
-//
-///**
-// * @Description: Mqtt 设备动作拦截处理
-// * @Author: ShiHuan Sun
-// * @E-mail: 13733918655@163.com
-// * @Website: http://mqtts.net
-// * @CreateDate: 2021/11/16$ 10:33$
-// * @UpdateUser: ShiHuan Sun
-// * @UpdateDate: 2021/11/16$ 10:33$
-// * @UpdateRemark: 修改内容
-// * @Version: 1.0
-// */
-//@Service
-//@Slf4j
-//@Component
-//public class DeviceActionInterceptor implements Interceptor {
-//
-// private static DeviceActionInterceptor DeviceActionInterceptor;
-//
-// @Autowired
-// private RemoteMqttsDeviceService mqttsDeviceService;
-//
-// @Autowired
-// private RemoteMqttsDeviceActionService mqttsDeviceActionService;
-//
-//
-// @PostConstruct
-// public void init() {
-// DeviceActionInterceptor = this;
-// DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
-// DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService;
-// }
-//
-// /**
-// * 拦截目标参数
-// *
-// * @param invocation {@link Invocation}
-// * @return Object
-// */
-// @Override
-// public Object intercept(Invocation invocation) {
-// try {
-// MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
-// SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
-// ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
-// DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
-// MqttMessage message = smqttMessage.getMessage();
-// //TODO MQTT动作数据处理
-// List mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
-// if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
-// MqttPublishMessage publishMessage = (MqttPublishMessage) message;
-// HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
-// MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
-// mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier());
-// mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
-// mqttsDeviceAction.setStatus(message.decoderResult().toString());
-// mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
-// mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
-// DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction);
-// DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier()));
-// }
-// // 拦截业务
-// return invocation.proceed(); // 放行
-// } catch (Exception e) {
-// e.printStackTrace();
-// }
-// return null;
-// }
-//
-// /**
-// * 构建消息体
-// *
-// * @param message {@link MqttPublishMessage}
-// * @param timestamp
-// * @return {@link HeapMqttMessage}
-// */
-// private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
-// MqttPublishVariableHeader header = message.variableHeader();
-// MqttFixedHeader fixedHeader = message.fixedHeader();
-// return HeapMqttMessage.builder()
-// .timestamp(timestamp)
-// .clientIdentifier(channel.getClientIdentifier())
-// .message(MessageUtils.copyReleaseByteBuf(message.payload()))
-// .topic(header.topicName())
-// .retain(fixedHeader.isRetain())
-// .qos(fixedHeader.qosLevel().value())
-// .build();
-// }
-//
-// /**
-// * 排序
-// * 值越大权重越高
-// *
-// * @return 排序
-// */
-// @Override
-// public int sort() {
-// return 1;
-// }
-//}
+package net.mqtts.broker.service;
+
+import cn.hutool.core.date.LocalDateTimeUtil;
+import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.config.Configuration;
+import io.github.quickmsg.common.context.ReceiveContext;
+import io.github.quickmsg.common.interceptor.Interceptor;
+import io.github.quickmsg.common.interceptor.Invocation;
+import io.github.quickmsg.common.message.HeapMqttMessage;
+import io.github.quickmsg.common.message.SmqttMessage;
+import io.github.quickmsg.common.rule.DslExecutor;
+import io.github.quickmsg.common.utils.MessageUtils;
+import io.netty.handler.codec.mqtt.*;
+import lombok.extern.slf4j.Slf4j;
+import net.mqtts.link.api.RemoteMqttsDeviceActionService;
+import net.mqtts.link.api.RemoteMqttsDeviceService;
+import net.mqtts.link.api.domain.MqttsDevice;
+import net.mqtts.link.api.domain.MqttsDeviceAction;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @Description: Mqtt 设备动作拦截处理
+ * @Author: ShiHuan Sun
+ * @E-mail: 13733918655@163.com
+ * @Website: http://mqtts.net
+ * @CreateDate: 2021/11/16$ 10:33$
+ * @UpdateUser: ShiHuan Sun
+ * @UpdateDate: 2021/11/16$ 10:33$
+ * @UpdateRemark: 修改内容
+ * @Version: 1.0
+ */
+@Service
+@Slf4j
+@Component
+public class DeviceActionInterceptor implements Interceptor {
+
+ private static DeviceActionInterceptor DeviceActionInterceptor;
+
+ @Autowired
+ private RemoteMqttsDeviceService mqttsDeviceService;
+
+ @Autowired
+ private RemoteMqttsDeviceActionService mqttsDeviceActionService;
+
+
+ @PostConstruct
+ public void init() {
+ DeviceActionInterceptor = this;
+ DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
+ DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService;
+ }
+
+ /**
+ * 拦截目标参数
+ *
+ * @param invocation {@link Invocation}
+ * @return Object
+ */
+ @Override
+ public Object intercept(Invocation invocation) {
+ try {
+ MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
+ SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
+ ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
+ DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
+ MqttMessage message = smqttMessage.getMessage();
+ //TODO MQTT动作数据处理
+ List mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
+ if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
+ MqttPublishMessage publishMessage = (MqttPublishMessage) message;
+ HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
+ MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
+ mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier());
+ mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
+ mqttsDeviceAction.setStatus(message.decoderResult().toString());
+ mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
+ mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
+ DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction);
+ DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier()));
+ }
+ // 拦截业务
+ return invocation.proceed(); // 放行
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * 构建消息体
+ *
+ * @param message {@link MqttPublishMessage}
+ * @param timestamp
+ * @return {@link HeapMqttMessage}
+ */
+ private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
+ MqttPublishVariableHeader header = message.variableHeader();
+ MqttFixedHeader fixedHeader = message.fixedHeader();
+ return HeapMqttMessage.builder()
+ .timestamp(timestamp)
+ .clientIdentifier(channel.getClientIdentifier())
+ .message(MessageUtils.copyReleaseByteBuf(message.payload()))
+ .topic(header.topicName())
+ .retain(fixedHeader.isRetain())
+ .qos(fixedHeader.qosLevel().value())
+ .build();
+ }
+
+ /**
+ * 排序
+ * 值越大权重越高
+ *
+ * @return 排序
+ */
+ @Override
+ public int sort() {
+ return 0;
+ }
+}
diff --git a/mqtts-modules/mqtts-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor b/mqtts-modules/mqtts-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
index 51fcc1f92552a5022193ae6921013f14c5c0b2d1..dc3111147ad96dc82bf24803abcff50f564749cc 100644
--- a/mqtts-modules/mqtts-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
+++ b/mqtts-modules/mqtts-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
@@ -1,3 +1,3 @@
-net.mqtts.broker.service.DemoMessageInterceptor
-net.mqtts.broker.service.DeviceActionInterceptor
-net.mqtts.broker.service.DeviceDatasInterceptor
\ No newline at end of file
+#net.mqtts.broker.service.DemoMessageInterceptor
+#net.mqtts.broker.service.DeviceActionInterceptor
+#net.mqtts.broker.service.DeviceDatasInterceptor
\ No newline at end of file
diff --git a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
index 6a7d59243fdaf2ef0c0b4f114ac75626c445bf8a..342522a97d63481c1f1b1dbc051008b585629549 100644
--- a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
+++ b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
@@ -1,7 +1,17 @@
package net.mqtts.link.common.enums.consumer;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.sun.xml.internal.bind.v2.TODO;
+import lombok.extern.slf4j.Slf4j;
+import net.mqtts.common.log.annotation.Log;
+import net.mqtts.common.log.enums.BusinessType;
+import net.mqtts.link.service.device.MqttsDeviceActionService;
+import net.mqtts.link.service.device.MqttsDeviceDatasService;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
@@ -15,14 +25,32 @@ import org.springframework.stereotype.Component;
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
+@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "mqtts", topic = "mqtts")
public class MqttsDeviceActionMessageConsumer implements RocketMQListener {
+ @Autowired
+ private MqttsDeviceActionService mqttsDeviceActionService;
+ @Autowired
+ private MqttsDeviceDatasService mqttsDeviceDatasService;
@Override
public void onMessage(Object message) {
assert message!=null;
-
System.out.println("Link消费消息"+message);
+ JSONObject mqttsMessage = JSONObject.parseObject((String) message);
+ /**
+ * TODO 设备上下线处理
+ * $event/close 设备断开事件
+ * $event/connect 设备连接事件
+ * ${topic} 其他为业务数据自行处理
+ */
+ if("$event/connect".equals(mqttsMessage.get("topic"))){
+ mqttsDeviceActionService.connectEvent(mqttsMessage.get("msg").toString());
+ }else if("$event/close".equals(mqttsMessage.get("topic"))){
+ mqttsDeviceActionService.closeEvent(mqttsMessage.get("msg").toString());
+ }else {
+ mqttsDeviceDatasService.insertBaseDatas(mqttsMessage.get("msg").toString());
+ }
}
}
diff --git a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceActionService.java b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceActionService.java
index 4f9da220021a92ab28acfab81bbf1f3991a71153..a12be3cff648626f3b6c9523828d98957415fc20 100644
--- a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceActionService.java
+++ b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceActionService.java
@@ -1,11 +1,12 @@
package net.mqtts.link.service.device;
+import com.alibaba.fastjson.JSONObject;
import net.mqtts.link.api.domain.MqttsDeviceAction;
import java.util.List;
/**
- * @Description: java类作用描述
+ * @Description: 设备动作处理接口
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://mqtts.net
@@ -47,4 +48,16 @@ public interface MqttsDeviceActionService {
int batchInsert(List list);
int deleteMqttsDeviceActionByIds(Long[] ids);
+
+ /**
+ * 设备连接事件
+ * @param mqttsMessage
+ */
+ void connectEvent(String mqttsMessage);
+
+ /**
+ * 设备断开事件
+ * @param mqttsMessage
+ */
+ void closeEvent(String mqttsMessage);
}
diff --git a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceDatasService.java b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceDatasService.java
index c32ed17c47d981f14b01024eaf40fb9098515043..e3b1b069a7f9855453b9220c6dddeb196c066bad 100644
--- a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceDatasService.java
+++ b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceDatasService.java
@@ -1,5 +1,6 @@
package net.mqtts.link.service.device;
+import com.alibaba.fastjson.JSONObject;
import net.mqtts.link.api.domain.MqttsDeviceDatas;
import java.util.List;
@@ -47,4 +48,10 @@ public interface MqttsDeviceDatasService {
int batchInsert(List list);
int deleteMqttsDeviceDatasByIds(Long[] ids);
+
+ /**
+ * mqtt基础数据处理
+ * @param mqttsMessage
+ */
+ void insertBaseDatas(String mqttsMessage);
}
diff --git a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceActionServiceImpl.java b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceActionServiceImpl.java
index 1fb0895b28e897384b2b2255487e658710e79ef8..3d39373b44cb4ab4f2775450d156e2a8e6d348e2 100644
--- a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceActionServiceImpl.java
+++ b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceActionServiceImpl.java
@@ -1,15 +1,22 @@
package net.mqtts.link.service.device.impl;
+import cn.hutool.core.date.LocalDateTimeUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.nacos.shaded.com.google.gson.Gson;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import lombok.extern.slf4j.Slf4j;
import net.mqtts.link.api.domain.MqttsDeviceAction;
import net.mqtts.link.mapper.device.MqttsDeviceActionMapper;
import net.mqtts.link.service.device.MqttsDeviceActionService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
- * @Description: java类作用描述
+ * @Description: mqtt上下线动作数据处理
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://mqtts.net
@@ -19,6 +26,7 @@ import java.util.List;
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
+@Slf4j
@Service
public class MqttsDeviceActionServiceImpl implements MqttsDeviceActionService {
@@ -100,4 +108,37 @@ public class MqttsDeviceActionServiceImpl implements MqttsDeviceActionService {
return mqttsDeviceActionMapper.deleteMqttsDeviceActionByIds(ids);
}
+ /**
+ * 设备连接事件
+ *
+ * @param mqttsMessage
+ */
+ @Override
+ public void connectEvent(String mqttsMessage) {
+ Gson gson = new Gson();
+ Map map = new HashMap();
+ map = gson.fromJson(mqttsMessage, map.getClass());
+ log.info(map.toString());
+ /*MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
+ mqttsDeviceAction.setDevice_id(mqttsMessage.get("clientIdentifier").toString());
+ mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
+ mqttsDeviceAction.setStatus(message.decoderResult().toString());
+ mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
+ mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
+ mqttsDeviceActionMapper.insert(mqttsDeviceAction);*/
+ }
+
+ /**
+ * 设备断开事件
+ *
+ * @param mqttsMessage
+ */
+ @Override
+ public void closeEvent(String mqttsMessage) {
+ Gson gson = new Gson();
+ Map map = new HashMap();
+ map = gson.fromJson(mqttsMessage, map.getClass());
+ log.info(map.toString());
+ }
+
}
diff --git a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceDatasServiceImpl.java b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceDatasServiceImpl.java
index 1ec75b1a5fb61a217a04e0a53b7a589c56416377..77394ded546cc593c31db800742eace3ee38ed60 100644
--- a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceDatasServiceImpl.java
+++ b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceDatasServiceImpl.java
@@ -1,16 +1,22 @@
package net.mqtts.link.service.device.impl;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.nacos.shaded.com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
import net.mqtts.link.api.domain.MqttsDeviceDatas;
import net.mqtts.link.mapper.device.MqttsDeviceDatasMapper;
import net.mqtts.link.service.device.MqttsDeviceDatasService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
-* @Description: java类作用描述
+* @Description: mqtt基础业务处理
* @Author: ShiHuan Sun
@@ -29,6 +35,7 @@ import java.util.List;
* @Version: 1.0
*/
+@Slf4j
@Service
public class MqttsDeviceDatasServiceImpl implements MqttsDeviceDatasService{
@@ -110,4 +117,17 @@ public class MqttsDeviceDatasServiceImpl implements MqttsDeviceDatasService{
return mqttsDeviceDatasMapper.deleteMqttsDeviceDatasByIds(ids);
}
+ /**
+ * mqtt基础数据处理
+ *
+ * @param mqttsMessage
+ */
+ @Override
+ public void insertBaseDatas(String mqttsMessage) {
+ Gson gson = new Gson();
+ Map map = new HashMap();
+ map = gson.fromJson(mqttsMessage, map.getClass());
+ log.info(map.toString());
+ }
+
}