提交 d8bf95c1 编写于 作者: 浅梦2013's avatar 浅梦2013

mica-mqtt-core TopicUtil 改名,mica-mqtt-spring-boot-starter...

 mica-mqtt-core TopicUtil 改名,mica-mqtt-spring-boot-starter @MqttClientSubscribe 添加 IMqttClientMessageListener bean 支持。
上级 13119f44
......@@ -22,7 +22,7 @@ import net.dreamlu.iot.mqtt.broker.util.RedisUtil;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import java.util.ArrayList;
......@@ -71,7 +71,7 @@ public class RedisMqttMessageStore implements IMqttMessageStore {
@Override
public List<Message> getRetainMessage(String topicFilter) {
List<Message> retainMessageList = new ArrayList<>();
Pattern topicPattern = MqttTopicUtil.getTopicPattern(topicFilter);
Pattern topicPattern = TopicUtil.getTopicPattern(topicFilter);
RedisKeys redisKey = RedisKeys.MESSAGE_STORE_RETAIN;
String redisKeyPrefix = redisKey.getKey();
String redisKeyPattern = redisKeyPrefix.concat(RedisUtil.getTopicPattern(topicFilter));
......
......@@ -18,7 +18,7 @@ package net.dreamlu.iot.mqtt.core.client;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import java.io.Serializable;
import java.util.Objects;
......@@ -40,7 +40,7 @@ public final class MqttClientSubscription implements Serializable {
IMqttClientMessageListener listener) {
this.mqttQoS = Objects.requireNonNull(mqttQoS, "MQTT subscribe mqttQoS is null.");
this.topicFilter = Objects.requireNonNull(topicFilter, "MQTT subscribe topicFilter is null.");
this.topicRegex = MqttTopicUtil.getTopicPattern(topicFilter);
this.topicRegex = TopicUtil.getTopicPattern(topicFilter);
this.listener = Objects.requireNonNull(listener, "MQTT subscribe listener is null.");
}
......
......@@ -139,7 +139,7 @@ public final class MqttWillMessage {
@Override
public int hashCode() {
return Objects.hash(topic, message, retain, qos);
return Objects.hash(topic, Arrays.hashCode(message), retain, qos);
}
@Override
......
......@@ -19,7 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.session;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.model.Subscribe;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
......@@ -86,7 +86,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
Integer qosValue = null;
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
if (TopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
Integer mqttQoS = data.get(clientId);
......@@ -109,7 +109,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager {
Map<String, Integer> subscribeMap = new HashMap<>(32);
Set<String> topicFilterSet = subscribeStore.keySet();
for (String topicFilter : topicFilterSet) {
if (MqttTopicUtil.match(topicFilter, topicName)) {
if (TopicUtil.match(topicFilter, topicName)) {
ConcurrentMap<String, Integer> data = subscribeStore.get(topicFilter);
if (data != null && !data.isEmpty()) {
data.forEach((clientId, qos) -> {
......
......@@ -18,7 +18,7 @@ package net.dreamlu.iot.mqtt.core.server.store;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import java.util.ArrayList;
import java.util.List;
......@@ -72,7 +72,7 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore {
@Override
public List<Message> getRetainMessage(String topicFilter) {
Pattern topicPattern = MqttTopicUtil.getTopicPattern(topicFilter);
Pattern topicPattern = TopicUtil.getTopicPattern(topicFilter);
List<Message> retainMessageList = new ArrayList<>();
retainStore.forEach((topic, message) -> {
if (topicPattern.matcher(topic).matches()) {
......
......@@ -27,7 +27,7 @@ import java.util.regex.Pattern;
*
* @author L.cm
*/
public final class MqttTopicUtil {
public final class TopicUtil {
private static final Map<String, Pattern> TOPIC_FILTER_PATTERN_CACHE = new ConcurrentHashMap<>(32);
/**
......@@ -52,7 +52,7 @@ public final class MqttTopicUtil {
* @return Pattern
*/
public static Pattern getTopicPattern(String topicFilter) {
return TOPIC_FILTER_PATTERN_CACHE.computeIfAbsent(topicFilter, MqttTopicUtil::getTopicFilterPattern);
return TOPIC_FILTER_PATTERN_CACHE.computeIfAbsent(topicFilter, TopicUtil::getTopicFilterPattern);
}
/**
......@@ -71,4 +71,16 @@ public final class MqttTopicUtil {
);
}
/**
* 获取处理完成之后的 topic
*
* @param topicTemplate topic 模板
* @return 获取处理完成之后的 topic
*/
public static String getTopicFilter(String topicTemplate) {
// 替换 ${name} 为 + 替换 #{name} 为 #
return topicTemplate.replaceAll("\\$\\{[\\s\\w.]+}", "+")
.replaceAll("#\\{[\\s\\w.]+}", "#");
}
}
package net.dreamlu.iot.mqtt.mica.listener;
import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/**
* 客户端消息监听的另一种方式
*
* @author L.cm
*/
@Service
@MqttClientSubscribe("/test/#")
public class MqttClientMessageListener implements IMqttClientMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientMessageListener.class);
@Override
public void onMessage(String topic, ByteBuffer payload) {
logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));
}
}
......@@ -9,6 +9,11 @@ import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
/**
* 客户端消息监听
*
* @author L.cm
*/
@Service
public class MqttClientSubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);
......
......@@ -5,6 +5,9 @@ spring:
name: mica-mqtt-broker
profiles:
active: dev
mvc:
pathmatch:
matching-strategy: ant_path_matcher
# actuator management
management:
info:
......
......@@ -27,7 +27,7 @@ import java.lang.annotation.*;
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface MqttClientSubscribe {
/**
......
......@@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.spring.client;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.core.client.MqttClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
......@@ -45,35 +46,44 @@ public class MqttClientSubscribeDetector implements BeanPostProcessor {
MqttClient mqttClient = clientTemplate.getMqttClient();
Objects.requireNonNull(mqttClient, "MqttClient is null.");
Class<?> userClass = ClassUtils.getUserClass(bean);
ReflectionUtils.doWithMethods(userClass, method -> {
MqttClientSubscribe subscribe = AnnotationUtils.findAnnotation(method, MqttClientSubscribe.class);
// 1. 查找类
if (bean instanceof IMqttClientMessageListener) {
MqttClientSubscribe subscribe = AnnotationUtils.findAnnotation(userClass, MqttClientSubscribe.class);
if (subscribe != null) {
// 1. 校验必须为 public 和非 static 的方法
int modifiers = method.getModifiers();
if (Modifier.isStatic(modifiers)) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must not static.");
}
if (!Modifier.isPublic(modifiers)) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must public.");
}
// 2. 校验 method 入参数必须等于2
int paramCount = method.getParameterCount();
if (paramCount != 2) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter count must equal to 2.");
}
// 3. 校验 method 入参类型必须为 String、ByteBuffer
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> topicParamType = parameterTypes[0];
Class<?> payloadParamType = parameterTypes[1];
if (String.class != topicParamType || ByteBuffer.class != payloadParamType) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter type must String and ByteBuffer.");
}
// 4. 订阅
mqttClient.subscribe(subscribe.value(), subscribe.qos(), (topic, payload) ->
ReflectionUtils.invokeMethod(method, bean, topic, payload)
);
clientTemplate.subscribe(subscribe.value(), subscribe.qos(), (IMqttClientMessageListener) bean);
}
}, ReflectionUtils.USER_DECLARED_METHODS);
} else {
// 2. 查找方法
ReflectionUtils.doWithMethods(userClass, method -> {
MqttClientSubscribe subscribe = AnnotationUtils.findAnnotation(method, MqttClientSubscribe.class);
if (subscribe != null) {
// 1. 校验必须为 public 和非 static 的方法
int modifiers = method.getModifiers();
if (Modifier.isStatic(modifiers)) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must not static.");
}
if (!Modifier.isPublic(modifiers)) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must public.");
}
// 2. 校验 method 入参数必须等于2
int paramCount = method.getParameterCount();
if (paramCount != 2) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter count must equal to 2.");
}
// 3. 校验 method 入参类型必须为 String、ByteBuffer
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> topicParamType = parameterTypes[0];
Class<?> payloadParamType = parameterTypes[1];
if (String.class != topicParamType || ByteBuffer.class != payloadParamType) {
throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter type must String and ByteBuffer.");
}
// 4. 订阅
mqttClient.subscribe(subscribe.value(), subscribe.qos(), (topic, payload) ->
ReflectionUtils.invokeMethod(method, bean, topic, payload)
);
}
}, ReflectionUtils.USER_DECLARED_METHODS);
}
return bean;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册