提交 7446233d 编写于 作者: 很久是多久's avatar 很久是多久

拦截设备上下线事件

更新设备状态
上级 f63cecd3
package net.mqtts.link.mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import net.mqtts.link.domain.MqttsDevice;
/**
* 设备管理Mapper接口
*
*
* @author mqtts
* @date 2021-10-22
*/
public interface MqttsDeviceMapper
{
public interface MqttsDeviceMapper {
/**
* 查询设备管理
*
*
* @param id 设备管理主键
* @return 设备管理
*/
......@@ -22,7 +23,7 @@ public interface MqttsDeviceMapper
/**
* 查询设备管理列表
*
*
* @param mqttsDevice 设备管理
* @return 设备管理集合
*/
......@@ -30,7 +31,7 @@ public interface MqttsDeviceMapper
/**
* 新增设备管理
*
*
* @param mqttsDevice 设备管理
* @return 结果
*/
......@@ -38,7 +39,7 @@ public interface MqttsDeviceMapper
/**
* 修改设备管理
*
*
* @param mqttsDevice 设备管理
* @return 结果
*/
......@@ -46,7 +47,7 @@ public interface MqttsDeviceMapper
/**
* 删除设备管理
*
*
* @param id 设备管理主键
* @return 结果
*/
......@@ -54,18 +55,23 @@ public interface MqttsDeviceMapper
/**
* 批量删除设备管理
*
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteMqttsDeviceByIds(Long[] ids);
MqttsDevice findOneByClientIdAndUserNameAndPassword(@Param("clientId")String clientId,@Param("userName")String userName,@Param("password")String password);
MqttsDevice findOneByClientIdAndUserNameAndPasswordAndDeviceStatusAndProtocolType(@Param("clientId")String clientId,@Param("userName")String userName,@Param("password")String password,@Param("deviceStatus")String deviceStatus,@Param("protocolType")String protocolType);
int updateConnectStatusByClientId(@Param("updatedConnectStatus")String updatedConnectStatus,@Param("clientId")String clientId);
MqttsDevice findOneByClientIdAndUserNameAndPassword(@Param("clientId") String clientId, @Param("userName") String userName, @Param("password") String password);
MqttsDevice findOneByClientIdAndUserNameAndPasswordAndDeviceStatusAndProtocolType(@Param("clientId") String clientId, @Param("userName") String userName, @Param("password") String password, @Param("deviceStatus") String deviceStatus, @Param("protocolType") String protocolType);
/**
* 更新设备在线状态
*
* @param updatedConnectStatus 设备状态值
* @param clientId 客户端ID
* @return 返回结果
*/
int updateConnectStatusByClientId(@Param("updatedConnectStatus") String updatedConnectStatus, @Param("clientId") String clientId);
}
package net.mqtts.link.service;
import java.util.List;
import net.mqtts.link.domain.MqttsDevice;
import org.springframework.stereotype.Component;
/**
* 设备管理Service接口
*
*
* @author mqtts
* @date 2021-10-22
*/
public interface IMqttsDeviceService
{
public interface IMqttsDeviceService<updatateDeviceStacus> {
/**
* 查询设备管理
*
*
* @param id 设备管理主键
* @return 设备管理
*/
......@@ -22,7 +21,7 @@ public interface IMqttsDeviceService
/**
* 查询设备管理列表
*
*
* @param mqttsDevice 设备管理
* @return 设备管理集合
*/
......@@ -30,7 +29,7 @@ public interface IMqttsDeviceService
/**
* 新增设备管理
*
*
* @param mqttsDevice 设备管理
* @return 结果
*/
......@@ -38,7 +37,7 @@ public interface IMqttsDeviceService
/**
* 修改设备管理
*
*
* @param mqttsDevice 设备管理
* @return 结果
*/
......@@ -46,7 +45,7 @@ public interface IMqttsDeviceService
/**
* 批量删除设备管理
*
*
* @param ids 需要删除的设备管理主键集合
* @return 结果
*/
......@@ -54,26 +53,24 @@ public interface IMqttsDeviceService
/**
* 删除设备管理信息
*
*
* @param id 设备管理主键
* @return 结果
*/
public int deleteMqttsDeviceById(Long id);
MqttsDevice findOneByClientIdAndUserNameAndPassword(String clientId, String userName, String password);
MqttsDevice findOneByClientIdAndUserNameAndPassword(String clientId,String userName,String password);
MqttsDevice findOneByClientIdAndUserNameAndPasswordAndDeviceStatusAndProtocolType(String clientId,String userName,String password,String deviceStatus,String protocolType);
int updateConnectStatusByClientId(String updatedConnectStatus,String clientId);
MqttsDevice findOneByClientIdAndUserNameAndPasswordAndDeviceStatusAndProtocolType(String clientId, String userName, String password, String deviceStatus, String protocolType);
/**
* 更新设备在线状态
*
* @param updatedConnectStatus 设备状态
* @param clientId 客户端ID
* @return
*/
int updateConnectStatusByClientId(String updatedConnectStatus, String clientId);
}
......@@ -10,9 +10,16 @@ import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import net.mqtts.common.core.utils.StringUtils;
import net.mqtts.link.common.enums.DeviceConnectStatus;
import net.mqtts.link.service.IMqttsDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
/**
* @Description: Mqtt 设备动作拦截处理
* @Author: ShiHuan Sun
......@@ -28,6 +35,19 @@ import org.springframework.stereotype.Service;
@Slf4j
@Component
public class DeviceActionInterceptor implements Interceptor {
private static DeviceActionInterceptor DeviceActionInterceptor;
@Autowired
private IMqttsDeviceService mqttsDeviceService;
@PostConstruct
public void init() {
DeviceActionInterceptor = this;
DeviceActionInterceptor.mqttsDeviceService = this.mqttsDeviceService;
}
/**
* 拦截目标参数
*
......@@ -40,17 +60,24 @@ public class DeviceActionInterceptor implements Interceptor {
MqttChannel mqttChannel = (MqttChannel) invocation.getArgs()[0];
SmqttMessage<MqttMessage> smqttMessage = (SmqttMessage<MqttMessage>) invocation.getArgs()[1];
ReceiveContext<Configuration> mqttReceiveContext = (ReceiveContext<Configuration>) invocation.getArgs()[2];
Object variableHeader = smqttMessage.getMessage().variableHeader();
final MqttConnectPayload payload = (MqttConnectPayload) smqttMessage.getMessage().payload();
Object variableHeader = smqttMessage.getMessage().variableHeader();
MqttConnectPayload payload = null;
try {
payload = (MqttConnectPayload) smqttMessage.getMessage().payload();
} catch (Exception e) {
log.error("MqttConnectPayload转换异常:{}", e.getMessage());
}
log.info(variableHeader.getClass().getName());
//TODO 设备上下线日志写入处理,更新设备在线状态信息
/* if("$event/connect".equals(parseObject.get("topicName"))){
//设备连接事件
log.info(parseObject.get("topicName").toString());
}else if("$event/close".equals(parseObject.get("topicName"))){
//设备断开事件
log.info(parseObject.get("topicName").toString());
}*/
// 设备上下线日志写入处理,更新设备在线状态信息
String clientId = "";
if (StringUtils.isNotEmpty(mqttChannel.getClientIdentifier())) {
clientId = mqttChannel.getClientIdentifier();
}
if (null != payload && StringUtils.isNotEmpty(payload.clientIdentifier())) {
clientId = payload.clientIdentifier();
}
log.info("设备ClientID:{},设备状态:{}", clientId, mqttChannel.getStatus().toString());
DeviceActionInterceptor.mqttsDeviceService.updateConnectStatusByClientId(mqttChannel.getStatus().toString(), clientId);
//QoS = PUBLISH报文的服务质量等级
MqttFixedHeader mqttFixedHeader = smqttMessage.getMessage().fixedHeader();//固定报头
// 拦截业务
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册