From f0ad359ca15f2f7671a53c35f0550825e7404b0f Mon Sep 17 00:00:00 2001 From: chengxiangwang Date: Mon, 3 Jun 2019 10:15:50 +0800 Subject: [PATCH] add logic of resending msg when acktimeout --- .../apache/rocketmq/common/MqttConfig.java | 10 +++ .../rocketmq/common/service/NnodeService.java | 2 +- .../mqtt/client/IOTClientManagerImpl.java | 10 ++- .../rocketmq/mqtt/client/InFlightMessage.java | 9 +- .../rocketmq/mqtt/client/InFlightPacket.java | 89 +++++++++++++++++++ .../rocketmq/mqtt/client/MQTTSession.java | 49 ++-------- .../rocketmq/mqtt/constant/MqttConstant.java | 1 + .../impl/MqttScheduledServiceImpl.java | 38 ++++++++ .../rocketmq/mqtt/task/MqttPushTask.java | 4 +- .../mqtt/MqttPubackMessageHandlerTest.java | 2 +- 10 files changed, 158 insertions(+), 56 deletions(-) create mode 100644 mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightPacket.java diff --git a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java index 186f22cd..5a344c5b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/MqttConfig.java @@ -54,6 +54,8 @@ public class MqttConfig { private long persistOffsetInterval = 2 * 1000; + private long scanAckTimeoutInterval = 1000; + public int getListenPort() { return listenPort; } @@ -149,4 +151,12 @@ public class MqttConfig { public void setPersistOffsetInterval(long persistOffsetInterval) { this.persistOffsetInterval = persistOffsetInterval; } + + public long getScanAckTimeoutInterval() { + return scanAckTimeoutInterval; + } + + public void setScanAckTimeoutInterval(long scanAckTimeoutInterval) { + this.scanAckTimeoutInterval = scanAckTimeoutInterval; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/service/NnodeService.java b/common/src/main/java/org/apache/rocketmq/common/service/NnodeService.java index 10bb6e4d..edb79926 100644 --- a/common/src/main/java/org/apache/rocketmq/common/service/NnodeService.java +++ b/common/src/main/java/org/apache/rocketmq/common/service/NnodeService.java @@ -31,7 +31,7 @@ public interface NnodeService { * * @param snodeConfig {@link SnodeConfig} */ - void registerSnode(SnodeConfig snodeConfig) throws Exception; + void registerSnode(SnodeConfig snodeConfig) throws Exception; /** * Update Nnode server address list. diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java index 8caec92e..90d7b065 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/IOTClientManagerImpl.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.ClientManagerImpl; import org.apache.rocketmq.common.client.Subscription; @@ -43,8 +44,9 @@ public class IOTClientManagerImpl extends ClientManagerImpl { 1024); private final ConcurrentHashMap clientId2Subscription = new ConcurrentHashMap<>(1024); private final Map snode2MqttClient = new HashMap<>(); - private final ConcurrentHashMap>> processTable = new ConcurrentHashMap<>(); - + private final ConcurrentHashMap>> processTable = new ConcurrentHashMap<>(); + private final ConcurrentHashMap consumeOffsetTable = new ConcurrentHashMap<>(); + private final DelayQueue inflightTimeouts = new DelayQueue<>(); public IOTClientManagerImpl() { } @@ -129,4 +131,8 @@ public class IOTClientManagerImpl extends ClientManagerImpl { public ConcurrentHashMap>> getProcessTable() { return processTable; } + + public DelayQueue getInflightTimeouts() { + return inflightTimeouts; + } } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java index 1ba191fc..24906744 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightMessage.java @@ -23,16 +23,13 @@ public class InFlightMessage { private final Integer pushQos; private final BrokerData brokerData; private final byte[] body; - private final String messageId; private final long queueOffset; - public InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, String messageId, - long queueOffset) { + public InFlightMessage(String topic, Integer pushQos, byte[] body, BrokerData brokerData, long queueOffset) { this.topic = topic; this.pushQos = pushQos; this.body = body; this.brokerData = brokerData; - this.messageId = messageId; this.queueOffset = queueOffset; } @@ -44,10 +41,6 @@ public class InFlightMessage { return brokerData; } - public String getMessageId() { - return messageId; - } - public long getQueueOffset() { return queueOffset; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightPacket.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightPacket.java new file mode 100644 index 00000000..daca6298 --- /dev/null +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/InFlightPacket.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.mqtt.client; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public class InFlightPacket implements Delayed { + + private final MQTTSession client; + private final int packetId; + private long startTime; + private int resendTime = 0; + + InFlightPacket(MQTTSession client, int packetId, long delayInMilliseconds) { + this.client = client; + this.packetId = packetId; + this.startTime = System.currentTimeMillis() + delayInMilliseconds; + } + + @Override + public long getDelay(TimeUnit unit) { + long diff = startTime - System.currentTimeMillis(); + return unit.convert(diff, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + if ((this.startTime - ((InFlightPacket) o).startTime) == 0) { + return 0; + } + if ((this.startTime - ((InFlightPacket) o).startTime) > 0) { + return 1; + } else { + return -1; + } + } + + public MQTTSession getClient() { + return client; + } + + public int getPacketId() { + return packetId; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public int getResendTime() { + return resendTime; + } + + public void setResendTime(int resendTime) { + this.resendTime = resendTime; + } + + @Override public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof InFlightPacket)) { + return false; + } + InFlightPacket packet = (InFlightPacket) obj; + return packet.getClient().equals(this.getClient()) && + packet.getPacketId() == this.getPacketId(); + } +} \ No newline at end of file diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java index 32990a4b..b5ed9c35 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/client/MQTTSession.java @@ -23,9 +23,6 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.client.Client; import org.apache.rocketmq.common.client.ClientRole; @@ -45,6 +42,7 @@ import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; +import static org.apache.rocketmq.mqtt.constant.MqttConstant.FLIGHT_BEFORE_RESEND_MS; import static org.apache.rocketmq.mqtt.constant.MqttConstant.TOPIC_CLIENTID_SEPARATOR; public class MQTTSession extends Client { @@ -57,40 +55,9 @@ public class MQTTSession extends Client { private final DefaultMqttMessageProcessor defaultMqttMessageProcessor; private final AtomicInteger inflightSlots = new AtomicInteger(10); private final Map inflightWindow = new HashMap<>(); - private final DelayQueue inflightTimeouts = new DelayQueue<>(); - private static final int FLIGHT_BEFORE_RESEND_MS = 5_000; private Hashtable inUsePacketIds = new Hashtable(); private int nextPacketId = 0; - static class InFlightPacket implements Delayed { - - final int packetId; - private long startTime; - - InFlightPacket(int packetId, long delayInMilliseconds) { - this.packetId = packetId; - this.startTime = System.currentTimeMillis() + delayInMilliseconds; - } - - @Override - public long getDelay(TimeUnit unit) { - long diff = startTime - System.currentTimeMillis(); - return unit.convert(diff, TimeUnit.MILLISECONDS); - } - - @Override - public int compareTo(Delayed o) { - if ((this.startTime - ((InFlightPacket) o).startTime) == 0) { - return 0; - } - if ((this.startTime - ((InFlightPacket) o).startTime) > 0) { - return 1; - } else { - return -1; - } - } - } - public MQTTSession(String clientId, ClientRole clientRole, Set groups, boolean isConnected, boolean cleanSession, RemotingChannel remotingChannel, long lastUpdateTimestamp, DefaultMqttMessageProcessor defaultMqttMessageProcessor) { @@ -149,9 +116,10 @@ public class MQTTSession extends Client { if (inflightSlots.get() > 0) { inflightSlots.decrementAndGet(); mqttHeader.setPacketId(getNextPacketId()); - inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), messageExt.getBody(), brokerData, messageExt.getMsgId(), messageExt.getQueueOffset())); -// inflightTimeouts.add(new InFlightPacket(mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS)); - put2processTable(((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getProcessTable(), brokerData.getBrokerName(), MqttUtil.getRootTopic(mqttHeader.getTopicName()), messageExt); + inflightWindow.put(mqttHeader.getPacketId(), new InFlightMessage(mqttHeader.getTopicName(), mqttHeader.getQosLevel(), messageExt.getBody(), brokerData, messageExt.getQueueOffset())); + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager(); + iotClientManager.getInflightTimeouts().add(new InFlightPacket(this, mqttHeader.getPacketId(), FLIGHT_BEFORE_RESEND_MS)); + put2processTable(iotClientManager.getProcessTable(), brokerData.getBrokerName(), MqttUtil.getRootTopic(mqttHeader.getTopicName()), messageExt); pushMessage2Client(mqttHeader, messageExt.getBody()); } } @@ -168,11 +136,12 @@ public class MQTTSession extends Client { } } inflightSlots.incrementAndGet(); + ((IOTClientManagerImpl) this.defaultMqttMessageProcessor.getIotClientManager()).getInflightTimeouts().remove(new InFlightPacket(this, ackPacketId, 0)); releasePacketId(ackPacketId); return remove; } - private void pushMessage2Client(MqttHeader mqttHeader, byte[] body) { + public void pushMessage2Client(MqttHeader mqttHeader, byte[] body) { try { //set remaining length int remainingLength = mqttHeader.getTopicName().getBytes().length + body.length; @@ -259,10 +228,6 @@ public class MQTTSession extends Client { return inflightWindow; } - public DelayQueue getInflightTimeouts() { - return inflightTimeouts; - } - public Hashtable getInUsePacketIds() { return inUsePacketIds; } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java index 5a4a4c92..3d53f5fd 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/constant/MqttConstant.java @@ -27,6 +27,7 @@ public class MqttConstant { public static final String SUBSCRIPTION_SEPARATOR = "/"; public static final String TOPIC_CLIENTID_SEPARATOR = "@"; public static final long DEFAULT_TIMEOUT_MILLS = 3000L; + public static final int FLIGHT_BEFORE_RESEND_MS = 5_000; public static final String PROPERTY_MQTT_QOS = "PROPERTY_MQTT_QOS"; public static final AttributeKey MQTT_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("mqtt.client"); } diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttScheduledServiceImpl.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttScheduledServiceImpl.java index c4e57e56..cf88dd03 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttScheduledServiceImpl.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/service/impl/MqttScheduledServiceImpl.java @@ -16,6 +16,9 @@ */ package org.apache.rocketmq.mqtt.service.impl; +import io.netty.handler.codec.mqtt.MqttMessageType; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -29,7 +32,12 @@ import org.apache.rocketmq.common.service.ScheduledService; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.mqtt.client.IOTClientManagerImpl; +import org.apache.rocketmq.mqtt.client.InFlightMessage; +import org.apache.rocketmq.mqtt.client.InFlightPacket; +import org.apache.rocketmq.mqtt.client.MQTTSession; +import org.apache.rocketmq.mqtt.constant.MqttConstant; import org.apache.rocketmq.mqtt.processor.DefaultMqttMessageProcessor; +import org.apache.rocketmq.remoting.transport.mqtt.MqttHeader; public class MqttScheduledServiceImpl implements ScheduledService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME); @@ -67,6 +75,36 @@ public class MqttScheduledServiceImpl implements ScheduledService { } } }, 0, defaultMqttMessageProcessor.getMqttConfig().getPersistOffsetInterval(), TimeUnit.MILLISECONDS); + this.mqttScheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override public void run() { + IOTClientManagerImpl iotClientManager = (IOTClientManagerImpl) defaultMqttMessageProcessor.getIotClientManager(); + Collection expired = new ArrayList<>(); + iotClientManager.getInflightTimeouts().drainTo(expired); + for (InFlightPacket notAcked : expired) { + MQTTSession client = notAcked.getClient(); + if (!client.isConnected()) { + continue; + } + if (notAcked.getResendTime() > 3) { + client.getRemotingChannel().close(); + continue; + } + if (client.getInflightWindow().containsKey(notAcked.getPacketId())) { + InFlightMessage inFlightMessage = client.getInflightWindow().get(notAcked.getPacketId()); + MqttHeader mqttHeader = new MqttHeader(); + mqttHeader.setTopicName(inFlightMessage.getTopic()); + mqttHeader.setQosLevel(inFlightMessage.getPushQos()); + mqttHeader.setRetain(false); + mqttHeader.setDup(true); + mqttHeader.setMessageType(MqttMessageType.PUBLISH.value()); + notAcked.setStartTime(System.currentTimeMillis() + MqttConstant.FLIGHT_BEFORE_RESEND_MS); + notAcked.setResendTime(notAcked.getResendTime() + 1); + iotClientManager.getInflightTimeouts().add(notAcked); + client.pushMessage2Client(mqttHeader, inFlightMessage.getBody()); + } + } + } + }, 10000, defaultMqttMessageProcessor.getMqttConfig().getScanAckTimeoutInterval(), TimeUnit.MILLISECONDS); } @Override diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java index e865aa9e..f97bbc84 100644 --- a/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java +++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/task/MqttPushTask.java @@ -68,8 +68,8 @@ public class MqttPushTask implements Runnable { private BrokerData brokerData; private String rootTopic; - public MqttPushTask(DefaultMqttMessageProcessor processor, final MqttHeader mqttHeader, String rootTopic, Client client, - BrokerData brokerData) { + public MqttPushTask(DefaultMqttMessageProcessor processor, final MqttHeader mqttHeader, String rootTopic, + Client client, BrokerData brokerData) { this.defaultMqttMessageProcessor = processor; this.mqttHeader = mqttHeader; this.rootTopic = rootTopic; diff --git a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java index 5fc7422a..b69de9c1 100644 --- a/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java +++ b/mqtt/src/test/java/org/apache/rocketmq/mqtt/MqttPubackMessageHandlerTest.java @@ -84,7 +84,7 @@ public class MqttPubackMessageHandlerTest { MQTTSession mqttSession = Mockito.spy(new MQTTSession("client1", ClientRole.IOTCLIENT, null, true, true, remotingChannel, System.currentTimeMillis(), defaultMqttMessageProcessor)); Mockito.when(iotClientManager.getClient(anyString(), any(RemotingChannel.class))).thenReturn(mqttSession); - InFlightMessage inFlightMessage = Mockito.spy(new InFlightMessage("topicTest", 0, "Hello".getBytes(), null, null, 0)); + InFlightMessage inFlightMessage = Mockito.spy(new InFlightMessage("topicTest", 0, "Hello".getBytes(), null, 0)); doReturn(inFlightMessage).when(mqttSession).pubAckReceived(anyInt()); RemotingCommand remotingCommand = mqttPubackMessageHandler.handleMessage(mqttMessage, remotingChannel); assert remotingCommand == null; -- GitLab