提交 35475f65 编写于 作者: xiaonannet's avatar xiaonannet

Link新增消费rocketmq消息

上级 6f717b7b
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
<artifactId>mqtts-common-core</artifactId> <artifactId>mqtts-common-core</artifactId>
<version>${mqtts.version}</version> <version>${mqtts.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>net.mqtts</groupId>
<artifactId>mqtts-common</artifactId>
<version>${mqtts.version}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>mqtts-common-rocketmq</artifactId>
<description>
mqtts-common-rocketmq消息服务
</description>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
<module>mqtts-common-security</module> <module>mqtts-common-security</module>
<module>mqtts-common-datascope</module> <module>mqtts-common-datascope</module>
<module>mqtts-common-datasource</module> <module>mqtts-common-datasource</module>
<module>mqtts-common-rocketmq</module>
</modules> </modules>
<artifactId>mqtts-common</artifactId> <artifactId>mqtts-common</artifactId>
......
package net.mqtts.broker.service; //package net.mqtts.broker.service;
//
import cn.hutool.core.date.LocalDateTimeUtil; //import cn.hutool.core.date.LocalDateTimeUtil;
import io.github.quickmsg.common.channel.MqttChannel; //import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.config.Configuration; //import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.context.ReceiveContext; //import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.interceptor.Interceptor; //import io.github.quickmsg.common.interceptor.Interceptor;
import io.github.quickmsg.common.interceptor.Invocation; //import io.github.quickmsg.common.interceptor.Invocation;
import io.github.quickmsg.common.message.HeapMqttMessage; //import io.github.quickmsg.common.message.HeapMqttMessage;
import io.github.quickmsg.common.message.SmqttMessage; //import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.rule.DslExecutor; //import io.github.quickmsg.common.rule.DslExecutor;
import io.github.quickmsg.common.utils.MessageUtils; //import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.handler.codec.mqtt.*; //import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import net.mqtts.link.api.RemoteMqttsDeviceActionService; //import net.mqtts.link.api.RemoteMqttsDeviceActionService;
import net.mqtts.link.api.RemoteMqttsDeviceService; //import net.mqtts.link.api.RemoteMqttsDeviceService;
import net.mqtts.link.api.domain.MqttsDevice; //import net.mqtts.link.api.domain.MqttsDevice;
import net.mqtts.link.api.domain.MqttsDeviceAction; //import net.mqtts.link.api.domain.MqttsDeviceAction;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; //import org.springframework.stereotype.Service;
//
import javax.annotation.PostConstruct; //import javax.annotation.PostConstruct;
import java.util.Arrays; //import java.util.Arrays;
import java.util.List; //import java.util.List;
//
/** ///**
* @Description: Mqtt 设备动作拦截处理 // * @Description: Mqtt 设备动作拦截处理
* @Author: ShiHuan Sun // * @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com // * @E-mail: 13733918655@163.com
* @Website: http://mqtts.net // * @Website: http://mqtts.net
* @CreateDate: 2021/11/16$ 10:33$ // * @CreateDate: 2021/11/16$ 10:33$
* @UpdateUser: ShiHuan Sun // * @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/16$ 10:33$ // * @UpdateDate: 2021/11/16$ 10:33$
* @UpdateRemark: 修改内容 // * @UpdateRemark: 修改内容
* @Version: 1.0 // * @Version: 1.0
*/ // */
@Service //@Service
@Slf4j //@Slf4j
@Component //@Component
public class DeviceActionInterceptor implements Interceptor { //public class DeviceActionInterceptor implements Interceptor {
//
private static DeviceActionInterceptor DeviceActionInterceptor; // private static DeviceActionInterceptor DeviceActionInterceptor;
//
@Autowired // @Autowired
private RemoteMqttsDeviceService mqttsDeviceService; // private RemoteMqttsDeviceService mqttsDeviceService;
//
@Autowired // @Autowired
private RemoteMqttsDeviceActionService mqttsDeviceActionService; // private RemoteMqttsDeviceActionService mqttsDeviceActionService;
//
//
@PostConstruct // @PostConstruct
public void init() { // public void init() {
DeviceActionInterceptor = this; // DeviceActionInterceptor = this;
DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService; // DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService; // DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService;
} // }
//
/** // /**
* 拦截目标参数 // * 拦截目标参数
* // *
* @param invocation {@link Invocation} // * @param invocation {@link Invocation}
* @return Object // * @return Object
*/ // */
@Override // @Override
public Object intercept(Invocation invocation) { // public Object intercept(Invocation invocation) {
try { // try {
MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0]; // MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1]; // SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1];
ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2]; // ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2];
DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor(); // DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
MqttMessage message = smqttMessage.getMessage(); // MqttMessage message = smqttMessage.getMessage();
//TODO MQTT动作数据处理 // //TODO MQTT动作数据处理
List<MqttMessageType> mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE); // List<MqttMessageType> mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) { // if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
MqttPublishMessage publishMessage = (MqttPublishMessage) message; // MqttPublishMessage publishMessage = (MqttPublishMessage) message;
HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp()); // HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction(); // MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier()); // mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier());
mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString()); // mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
mqttsDeviceAction.setStatus(message.decoderResult().toString()); // mqttsDeviceAction.setStatus(message.decoderResult().toString());
mqttsDeviceAction.setMessage(heapMqttMessage.getTopic()); // mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now()); // mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction); // DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction);
DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier())); // DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier()));
} // }
// 拦截业务 // // 拦截业务
return invocation.proceed(); // 放行 // return invocation.proceed(); // 放行
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
return null; // return null;
} // }
//
/** // /**
* 构建消息体 // * 构建消息体
* // *
* @param message {@link MqttPublishMessage} // * @param message {@link MqttPublishMessage}
* @param timestamp // * @param timestamp
* @return {@link HeapMqttMessage} // * @return {@link HeapMqttMessage}
*/ // */
private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) { // private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
MqttPublishVariableHeader header = message.variableHeader(); // MqttPublishVariableHeader header = message.variableHeader();
MqttFixedHeader fixedHeader = message.fixedHeader(); // MqttFixedHeader fixedHeader = message.fixedHeader();
return HeapMqttMessage.builder() // return HeapMqttMessage.builder()
.timestamp(timestamp) // .timestamp(timestamp)
.clientIdentifier(channel.getClientIdentifier()) // .clientIdentifier(channel.getClientIdentifier())
.message(MessageUtils.copyReleaseByteBuf(message.payload())) // .message(MessageUtils.copyReleaseByteBuf(message.payload()))
.topic(header.topicName()) // .topic(header.topicName())
.retain(fixedHeader.isRetain()) // .retain(fixedHeader.isRetain())
.qos(fixedHeader.qosLevel().value()) // .qos(fixedHeader.qosLevel().value())
.build(); // .build();
} // }
//
/** // /**
* 排序 // * 排序
* 值越大权重越高 // * 值越大权重越高
* // *
* @return 排序 // * @return 排序
*/ // */
@Override // @Override
public int sort() { // public int sort() {
return 1; // return 1;
} // }
} //}
package net.mqtts.broker.service; //package net.mqtts.broker.service;
//
import cn.hutool.core.date.LocalDateTimeUtil; //import cn.hutool.core.date.LocalDateTimeUtil;
import io.github.quickmsg.common.channel.MqttChannel; //import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.config.Configuration; //import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.context.ReceiveContext; //import io.github.quickmsg.common.context.ReceiveContext;
import io.github.quickmsg.common.interceptor.Interceptor; //import io.github.quickmsg.common.interceptor.Interceptor;
import io.github.quickmsg.common.interceptor.Invocation; //import io.github.quickmsg.common.interceptor.Invocation;
import io.github.quickmsg.common.message.HeapMqttMessage; //import io.github.quickmsg.common.message.HeapMqttMessage;
import io.github.quickmsg.common.message.SmqttMessage; //import io.github.quickmsg.common.message.SmqttMessage;
import io.github.quickmsg.common.rule.DslExecutor; //import io.github.quickmsg.common.rule.DslExecutor;
import io.github.quickmsg.common.utils.MessageUtils; //import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.handler.codec.mqtt.*; //import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import net.mqtts.link.api.RemoteMqttsDeviceDatasService; //import net.mqtts.link.api.RemoteMqttsDeviceDatasService;
import net.mqtts.link.api.domain.MqttsDeviceDatas; //import net.mqtts.link.api.domain.MqttsDeviceDatas;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; //import org.springframework.stereotype.Service;
//
import javax.annotation.PostConstruct; //import javax.annotation.PostConstruct;
//
/** ///**
* @Description: mqtt消息拦截处理 // * @Description: mqtt消息拦截处理
* @Author: ShiHuan Sun // * @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com // * @E-mail: 13733918655@163.com
* @Website: http://mqtts.net // * @Website: http://mqtts.net
* @CreateDate: 2021/11/19$ 21:25$ // * @CreateDate: 2021/11/19$ 21:25$
* @UpdateUser: ShiHuan Sun // * @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/19$ 21:25$ // * @UpdateDate: 2021/11/19$ 21:25$
* @UpdateRemark: 修改内容 // * @UpdateRemark: 修改内容
* @Version: 1.0 // * @Version: 1.0
*/ // */
@Service //@Service
@Slf4j //@Slf4j
@Component //@Component
public class DeviceDatasInterceptor implements Interceptor { //public class DeviceDatasInterceptor implements Interceptor {
//
private static DeviceDatasInterceptor DeviceDatasInterceptor; // private static DeviceDatasInterceptor DeviceDatasInterceptor;
//
@Autowired // @Autowired
private RemoteMqttsDeviceDatasService remoteMqttsDeviceDatasService; // private RemoteMqttsDeviceDatasService remoteMqttsDeviceDatasService;
//
//
@PostConstruct // @PostConstruct
public void init() { // public void init() {
DeviceDatasInterceptor = this; // DeviceDatasInterceptor = this;
DeviceDatasInterceptor.remoteMqttsDeviceDatasService = this.remoteMqttsDeviceDatasService; // DeviceDatasInterceptor.remoteMqttsDeviceDatasService = this.remoteMqttsDeviceDatasService;
} // }
//
/** // /**
* 拦截目标参数 // * 拦截目标参数
* // *
* @param invocation {@link Invocation} // * @param invocation {@link Invocation}
* @return Object // * @return Object
*/ // */
@Override // @Override
public Object intercept(Invocation invocation) { // public Object intercept(Invocation invocation) {
try { // try {
MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0]; // MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1]; // SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1];
ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2]; // ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2];
DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor(); // DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
MqttMessage message = smqttMessage.getMessage(); // MqttMessage message = smqttMessage.getMessage();
//TODO 发布消息类型处理(业务数据) // //TODO 发布消息类型处理(业务数据)
if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage && message.fixedHeader().messageType() == MqttMessageType.PUBLISH) { // if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage && message.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
MqttPublishMessage publishMessage = (MqttPublishMessage) message; // MqttPublishMessage publishMessage = (MqttPublishMessage) message;
HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp()); // HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
log.info("Topic->{}" + heapMqttMessage.getTopic() + "Message->{}" + new String(heapMqttMessage.getMessage())); // log.info("Topic->{}" + heapMqttMessage.getTopic() + "Message->{}" + new String(heapMqttMessage.getMessage()));
MqttsDeviceDatas mqttsDeviceDatas = new MqttsDeviceDatas(); // MqttsDeviceDatas mqttsDeviceDatas = new MqttsDeviceDatas();
mqttsDeviceDatas.setDevice_id(heapMqttMessage.getClientIdentifier()); // mqttsDeviceDatas.setDevice_id(heapMqttMessage.getClientIdentifier());
mqttsDeviceDatas.setTopic(heapMqttMessage.getTopic()); // mqttsDeviceDatas.setTopic(heapMqttMessage.getTopic());
mqttsDeviceDatas.setMessage_id(String.valueOf(heapMqttMessage.getTimestamp())); // mqttsDeviceDatas.setMessage_id(String.valueOf(heapMqttMessage.getTimestamp()));
mqttsDeviceDatas.setMessage(new String(heapMqttMessage.getMessage(), "UTF-8").trim()); // mqttsDeviceDatas.setMessage(new String(heapMqttMessage.getMessage(), "UTF-8").trim());
mqttsDeviceDatas.setStatus(message.decoderResult().toString()); // mqttsDeviceDatas.setStatus(message.decoderResult().toString());
mqttsDeviceDatas.setCreate_time(LocalDateTimeUtil.now()); // mqttsDeviceDatas.setCreate_time(LocalDateTimeUtil.now());
DeviceDatasInterceptor.remoteMqttsDeviceDatasService.add(mqttsDeviceDatas); // DeviceDatasInterceptor.remoteMqttsDeviceDatasService.add(mqttsDeviceDatas);
/* if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) { // /* if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) {
mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe(); // mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
} // }
if (dslExecutor.isExecute()) { // if (dslExecutor.isExecute()) {
dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext); // dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext);
}*/ // }*/
} // }
return invocation.proceed(); // 放行 // return invocation.proceed(); // 放行
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} // }
return null; // return null;
} // }
//
//
/** // /**
* 构建消息体 // * 构建消息体
* // *
* @param message {@link MqttPublishMessage} // * @param message {@link MqttPublishMessage}
* @param timestamp // * @param timestamp
* @return {@link HeapMqttMessage} // * @return {@link HeapMqttMessage}
*/ // */
private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) { // private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
MqttPublishVariableHeader header = message.variableHeader(); // MqttPublishVariableHeader header = message.variableHeader();
MqttFixedHeader fixedHeader = message.fixedHeader(); // MqttFixedHeader fixedHeader = message.fixedHeader();
return HeapMqttMessage.builder() // return HeapMqttMessage.builder()
.timestamp(timestamp) // .timestamp(timestamp)
.clientIdentifier(channel.getClientIdentifier()) // .clientIdentifier(channel.getClientIdentifier())
.message(MessageUtils.copyReleaseByteBuf(message.payload())) // .message(MessageUtils.copyReleaseByteBuf(message.payload()))
.topic(header.topicName()) // .topic(header.topicName())
.retain(fixedHeader.isRetain()) // .retain(fixedHeader.isRetain())
.qos(fixedHeader.qosLevel().value()) // .qos(fixedHeader.qosLevel().value())
.build(); // .build();
} // }
//
/** // /**
* 排序 // * 排序
* 值越大权重越高 // * 值越大权重越高
* // *
* @return 排序 // * @return 排序
*/ // */
@Override // @Override
public int sort() { // public int sort() {
return 0; // return 0;
} // }
} //}
...@@ -89,6 +89,13 @@ ...@@ -89,6 +89,13 @@
<version>${mqtts.version}</version> <version>${mqtts.version}</version>
</dependency> </dependency>
<!-- mqtts Common Rocketmq -->
<dependency>
<groupId>net.mqtts</groupId>
<artifactId>mqtts-common-rocketmq</artifactId>
<version>${mqtts.version}</version>
</dependency>
<dependency> <dependency>
<groupId>net.mqtts</groupId> <groupId>net.mqtts</groupId>
<artifactId>mqtts-api-link</artifactId> <artifactId>mqtts-api-link</artifactId>
......
package net.mqtts.link.common.enums.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @Description: Mqtt动作消息消费(Rocketmq模式)
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://mqtts.net
* @CreateDate: 2021/11/22$ 16:11$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/22$ 16:11$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Component
@RocketMQMessageListener(consumerGroup = "mqtts", topic = "mqtts")
public class MqttsDeviceActionMessageConsumer implements RocketMQListener {
@Override
public void onMessage(Object message) {
assert message!=null;
System.out.println("Link消费消息"+message);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册