提交 4dbf4160 编写于 作者: S ShannonDing

Polish Push consumer

上级 e2c697bf
...@@ -224,13 +224,13 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -224,13 +224,13 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
try { try {
this.makeSureStateOK(); this.makeSureStateOK();
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e); log.warn("pullMessage exception, real push consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return; return;
} }
if (this.isPause()) { if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); log.warn("push consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return; return;
} }
...@@ -242,7 +242,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -242,7 +242,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) { if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn( log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
} }
return; return;
...@@ -252,7 +252,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -252,7 +252,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) { if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn( log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
} }
return; return;
...@@ -263,7 +263,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -263,7 +263,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn( log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes); pullRequest, queueMaxSpanFlowControlTimes);
} }
...@@ -274,7 +274,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -274,7 +274,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
if (!pullRequest.isLockedFirst()) { if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset(); boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", log.info("the first time to pull message, so fix offset from snode. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy); pullRequest, offset, brokerBusy);
if (brokerBusy) { if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
...@@ -582,7 +582,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -582,7 +582,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.persistConsumerOffset(); this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown(); this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); log.info("the real push consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy(); this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY; this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break; break;
...@@ -596,6 +596,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -596,6 +596,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
public synchronized void start() throws MQClientException { public synchronized void start() throws MQClientException {
switch (this.serviceState) { switch (this.serviceState) {
case CREATE_JUST: case CREATE_JUST:
if (this.defaultMQPushConsumer.isRealPush()) {
log.info("============================================");
log.info(" Notice: Start Real Push Consumer Model.");
log.info("============================================\n");
}
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED; this.serviceState = ServiceState.START_FAILED;
...@@ -1207,6 +1212,16 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -1207,6 +1212,16 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false; return false;
} }
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(false));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
}
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null) {
processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
if (localOffset.get() + 1 < offset) { if (localOffset.get() + 1 < offset) {
//should start pull message process //should start pull message process
log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
...@@ -1214,11 +1229,6 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -1214,11 +1229,6 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
} else { } else {
//Stop pull request //Stop pull request
log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey);
}
pullStop = this.pullStopped.get(localOffsetKey); pullStop = this.pullStopped.get(localOffsetKey);
if (!pullStop.get()) { if (!pullStop.get()) {
pullStop.set(true); pullStop.set(true);
...@@ -1229,13 +1239,17 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -1229,13 +1239,17 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
//submit to process queue //submit to process queue
List<MessageExt> messageExtList = new ArrayList<MessageExt>(); List<MessageExt> messageExtList = new ArrayList<MessageExt>();
messageExtList.add(msg); messageExtList.add(msg);
ProcessQueue processQueue = processQueues.get(localOffsetKey);
if (processQueue == null) {
processQueues.put(localOffsetKey, new ProcessQueue());
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList); processQueue.putMessage(messageExtList);
if (this.consumeOrderly) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID); MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID);
//update process Queue pull time
if (this.rebalanceImpl.processQueueTable.get(messageQueue) != null) {
this.rebalanceImpl.processQueueTable.get(messageQueue).setLastPullTimestamp(System.currentTimeMillis());
}
this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true); this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true);
} }
return true; return true;
......
...@@ -360,6 +360,9 @@ public abstract class RebalanceImpl { ...@@ -360,6 +360,9 @@ public abstract class RebalanceImpl {
} }
break; break;
case CONSUME_PUSH: case CONSUME_PUSH:
if (isOrder) {
break;
}
pq.setDropped(true); pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) { if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove(); it.remove();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册