提交 df190793 编写于 作者: 如梦技术's avatar 如梦技术 🐛

mica-mqtt-broker 调整默认的序列化方式。

上级 f666adbd
......@@ -18,6 +18,7 @@ package net.dreamlu.iot.mqtt.broker.cluster;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer;
import net.dreamlu.mica.core.utils.JsonUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.data.redis.core.RedisCallback;
......@@ -33,17 +34,21 @@ import java.util.Objects;
*/
public class RedisMqttMessageDispatcher implements IMqttMessageDispatcher {
private final RedisTemplate<String, Object> redisTemplate;
private final IMessageSerializer messageSerializer;
private final byte[] channelBytes;
public RedisMqttMessageDispatcher(MicaRedisCache redisCache, String channel) {
public RedisMqttMessageDispatcher(MicaRedisCache redisCache,
IMessageSerializer messageSerializer,
String channel) {
this.redisTemplate = redisCache.getRedisTemplate();
this.messageSerializer = messageSerializer;
this.channelBytes = RedisSerializer.string().serialize(Objects.requireNonNull(channel, "Redis pub/sub channel is null."));
}
@Override
public boolean send(Message message) {
// 手动序列化和反序列化,避免 redis 序列化不一致问题
final byte[] messageBytes = JsonUtil.toJsonAsBytes(message);
final byte[] messageBytes = messageSerializer.serialize(message);
redisTemplate.execute((RedisCallback<Long>) connection -> connection.publish(channelBytes, messageBytes));
return true;
}
......
......@@ -20,6 +20,7 @@ import net.dreamlu.iot.mqtt.broker.service.IMqttMessageService;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.mica.core.utils.JsonUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
......@@ -39,16 +40,19 @@ import java.util.Objects;
*/
public class RedisMqttMessageReceiver implements MessageListener, InitializingBean {
private final RedisTemplate<String, Object> redisTemplate;
private final IMessageSerializer messageSerializer;
private final String channel;
private final MqttServer mqttServer;
private final IMqttSessionManager sessionManager;
private final IMqttMessageService messageService;
public RedisMqttMessageReceiver(MicaRedisCache redisCache,
IMessageSerializer messageSerializer,
String channel,
MqttServer mqttServer,
IMqttMessageService messageService) {
this.redisTemplate = redisCache.getRedisTemplate();
this.messageSerializer = messageSerializer;
this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null.");
this.mqttServer = mqttServer;
this.sessionManager = mqttServer.getServerCreator().getSessionManager();
......@@ -59,7 +63,7 @@ public class RedisMqttMessageReceiver implements MessageListener, InitializingBe
public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) {
byte[] messageBody = message.getBody();
// 手动序列化和反序列化,避免 redis 序列化不一致问题
Message mqttMessage = JsonUtil.readValue(messageBody, Message.class);
Message mqttMessage = messageSerializer.deserialize(messageBody);
if (mqttMessage == null) {
return;
}
......
......@@ -20,9 +20,13 @@ import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.broker.enums.RedisKeys;
import net.dreamlu.iot.mqtt.broker.util.RedisUtil;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.util.MqttTopicUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.util.ArrayList;
import java.util.List;
......@@ -36,10 +40,12 @@ import java.util.regex.Pattern;
@RequiredArgsConstructor
public class RedisMqttMessageStore implements IMqttMessageStore {
private final MicaRedisCache redisCache;
private final IMessageSerializer messageSerializer;
@Override
public boolean addWillMessage(String clientId, Message message) {
redisCache.set(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId), message);
byte[] value = messageSerializer.serialize(message);
redis((redis) -> redis.set(keySerialize(RedisKeys.MESSAGE_STORE_WILL, clientId), value));
return true;
}
......@@ -51,12 +57,14 @@ public class RedisMqttMessageStore implements IMqttMessageStore {
@Override
public Message getWillMessage(String clientId) {
return redisCache.get(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId));
byte[] value = redis((redis) -> redis.get(keySerialize(RedisKeys.MESSAGE_STORE_WILL, clientId)));
return messageSerializer.deserialize(value);
}
@Override
public boolean addRetainMessage(String topic, Message message) {
redisCache.set(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topic), message);
byte[] value = messageSerializer.serialize(message);
redis((redis) -> redis.set(keySerialize(RedisKeys.MESSAGE_STORE_RETAIN, topic), value));
return true;
}
......@@ -77,10 +85,27 @@ public class RedisMqttMessageStore implements IMqttMessageStore {
redisCache.scan(redisKeyPattern, (key) -> {
String keySuffix = key.substring(keyPrefixLength);
if (topicPattern.matcher(keySuffix).matches()) {
retainMessageList.add(redisCache.get(key));
byte[] value = redis((redis) -> redis.get(keySerialize(key)));
Message message = messageSerializer.deserialize(value);
if (message != null) {
retainMessageList.add(message);
}
}
});
return retainMessageList;
}
private byte[] keySerialize(String redisKey) {
return RedisSerializer.string().serialize(redisKey);
}
private byte[] keySerialize(RedisKeys suffix, String clientId) {
return RedisSerializer.string().serialize(suffix.getKey(clientId));
}
private <T> T redis(RedisCallback<T> callback) {
RedisTemplate<String, Object> redisTemplate = redisCache.getRedisTemplate();
return redisTemplate.execute(callback);
}
}
......@@ -26,6 +26,8 @@ import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.broker.MqttBrokerMessageListener;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.serializer.DefaultMessageSerializer;
import net.dreamlu.iot.mqtt.core.server.serializer.IMessageSerializer;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.context.ApplicationContext;
......@@ -40,6 +42,11 @@ import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MqttBrokerConfiguration {
@Bean
public IMessageSerializer messageSerializer() {
return DefaultMessageSerializer.INSTANCE;
}
@Bean
public IMqttConnectStatusListener mqttBrokerConnectListener(ApplicationContext context,
MicaRedisCache redisCache) {
......@@ -47,20 +54,22 @@ public class MqttBrokerConfiguration {
}
@Bean
public IMqttMessageStore mqttMessageStore(MicaRedisCache redisCache) {
return new RedisMqttMessageStore(redisCache);
public IMqttMessageStore mqttMessageStore(MicaRedisCache redisCache, IMessageSerializer messageSerializer) {
return new RedisMqttMessageStore(redisCache, messageSerializer);
}
@Bean
public RedisMqttMessageReceiver mqttMessageReceiver(MicaRedisCache redisCache,
IMessageSerializer messageSerializer,
MqttServer mqttServer,
IMqttMessageService mqttMessageService) {
return new RedisMqttMessageReceiver(redisCache, RedisKeys.REDIS_CHANNEL.getKey(), mqttServer, mqttMessageService);
return new RedisMqttMessageReceiver(redisCache, messageSerializer, RedisKeys.REDIS_CHANNEL.getKey(), mqttServer, mqttMessageService);
}
@Bean
public IMqttMessageDispatcher mqttMessageDispatcher(MicaRedisCache redisCache) {
return new RedisMqttMessageDispatcher(redisCache, RedisKeys.REDIS_CHANNEL.getKey());
public IMqttMessageDispatcher mqttMessageDispatcher(MicaRedisCache redisCache,
IMessageSerializer messageSerializer) {
return new RedisMqttMessageDispatcher(redisCache, messageSerializer, RedisKeys.REDIS_CHANNEL.getKey());
}
@Bean
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册