From df190793e848d1faddc1a385d3df695c5d00e35e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Sun, 21 Nov 2021 14:51:12 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20mica-mqtt-broker=20=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E9=BB=98=E8=AE=A4=E7=9A=84=E5=BA=8F=E5=88=97=E5=8C=96?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/RedisMqttMessageDispatcher.java | 9 +++-- .../cluster/RedisMqttMessageReceiver.java | 6 +++- .../broker/cluster/RedisMqttMessageStore.java | 33 ++++++++++++++++--- .../config/MqttBrokerConfiguration.java | 19 ++++++++--- .../mqtt/broker/serializer/package-info.java | 1 - 5 files changed, 55 insertions(+), 13 deletions(-) delete mode 100644 mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/serializer/package-info.java diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageDispatcher.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageDispatcher.java index b45a065..2c18d65 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageDispatcher.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageDispatcher.java @@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.broker.cluster; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer; import net.dreamlu.mica.core.utils.JsonUtil; import net.dreamlu.mica.redis.cache.MicaRedisCache; import org.springframework.data.redis.core.RedisCallback; @@ -33,17 +34,21 @@ import java.util.Objects; */ public class RedisMqttMessageDispatcher implements IMqttMessageDispatcher { private final RedisTemplate redisTemplate; + private final IMessageSerializer messageSerializer; private final byte[] channelBytes; - public RedisMqttMessageDispatcher(MicaRedisCache redisCache, String channel) { + public RedisMqttMessageDispatcher(MicaRedisCache redisCache, + IMessageSerializer messageSerializer, + String channel) { this.redisTemplate = redisCache.getRedisTemplate(); + this.messageSerializer = messageSerializer; this.channelBytes = RedisSerializer.string().serialize(Objects.requireNonNull(channel, "Redis pub/sub channel is null.")); } @Override public boolean send(Message message) { // 手动序列化和反序列化,避免 redis 序列化不一致问题 - final byte[] messageBytes = JsonUtil.toJsonAsBytes(message); + final byte[] messageBytes = messageSerializer.serialize(message); redisTemplate.execute((RedisCallback) connection -> connection.publish(channelBytes, messageBytes)); return true; } diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java index 97463b9..d394502 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageReceiver.java @@ -20,6 +20,7 @@ import net.dreamlu.iot.mqtt.broker.service.IMqttMessageService; import net.dreamlu.iot.mqtt.codec.MqttMessageType; import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer; import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager; import net.dreamlu.mica.core.utils.JsonUtil; import net.dreamlu.mica.redis.cache.MicaRedisCache; @@ -39,16 +40,19 @@ import java.util.Objects; */ public class RedisMqttMessageReceiver implements MessageListener, InitializingBean { private final RedisTemplate redisTemplate; + private final IMessageSerializer messageSerializer; private final String channel; private final MqttServer mqttServer; private final IMqttSessionManager sessionManager; private final IMqttMessageService messageService; public RedisMqttMessageReceiver(MicaRedisCache redisCache, + IMessageSerializer messageSerializer, String channel, MqttServer mqttServer, IMqttMessageService messageService) { this.redisTemplate = redisCache.getRedisTemplate(); + this.messageSerializer = messageSerializer; this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null."); this.mqttServer = mqttServer; this.sessionManager = mqttServer.getServerCreator().getSessionManager(); @@ -59,7 +63,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) { byte[] messageBody = message.getBody(); // 手动序列化和反序列化,避免 redis 序列化不一致问题 - Message mqttMessage = JsonUtil.readValue(messageBody, Message.class); + Message mqttMessage = messageSerializer.deserialize(messageBody); if (mqttMessage == null) { return; } diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java index 57d9a96..c443aa5 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/cluster/RedisMqttMessageStore.java @@ -20,9 +20,13 @@ import lombok.RequiredArgsConstructor; import net.dreamlu.iot.mqtt.broker.enums.RedisKeys; import net.dreamlu.iot.mqtt.broker.util.RedisUtil; import net.dreamlu.iot.mqtt.core.server.model.Message; +import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer; import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore; import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil; import net.dreamlu.mica.redis.cache.MicaRedisCache; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; import java.util.ArrayList; import java.util.List; @@ -36,10 +40,12 @@ import java.util.regex.Pattern; @RequiredArgsConstructor public class RedisMqttMessageStore implements IMqttMessageStore { private final MicaRedisCache redisCache; + private final IMessageSerializer messageSerializer; @Override public boolean addWillMessage(String clientId, Message message) { - redisCache.set(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId), message); + byte[] value = messageSerializer.serialize(message); + redis((redis) -> redis.set(keySerialize(RedisKeys.MESSAGE_STORE_WILL, clientId), value)); return true; } @@ -51,12 +57,14 @@ public class RedisMqttMessageStore implements IMqttMessageStore { @Override public Message getWillMessage(String clientId) { - return redisCache.get(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId)); + byte[] value = redis((redis) -> redis.get(keySerialize(RedisKeys.MESSAGE_STORE_WILL, clientId))); + return messageSerializer.deserialize(value); } @Override public boolean addRetainMessage(String topic, Message message) { - redisCache.set(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topic), message); + byte[] value = messageSerializer.serialize(message); + redis((redis) -> redis.set(keySerialize(RedisKeys.MESSAGE_STORE_RETAIN, topic), value)); return true; } @@ -77,10 +85,27 @@ public class RedisMqttMessageStore implements IMqttMessageStore { redisCache.scan(redisKeyPattern, (key) -> { String keySuffix = key.substring(keyPrefixLength); if (topicPattern.matcher(keySuffix).matches()) { - retainMessageList.add(redisCache.get(key)); + byte[] value = redis((redis) -> redis.get(keySerialize(key))); + Message message = messageSerializer.deserialize(value); + if (message != null) { + retainMessageList.add(message); + } } }); return retainMessageList; } + private byte[] keySerialize(String redisKey) { + return RedisSerializer.string().serialize(redisKey); + } + + private byte[] keySerialize(RedisKeys suffix, String clientId) { + return RedisSerializer.string().serialize(suffix.getKey(clientId)); + } + + private T redis(RedisCallback callback) { + RedisTemplate redisTemplate = redisCache.getRedisTemplate(); + return redisTemplate.execute(callback); + } + } diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/config/MqttBrokerConfiguration.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/config/MqttBrokerConfiguration.java index 9622924..6d30f8a 100644 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/config/MqttBrokerConfiguration.java +++ b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/config/MqttBrokerConfiguration.java @@ -26,6 +26,8 @@ import net.dreamlu.iot.mqtt.core.server.MqttServer; import net.dreamlu.iot.mqtt.core.server.broker.MqttBrokerMessageListener; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; +import net.dreamlu.iot.mqtt.core.server.serializer.DefaultMessageSerializer; +import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer; import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore; import net.dreamlu.mica.redis.cache.MicaRedisCache; import org.springframework.context.ApplicationContext; @@ -40,6 +42,11 @@ import org.springframework.context.annotation.Configuration; @Configuration(proxyBeanMethods = false) public class MqttBrokerConfiguration { + @Bean + public IMessageSerializer messageSerializer() { + return DefaultMessageSerializer.INSTANCE; + } + @Bean public IMqttConnectStatusListener mqttBrokerConnectListener(ApplicationContext context, MicaRedisCache redisCache) { @@ -47,20 +54,22 @@ public class MqttBrokerConfiguration { } @Bean - public IMqttMessageStore mqttMessageStore(MicaRedisCache redisCache) { - return new RedisMqttMessageStore(redisCache); + public IMqttMessageStore mqttMessageStore(MicaRedisCache redisCache, IMessageSerializer messageSerializer) { + return new RedisMqttMessageStore(redisCache, messageSerializer); } @Bean public RedisMqttMessageReceiver mqttMessageReceiver(MicaRedisCache redisCache, + IMessageSerializer messageSerializer, MqttServer mqttServer, IMqttMessageService mqttMessageService) { - return new RedisMqttMessageReceiver(redisCache, RedisKeys.REDIS_CHANNEL.getKey(), mqttServer, mqttMessageService); + return new RedisMqttMessageReceiver(redisCache, messageSerializer, RedisKeys.REDIS_CHANNEL.getKey(), mqttServer, mqttMessageService); } @Bean - public IMqttMessageDispatcher mqttMessageDispatcher(MicaRedisCache redisCache) { - return new RedisMqttMessageDispatcher(redisCache, RedisKeys.REDIS_CHANNEL.getKey()); + public IMqttMessageDispatcher mqttMessageDispatcher(MicaRedisCache redisCache, + IMessageSerializer messageSerializer) { + return new RedisMqttMessageDispatcher(redisCache, messageSerializer, RedisKeys.REDIS_CHANNEL.getKey()); } @Bean diff --git a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/serializer/package-info.java b/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/serializer/package-info.java deleted file mode 100644 index 3c150d4..0000000 --- a/mica-mqtt-broker/src/main/java/net/dreamlu/iot/mqtt/broker/serializer/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package net.dreamlu.iot.mqtt.broker.serializer; -- GitLab