From 4dbf4160d952b3d2ee8b91c7c6fa26e662f5c496 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Thu, 28 Feb 2019 17:41:07 +0800 Subject: [PATCH] Polish Push consumer --- .../DefaultMQRealPushConsumerImpl.java | 48 ++++++++++++------- .../client/impl/consumer/RebalanceImpl.java | 3 ++ 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java index 0c323ca4..755ed589 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java @@ -224,13 +224,13 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { try { this.makeSureStateOK(); } catch (MQClientException e) { - log.warn("pullMessage exception, consumer state not ok", e); + log.warn("pullMessage exception, real push consumer state not ok", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return; } if (this.isPause()) { - log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); + log.warn("push consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } @@ -242,7 +242,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( - "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; @@ -252,7 +252,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( - "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", + "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; @@ -263,7 +263,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( - "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", + "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } @@ -274,7 +274,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); - log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", + log.info("the first time to pull message, so fix offset from snode. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", @@ -582,7 +582,7 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.persistConsumerOffset(); this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); - log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); + log.info("the real push consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.destroy(); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; @@ -596,6 +596,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: + if (this.defaultMQPushConsumer.isRealPush()) { + log.info("============================================"); + log.info(" Notice: Start Real Push Consumer Model."); + log.info("============================================\n"); + } log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; @@ -1207,6 +1212,16 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); return false; } + AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); + if (pullStop == null) { + this.pullStopped.put(localOffsetKey, new AtomicBoolean(false)); + log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); + } + ProcessQueue processQueue = processQueues.get(localOffsetKey); + if (processQueue == null) { + processQueues.put(localOffsetKey, new ProcessQueue()); + processQueue = processQueues.get(localOffsetKey); + } if (localOffset.get() + 1 < offset) { //should start pull message process log.debug("#####Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); @@ -1214,11 +1229,6 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { } else { //Stop pull request log.debug("#####Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); - AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); - if (pullStop == null) { - this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); - log.info("Pull stop flag of {} is not set, initialize to TRUE", localOffsetKey); - } pullStop = this.pullStopped.get(localOffsetKey); if (!pullStop.get()) { pullStop.set(true); @@ -1229,13 +1239,17 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { //submit to process queue List messageExtList = new ArrayList(); messageExtList.add(msg); - ProcessQueue processQueue = processQueues.get(localOffsetKey); - if (processQueue == null) { - processQueues.put(localOffsetKey, new ProcessQueue()); - processQueue = processQueues.get(localOffsetKey); - } + processQueue.putMessage(messageExtList); + if (this.consumeOrderly) { + processQueue.setLocked(true); + processQueue.setLastLockTimestamp(System.currentTimeMillis()); + } MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID); + //update process Queue pull time + if (this.rebalanceImpl.processQueueTable.get(messageQueue) != null) { + this.rebalanceImpl.processQueueTable.get(messageQueue).setLastPullTimestamp(System.currentTimeMillis()); + } this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true); } return true; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index ba90f093..763472a6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -360,6 +360,9 @@ public abstract class RebalanceImpl { } break; case CONSUME_PUSH: + if (isOrder) { + break; + } pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); -- GitLab