From 35475f6578eb0282ff2ef7750b7b1db2db403b53 Mon Sep 17 00:00:00 2001
From: sunshihuan <13733918655@163.com>
Date: Mon, 22 Nov 2021 16:43:39 +0800
Subject: [PATCH] =?UTF-8?q?Link=E6=96=B0=E5=A2=9E=E6=B6=88=E8=B4=B9rocketm?=
=?UTF-8?q?q=E6=B6=88=E6=81=AF?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
mqtts-api/mqtts-api-link/pom.xml | 1 +
mqtts-common/mqtts-common-rocketmq/pom.xml | 29 ++
mqtts-common/pom.xml | 1 +
.../service/DeviceActionInterceptor.java | 250 +++++++++---------
.../service/DeviceDatasInterceptor.java | 246 ++++++++---------
mqtts-modules/mqtts-modules-link/pom.xml | 7 +
.../MqttsDeviceActionMessageConsumer.java | 28 ++
7 files changed, 314 insertions(+), 248 deletions(-)
create mode 100644 mqtts-common/mqtts-common-rocketmq/pom.xml
create mode 100644 mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
diff --git a/mqtts-api/mqtts-api-link/pom.xml b/mqtts-api/mqtts-api-link/pom.xml
index 98af5de..346f30b 100644
--- a/mqtts-api/mqtts-api-link/pom.xml
+++ b/mqtts-api/mqtts-api-link/pom.xml
@@ -24,6 +24,7 @@
mqtts-common-core
${mqtts.version}
+
org.projectlombok
lombok
diff --git a/mqtts-common/mqtts-common-rocketmq/pom.xml b/mqtts-common/mqtts-common-rocketmq/pom.xml
new file mode 100644
index 0000000..772229a
--- /dev/null
+++ b/mqtts-common/mqtts-common-rocketmq/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+ net.mqtts
+ mqtts-common
+ ${mqtts.version}
+
+ 4.0.0
+
+ mqtts-common-rocketmq
+
+
+ mqtts-common-rocketmq消息服务
+
+
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ 2.2.1
+
+
+
+
+
\ No newline at end of file
diff --git a/mqtts-common/pom.xml b/mqtts-common/pom.xml
index c24ea40..33aa01a 100644
--- a/mqtts-common/pom.xml
+++ b/mqtts-common/pom.xml
@@ -16,6 +16,7 @@
mqtts-common-security
mqtts-common-datascope
mqtts-common-datasource
+ mqtts-common-rocketmq
mqtts-common
diff --git a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
index 0edf305..515880e 100644
--- a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
+++ b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
@@ -1,125 +1,125 @@
-package net.mqtts.broker.service;
-
-import cn.hutool.core.date.LocalDateTimeUtil;
-import io.github.quickmsg.common.channel.MqttChannel;
-import io.github.quickmsg.common.config.Configuration;
-import io.github.quickmsg.common.context.ReceiveContext;
-import io.github.quickmsg.common.interceptor.Interceptor;
-import io.github.quickmsg.common.interceptor.Invocation;
-import io.github.quickmsg.common.message.HeapMqttMessage;
-import io.github.quickmsg.common.message.SmqttMessage;
-import io.github.quickmsg.common.rule.DslExecutor;
-import io.github.quickmsg.common.utils.MessageUtils;
-import io.netty.handler.codec.mqtt.*;
-import lombok.extern.slf4j.Slf4j;
-import net.mqtts.link.api.RemoteMqttsDeviceActionService;
-import net.mqtts.link.api.RemoteMqttsDeviceService;
-import net.mqtts.link.api.domain.MqttsDevice;
-import net.mqtts.link.api.domain.MqttsDeviceAction;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * @Description: Mqtt 设备动作拦截处理
- * @Author: ShiHuan Sun
- * @E-mail: 13733918655@163.com
- * @Website: http://mqtts.net
- * @CreateDate: 2021/11/16$ 10:33$
- * @UpdateUser: ShiHuan Sun
- * @UpdateDate: 2021/11/16$ 10:33$
- * @UpdateRemark: 修改内容
- * @Version: 1.0
- */
-@Service
-@Slf4j
-@Component
-public class DeviceActionInterceptor implements Interceptor {
-
- private static DeviceActionInterceptor DeviceActionInterceptor;
-
- @Autowired
- private RemoteMqttsDeviceService mqttsDeviceService;
-
- @Autowired
- private RemoteMqttsDeviceActionService mqttsDeviceActionService;
-
-
- @PostConstruct
- public void init() {
- DeviceActionInterceptor = this;
- DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
- DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService;
- }
-
- /**
- * 拦截目标参数
- *
- * @param invocation {@link Invocation}
- * @return Object
- */
- @Override
- public Object intercept(Invocation invocation) {
- try {
- MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
- SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
- ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
- DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
- MqttMessage message = smqttMessage.getMessage();
- //TODO MQTT动作数据处理
- List mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
- if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
- MqttPublishMessage publishMessage = (MqttPublishMessage) message;
- HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
- MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
- mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier());
- mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
- mqttsDeviceAction.setStatus(message.decoderResult().toString());
- mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
- mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
- DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction);
- DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier()));
- }
- // 拦截业务
- return invocation.proceed(); // 放行
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- /**
- * 构建消息体
- *
- * @param message {@link MqttPublishMessage}
- * @param timestamp
- * @return {@link HeapMqttMessage}
- */
- private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
- MqttPublishVariableHeader header = message.variableHeader();
- MqttFixedHeader fixedHeader = message.fixedHeader();
- return HeapMqttMessage.builder()
- .timestamp(timestamp)
- .clientIdentifier(channel.getClientIdentifier())
- .message(MessageUtils.copyReleaseByteBuf(message.payload()))
- .topic(header.topicName())
- .retain(fixedHeader.isRetain())
- .qos(fixedHeader.qosLevel().value())
- .build();
- }
-
- /**
- * 排序
- * 值越大权重越高
- *
- * @return 排序
- */
- @Override
- public int sort() {
- return 1;
- }
-}
+//package net.mqtts.broker.service;
+//
+//import cn.hutool.core.date.LocalDateTimeUtil;
+//import io.github.quickmsg.common.channel.MqttChannel;
+//import io.github.quickmsg.common.config.Configuration;
+//import io.github.quickmsg.common.context.ReceiveContext;
+//import io.github.quickmsg.common.interceptor.Interceptor;
+//import io.github.quickmsg.common.interceptor.Invocation;
+//import io.github.quickmsg.common.message.HeapMqttMessage;
+//import io.github.quickmsg.common.message.SmqttMessage;
+//import io.github.quickmsg.common.rule.DslExecutor;
+//import io.github.quickmsg.common.utils.MessageUtils;
+//import io.netty.handler.codec.mqtt.*;
+//import lombok.extern.slf4j.Slf4j;
+//import net.mqtts.link.api.RemoteMqttsDeviceActionService;
+//import net.mqtts.link.api.RemoteMqttsDeviceService;
+//import net.mqtts.link.api.domain.MqttsDevice;
+//import net.mqtts.link.api.domain.MqttsDeviceAction;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//import org.springframework.stereotype.Service;
+//
+//import javax.annotation.PostConstruct;
+//import java.util.Arrays;
+//import java.util.List;
+//
+///**
+// * @Description: Mqtt 设备动作拦截处理
+// * @Author: ShiHuan Sun
+// * @E-mail: 13733918655@163.com
+// * @Website: http://mqtts.net
+// * @CreateDate: 2021/11/16$ 10:33$
+// * @UpdateUser: ShiHuan Sun
+// * @UpdateDate: 2021/11/16$ 10:33$
+// * @UpdateRemark: 修改内容
+// * @Version: 1.0
+// */
+//@Service
+//@Slf4j
+//@Component
+//public class DeviceActionInterceptor implements Interceptor {
+//
+// private static DeviceActionInterceptor DeviceActionInterceptor;
+//
+// @Autowired
+// private RemoteMqttsDeviceService mqttsDeviceService;
+//
+// @Autowired
+// private RemoteMqttsDeviceActionService mqttsDeviceActionService;
+//
+//
+// @PostConstruct
+// public void init() {
+// DeviceActionInterceptor = this;
+// DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
+// DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService;
+// }
+//
+// /**
+// * 拦截目标参数
+// *
+// * @param invocation {@link Invocation}
+// * @return Object
+// */
+// @Override
+// public Object intercept(Invocation invocation) {
+// try {
+// MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
+// SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
+// ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
+// DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
+// MqttMessage message = smqttMessage.getMessage();
+// //TODO MQTT动作数据处理
+// List mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
+// if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
+// MqttPublishMessage publishMessage = (MqttPublishMessage) message;
+// HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
+// MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
+// mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier());
+// mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
+// mqttsDeviceAction.setStatus(message.decoderResult().toString());
+// mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
+// mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
+// DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction);
+// DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier()));
+// }
+// // 拦截业务
+// return invocation.proceed(); // 放行
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// return null;
+// }
+//
+// /**
+// * 构建消息体
+// *
+// * @param message {@link MqttPublishMessage}
+// * @param timestamp
+// * @return {@link HeapMqttMessage}
+// */
+// private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
+// MqttPublishVariableHeader header = message.variableHeader();
+// MqttFixedHeader fixedHeader = message.fixedHeader();
+// return HeapMqttMessage.builder()
+// .timestamp(timestamp)
+// .clientIdentifier(channel.getClientIdentifier())
+// .message(MessageUtils.copyReleaseByteBuf(message.payload()))
+// .topic(header.topicName())
+// .retain(fixedHeader.isRetain())
+// .qos(fixedHeader.qosLevel().value())
+// .build();
+// }
+//
+// /**
+// * 排序
+// * 值越大权重越高
+// *
+// * @return 排序
+// */
+// @Override
+// public int sort() {
+// return 1;
+// }
+//}
diff --git a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceDatasInterceptor.java b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceDatasInterceptor.java
index 744540c..3fdd797 100644
--- a/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceDatasInterceptor.java
+++ b/mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceDatasInterceptor.java
@@ -1,123 +1,123 @@
-package net.mqtts.broker.service;
-
-import cn.hutool.core.date.LocalDateTimeUtil;
-import io.github.quickmsg.common.channel.MqttChannel;
-import io.github.quickmsg.common.config.Configuration;
-import io.github.quickmsg.common.context.ReceiveContext;
-import io.github.quickmsg.common.interceptor.Interceptor;
-import io.github.quickmsg.common.interceptor.Invocation;
-import io.github.quickmsg.common.message.HeapMqttMessage;
-import io.github.quickmsg.common.message.SmqttMessage;
-import io.github.quickmsg.common.rule.DslExecutor;
-import io.github.quickmsg.common.utils.MessageUtils;
-import io.netty.handler.codec.mqtt.*;
-import lombok.extern.slf4j.Slf4j;
-import net.mqtts.link.api.RemoteMqttsDeviceDatasService;
-import net.mqtts.link.api.domain.MqttsDeviceDatas;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-
-/**
- * @Description: mqtt消息拦截处理
- * @Author: ShiHuan Sun
- * @E-mail: 13733918655@163.com
- * @Website: http://mqtts.net
- * @CreateDate: 2021/11/19$ 21:25$
- * @UpdateUser: ShiHuan Sun
- * @UpdateDate: 2021/11/19$ 21:25$
- * @UpdateRemark: 修改内容
- * @Version: 1.0
- */
-@Service
-@Slf4j
-@Component
-public class DeviceDatasInterceptor implements Interceptor {
-
- private static DeviceDatasInterceptor DeviceDatasInterceptor;
-
- @Autowired
- private RemoteMqttsDeviceDatasService remoteMqttsDeviceDatasService;
-
-
- @PostConstruct
- public void init() {
- DeviceDatasInterceptor = this;
- DeviceDatasInterceptor.remoteMqttsDeviceDatasService = this.remoteMqttsDeviceDatasService;
- }
-
- /**
- * 拦截目标参数
- *
- * @param invocation {@link Invocation}
- * @return Object
- */
- @Override
- public Object intercept(Invocation invocation) {
- try {
- MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
- SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
- ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
- DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
- MqttMessage message = smqttMessage.getMessage();
- //TODO 发布消息类型处理(业务数据)
- if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage && message.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
- MqttPublishMessage publishMessage = (MqttPublishMessage) message;
- HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
- log.info("Topic->{}" + heapMqttMessage.getTopic() + "Message->{}" + new String(heapMqttMessage.getMessage()));
- MqttsDeviceDatas mqttsDeviceDatas = new MqttsDeviceDatas();
- mqttsDeviceDatas.setDevice_id(heapMqttMessage.getClientIdentifier());
- mqttsDeviceDatas.setTopic(heapMqttMessage.getTopic());
- mqttsDeviceDatas.setMessage_id(String.valueOf(heapMqttMessage.getTimestamp()));
- mqttsDeviceDatas.setMessage(new String(heapMqttMessage.getMessage(), "UTF-8").trim());
- mqttsDeviceDatas.setStatus(message.decoderResult().toString());
- mqttsDeviceDatas.setCreate_time(LocalDateTimeUtil.now());
- DeviceDatasInterceptor.remoteMqttsDeviceDatasService.add(mqttsDeviceDatas);
- /* if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) {
- mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
- }
- if (dslExecutor.isExecute()) {
- dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext);
- }*/
- }
- return invocation.proceed(); // 放行
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
-
- /**
- * 构建消息体
- *
- * @param message {@link MqttPublishMessage}
- * @param timestamp
- * @return {@link HeapMqttMessage}
- */
- private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
- MqttPublishVariableHeader header = message.variableHeader();
- MqttFixedHeader fixedHeader = message.fixedHeader();
- return HeapMqttMessage.builder()
- .timestamp(timestamp)
- .clientIdentifier(channel.getClientIdentifier())
- .message(MessageUtils.copyReleaseByteBuf(message.payload()))
- .topic(header.topicName())
- .retain(fixedHeader.isRetain())
- .qos(fixedHeader.qosLevel().value())
- .build();
- }
-
- /**
- * 排序
- * 值越大权重越高
- *
- * @return 排序
- */
- @Override
- public int sort() {
- return 0;
- }
-}
+//package net.mqtts.broker.service;
+//
+//import cn.hutool.core.date.LocalDateTimeUtil;
+//import io.github.quickmsg.common.channel.MqttChannel;
+//import io.github.quickmsg.common.config.Configuration;
+//import io.github.quickmsg.common.context.ReceiveContext;
+//import io.github.quickmsg.common.interceptor.Interceptor;
+//import io.github.quickmsg.common.interceptor.Invocation;
+//import io.github.quickmsg.common.message.HeapMqttMessage;
+//import io.github.quickmsg.common.message.SmqttMessage;
+//import io.github.quickmsg.common.rule.DslExecutor;
+//import io.github.quickmsg.common.utils.MessageUtils;
+//import io.netty.handler.codec.mqtt.*;
+//import lombok.extern.slf4j.Slf4j;
+//import net.mqtts.link.api.RemoteMqttsDeviceDatasService;
+//import net.mqtts.link.api.domain.MqttsDeviceDatas;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.stereotype.Component;
+//import org.springframework.stereotype.Service;
+//
+//import javax.annotation.PostConstruct;
+//
+///**
+// * @Description: mqtt消息拦截处理
+// * @Author: ShiHuan Sun
+// * @E-mail: 13733918655@163.com
+// * @Website: http://mqtts.net
+// * @CreateDate: 2021/11/19$ 21:25$
+// * @UpdateUser: ShiHuan Sun
+// * @UpdateDate: 2021/11/19$ 21:25$
+// * @UpdateRemark: 修改内容
+// * @Version: 1.0
+// */
+//@Service
+//@Slf4j
+//@Component
+//public class DeviceDatasInterceptor implements Interceptor {
+//
+// private static DeviceDatasInterceptor DeviceDatasInterceptor;
+//
+// @Autowired
+// private RemoteMqttsDeviceDatasService remoteMqttsDeviceDatasService;
+//
+//
+// @PostConstruct
+// public void init() {
+// DeviceDatasInterceptor = this;
+// DeviceDatasInterceptor.remoteMqttsDeviceDatasService = this.remoteMqttsDeviceDatasService;
+// }
+//
+// /**
+// * 拦截目标参数
+// *
+// * @param invocation {@link Invocation}
+// * @return Object
+// */
+// @Override
+// public Object intercept(Invocation invocation) {
+// try {
+// MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
+// SmqttMessage smqttMessage = (SmqttMessage) invocation.getArgs()[1];
+// ReceiveContext mqttReceiveContext = (ReceiveContext) invocation.getArgs()[2];
+// DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
+// MqttMessage message = smqttMessage.getMessage();
+// //TODO 发布消息类型处理(业务数据)
+// if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage && message.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
+// MqttPublishMessage publishMessage = (MqttPublishMessage) message;
+// HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
+// log.info("Topic->{}" + heapMqttMessage.getTopic() + "Message->{}" + new String(heapMqttMessage.getMessage()));
+// MqttsDeviceDatas mqttsDeviceDatas = new MqttsDeviceDatas();
+// mqttsDeviceDatas.setDevice_id(heapMqttMessage.getClientIdentifier());
+// mqttsDeviceDatas.setTopic(heapMqttMessage.getTopic());
+// mqttsDeviceDatas.setMessage_id(String.valueOf(heapMqttMessage.getTimestamp()));
+// mqttsDeviceDatas.setMessage(new String(heapMqttMessage.getMessage(), "UTF-8").trim());
+// mqttsDeviceDatas.setStatus(message.decoderResult().toString());
+// mqttsDeviceDatas.setCreate_time(LocalDateTimeUtil.now());
+// DeviceDatasInterceptor.remoteMqttsDeviceDatasService.add(mqttsDeviceDatas);
+// /* if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) {
+// mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
+// }
+// if (dslExecutor.isExecute()) {
+// dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext);
+// }*/
+// }
+// return invocation.proceed(); // 放行
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// return null;
+// }
+//
+//
+// /**
+// * 构建消息体
+// *
+// * @param message {@link MqttPublishMessage}
+// * @param timestamp
+// * @return {@link HeapMqttMessage}
+// */
+// private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
+// MqttPublishVariableHeader header = message.variableHeader();
+// MqttFixedHeader fixedHeader = message.fixedHeader();
+// return HeapMqttMessage.builder()
+// .timestamp(timestamp)
+// .clientIdentifier(channel.getClientIdentifier())
+// .message(MessageUtils.copyReleaseByteBuf(message.payload()))
+// .topic(header.topicName())
+// .retain(fixedHeader.isRetain())
+// .qos(fixedHeader.qosLevel().value())
+// .build();
+// }
+//
+// /**
+// * 排序
+// * 值越大权重越高
+// *
+// * @return 排序
+// */
+// @Override
+// public int sort() {
+// return 0;
+// }
+//}
diff --git a/mqtts-modules/mqtts-modules-link/pom.xml b/mqtts-modules/mqtts-modules-link/pom.xml
index a8133de..e4296e1 100644
--- a/mqtts-modules/mqtts-modules-link/pom.xml
+++ b/mqtts-modules/mqtts-modules-link/pom.xml
@@ -89,6 +89,13 @@
${mqtts.version}
+
+
+ net.mqtts
+ mqtts-common-rocketmq
+ ${mqtts.version}
+
+
net.mqtts
mqtts-api-link
diff --git a/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
new file mode 100644
index 0000000..6a7d592
--- /dev/null
+++ b/mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
@@ -0,0 +1,28 @@
+package net.mqtts.link.common.enums.consumer;
+
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description: Mqtt动作消息消费(Rocketmq模式)
+ * @Author: ShiHuan Sun
+ * @E-mail: 13733918655@163.com
+ * @Website: http://mqtts.net
+ * @CreateDate: 2021/11/22$ 16:11$
+ * @UpdateUser: ShiHuan Sun
+ * @UpdateDate: 2021/11/22$ 16:11$
+ * @UpdateRemark: 修改内容
+ * @Version: 1.0
+ */
+@Component
+@RocketMQMessageListener(consumerGroup = "mqtts", topic = "mqtts")
+public class MqttsDeviceActionMessageConsumer implements RocketMQListener {
+
+ @Override
+ public void onMessage(Object message) {
+ assert message!=null;
+
+ System.out.println("Link消费消息"+message);
+ }
+}
--
GitLab