diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index b9c309e189878ac942bc2ee24fb7a655b6c4fe26..6d94a75d3e7708202dfed67b23307eb9d0a5fa4d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -105,7 +105,7 @@ public class PopReviveService extends ServiceThread { msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime())); } msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - addRetryTopicIfNoExit(msgInner.getTopic()); + addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId()); PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", @@ -127,10 +127,18 @@ public class PopReviveService extends ServiceThread { } } - private boolean addRetryTopicIfNoExit(String topic) { + private void initPopRetryOffset(String topic, String consumerGroup) { + long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, 0); + if (offset < 0) { + this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, topic, + 0, 0); + } + } + + private void addRetryTopicIfNoExit(String topic, String consumerGroup) { TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); if (topicConfig != null) { - return true; + return; } topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum); @@ -139,7 +147,8 @@ public class PopReviveService extends ServiceThread { topicConfig.setPerm(6); topicConfig.setTopicSysFlag(0); brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - return true; + + initPopRetryOffset(topic, consumerGroup); } private List getReviveMessage(long offset, int queueId) {