diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java index 8ca68e78925d4012f66e9dd9dc5e1e24af74268f..97af7d068fbbb981655ed4dad957e09a8978dddd 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java @@ -22,7 +22,7 @@ import net.dreamlu.iot.mqtt.broker.util.RedisUtil; import net.dreamlu.iot.mqtt.core.server.model.Message; import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer; import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore; -import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil; +import net.dreamlu.iot.mqtt.core.util.TopicUtil; import net.dreamlu.mica.redis.cache.MicaRedisCache; import java.util.ArrayList; @@ -71,7 +71,7 @@ public class RedisMqttMessageStore implements IMqttMessageStore { @Override public List getRetainMessage(String topicFilter) { List retainMessageList = new ArrayList<>(); - Pattern topicPattern = MqttTopicUtil.getTopicPattern(topicFilter); + Pattern topicPattern = TopicUtil.getTopicPattern(topicFilter); RedisKeys redisKey = RedisKeys.MESSAGE_STORE_RETAIN; String redisKeyPrefix = redisKey.getKey(); String redisKeyPattern = redisKeyPrefix.concat(RedisUtil.getTopicPattern(topicFilter)); diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientSubscription.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientSubscription.java index f490ac03d6e8e9d0382f64d6df8fb57e4377017d..e16beb312b2d2925d0e556cf5187ef2ec0b8d242 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientSubscription.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttClientSubscription.java @@ -18,7 +18,7 @@ package net.dreamlu.iot.mqtt.core.client; import net.dreamlu.iot.mqtt.codec.MqttQoS; import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription; -import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil; +import net.dreamlu.iot.mqtt.core.util.TopicUtil; import java.io.Serializable; import java.util.Objects; @@ -40,7 +40,7 @@ public final class MqttClientSubscription implements Serializable { IMqttClientMessageListener listener) { this.mqttQoS = Objects.requireNonNull(mqttQoS, "MQTT subscribe mqttQoS is null."); this.topicFilter = Objects.requireNonNull(topicFilter, "MQTT subscribe topicFilter is null."); - this.topicRegex = MqttTopicUtil.getTopicPattern(topicFilter); + this.topicRegex = TopicUtil.getTopicPattern(topicFilter); this.listener = Objects.requireNonNull(listener, "MQTT subscribe listener is null."); } diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java index 5edaa9642ec80cfc4882ef1bcf1b83e91f63dda3..a9d14bc5fd7069b5b0f89f12233df5165b2fe147 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/client/MqttWillMessage.java @@ -139,7 +139,7 @@ public final class MqttWillMessage { @Override public int hashCode() { - return Objects.hash(topic, message, retain, qos); + return Objects.hash(topic, Arrays.hashCode(message), retain, qos); } @Override diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java index 57e9c2b288dd43a3cc36a1c1d25a489db1041a9d..e583239965eee127308deb9a5f468adf324f3f67 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/session/InMemoryMqttSessionManager.java @@ -19,7 +19,7 @@ package net.dreamlu.iot.mqtt.core.server.session; import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish; import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish; import net.dreamlu.iot.mqtt.core.server.model.Subscribe; -import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil; +import net.dreamlu.iot.mqtt.core.util.TopicUtil; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -86,7 +86,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { Integer qosValue = null; Set topicFilterSet = subscribeStore.keySet(); for (String topicFilter : topicFilterSet) { - if (MqttTopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) { + if (TopicUtil.getTopicPattern(topicFilter).matcher(topicName).matches()) { ConcurrentMap data = subscribeStore.get(topicFilter); if (data != null && !data.isEmpty()) { Integer mqttQoS = data.get(clientId); @@ -109,7 +109,7 @@ public class InMemoryMqttSessionManager implements IMqttSessionManager { Map subscribeMap = new HashMap<>(32); Set topicFilterSet = subscribeStore.keySet(); for (String topicFilter : topicFilterSet) { - if (MqttTopicUtil.match(topicFilter, topicName)) { + if (TopicUtil.match(topicFilter, topicName)) { ConcurrentMap data = subscribeStore.get(topicFilter); if (data != null && !data.isEmpty()) { data.forEach((clientId, qos) -> { diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/store/InMemoryMqttMessageStore.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/store/InMemoryMqttMessageStore.java index 111c4d1f45341133805e30d519b7b154f9c03c94..05f00a66487d12fd234eba89b37a7c303d93c87a 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/store/InMemoryMqttMessageStore.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/server/store/InMemoryMqttMessageStore.java @@ -18,7 +18,7 @@ package net.dreamlu.iot.mqtt.core.server.store; import net.dreamlu.iot.mqtt.core.server.model.Message; -import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil; +import net.dreamlu.iot.mqtt.core.util.TopicUtil; import java.util.ArrayList; import java.util.List; @@ -72,7 +72,7 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore { @Override public List getRetainMessage(String topicFilter) { - Pattern topicPattern = MqttTopicUtil.getTopicPattern(topicFilter); + Pattern topicPattern = TopicUtil.getTopicPattern(topicFilter); List retainMessageList = new ArrayList<>(); retainStore.forEach((topic, message) -> { if (topicPattern.matcher(topic).matches()) { diff --git a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/MqttTopicUtil.java b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/TopicUtil.java similarity index 83% rename from mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/MqttTopicUtil.java rename to mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/TopicUtil.java index 9d54279948a2b8898542e4a18eaaa9b9c766013f..bc8308796bfe574433ded0f3484b61194f794a57 100644 --- a/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/MqttTopicUtil.java +++ b/mica-mqtt-core/src/main/java/net/dreamlu/iot/mqtt/core/util/TopicUtil.java @@ -27,7 +27,7 @@ import java.util.regex.Pattern; * * @author L.cm */ -public final class MqttTopicUtil { +public final class TopicUtil { private static final Map TOPIC_FILTER_PATTERN_CACHE = new ConcurrentHashMap<>(32); /** @@ -52,7 +52,7 @@ public final class MqttTopicUtil { * @return Pattern */ public static Pattern getTopicPattern(String topicFilter) { - return TOPIC_FILTER_PATTERN_CACHE.computeIfAbsent(topicFilter, MqttTopicUtil::getTopicFilterPattern); + return TOPIC_FILTER_PATTERN_CACHE.computeIfAbsent(topicFilter, TopicUtil::getTopicFilterPattern); } /** @@ -71,4 +71,16 @@ public final class MqttTopicUtil { ); } + /** + * 获取处理完成之后的 topic + * + * @param topicTemplate topic 模板 + * @return 获取处理完成之后的 topic + */ + public static String getTopicFilter(String topicTemplate) { + // 替换 ${name} 为 + 替换 #{name} 为 # + return topicTemplate.replaceAll("\\$\\{[\\s\\w.]+}", "+") + .replaceAll("#\\{[\\s\\w.]+}", "#"); + } + } diff --git a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientMessageListener.java b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientMessageListener.java new file mode 100644 index 0000000000000000000000000000000000000000..519e2fddc59dbaaac9b4c1a436ddf2398a847d97 --- /dev/null +++ b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientMessageListener.java @@ -0,0 +1,27 @@ +package net.dreamlu.iot.mqtt.mica.listener; + +import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; +import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener; +import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.nio.ByteBuffer; + +/** + * 客户端消息监听的另一种方式 + * + * @author L.cm + */ +@Service +@MqttClientSubscribe("/test/#") +public class MqttClientMessageListener implements IMqttClientMessageListener { + private static final Logger logger = LoggerFactory.getLogger(MqttClientMessageListener.class); + + @Override + public void onMessage(String topic, ByteBuffer payload) { + logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload)); + } +} + diff --git a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientSubscribeListener.java b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientSubscribeListener.java index 0fab52db798f11eb3116a862076d8c938cccd0ba..b0320b155348a7c5b18f34c6c1f82189fbc694d2 100644 --- a/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientSubscribeListener.java +++ b/mica-mqtt-spring-boot-example/src/main/java/net/dreamlu/iot/mqtt/mica/listener/MqttClientSubscribeListener.java @@ -9,6 +9,11 @@ import org.springframework.stereotype.Service; import java.nio.ByteBuffer; +/** + * 客户端消息监听 + * + * @author L.cm + */ @Service public class MqttClientSubscribeListener { private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class); diff --git a/mica-mqtt-spring-boot-example/src/main/resources/application.yml b/mica-mqtt-spring-boot-example/src/main/resources/application.yml index 01810673c520c31fd94efaf1feb2d957f4146184..dc39a1b87d0bc98f7a62eaa3fc3207a965b1ae83 100644 --- a/mica-mqtt-spring-boot-example/src/main/resources/application.yml +++ b/mica-mqtt-spring-boot-example/src/main/resources/application.yml @@ -5,6 +5,9 @@ spring: name: mica-mqtt-broker profiles: active: dev + mvc: + pathmatch: + matching-strategy: ant_path_matcher # actuator management management: info: diff --git a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribe.java b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribe.java index be403bbcc770a3ee47c8f63c1e2dc8940f938d53..83253a3caa8f7295ddf6ba00f78c7814878d3001 100644 --- a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribe.java +++ b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribe.java @@ -27,7 +27,7 @@ import java.lang.annotation.*; */ @Documented @Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) +@Target({ElementType.TYPE, ElementType.METHOD}) public @interface MqttClientSubscribe { /** diff --git a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribeDetector.java b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribeDetector.java index d39881b9a908697b198da6a50575567583facdfd..15b3ff0950805ac52f73a2c873ac83c9aee162e0 100644 --- a/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribeDetector.java +++ b/mica-mqtt-spring-boot-starter/src/main/java/net/dreamlu/iot/mqtt/spring/client/MqttClientSubscribeDetector.java @@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.spring.client; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener; import net.dreamlu.iot.mqtt.core.client.MqttClient; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -45,35 +46,44 @@ public class MqttClientSubscribeDetector implements BeanPostProcessor { MqttClient mqttClient = clientTemplate.getMqttClient(); Objects.requireNonNull(mqttClient, "MqttClient is null."); Class userClass = ClassUtils.getUserClass(bean); - ReflectionUtils.doWithMethods(userClass, method -> { - MqttClientSubscribe subscribe = AnnotationUtils.findAnnotation(method, MqttClientSubscribe.class); + // 1. 查找类 + if (bean instanceof IMqttClientMessageListener) { + MqttClientSubscribe subscribe = AnnotationUtils.findAnnotation(userClass, MqttClientSubscribe.class); if (subscribe != null) { - // 1. 校验必须为 public 和非 static 的方法 - int modifiers = method.getModifiers(); - if (Modifier.isStatic(modifiers)) { - throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must not static."); - } - if (!Modifier.isPublic(modifiers)) { - throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must public."); - } - // 2. 校验 method 入参数必须等于2 - int paramCount = method.getParameterCount(); - if (paramCount != 2) { - throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter count must equal to 2."); - } - // 3. 校验 method 入参类型必须为 String、ByteBuffer - Class[] parameterTypes = method.getParameterTypes(); - Class topicParamType = parameterTypes[0]; - Class payloadParamType = parameterTypes[1]; - if (String.class != topicParamType || ByteBuffer.class != payloadParamType) { - throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter type must String and ByteBuffer."); - } - // 4. 订阅 - mqttClient.subscribe(subscribe.value(), subscribe.qos(), (topic, payload) -> - ReflectionUtils.invokeMethod(method, bean, topic, payload) - ); + clientTemplate.subscribe(subscribe.value(), subscribe.qos(), (IMqttClientMessageListener) bean); } - }, ReflectionUtils.USER_DECLARED_METHODS); + } else { + // 2. 查找方法 + ReflectionUtils.doWithMethods(userClass, method -> { + MqttClientSubscribe subscribe = AnnotationUtils.findAnnotation(method, MqttClientSubscribe.class); + if (subscribe != null) { + // 1. 校验必须为 public 和非 static 的方法 + int modifiers = method.getModifiers(); + if (Modifier.isStatic(modifiers)) { + throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must not static."); + } + if (!Modifier.isPublic(modifiers)) { + throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " must public."); + } + // 2. 校验 method 入参数必须等于2 + int paramCount = method.getParameterCount(); + if (paramCount != 2) { + throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter count must equal to 2."); + } + // 3. 校验 method 入参类型必须为 String、ByteBuffer + Class[] parameterTypes = method.getParameterTypes(); + Class topicParamType = parameterTypes[0]; + Class payloadParamType = parameterTypes[1]; + if (String.class != topicParamType || ByteBuffer.class != payloadParamType) { + throw new IllegalArgumentException("@MqttClientSubscribe on method " + method + " parameter type must String and ByteBuffer."); + } + // 4. 订阅 + mqttClient.subscribe(subscribe.value(), subscribe.qos(), (topic, payload) -> + ReflectionUtils.invokeMethod(method, bean, topic, payload) + ); + } + }, ReflectionUtils.USER_DECLARED_METHODS); + } return bean; }