提交 3ac93bcc 编写于 作者: L luxurong

cancel relay update

上级 ae1b72d2
......@@ -2,7 +2,7 @@
## SMQTT是一款开源的MQTT消息代理Broker,
SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,
SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持插拔式接口,支持单机部署,支持容器化部署,
### smqtt目前拥有的功能如下:
1. 消息质量等级实现(支持qos0,qos1,qos2)
......@@ -32,6 +32,65 @@ SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持单机部署
## 快速开始
- main方式启动
引入依赖
```markdown
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
.build()
.startAwait();
```
阻塞式启动服务:
```markdown
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
.build()
.startAwait();
```
非阻塞式启动服务:
```markdown
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
.build()
.startAwait();
```
-- jar方式
- docker 方式
......
......@@ -24,7 +24,6 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
......
package com.github.quickmsg;
import com.github.quickmsg.common.bootstrap.BootstrapKey;
import com.github.quickmsg.common.config.SslContext;
import com.github.quickmsg.common.utils.PropertiesLoader;
import com.github.quickmsg.core.Bootstrap;
import io.netty.channel.WriteBufferWaterMark;
import java.util.Map;
......
package com.github.quickmsg;
import com.github.quickmsg.common.config.SslContext;
import com.github.quickmsg.core.Bootstrap;
import org.junit.Test;
import reactor.core.publisher.Sinks;
/**
* Unit test for simple App.
......@@ -11,18 +12,35 @@ public class BootstrapTest {
* Rigorous Test :-)
*/
@Test
public void TestBootstrap() {
public void TestBootstrap() throws InterruptedException {
Bootstrap.builder()
// Bootstrap.builder()
// .port(8555)
// .websocketPort(8999)
// .options(channelOptionMap -> {})
// .ssl(false)
// .sslContext(new SslContext("crt","key"))
// .isWebsocket(true)
// .wiretap(false)
// .httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
// .build()
// .startAwait();
// 启动服务
Bootstrap bootstrap = Bootstrap.builder()
.port(8555)
.websocketPort(8999)
// .options(channelOptionMap -> channelOptionMap.put())
.options(channelOptionMap -> {})
.ssl(false)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().httpPort(62212).accessLog(true).build())
.wiretap(true)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
.build()
.startAwait();
.start().block();
assert bootstrap != null;
// 关闭服务
// bootstrap.shutdown();
Thread.sleep(1000000);
}
}
......@@ -12,11 +12,11 @@ import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
/**
......@@ -44,7 +44,7 @@ public class MqttChannel {
private long keepalive;
private List<String> topics;
private Set<String> topics;
private Boolean isMock = false;
......@@ -55,12 +55,12 @@ public class MqttChannel {
private Map<Integer, MqttPublishMessage> qos2MsgCache;
private Map<Integer, Disposable> replyMqttMessageMap;
private Map<MqttMessageType, Map<Integer, Disposable>> replyMqttMessageMap;
public static MqttChannel init(Connection connection) {
MqttChannel mqttChannel = new MqttChannel();
mqttChannel.setTopics(new CopyOnWriteArrayList<>());
mqttChannel.setTopics(new CopyOnWriteArraySet<>());
mqttChannel.setAtomicInteger(new AtomicInteger(0));
mqttChannel.setReplyMqttMessageMap(new ConcurrentHashMap<>());
mqttChannel.setMqttMessageSink(new MqttMessageSink());
......@@ -158,23 +158,25 @@ public class MqttChannel {
/**
* 取消重发
*
* @param type type
* @param messageId 消息Id
* @return boolean状态
*/
public Mono<Void> cancelRetry(Integer messageId) {
return Mono.fromRunnable(() -> this.removeReply(messageId));
public Mono<Void> cancelRetry(MqttMessageType type, Integer messageId) {
return Mono.fromRunnable(() -> this.removeReply(type, messageId));
}
/**
* remove resend action
*
* @param type type
* @param messageId messageId
* @return void
*/
private void removeReply(Integer messageId) {
Optional.ofNullable(replyMqttMessageMap.get(messageId))
private void removeReply(MqttMessageType type, Integer messageId) {
Optional.ofNullable(replyMqttMessageMap.get(type))
.map(messageIds -> messageIds.remove(messageId))
.ifPresent(Disposable::dispose);
replyMqttMessageMap.remove(messageId);
}
......@@ -194,7 +196,7 @@ public class MqttChannel {
private void clear() {
replyMqttMessageMap.values().forEach(Disposable::dispose);
replyMqttMessageMap.values().forEach(maps -> maps.values().forEach(Disposable::dispose));
replyMqttMessageMap.clear();
}
......@@ -209,7 +211,7 @@ public class MqttChannel {
public static MqttMessageSink MQTT_SINK = new MqttMessageSink();
public Mono<Void> sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry, Map<Integer, Disposable> replyMqttMessageMap) {
public Mono<Void> sendMessage(MqttMessage mqttMessage, MqttChannel mqttChannel, boolean retry, Map<MqttMessageType, Map<Integer, Disposable>> replyMqttMessageMap) {
log.info("write channel {} message {}", mqttChannel.getConnection(), mqttMessage);
if (retry) {
/*
......@@ -266,9 +268,10 @@ public class MqttChannel {
* @param replyMqttMessageMap
* @return Mono
*/
public Mono<Void> offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map<Integer, Disposable> replyMqttMessageMap) {
public Mono<Void> offerReply(MqttMessage message, final MqttChannel mqttChannel, final int messageId, Map<MqttMessageType, Map<Integer, Disposable>> replyMqttMessageMap) {
return Mono.fromRunnable(() ->
replyMqttMessageMap.put(messageId,
replyMqttMessageMap.computeIfAbsent(message.fixedHeader().messageType(), mqttMessageType -> new ConcurrentHashMap<>(8)).put(messageId,
mqttChannel.write(Mono.just(message))
.delaySubscription(Duration.ofSeconds(5))
.repeat()
......
......@@ -43,7 +43,7 @@ public interface TopicRegistry {
* @param mqttChannel 通道信息
* @return Void
*/
void clear(List<String> topics, MqttChannel mqttChannel);
void clear(Set<String> topics, MqttChannel mqttChannel);
/**
* 获取topic的channels
......
package com.github.quickmsg;
package com.github.quickmsg.core;
import com.github.quickmsg.core.http.HttpTransportFactory;
import com.github.quickmsg.common.auth.PasswordAuthentication;
......
......@@ -61,7 +61,7 @@ public class DefaultMessageRegistry implements MessageRegistry {
retainMessage.getMqttQoS(),
retainMessage.getMqttQoS() == MqttQoS.AT_MOST_ONCE ? 0 : mqttChannel.generateMessageId(),
key,
retainMessage.getByteBuf());
retainMessage.getByteBuf().copy());
}).collect(Collectors.toList()));
} else {
return Optional.empty();
......
......@@ -30,12 +30,12 @@ public class DefaultTopicRegistry implements TopicRegistry {
@Override
public void clear(MqttChannel mqttChannel) {
List<String> topics = mqttChannel.getTopics();
Set<String> topics = mqttChannel.getTopics();
this.clear(topics, mqttChannel);
}
@Override
public void clear(List<String> topics, MqttChannel mqttChannel) {
public void clear(Set<String> topics, MqttChannel mqttChannel) {
for (String topic : topics) {
topicChannels.get(TopicRegexUtils.regexTopic(topic)).remove(mqttChannel);
}
......
......@@ -51,7 +51,7 @@ public class CommonProtocol implements Protocol<MqttMessage> {
case PUBREC:
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int messageId = messageIdVariableHeader.messageId();
return mqttChannel.cancelRetry(messageId)
return mqttChannel.cancelRetry(MqttMessageType.PUBLISH,messageId)
.then(mqttChannel.write(MqttMessageBuilder.buildPublishRel(messageId), true));
case PUBREL:
MqttMessageIdVariableHeader relMessageIdVariableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
......@@ -65,14 +65,14 @@ public class CommonProtocol implements Protocol<MqttMessage> {
mqttChannels.stream()
.map(channel -> channel.write(MessageUtils.wrapPublishMessage(msg, channel.generateMessageId()), true))
.collect(Collectors.toList()))
.then(mqttChannel.cancelRetry(id))
.then(mqttChannel.cancelRetry(MqttMessageType.PUBREC,id))
.then(mqttChannel.write(MqttMessageBuilder.buildPublishComp(id), false));
}).orElse(Mono.empty());
case PUBCOMP:
MqttMessageIdVariableHeader messageIdVariableHeader1 = (MqttMessageIdVariableHeader) message.variableHeader();
int compId = messageIdVariableHeader1.messageId();
return mqttChannel.cancelRetry(compId);
return mqttChannel.cancelRetry(MqttMessageType.PUBREL,compId);
case PINGRESP:
default:
return Mono.empty();
......
......@@ -27,7 +27,7 @@ public class ConnectAckProtocol implements Protocol<MqttConnAckMessage> {
@Override
public Mono<Void> parseProtocol(MqttConnAckMessage message, MqttChannel mqttChannel, ContextView contextView) {
return mqttChannel.cancelRetry(-1);
return mqttChannel.cancelRetry(MqttMessageType.CONNECT,-1);
}
@Override
......
package com.github.quickmsg.core.protocol;
import com.github.quickmsg.core.mqtt.MqttReceiveContext;
import com.github.quickmsg.common.auth.PasswordAuthentication;
import com.github.quickmsg.common.channel.ChannelRegistry;
import com.github.quickmsg.common.channel.MqttChannel;
......@@ -10,6 +9,7 @@ import com.github.quickmsg.common.message.MessageRegistry;
import com.github.quickmsg.common.message.MqttMessageBuilder;
import com.github.quickmsg.common.protocol.Protocol;
import com.github.quickmsg.common.topic.TopicRegistry;
import com.github.quickmsg.core.mqtt.MqttReceiveContext;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
......@@ -20,6 +20,7 @@ import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* @author luxurong
......@@ -131,7 +132,7 @@ public class ConnectProtocol implements Protocol<MqttConnectMessage> {
ChannelRegistry channelRegistry,
TopicRegistry topicRegistry,
MessageRegistry messageRegistry) {
List<String> topics = sessionChannel.getTopics();
Set<String> topics = sessionChannel.getTopics();
mqttChannel.setTopics(topics);
topicRegistry.clear(sessionChannel);
topics.forEach(topic -> topicRegistry.registryTopicConnection(topic, mqttChannel));
......
......@@ -30,7 +30,7 @@ public class PublishAckProtocol implements Protocol<MqttPubAckMessage> {
public Mono<Void> parseProtocol(MqttPubAckMessage message, MqttChannel mqttChannel, ContextView contextView) {
MqttMessageIdVariableHeader idVariableHeader = message.variableHeader();
int messageId = idVariableHeader.messageId();
return mqttChannel.cancelRetry(messageId);
return mqttChannel.cancelRetry(MqttMessageType.PUBLISH,messageId);
}
@Override
......
......@@ -84,7 +84,6 @@ public class PublishProtocol implements Protocol<MqttPublishMessage> {
* @param other 其他操作
* @return Mono
*/
// todo 实时写入的可以使用duplicate 避免使用copy
private Mono<Void> send(Set<MqttChannel> mqttChannels, MqttPublishMessage message, MessageRegistry messageRegistry, Mono<Void> other) {
return Mono.when(
mqttChannels.stream()
......
......@@ -26,7 +26,7 @@ public class SubscribeAckProtocol implements Protocol<MqttSubAckMessage> {
@Override
public Mono<Void> parseProtocol(MqttSubAckMessage message, MqttChannel mqttChannel, ContextView contextView) {
return mqttChannel.cancelRetry(message.variableHeader().messageId());
return mqttChannel.cancelRetry(MqttMessageType.SUBSCRIBE,message.variableHeader().messageId());
}
@Override
......
......@@ -25,7 +25,7 @@ public class UnSubscribeAckProtocol implements Protocol<MqttUnsubAckMessage> {
@Override
public Mono<Void> parseProtocol(MqttUnsubAckMessage message, MqttChannel mqttChannel, ContextView contextView) {
return mqttChannel.cancelRetry(message.variableHeader().messageId());
return mqttChannel.cancelRetry(MqttMessageType.UNSUBSCRIBE,message.variableHeader().messageId());
}
@Override
......
......@@ -11,7 +11,9 @@ import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author luxurong
......@@ -32,7 +34,7 @@ public class UnSubscribeProtocol implements Protocol<MqttUnsubscribeMessage> {
return Mono.fromRunnable(() -> {
ReceiveContext<?> receiveContext = contextView.get(ReceiveContext.class);
TopicRegistry topicRegistry = receiveContext.getTopicRegistry();
topicRegistry.clear(message.payload().topics(), mqttChannel);
topicRegistry.clear(new HashSet<>(message.payload().topics()), mqttChannel);
}).then(mqttChannel.write(MqttMessageBuilder.buildUnsubAck(message.variableHeader().messageId()), false));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册