未验证 提交 d7a2603c 编写于 作者: D dinglei 提交者: GitHub

[RIP-11] Recover pull request for old message queue that is allocated on rebalance.(#827)

[RIP-11] Recover pull request for old message queue that is allocated on rebalance.
上级 1c26991d
......@@ -1212,11 +1212,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
log.debug("Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
......@@ -1247,4 +1247,36 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) {
return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
}
public boolean resumePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue != null) {
log.info("Clear local expire message for {} in processQueue.", localOffsetKey);
processQueue.cleanExpiredMsg(this.defaultMQPushConsumer);
}
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop != null) {
if (pullStop.get()) {
pullStop.set(false);
log.info("Resume Pull Request of {} is set to TRUE, and then the pull request will start by rebalance again...", localOffsetKey);
}
}
return true;
}
public boolean pausePullRequest(String consumerGroup, String topic, String brokerName, int queueID) {
String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
return true;
}
if (!pullStop.get()) {
pullStop.set(true);
log.info("Pull stop of {} is set to TRUE, and then the pull request will stop...", localOffsetKey);
}
return true;
}
}
......@@ -359,6 +359,15 @@ public abstract class RebalanceImpl {
consumerGroup, mq);
}
break;
case CONSUME_PUSH:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}, because pull is pause by Push model, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
......@@ -375,6 +384,7 @@ public abstract class RebalanceImpl {
}
this.removeDirtyOffset(mq);
this.removeLocalDirtyPushOffset(topic, mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
......@@ -411,6 +421,8 @@ public abstract class RebalanceImpl {
public abstract void removeDirtyOffset(final MessageQueue mq);
public abstract void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq);
public abstract long computePullFromWhere(final MessageQueue mq);
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
......
......@@ -68,6 +68,10 @@ public class RebalancePullImpl extends RebalanceImpl {
this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Override
public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) {
}
@Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
......
......@@ -140,6 +140,12 @@ public class RebalancePushImpl extends RebalanceImpl {
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Override
public void removeLocalDirtyPushOffset(final String topic, final MessageQueue mq) {
log.info("removeLocalDirtyPushOffset:consumergroup={},topic={},mq={}", consumerGroup, topic, mq);
this.defaultMQPushConsumerImpl.resumePullRequest(consumerGroup, topic, mq.getBrokerName(), mq.getQueueId());
}
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册