Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
mqttsnet
thinglinks
提交
2fa7d2c8
thinglinks
项目概览
mqttsnet
/
thinglinks
通知
1
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
thinglinks
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
2fa7d2c8
编写于
11月 22, 2021
作者:
xiaonannet
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
link、broker交互方式更改
上级
35475f65
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
345 addition
and
229 deletion
+345
-229
mqtts-common/mqtts-common-core/pom.xml
mqtts-common/mqtts-common-core/pom.xml
+7
-0
mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DemoMessageInterceptor.java
...java/net/mqtts/broker/service/DemoMessageInterceptor.java
+97
-97
mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
...ava/net/mqtts/broker/service/DeviceActionInterceptor.java
+125
-125
mqtts-modules/mqtts-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
...ervices/io.github.quickmsg.common.interceptor.Interceptor
+3
-3
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
...mmon/enums/consumer/MqttsDeviceActionMessageConsumer.java
+29
-1
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceActionService.java
...t/mqtts/link/service/device/MqttsDeviceActionService.java
+14
-1
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceDatasService.java
...et/mqtts/link/service/device/MqttsDeviceDatasService.java
+7
-0
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceActionServiceImpl.java
...ink/service/device/impl/MqttsDeviceActionServiceImpl.java
+42
-1
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceDatasServiceImpl.java
...link/service/device/impl/MqttsDeviceDatasServiceImpl.java
+21
-1
未找到文件。
mqtts-common/mqtts-common-core/pom.xml
浏览文件 @
2fa7d2c8
...
...
@@ -114,6 +114,13 @@
<version>
5.7.14
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>
com.google.code.gson
</groupId>
<artifactId>
gson
</artifactId>
<version>
2.8.9
</version>
</dependency>
</dependencies>
</project>
mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DemoMessageInterceptor.java
浏览文件 @
2fa7d2c8
package
net.mqtts.broker.service
;
import
io.github.quickmsg.common.channel.MqttChannel
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ReceiveContext
;
import
io.github.quickmsg.common.interceptor.Interceptor
;
import
io.github.quickmsg.common.interceptor.Invocation
;
import
io.github.quickmsg.common.message.HeapMqttMessage
;
import
io.github.quickmsg.common.message.SmqttMessage
;
import
io.github.quickmsg.common.rule.DslExecutor
;
import
io.github.quickmsg.common.utils.MessageUtils
;
import
io.netty.handler.codec.mqtt.MqttFixedHeader
;
import
io.netty.handler.codec.mqtt.MqttMessage
;
import
io.netty.handler.codec.mqtt.MqttPublishMessage
;
import
io.netty.handler.codec.mqtt.MqttPublishVariableHeader
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Service
;
import
reactor.core.scheduler.Schedulers
;
/**
* @Description: mqtt消息拦截器示例
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @CreateDate: 2021/11/3$ 18:47$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/3$ 18:47$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Service
@Slf4j
@Component
public
class
DemoMessageInterceptor
implements
Interceptor
{
/**
* 拦截目标参数
*
* @param invocation {@link Invocation}
* @return Object
*/
@Override
public
Object
intercept
(
Invocation
invocation
)
{
try
{
MqttChannel
mqttChannel
=
(
MqttChannel
)
invocation
.
getArgs
()[
0
];
SmqttMessage
<
MqttMessage
>
smqttMessage
=
(
SmqttMessage
<
MqttMessage
>)
invocation
.
getArgs
()[
1
];
ReceiveContext
<
Configuration
>
mqttReceiveContext
=
(
ReceiveContext
<
Configuration
>)
invocation
.
getArgs
()[
2
];
DslExecutor
dslExecutor
=
mqttReceiveContext
.
getDslExecutor
();
MqttMessage
message
=
smqttMessage
.
getMessage
();
if
(!
smqttMessage
.
getIsCluster
()
&&
message
instanceof
MqttPublishMessage
)
{
MqttPublishMessage
publishMessage
=
(
MqttPublishMessage
)
message
;
HeapMqttMessage
heapMqttMessage
=
this
.
clusterMessage
(
publishMessage
,
mqttChannel
,
smqttMessage
.
getTimestamp
());
log
.
info
(
"TOPIC-"
+
heapMqttMessage
.
getTopic
()+
"------Message:"
+
new
String
(
heapMqttMessage
.
getMessage
()));
if
(
mqttReceiveContext
.
getConfiguration
().
getClusterConfig
().
isEnable
())
{
mqttReceiveContext
.
getClusterRegistry
().
spreadPublishMessage
(
heapMqttMessage
).
subscribeOn
(
Schedulers
.
boundedElastic
()).
subscribe
();
}
if
(
dslExecutor
.
isExecute
())
{
dslExecutor
.
executeRule
(
mqttChannel
,
heapMqttMessage
,
mqttReceiveContext
);
}
}
return
invocation
.
proceed
();
// 放行
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
null
;
}
/**
* 构建消息体
*
* @param message {@link MqttPublishMessage}
* @param timestamp
* @return {@link HeapMqttMessage}
*/
private
HeapMqttMessage
clusterMessage
(
MqttPublishMessage
message
,
MqttChannel
channel
,
long
timestamp
)
{
MqttPublishVariableHeader
header
=
message
.
variableHeader
();
MqttFixedHeader
fixedHeader
=
message
.
fixedHeader
();
return
HeapMqttMessage
.
builder
()
.
timestamp
(
timestamp
)
.
clientIdentifier
(
channel
.
getClientIdentifier
())
.
message
(
MessageUtils
.
copyReleaseByteBuf
(
message
.
payload
()))
.
topic
(
header
.
topicName
())
.
retain
(
fixedHeader
.
isRetain
())
.
qos
(
fixedHeader
.
qosLevel
().
value
())
.
build
();
}
/**
* 排序
*
* @return 排序
*/
@Override
public
int
sort
()
{
return
0
;
}
}
//
package net.mqtts.broker.service;
//
//
import io.github.quickmsg.common.channel.MqttChannel;
//
import io.github.quickmsg.common.config.Configuration;
//
import io.github.quickmsg.common.context.ReceiveContext;
//
import io.github.quickmsg.common.interceptor.Interceptor;
//
import io.github.quickmsg.common.interceptor.Invocation;
//
import io.github.quickmsg.common.message.HeapMqttMessage;
//
import io.github.quickmsg.common.message.SmqttMessage;
//
import io.github.quickmsg.common.rule.DslExecutor;
//
import io.github.quickmsg.common.utils.MessageUtils;
//
import io.netty.handler.codec.mqtt.MqttFixedHeader;
//
import io.netty.handler.codec.mqtt.MqttMessage;
//
import io.netty.handler.codec.mqtt.MqttPublishMessage;
//
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
//
import lombok.extern.slf4j.Slf4j;
//
import org.springframework.stereotype.Component;
//
import org.springframework.stereotype.Service;
//
import reactor.core.scheduler.Schedulers;
//
//
/
//
**
//
* @Description: mqtt消息拦截器示例
//
* @Author: ShiHuan Sun
//
* @E-mail: 13733918655@163.com
//
* @CreateDate: 2021/11/3$ 18:47$
//
* @UpdateUser: ShiHuan Sun
//
* @UpdateDate: 2021/11/3$ 18:47$
//
* @UpdateRemark: 修改内容
//
* @Version: 1.0
//
*/
//
@Service
//
@Slf4j
//
@Component
//
public class DemoMessageInterceptor implements Interceptor {
//
/**
//
* 拦截目标参数
//
*
//
* @param invocation {@link Invocation}
//
* @return Object
//
*/
//
@Override
//
public Object intercept(Invocation invocation) {
//
try {
//
MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
//
SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1];
//
ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2];
//
DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
//
MqttMessage message = smqttMessage.getMessage();
//
if (!smqttMessage.getIsCluster() && message instanceof MqttPublishMessage) {
//
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
//
HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
//
log.info("TOPIC-"+heapMqttMessage.getTopic()+"------Message:"+new String(heapMqttMessage.getMessage()));
//
if (mqttReceiveContext.getConfiguration().getClusterConfig().isEnable()) {
//
mqttReceiveContext.getClusterRegistry().spreadPublishMessage(heapMqttMessage).subscribeOn(Schedulers.boundedElastic()).subscribe();
//
}
//
if (dslExecutor.isExecute()) {
//
dslExecutor.executeRule(mqttChannel, heapMqttMessage, mqttReceiveContext);
//
}
//
}
//
return invocation.proceed(); // 放行
//
} catch (Exception e) {
//
e.printStackTrace();
//
}
//
return null;
//
}
//
//
/**
//
* 构建消息体
//
*
//
* @param message {@link MqttPublishMessage}
//
* @param timestamp
//
* @return {@link HeapMqttMessage}
//
*/
//
private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
//
MqttPublishVariableHeader header = message.variableHeader();
//
MqttFixedHeader fixedHeader = message.fixedHeader();
//
return HeapMqttMessage.builder()
//
.timestamp(timestamp)
//
.clientIdentifier(channel.getClientIdentifier())
//
.message(MessageUtils.copyReleaseByteBuf(message.payload()))
//
.topic(header.topicName())
//
.retain(fixedHeader.isRetain())
//
.qos(fixedHeader.qosLevel().value())
//
.build();
//
}
//
//
/**
//
* 排序
//
*
//
* @return 排序
//
*/
//
@Override
//
public int sort() {
//
return 0;
//
}
//
}
mqtts-modules/mqtts-modules-broker/src/main/java/net/mqtts/broker/service/DeviceActionInterceptor.java
浏览文件 @
2fa7d2c8
//
package net.mqtts.broker.service;
//
//
import cn.hutool.core.date.LocalDateTimeUtil;
//
import io.github.quickmsg.common.channel.MqttChannel;
//
import io.github.quickmsg.common.config.Configuration;
//
import io.github.quickmsg.common.context.ReceiveContext;
//
import io.github.quickmsg.common.interceptor.Interceptor;
//
import io.github.quickmsg.common.interceptor.Invocation;
//
import io.github.quickmsg.common.message.HeapMqttMessage;
//
import io.github.quickmsg.common.message.SmqttMessage;
//
import io.github.quickmsg.common.rule.DslExecutor;
//
import io.github.quickmsg.common.utils.MessageUtils;
//
import io.netty.handler.codec.mqtt.*;
//
import lombok.extern.slf4j.Slf4j;
//
import net.mqtts.link.api.RemoteMqttsDeviceActionService;
//
import net.mqtts.link.api.RemoteMqttsDeviceService;
//
import net.mqtts.link.api.domain.MqttsDevice;
//
import net.mqtts.link.api.domain.MqttsDeviceAction;
//
import org.springframework.beans.factory.annotation.Autowired;
//
import org.springframework.stereotype.Component;
//
import org.springframework.stereotype.Service;
//
//
import javax.annotation.PostConstruct;
//
import java.util.Arrays;
//
import java.util.List;
//
/
//
**
//
* @Description: Mqtt 设备动作拦截处理
//
* @Author: ShiHuan Sun
//
* @E-mail: 13733918655@163.com
//
* @Website: http://mqtts.net
//
* @CreateDate: 2021/11/16$ 10:33$
//
* @UpdateUser: ShiHuan Sun
//
* @UpdateDate: 2021/11/16$ 10:33$
//
* @UpdateRemark: 修改内容
//
* @Version: 1.0
//
*/
//
@Service
//
@Slf4j
//
@Component
//
public class DeviceActionInterceptor implements Interceptor {
//
//
private static DeviceActionInterceptor DeviceActionInterceptor;
//
//
@Autowired
//
private RemoteMqttsDeviceService mqttsDeviceService;
//
//
@Autowired
//
private RemoteMqttsDeviceActionService mqttsDeviceActionService;
//
//
//
@PostConstruct
//
public void init() {
//
DeviceActionInterceptor = this;
//
DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
//
DeviceActionInterceptor.mqttsDeviceActionService = this.mqttsDeviceActionService;
//
}
//
//
/**
//
* 拦截目标参数
//
*
//
* @param invocation {@link Invocation}
//
* @return Object
//
*/
//
@Override
//
public Object intercept(Invocation invocation) {
//
try {
//
MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
//
SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1];
//
ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2];
//
DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
//
MqttMessage message = smqttMessage.getMessage();
//
//TODO MQTT动作数据处理
//
List<MqttMessageType> mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
//
if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
//
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
//
HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
//
MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
//
mqttsDeviceAction.setDevice_id(mqttChannel.getClientIdentifier());
//
mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
//
mqttsDeviceAction.setStatus(message.decoderResult().toString());
//
mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
//
mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
//
DeviceActionInterceptor.mqttsDeviceActionService.add(mqttsDeviceAction);
//
DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(new MqttsDevice(mqttChannel.getStatus().toString(), mqttChannel.getClientIdentifier()));
//
}
//
// 拦截业务
//
return invocation.proceed(); // 放行
//
} catch (Exception e) {
//
e.printStackTrace();
//
}
//
return null;
//
}
//
//
/**
//
* 构建消息体
//
*
//
* @param message {@link MqttPublishMessage}
//
* @param timestamp
//
* @return {@link HeapMqttMessage}
//
*/
//
private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
//
MqttPublishVariableHeader header = message.variableHeader();
//
MqttFixedHeader fixedHeader = message.fixedHeader();
//
return HeapMqttMessage.builder()
//
.timestamp(timestamp)
//
.clientIdentifier(channel.getClientIdentifier())
//
.message(MessageUtils.copyReleaseByteBuf(message.payload()))
//
.topic(header.topicName())
//
.retain(fixedHeader.isRetain())
//
.qos(fixedHeader.qosLevel().value())
//
.build();
//
}
//
//
/**
//
* 排序
//
* 值越大权重越高
//
*
//
* @return 排序
//
*/
//
@Override
//
public int sort() {
// return 1
;
//
}
//
}
package
net.mqtts.broker.service
;
import
cn.hutool.core.date.LocalDateTimeUtil
;
import
io.github.quickmsg.common.channel.MqttChannel
;
import
io.github.quickmsg.common.config.Configuration
;
import
io.github.quickmsg.common.context.ReceiveContext
;
import
io.github.quickmsg.common.interceptor.Interceptor
;
import
io.github.quickmsg.common.interceptor.Invocation
;
import
io.github.quickmsg.common.message.HeapMqttMessage
;
import
io.github.quickmsg.common.message.SmqttMessage
;
import
io.github.quickmsg.common.rule.DslExecutor
;
import
io.github.quickmsg.common.utils.MessageUtils
;
import
io.netty.handler.codec.mqtt.*
;
import
lombok.extern.slf4j.Slf4j
;
import
net.mqtts.link.api.RemoteMqttsDeviceActionService
;
import
net.mqtts.link.api.RemoteMqttsDeviceService
;
import
net.mqtts.link.api.domain.MqttsDevice
;
import
net.mqtts.link.api.domain.MqttsDeviceAction
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.PostConstruct
;
import
java.util.Arrays
;
import
java.util.List
;
/**
* @Description: Mqtt 设备动作拦截处理
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://mqtts.net
* @CreateDate: 2021/11/16$ 10:33$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/16$ 10:33$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Service
@Slf4j
@Component
public
class
DeviceActionInterceptor
implements
Interceptor
{
private
static
DeviceActionInterceptor
DeviceActionInterceptor
;
@Autowired
private
RemoteMqttsDeviceService
mqttsDeviceService
;
@Autowired
private
RemoteMqttsDeviceActionService
mqttsDeviceActionService
;
@PostConstruct
public
void
init
()
{
DeviceActionInterceptor
=
this
;
DeviceActionInterceptor
.
mqttsDeviceService
=
this
.
mqttsDeviceService
;
DeviceActionInterceptor
.
mqttsDeviceActionService
=
this
.
mqttsDeviceActionService
;
}
/**
* 拦截目标参数
*
* @param invocation {@link Invocation}
* @return Object
*/
@Override
public
Object
intercept
(
Invocation
invocation
)
{
try
{
MqttChannel
mqttChannel
=
(
MqttChannel
)
invocation
.
getArgs
()[
0
];
SmqttMessage
<
MqttMessage
>
smqttMessage
=
(
SmqttMessage
<
MqttMessage
>)
invocation
.
getArgs
()[
1
];
ReceiveContext
<
Configuration
>
mqttReceiveContext
=
(
ReceiveContext
<
Configuration
>)
invocation
.
getArgs
()[
2
];
DslExecutor
dslExecutor
=
mqttReceiveContext
.
getDslExecutor
();
MqttMessage
message
=
smqttMessage
.
getMessage
();
//TODO MQTT动作数据处理
List
<
MqttMessageType
>
mqttMessageType
=
Arrays
.
asList
(
MqttMessageType
.
PUBLISH
,
MqttMessageType
.
DISCONNECT
,
MqttMessageType
.
PINGRESP
,
MqttMessageType
.
SUBSCRIBE
,
MqttMessageType
.
UNSUBSCRIBE
);
if
(!
smqttMessage
.
getIsCluster
()
&&
mqttMessageType
.
contains
(
message
.
fixedHeader
().
messageType
()))
{
MqttPublishMessage
publishMessage
=
(
MqttPublishMessage
)
message
;
HeapMqttMessage
heapMqttMessage
=
this
.
clusterMessage
(
publishMessage
,
mqttChannel
,
smqttMessage
.
getTimestamp
());
MqttsDeviceAction
mqttsDeviceAction
=
new
MqttsDeviceAction
();
mqttsDeviceAction
.
setDevice_id
(
mqttChannel
.
getClientIdentifier
());
mqttsDeviceAction
.
setAction_type
(
message
.
fixedHeader
().
messageType
().
toString
());
mqttsDeviceAction
.
setStatus
(
message
.
decoderResult
().
toString
());
mqttsDeviceAction
.
setMessage
(
heapMqttMessage
.
getTopic
());
mqttsDeviceAction
.
setCreate_time
(
LocalDateTimeUtil
.
now
());
DeviceActionInterceptor
.
mqttsDeviceActionService
.
add
(
mqttsDeviceAction
);
DeviceActionInterceptor
.
mqttsDeviceService
.
updateConnectStatusByClientId
(
new
MqttsDevice
(
mqttChannel
.
getStatus
().
toString
(),
mqttChannel
.
getClientIdentifier
()));
}
// 拦截业务
return
invocation
.
proceed
();
// 放行
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
null
;
}
/**
* 构建消息体
*
* @param message {@link MqttPublishMessage}
* @param timestamp
* @return {@link HeapMqttMessage}
*/
private
HeapMqttMessage
clusterMessage
(
MqttPublishMessage
message
,
MqttChannel
channel
,
long
timestamp
)
{
MqttPublishVariableHeader
header
=
message
.
variableHeader
();
MqttFixedHeader
fixedHeader
=
message
.
fixedHeader
();
return
HeapMqttMessage
.
builder
()
.
timestamp
(
timestamp
)
.
clientIdentifier
(
channel
.
getClientIdentifier
())
.
message
(
MessageUtils
.
copyReleaseByteBuf
(
message
.
payload
()))
.
topic
(
header
.
topicName
())
.
retain
(
fixedHeader
.
isRetain
())
.
qos
(
fixedHeader
.
qosLevel
().
value
())
.
build
();
}
/**
* 排序
* 值越大权重越高
*
* @return 排序
*/
@Override
public
int
sort
()
{
return
0
;
}
}
mqtts-modules/mqtts-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
浏览文件 @
2fa7d2c8
net.mqtts.broker.service.DemoMessageInterceptor
net.mqtts.broker.service.DeviceActionInterceptor
net.mqtts.broker.service.DeviceDatasInterceptor
\ No newline at end of file
#net.mqtts.broker.service.DemoMessageInterceptor
#net.mqtts.broker.service.DeviceActionInterceptor
#net.mqtts.broker.service.DeviceDatasInterceptor
\ No newline at end of file
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/common/enums/consumer/MqttsDeviceActionMessageConsumer.java
浏览文件 @
2fa7d2c8
package
net.mqtts.link.common.enums.consumer
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.sun.xml.internal.bind.v2.TODO
;
import
lombok.extern.slf4j.Slf4j
;
import
net.mqtts.common.log.annotation.Log
;
import
net.mqtts.common.log.enums.BusinessType
;
import
net.mqtts.link.service.device.MqttsDeviceActionService
;
import
net.mqtts.link.service.device.MqttsDeviceDatasService
;
import
org.apache.rocketmq.spring.annotation.RocketMQMessageListener
;
import
org.apache.rocketmq.spring.core.RocketMQListener
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
/**
...
...
@@ -15,14 +25,32 @@ import org.springframework.stereotype.Component;
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Slf4j
@Component
@RocketMQMessageListener
(
consumerGroup
=
"mqtts"
,
topic
=
"mqtts"
)
public
class
MqttsDeviceActionMessageConsumer
implements
RocketMQListener
{
@Autowired
private
MqttsDeviceActionService
mqttsDeviceActionService
;
@Autowired
private
MqttsDeviceDatasService
mqttsDeviceDatasService
;
@Override
public
void
onMessage
(
Object
message
)
{
assert
message
!=
null
;
System
.
out
.
println
(
"Link消费消息"
+
message
);
JSONObject
mqttsMessage
=
JSONObject
.
parseObject
((
String
)
message
);
/**
* TODO 设备上下线处理
* $event/close 设备断开事件
* $event/connect 设备连接事件
* ${topic} 其他为业务数据自行处理
*/
if
(
"$event/connect"
.
equals
(
mqttsMessage
.
get
(
"topic"
))){
mqttsDeviceActionService
.
connectEvent
(
mqttsMessage
.
get
(
"msg"
).
toString
());
}
else
if
(
"$event/close"
.
equals
(
mqttsMessage
.
get
(
"topic"
))){
mqttsDeviceActionService
.
closeEvent
(
mqttsMessage
.
get
(
"msg"
).
toString
());
}
else
{
mqttsDeviceDatasService
.
insertBaseDatas
(
mqttsMessage
.
get
(
"msg"
).
toString
());
}
}
}
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceActionService.java
浏览文件 @
2fa7d2c8
package
net.mqtts.link.service.device
;
import
com.alibaba.fastjson.JSONObject
;
import
net.mqtts.link.api.domain.MqttsDeviceAction
;
import
java.util.List
;
/**
* @Description:
java类作用描述
* @Description:
设备动作处理接口
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://mqtts.net
...
...
@@ -47,4 +48,16 @@ public interface MqttsDeviceActionService {
int
batchInsert
(
List
<
MqttsDeviceAction
>
list
);
int
deleteMqttsDeviceActionByIds
(
Long
[]
ids
);
/**
* 设备连接事件
* @param mqttsMessage
*/
void
connectEvent
(
String
mqttsMessage
);
/**
* 设备断开事件
* @param mqttsMessage
*/
void
closeEvent
(
String
mqttsMessage
);
}
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/MqttsDeviceDatasService.java
浏览文件 @
2fa7d2c8
package
net.mqtts.link.service.device
;
import
com.alibaba.fastjson.JSONObject
;
import
net.mqtts.link.api.domain.MqttsDeviceDatas
;
import
java.util.List
;
...
...
@@ -47,4 +48,10 @@ public interface MqttsDeviceDatasService {
int
batchInsert
(
List
<
MqttsDeviceDatas
>
list
);
int
deleteMqttsDeviceDatasByIds
(
Long
[]
ids
);
/**
* mqtt基础数据处理
* @param mqttsMessage
*/
void
insertBaseDatas
(
String
mqttsMessage
);
}
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceActionServiceImpl.java
浏览文件 @
2fa7d2c8
package
net.mqtts.link.service.device.impl
;
import
cn.hutool.core.date.LocalDateTimeUtil
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.nacos.shaded.com.google.gson.Gson
;
import
io.netty.handler.codec.mqtt.MqttMessage
;
import
lombok.extern.slf4j.Slf4j
;
import
net.mqtts.link.api.domain.MqttsDeviceAction
;
import
net.mqtts.link.mapper.device.MqttsDeviceActionMapper
;
import
net.mqtts.link.service.device.MqttsDeviceActionService
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.Resource
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* @Description:
java类作用描述
* @Description:
mqtt上下线动作数据处理
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://mqtts.net
...
...
@@ -19,6 +26,7 @@ import java.util.List;
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Slf4j
@Service
public
class
MqttsDeviceActionServiceImpl
implements
MqttsDeviceActionService
{
...
...
@@ -100,4 +108,37 @@ public class MqttsDeviceActionServiceImpl implements MqttsDeviceActionService {
return
mqttsDeviceActionMapper
.
deleteMqttsDeviceActionByIds
(
ids
);
}
/**
* 设备连接事件
*
* @param mqttsMessage
*/
@Override
public
void
connectEvent
(
String
mqttsMessage
)
{
Gson
gson
=
new
Gson
();
Map
<
String
,
Object
>
map
=
new
HashMap
<
String
,
Object
>();
map
=
gson
.
fromJson
(
mqttsMessage
,
map
.
getClass
());
log
.
info
(
map
.
toString
());
/*MqttsDeviceAction mqttsDeviceAction = new MqttsDeviceAction();
mqttsDeviceAction.setDevice_id(mqttsMessage.get("clientIdentifier").toString());
mqttsDeviceAction.setAction_type(message.fixedHeader().messageType().toString());
mqttsDeviceAction.setStatus(message.decoderResult().toString());
mqttsDeviceAction.setMessage(heapMqttMessage.getTopic());
mqttsDeviceAction.setCreate_time(LocalDateTimeUtil.now());
mqttsDeviceActionMapper.insert(mqttsDeviceAction);*/
}
/**
* 设备断开事件
*
* @param mqttsMessage
*/
@Override
public
void
closeEvent
(
String
mqttsMessage
)
{
Gson
gson
=
new
Gson
();
Map
<
String
,
Object
>
map
=
new
HashMap
<
String
,
Object
>();
map
=
gson
.
fromJson
(
mqttsMessage
,
map
.
getClass
());
log
.
info
(
map
.
toString
());
}
}
mqtts-modules/mqtts-modules-link/src/main/java/net/mqtts/link/service/device/impl/MqttsDeviceDatasServiceImpl.java
浏览文件 @
2fa7d2c8
package
net.mqtts.link.service.device.impl
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.nacos.shaded.com.google.gson.Gson
;
import
lombok.extern.slf4j.Slf4j
;
import
net.mqtts.link.api.domain.MqttsDeviceDatas
;
import
net.mqtts.link.mapper.device.MqttsDeviceDatasMapper
;
import
net.mqtts.link.service.device.MqttsDeviceDatasService
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.Resource
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* @Description:
java类作用描述
* @Description:
mqtt基础业务处理
* @Author: ShiHuan Sun
...
...
@@ -29,6 +35,7 @@ import java.util.List;
* @Version: 1.0
*/
@Slf4j
@Service
public
class
MqttsDeviceDatasServiceImpl
implements
MqttsDeviceDatasService
{
...
...
@@ -110,4 +117,17 @@ public class MqttsDeviceDatasServiceImpl implements MqttsDeviceDatasService{
return
mqttsDeviceDatasMapper
.
deleteMqttsDeviceDatasByIds
(
ids
);
}
/**
* mqtt基础数据处理
*
* @param mqttsMessage
*/
@Override
public
void
insertBaseDatas
(
String
mqttsMessage
)
{
Gson
gson
=
new
Gson
();
Map
<
String
,
Object
>
map
=
new
HashMap
<
String
,
Object
>();
map
=
gson
.
fromJson
(
mqttsMessage
,
map
.
getClass
());
log
.
info
(
map
.
toString
());
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录