提交 f868d992 编写于 作者: A ayanamist

Init pop retry consumer offset when create

Fix issue that when messages pop out but not ack so redeliver to retry topic, and next pop start from max offset so skip these messages incorrectly.
上级 ced6b023
...@@ -105,7 +105,7 @@ public class PopReviveService extends ServiceThread { ...@@ -105,7 +105,7 @@ public class PopReviveService extends ServiceThread {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime())); msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
} }
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNoExit(msgInner.getTopic()); addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner); PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
if (brokerController.getBrokerConfig().isEnablePopLog()) { if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
...@@ -127,10 +127,18 @@ public class PopReviveService extends ServiceThread { ...@@ -127,10 +127,18 @@ public class PopReviveService extends ServiceThread {
} }
} }
private boolean addRetryTopicIfNoExit(String topic) { private void initPopRetryOffset(String topic, String consumerGroup) {
long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, 0);
if (offset < 0) {
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, topic,
0, 0);
}
}
private void addRetryTopicIfNoExit(String topic, String consumerGroup) {
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (topicConfig != null) { if (topicConfig != null) {
return true; return;
} }
topicConfig = new TopicConfig(topic); topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum); topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum);
...@@ -139,7 +147,8 @@ public class PopReviveService extends ServiceThread { ...@@ -139,7 +147,8 @@ public class PopReviveService extends ServiceThread {
topicConfig.setPerm(6); topicConfig.setPerm(6);
topicConfig.setTopicSysFlag(0); topicConfig.setTopicSysFlag(0);
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
return true;
initPopRetryOffset(topic, consumerGroup);
} }
private List<MessageExt> getReviveMessage(long offset, int queueId) { private List<MessageExt> getReviveMessage(long offset, int queueId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册