From f868d9925eb6a4d9fbace50ceb419ac7e6459485 Mon Sep 17 00:00:00 2001 From: ayanamist Date: Tue, 30 Mar 2021 20:07:09 +0800 Subject: [PATCH] Init pop retry consumer offset when create Fix issue that when messages pop out but not ack so redeliver to retry topic, and next pop start from max offset so skip these messages incorrectly. --- .../broker/processor/PopReviveService.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index b9c309e1..6d94a75d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -105,7 +105,7 @@ public class PopReviveService extends ServiceThread { msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime())); } msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - addRetryTopicIfNoExit(msgInner.getTopic()); + addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId()); PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", @@ -127,10 +127,18 @@ public class PopReviveService extends ServiceThread { } } - private boolean addRetryTopicIfNoExit(String topic) { + private void initPopRetryOffset(String topic, String consumerGroup) { + long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, 0); + if (offset < 0) { + this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, topic, + 0, 0); + } + } + + private void addRetryTopicIfNoExit(String topic, String consumerGroup) { TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); if (topicConfig != null) { - return true; + return; } topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum); @@ -139,7 +147,8 @@ public class PopReviveService extends ServiceThread { topicConfig.setPerm(6); topicConfig.setTopicSysFlag(0); brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - return true; + + initPopRetryOffset(topic, consumerGroup); } private List getReviveMessage(long offset, int queueId) { -- GitLab