提交 6aadba08 编写于 作者: 如梦技术's avatar 如梦技术 🐛

添加 mica-mqtt-broker,待完善。

上级 35be2f17
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
- [x] 支持 GraalVM 编译成本机可执行程序。 - [x] 支持 GraalVM 编译成本机可执行程序。
- [x] 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。 - [x] 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。
- [x] mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。 - [x] mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。
- [x] 基于 redis pub/sub 实现集群,详见 [mica-mqtt-broker 模块](mica-mqtt-broker)
## 待办 ## 待办
- [ ] 优化处理 mqtt session,以及支持 v5.0 - [ ] 优化处理 mqtt session,以及支持 v5.0
......
# mica-mqtt-broker 文档
## 功能
- 基于 redis pub/sub 实现集群。
- redis 客户端状态存储。
...@@ -53,6 +53,11 @@ ...@@ -53,6 +53,11 @@
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId> <artifactId>micrometer-registry-prometheus</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
......
...@@ -18,8 +18,6 @@ package net.dreamlu.iot.mqtt.broker; ...@@ -18,8 +18,6 @@ package net.dreamlu.iot.mqtt.broker;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.scheduling.annotation.EnableScheduling;
/** /**
* mica mqtt broker * mica mqtt broker
...@@ -27,8 +25,6 @@ import org.springframework.scheduling.annotation.EnableScheduling; ...@@ -27,8 +25,6 @@ import org.springframework.scheduling.annotation.EnableScheduling;
* @author L.cm * @author L.cm
*/ */
@SpringBootApplication @SpringBootApplication
@EnableScheduling
@EnableCaching
public class MqttBrokerApplication { public class MqttBrokerApplication {
public static void main(String[] args) { public static void main(String[] args) {
......
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.broker.cluster;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.mica.core.utils.JsonUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.util.Objects;
/**
* redis 消息转发器
*
* @author L.cm
*/
public class RedisMqttMessageDispatcher implements IMqttMessageDispatcher {
private final RedisTemplate<String, Object> redisTemplate;
private final byte[] channelBytes;
public RedisMqttMessageDispatcher(MicaRedisCache redisCache, String channel) {
this.redisTemplate = redisCache.getRedisTemplate();
this.channelBytes = RedisSerializer.string().serialize(Objects.requireNonNull(channel, "Redis pub/sub channel is null."));
}
@Override
public boolean send(Message message) {
// 手动序列化和反序列化,避免 redis 序列化不一致问题
final byte[] messageBytes = JsonUtil.toJsonAsBytes(message);
redisTemplate.execute((RedisCallback<Long>) connection -> connection.publish(channelBytes, messageBytes));
return true;
}
@Override
public boolean send(String clientId, Message message) {
message.setClientId(clientId);
return send(message);
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.broker.cluster;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.mica.core.utils.JsonUtil;
import net.dreamlu.mica.core.utils.StringUtil;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* 监听集群消息
*
* @author L.cm
*/
public class RedisMqttMessageReceiver implements MessageListener, InitializingBean {
private final RedisTemplate<String, Object> redisTemplate;
private final String channel;
private final MqttServer mqttServer;
public RedisMqttMessageReceiver(MicaRedisCache redisCache,
String channel,
MqttServer mqttServer) {
this.redisTemplate = redisCache.getRedisTemplate();
this.channel = Objects.requireNonNull(channel, "Redis pub/sub channel is null.");
this.mqttServer = mqttServer;
}
@Override
public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) {
byte[] messageBody = message.getBody();
// 手动序列化和反序列化,避免 redis 序列化不一致问题
Message mqttMessage = JsonUtil.readValue(messageBody, Message.class);
if (mqttMessage == null) {
return;
}
String clientId = mqttMessage.getClientId();
String topic = mqttMessage.getTopic();
MqttQoS mqttQoS = MqttQoS.valueOf(mqttMessage.getQos());
boolean retain = mqttMessage.isRetain();
if (StringUtil.isBlank(clientId)) {
mqttServer.publishAll(topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain);
} else {
mqttServer.publish(clientId, topic, ByteBuffer.wrap(mqttMessage.getPayload()), mqttQoS, retain);
}
}
@Override
public void afterPropertiesSet() throws Exception {
byte[] channelBytes = RedisSerializer.string().serialize(channel);
redisTemplate.execute((RedisCallback<Void>) connection -> {
connection.subscribe(RedisMqttMessageReceiver.this, channelBytes);
return null;
});
}
}
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.broker.cluster;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.broker.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
/**
* redis mqtt 遗嘱和保留消息存储
*
* @author L.cm
*/
@RequiredArgsConstructor
public class RedisMqttMessageStore implements IMqttMessageStore {
private final MicaRedisCache redisCache;
@Override
public boolean addWillMessage(String clientId, Message message) {
redisCache.set(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId), message);
return true;
}
@Override
public boolean clearWillMessage(String clientId) {
redisCache.del(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId));
return true;
}
@Override
public Message getWillMessage(String clientId) {
return redisCache.get(RedisKeys.MESSAGE_STORE_WILL.getKey(clientId));
}
@Override
public boolean addRetainMessage(String topic, Message message) {
redisCache.set(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topic), message);
return true;
}
@Override
public boolean clearRetainMessage(String topic) {
redisCache.del(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topic));
return true;
}
@Override
public Message getRetainMessage(String topic) {
return null;
}
}
...@@ -16,9 +16,15 @@ ...@@ -16,9 +16,15 @@
package net.dreamlu.iot.mqtt.broker.config; package net.dreamlu.iot.mqtt.broker.config;
import net.dreamlu.iot.mqtt.broker.cluster.RedisMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.broker.cluster.RedisMqttMessageReceiver;
import net.dreamlu.iot.mqtt.broker.enums.RedisKeys;
import net.dreamlu.iot.mqtt.broker.listener.MqttBrokerConnectListener;
import net.dreamlu.iot.mqtt.broker.listener.MqttBrokerMessageListener; import net.dreamlu.iot.mqtt.broker.listener.MqttBrokerMessageListener;
import net.dreamlu.iot.mqtt.core.server.MqttServer;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.support.DefaultMqttMessageDispatcher; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -31,14 +37,24 @@ import org.springframework.context.annotation.Configuration; ...@@ -31,14 +37,24 @@ import org.springframework.context.annotation.Configuration;
public class MqttBrokerConfiguration { public class MqttBrokerConfiguration {
@Bean @Bean
public IMqttMessageDispatcher messageDispatcher() { public IMqttConnectStatusListener mqttBrokerConnectListener(MicaRedisCache redisCache) {
// TODO L.cm 此处采用 redis 实现广播 return new MqttBrokerConnectListener(redisCache, RedisKeys.CONNECT_STATUS.getKey());
return new DefaultMqttMessageDispatcher();
} }
@Bean @Bean
public MqttBrokerMessageListener brokerMessageListener(IMqttMessageDispatcher dispatcher) { public RedisMqttMessageReceiver mqttMessageReceiver(MicaRedisCache redisCache,
return new MqttBrokerMessageListener(dispatcher); MqttServer mqttServer) {
return new RedisMqttMessageReceiver(redisCache, RedisKeys.REDIS_CHANNEL.getKey(), mqttServer);
}
@Bean
public IMqttMessageDispatcher mqttMessageDispatcher(MicaRedisCache redisCache) {
return new RedisMqttMessageDispatcher(redisCache, RedisKeys.REDIS_CHANNEL.getKey());
}
@Bean
public MqttBrokerMessageListener brokerMessageListener(IMqttMessageDispatcher mqttMessageDispatcher) {
return new MqttBrokerMessageListener(mqttMessageDispatcher);
} }
} }
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dreamlu.iot.mqtt.broker.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* redis key 汇总,方便统一处理
*
* @author L.cm
*/
@Getter
@RequiredArgsConstructor
public enum RedisKeys {
/**
* mqtt <-> redis pug/sub 消息交互
*/
REDIS_CHANNEL("mqtt:channel:exchange"),
/**
* 连接状态存储
*/
CONNECT_STATUS("mqtt:connect:status"),
/**
* 遗嘱消息存储
*/
MESSAGE_STORE_WILL("mqtt:messages:will:"),
/**
* 保留消息存储
*/
MESSAGE_STORE_RETAIN("mqtt:messages:retain:"),
;
private final String key;
/**
* 用于拼接后缀
*
* @param suffix 后缀
* @return 完整的 redis key
*/
public String getKey(String suffix) {
return this.key.concat(suffix);
}
}
...@@ -16,21 +16,27 @@ ...@@ -16,21 +16,27 @@
package net.dreamlu.iot.mqtt.broker.listener; package net.dreamlu.iot.mqtt.broker.listener;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.mica.redis.cache.MicaRedisCache;
/** /**
* mqtt 连接监听 * mqtt 连接监听
* *
* @author L.cm * @author L.cm
*/ */
@RequiredArgsConstructor
public class MqttBrokerConnectListener implements IMqttConnectStatusListener { public class MqttBrokerConnectListener implements IMqttConnectStatusListener {
private final MicaRedisCache redisCache;
private final String connectStatusKey;
@Override @Override
public void online(String clientId) { public void online(String clientId) {
redisCache.sAdd(connectStatusKey, clientId);
} }
@Override @Override
public void offline(String clientId) { public void offline(String clientId) {
redisCache.sRem(connectStatusKey, clientId);
} }
} }
...@@ -46,7 +46,7 @@ public class MqttBrokerMessageListener implements IMqttMessageListener { ...@@ -46,7 +46,7 @@ public class MqttBrokerMessageListener implements IMqttMessageListener {
message.setPayload(payload.array()); message.setPayload(payload.array());
} }
message.setMessageType(MqttMessageType.PUBLISH.value()); message.setMessageType(MqttMessageType.PUBLISH.value());
message.setStoreTime(System.currentTimeMillis()); message.setTimestamp(System.currentTimeMillis());
dispatcher.send(message); dispatcher.send(message);
} }
} }
...@@ -22,6 +22,7 @@ management: ...@@ -22,6 +22,7 @@ management:
web: web:
exposure: exposure:
include: '*' include: '*'
# 日志配置
logging: logging:
level: level:
root: info root: info
......
...@@ -58,7 +58,7 @@ public class Message implements Serializable { ...@@ -58,7 +58,7 @@ public class Message implements Serializable {
/** /**
* 存储时间 * 存储时间
*/ */
private long storeTime; private long timestamp;
public String getClientId() { public String getClientId() {
return clientId; return clientId;
...@@ -116,12 +116,12 @@ public class Message implements Serializable { ...@@ -116,12 +116,12 @@ public class Message implements Serializable {
this.payload = payload; this.payload = payload;
} }
public long getStoreTime() { public long getTimestamp() {
return storeTime; return timestamp;
} }
public void setStoreTime(long storeTime) { public void setTimestamp(long timestamp) {
this.storeTime = storeTime; this.timestamp = timestamp;
} }
@Override @Override
...@@ -137,7 +137,7 @@ public class Message implements Serializable { ...@@ -137,7 +137,7 @@ public class Message implements Serializable {
qos == message.qos && qos == message.qos &&
retain == message.retain && retain == message.retain &&
dup == message.dup && dup == message.dup &&
storeTime == message.storeTime && timestamp == message.timestamp &&
Objects.equals(clientId, message.clientId) && Objects.equals(clientId, message.clientId) &&
Objects.equals(topic, message.topic) && Objects.equals(topic, message.topic) &&
Arrays.equals(payload, message.payload); Arrays.equals(payload, message.payload);
...@@ -145,7 +145,7 @@ public class Message implements Serializable { ...@@ -145,7 +145,7 @@ public class Message implements Serializable {
@Override @Override
public int hashCode() { public int hashCode() {
int result = Objects.hash(clientId, messageType, topic, qos, retain, dup, storeTime); int result = Objects.hash(clientId, messageType, topic, qos, retain, dup, timestamp);
result = 31 * result + Arrays.hashCode(payload); result = 31 * result + Arrays.hashCode(payload);
return result; return result;
} }
...@@ -160,7 +160,7 @@ public class Message implements Serializable { ...@@ -160,7 +160,7 @@ public class Message implements Serializable {
", retain=" + retain + ", retain=" + retain +
", dup=" + dup + ", dup=" + dup +
", payload=" + Arrays.toString(payload) + ", payload=" + Arrays.toString(payload) +
", storeTime=" + storeTime + ", timestamp=" + timestamp +
'}'; '}';
} }
} }
...@@ -70,9 +70,9 @@ public interface IMqttMessageStore { ...@@ -70,9 +70,9 @@ public interface IMqttMessageStore {
/** /**
* 获取所有 retain 消息 * 获取所有 retain 消息
* *
* @param topic topic * @param topicFilter topicFilter
* @return Message * @return Message
*/ */
Message getRetainMessage(String topic); Message getRetainMessage(String topicFilter);
} }
...@@ -67,8 +67,9 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore { ...@@ -67,8 +67,9 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore {
} }
@Override @Override
public Message getRetainMessage(String topic) { public Message getRetainMessage(String topicFilter) {
return retainStore.get(topic); // TODO L.cm 改成匹配
return retainStore.get(topicFilter);
} }
} }
...@@ -270,7 +270,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { ...@@ -270,7 +270,7 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor {
topicList.add(topicName); topicList.add(topicName);
sessionManager.addSubscribe(topicName, clientId, mqttQoS); sessionManager.addSubscribe(topicName, clientId, mqttQoS);
} }
logger.info("Subscribe - clientId:{} Topic:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId); logger.info("Subscribe - clientId:{} TopicFilters:{} mqttQoS:{} messageId:{}", clientId, topicList, mqttQosList, messageId);
// 3. 返回 ack // 3. 返回 ack
MqttMessage subAckMessage = MqttMessageBuilders.subAck() MqttMessage subAckMessage = MqttMessageBuilders.subAck()
.addGrantedQosList(mqttQosList) .addGrantedQosList(mqttQosList)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册