From d7a2603c17eb04f49b936beea0900f04cc7bb96a Mon Sep 17 00:00:00 2001 From: dinglei Date: Wed, 20 Feb 2019 16:41:54 +0800 Subject: [PATCH] [RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827) [RIP-11] Recover pull request for old message queue that is allocated on rebalance. --- .../consumer/DefaultMQPushConsumerImpl.java | 36 +++++++++++++++++-- .../client/impl/consumer/RebalanceImpl.java | 12 +++++++ .../impl/consumer/RebalancePullImpl.java | 4 +++ .../impl/consumer/RebalancePushImpl.java | 6 ++++ 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index f5251f53..ff5f418a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -1212,11 +1212,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } if (localOffset.get() + 1 < offset) { //should start pull message process - log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); + log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); return false; } else { //Stop pull request - log.debug("Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); + log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); if (pullStop == null) { this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); @@ -1247,4 +1247,36 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) { return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID; } + + public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { + String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); + ProcessQueue processQueue = processQueues.get(localOffsetKey); + if (processQueue != null) { + log.info("Clear local expire message for {} in processQueue.", localOffsetKey); + processQueue.cleanExpiredMsg(this.defaultMQPushConsumer); + } + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop != null) { + if (pullStop.get()) { + pullStop.set(false); + log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey); + } + } + return true; + } + + public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) { + String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop == null) { + this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); + log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); + return true; + } + if (!pullStop.get()) { + pullStop.set(true); + log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey); + } + return true; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 9ad07c7e..ba90f093 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -359,6 +359,15 @@ public abstract class RebalanceImpl { consumerGroup, mq); } break; + case CONSUME_PUSH: + pq.setDropped(true); + if (this.removeUnnecessaryMessageQueue(mq, pq)) { + it.remove(); + changed = true; + log.info("doRebalance, {}, remove unnecessary mq, {}, because pull is pause by Push model, so try to fixed it", + consumerGroup, mq); + } + break; default: break; } @@ -375,6 +384,7 @@ public abstract class RebalanceImpl { } this.removeDirtyOffset(mq); + this.removeLocalDirtyPushOffset(topic, mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { @@ -411,6 +421,8 @@ public abstract class RebalanceImpl { public abstract void removeDirtyOffset(final MessageQueue mq); + public abstract void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq); + public abstract long computePullFromWhere(final MessageQueue mq); public abstract void dispatchPullRequest(final List pullRequestList); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index 9dd408c1..9dc8f4ae 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -68,6 +68,10 @@ public class RebalancePullImpl extends RebalanceImpl { this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); } + @Override + public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) { + } + @Override public long computePullFromWhere(MessageQueue mq) { return 0; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 2c2f014a..2e622be8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -140,6 +140,12 @@ public class RebalancePushImpl extends RebalanceImpl { this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); } + @Override + public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) { + log.info("removeLocalDirtyPushOffset:consumergroup={},topic={},mq={}", consumerGroup, topic, mq); + this.defaultMQPushConsumerImpl.resumePullRequest(consumerGroup, topic, mq.getBrokerName(), mq.getQueueId()); + } + @Override public long computePullFromWhere(MessageQueue mq) { long result = -1; -- GitLab