Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
mqttsnet
thinglinks
提交
2394643c
thinglinks
项目概览
mqttsnet
/
thinglinks
通知
1
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
thinglinks
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
2394643c
编写于
12月 26, 2021
作者:
xiaonannet
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
表字段调整
上级
5aadde57
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
135 addition
and
124 deletion
+135
-124
thinglinks-api/thinglinks-api-link/src/main/java/com/mqttsnet/thinglinks/link/api/RemoteDeviceService.java
...com/mqttsnet/thinglinks/link/api/RemoteDeviceService.java
+2
-2
thinglinks-api/thinglinks-api-link/src/main/java/com/mqttsnet/thinglinks/link/api/factory/RemoteDeviceFallbackFactory.java
...nglinks/link/api/factory/RemoteDeviceFallbackFactory.java
+1
-1
thinglinks-modules/thinglinks-modules-broker/src/main/java/com/mqttsnet/thinglinks/broker/service/DeviceActionInterceptor.java
...et/thinglinks/broker/service/DeviceActionInterceptor.java
+128
-117
thinglinks-modules/thinglinks-modules-broker/src/main/java/com/mqttsnet/thinglinks/broker/service/PasswordAuthenticationImpl.java
...thinglinks/broker/service/PasswordAuthenticationImpl.java
+4
-1
thinglinks-modules/thinglinks-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
...ervices/io.github.quickmsg.common.interceptor.Interceptor
+0
-3
未找到文件。
thinglinks-api/thinglinks-api-link/src/main/java/com/mqttsnet/thinglinks/link/api/RemoteDeviceService.java
浏览文件 @
2394643c
...
...
@@ -35,10 +35,10 @@ public interface RemoteDeviceService {
/**
* 更新设备在线状态
*
* @param
mqttsD
evice
* @param
d
evice
* @return
*/
@PutMapping
(
"/device/updateConnectStatusByClientId"
)
public
R
updateConnectStatusByClientId
(
@RequestBody
Device
mqttsD
evice
);
public
R
updateConnectStatusByClientId
(
@RequestBody
Device
d
evice
);
}
thinglinks-api/thinglinks-api-link/src/main/java/com/mqttsnet/thinglinks/link/api/factory/RemoteDeviceFallbackFactory.java
浏览文件 @
2394643c
...
...
@@ -27,7 +27,7 @@ public class RemoteDeviceFallbackFactory implements FallbackFactory<RemoteDevice
}
@Override
public
R
updateConnectStatusByClientId
(
Device
mqttsD
evice
)
{
public
R
updateConnectStatusByClientId
(
Device
d
evice
)
{
return
R
.
fail
(
"更新设备在线状态失败:"
+
throwable
.
getMessage
());
}
};
...
...
thinglinks-modules/thinglinks-modules-broker/src/main/java/com/mqttsnet/thinglinks/broker/service/DeviceActionInterceptor.java
浏览文件 @
2394643c
package
com.mqttsnet.thinglinks.broker.service
;
import
cn.hutool.core.date.LocalDateTimeUtil
;
import
com.mqttsnet.thinglinks.link.api.domain.device.entity.Device
;
import
com.mqttsnet.thinglinks.link.api.domain.device.entity.DeviceAction
;
import
io.netty.handler.codec.mqtt.*
;
import
lombok.extern.slf4j.Slf4j
;
import
com.mqttsnet.thinglinks.link.api.RemoteDeviceActionService
;
import
com.mqttsnet.thinglinks.link.api.RemoteDeviceService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Service
;
import
sun.misc.MessageUtils
;
import
javax.annotation.PostConstruct
;
import
java.util.Arrays
;
import
java.util.List
;
/**
* @Description: Mqtt 设备动作拦截处理
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2021/11/16$ 10:33$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/16$ 10:33$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Service
@Slf4j
@Component
public
class
DeviceActionInterceptor
implements
Interceptor
{
private
static
DeviceActionInterceptor
DeviceActionInterceptor
;
@Autowired
private
RemoteDeviceService
deviceService
;
@Autowired
private
RemoteDeviceActionService
deviceActionService
;
@PostConstruct
public
void
init
()
{
DeviceActionInterceptor
=
this
;
DeviceActionInterceptor
.
deviceService
=
this
.
deviceService
;
DeviceActionInterceptor
.
deviceActionService
=
this
.
deviceActionService
;
}
/**
* 拦截目标参数
*
* @param invocation {@link Invocation}
* @return Object
*/
@Override
public
Object
intercept
(
Invocation
invocation
)
{
try
{
MqttChannel
mqttChannel
=
(
MqttChannel
)
invocation
.
getArgs
()[
0
];
SmqttMessage
<
MqttMessage
>
smqttMessage
=
(
SmqttMessage
<
MqttMessage
>)
invocation
.
getArgs
()[
1
];
ReceiveContext
<
Configuration
>
mqttReceiveContext
=
(
ReceiveContext
<
Configuration
>)
invocation
.
getArgs
()[
2
];
DslExecutor
dslExecutor
=
mqttReceiveContext
.
getDslExecutor
();
MqttMessage
message
=
smqttMessage
.
getMessage
();
//TODO MQTT动作数据处理
List
<
MqttMessageType
>
mqttMessageType
=
Arrays
.
asList
(
MqttMessageType
.
PUBLISH
,
MqttMessageType
.
DISCONNECT
,
MqttMessageType
.
PINGRESP
,
MqttMessageType
.
SUBSCRIBE
,
MqttMessageType
.
UNSUBSCRIBE
);
if
(!
smqttMessage
.
getIsCluster
()
&&
mqttMessageType
.
contains
(
message
.
fixedHeader
().
messageType
()))
{
MqttPublishMessage
publishMessage
=
(
MqttPublishMessage
)
message
;
HeapMqttMessage
heapMqttMessage
=
this
.
clusterMessage
(
publishMessage
,
mqttChannel
,
smqttMessage
.
getTimestamp
());
DeviceAction
deviceAction
=
new
DeviceAction
();
deviceAction
.
setDeviceIdentification
(
mqttChannel
.
getClientIdentifier
());
deviceAction
.
setActionType
(
message
.
fixedHeader
().
messageType
().
toString
());
deviceAction
.
setStatus
(
message
.
decoderResult
().
toString
());
deviceAction
.
setMessage
(
heapMqttMessage
.
getTopic
());
deviceAction
.
setCreateTime
(
LocalDateTimeUtil
.
now
());
DeviceActionInterceptor
.
deviceActionService
.
add
(
deviceAction
);
DeviceActionInterceptor
.
deviceService
.
updateConnectStatusByClientId
(
new
Device
(
mqttChannel
.
getStatus
().
toString
(),
mqttChannel
.
getClientIdentifier
()));
}
// 拦截业务
return
invocation
.
proceed
();
// 放行
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
null
;
}
/**
* 构建消息体
*
* @param message {@link MqttPublishMessage}
* @param timestamp
* @return {@link HeapMqttMessage}
*/
private
HeapMqttMessage
clusterMessage
(
MqttPublishMessage
message
,
MqttChannel
channel
,
long
timestamp
)
{
MqttPublishVariableHeader
header
=
message
.
variableHeader
();
MqttFixedHeader
fixedHeader
=
message
.
fixedHeader
();
return
HeapMqttMessage
.
builder
()
.
timestamp
(
timestamp
)
.
clientIdentifier
(
channel
.
getClientIdentifier
())
.
message
(
MessageUtils
.
copyReleaseByteBuf
(
message
.
payload
()))
.
topic
(
header
.
topicName
())
.
retain
(
fixedHeader
.
isRetain
())
.
qos
(
fixedHeader
.
qosLevel
().
value
())
.
build
();
}
/**
* 排序
* 值越大权重越高
*
* @return 排序
*/
@Override
public
int
sort
()
{
return
0
;
}
}
//package com.mqttsnet.thinglinks.broker.service;
//
//import cn.hutool.core.date.LocalDateTimeUtil;
//import com.mqttsnet.thinglinks.link.api.domain.device.entity.Device;
//import com.mqttsnet.thinglinks.link.api.domain.device.entity.DeviceAction;
//import io.github.quickmsg.common.channel.MqttChannel;
//import io.github.quickmsg.common.config.Configuration;
//import io.github.quickmsg.common.context.ReceiveContext;
//import io.github.quickmsg.common.interceptor.Interceptor;
//import io.github.quickmsg.common.interceptor.Invocation;
//import io.github.quickmsg.common.message.HeapMqttMessage;
//import io.github.quickmsg.common.message.SmqttMessage;
//import io.github.quickmsg.common.rule.DslExecutor;
//import io.github.quickmsg.common.utils.MessageUtils;
//import io.netty.handler.codec.mqtt.*;
//import lombok.extern.slf4j.Slf4j;
//import com.mqttsnet.thinglinks.link.api.RemoteDeviceActionService;
//import com.mqttsnet.thinglinks.link.api.RemoteDeviceService;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.PostConstruct;
//import java.util.Arrays;
//import java.util.List;
//
///**
// * @Description: Mqtt 设备动作拦截处理
// * @Author: ShiHuan Sun
// * @E-mail: 13733918655@163.com
// * @Website: http://thinglinks.mqttsnet.com
// * @CreateDate: 2021/11/16$ 10:33$
// * @UpdateUser: ShiHuan Sun
// * @UpdateDate: 2021/11/16$ 10:33$
// * @UpdateRemark: 修改内容
// * @Version: 1.0
// */
//@Service
//@Slf4j
//@Component
//public class DeviceActionInterceptor implements Interceptor {
//
// private static DeviceActionInterceptor DeviceActionInterceptor;
//
// @Autowired
// private RemoteDeviceService deviceService;
//
// @Autowired
// private RemoteDeviceActionService deviceActionService;
//
//
// @PostConstruct
// public void init() {
// DeviceActionInterceptor = this;
// DeviceActionInterceptor.deviceService = this.deviceService;
// DeviceActionInterceptor.deviceActionService = this.deviceActionService;
// }
//
// /**
// * 拦截目标参数
// *
// * @param invocation {@link Invocation}
// * @return Object
// */
// @Override
// public Object intercept(Invocation invocation) {
// try {
// MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
// SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1];
// ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2];
// DslExecutor dslExecutor = mqttReceiveContext.getDslExecutor();
// MqttMessage message = smqttMessage.getMessage();
// //TODO MQTT动作数据处理
// List<MqttMessageType> mqttMessageType = Arrays.asList(MqttMessageType.PUBLISH, MqttMessageType.DISCONNECT, MqttMessageType.PINGRESP, MqttMessageType.SUBSCRIBE, MqttMessageType.UNSUBSCRIBE);
// if (!smqttMessage.getIsCluster() && mqttMessageType.contains(message.fixedHeader().messageType())) {
// MqttPublishMessage publishMessage = (MqttPublishMessage) message;
// HeapMqttMessage heapMqttMessage = this.clusterMessage(publishMessage, mqttChannel, smqttMessage.getTimestamp());
// DeviceAction deviceAction = new DeviceAction();
// deviceAction.setDeviceIdentification(mqttChannel.getClientIdentifier());
// deviceAction.setActionType(message.fixedHeader().messageType().toString());
// deviceAction.setStatus(message.decoderResult().toString());
// deviceAction.setMessage(heapMqttMessage.getTopic());
// deviceAction.setCreateTime(LocalDateTimeUtil.now());
// DeviceActionInterceptor.deviceActionService.add(deviceAction);
// Device device = new Device();
// device.setConnectStatus((mqttChannel.getStatus().toString());
// device.setDeviceIdentification(mqttChannel.getClientIdentifier());
// DeviceActionInterceptor.deviceService.updateConnectStatusByClientId(device);
// }
// // 拦截业务
// return invocation.proceed(); // 放行
// } catch (Exception e) {
// e.printStackTrace();
// }
// return null;
// }
//
// /**
// * 构建消息体
// *
// * @param message {@link MqttPublishMessage}
// * @param timestamp
// * @return {@link HeapMqttMessage}
// */
// private HeapMqttMessage clusterMessage(MqttPublishMessage message, MqttChannel channel, long timestamp) {
// MqttPublishVariableHeader header = message.variableHeader();
// MqttFixedHeader fixedHeader = message.fixedHeader();
// return HeapMqttMessage.builder()
// .timestamp(timestamp)
// .clientIdentifier(channel.getClientIdentifier())
// .message(MessageUtils.copyReleaseByteBuf(message.payload()))
// .topic(header.topicName())
// .retain(fixedHeader.isRetain())
// .qos(fixedHeader.qosLevel().value())
// .build();
// }
//
// /**
// * 排序
// * 值越大权重越高
// *
// * @return 排序
// */
// @Override
// public int sort() {
// return 0;
// }
//}
thinglinks-modules/thinglinks-modules-broker/src/main/java/com/mqttsnet/thinglinks/broker/service/PasswordAuthenticationImpl.java
浏览文件 @
2394643c
...
...
@@ -52,7 +52,10 @@ public class PasswordAuthenticationImpl implements PasswordAuthentication {
Device
mqttsDevice
=
PasswordAuthenticationImpl
.
deviceService
.
findOneByClientIdAndUserNameAndPasswordAndDeviceStatusAndProtocolType
(
clientIdentifier
,
userName
,
new
String
(
passwordInBytes
),
"ENABLE"
,
"MQTT"
).
getData
();
if
(
Optional
.
ofNullable
(
mqttsDevice
).
isPresent
())
{
//更改设备在线状态为在线
PasswordAuthenticationImpl
.
deviceService
.
updateConnectStatusByClientId
(
new
Device
(
"ONLINE"
,
clientIdentifier
));
Device
device
=
new
Device
();
device
.
setConnectStatus
(
"ONLINE"
);
device
.
setDeviceIdentification
(
clientIdentifier
);
PasswordAuthenticationImpl
.
deviceService
.
updateConnectStatusByClientId
(
device
);
return
true
;
}
return
false
;
...
...
thinglinks-modules/thinglinks-modules-broker/src/main/resources/META-INF/services/io.github.quickmsg.common.interceptor.Interceptor
浏览文件 @
2394643c
#net.mqtts.broker.service.DemoMessageInterceptor
#DeviceActionInterceptor
#net.mqtts.broker.service.DeviceDatasInterceptor
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录