From 3c264c8f2cf9c9672a26e5b9967dcd10f2f23fba Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 19 Feb 2019 17:30:24 +0800 Subject: [PATCH] Polish push message process in client(add broker name) --- .../client/impl/ClientRemotingProcessor.java | 2 +- .../consumer/DefaultMQPushConsumerImpl.java | 36 +++++++++++-------- .../client/impl/factory/MQClientInstance.java | 6 +++- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index c8c39199..f3cfff7a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -218,12 +218,12 @@ public class ClientRemotingProcessor implements RequestProcessor { final PushMessageHeader requestHeader = (PushMessageHeader) request .decodeCommandCustomHeader(PushMessageHeader.class); - final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody())); boolean result = this.mqClientFactory.processSnodePushMessage(msg, requestHeader.getConsumerGroup(), requestHeader.getTopic(), + requestHeader.getEnodeName(), requestHeader.getQueueId(), requestHeader.getQueueOffset()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 000d5692..f5251f53 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -308,12 +308,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void onSuccess(PullResult pullResult) { if (pullResult != null) { //Update local offset according remote offset - String localOffsetKey = pullRequest.getConsumerGroup() - + "@" + pullRequest.getMessageQueue().getTopic() - + "@" + pullRequest.getMessageQueue().getQueueId(); + String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), + pullRequest.getMessageQueue().getBrokerName(), + pullRequest.getMessageQueue().getQueueId()); AtomicLong localOffset = localConsumerOffset.get(localOffsetKey); if (localOffset == null) { - localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); + localConsumerOffset.putIfAbsent(localOffsetKey, new AtomicLong(-1)); } localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset()); @@ -474,9 +475,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { - String localOffsetKey = pullRequest.getConsumerGroup() - + "@" + pullRequest.getMessageQueue().getTopic() - + "@" + pullRequest.getMessageQueue().getQueueId(); + String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), + pullRequest.getMessageQueue().getBrokerName(), + pullRequest.getMessageQueue().getQueueId()); if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { //Stop pull request log.info("Stop pull request, {}", localOffsetKey); @@ -498,9 +500,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public void executePullRequestImmediately(final PullRequest pullRequest) { - String localOffsetKey = pullRequest.getConsumerGroup() - + "@" + pullRequest.getMessageQueue().getTopic() - + "@" + pullRequest.getMessageQueue().getQueueId(); + String localOffsetKey = genLocalOffsetKey(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), + pullRequest.getMessageQueue().getBrokerName(), + pullRequest.getMessageQueue().getQueueId()); if (pullStopped.get(localOffsetKey) != null && pullStopped.get(localOffsetKey).get()) { //Stop pull request log.info("Stop pull request, {}", localOffsetKey); @@ -1197,21 +1200,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public boolean processPushMessage(final MessageExt msg, final String consumerGroup, final String topic, + final String brokerName, final int queueID, final long offset) { - String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID; + String localOffsetKey = genLocalOffsetKey(consumerGroup, topic, brokerName, queueID); AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey); if (localOffset == null) { log.info("Current Local offset have not set, initiallized to -1."); this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1)); return false; } - if (localOffset.get() < offset) { + if (localOffset.get() + 1 < offset) { //should start pull message process log.debug("Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); return false; } else { //Stop pull request + log.debug("Process Push : Current Local key:{} and offset:{} and push offset:{}", localOffsetKey, localOffset.get(), offset); AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey); if (pullStop == null) { this.pullStopped.put(localOffsetKey, new AtomicBoolean(true)); @@ -1233,10 +1238,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { processQueue = processQueues.get(localOffsetKey); } processQueue.putMessage(messageExtList); - MessageQueue messageQueue = new MessageQueue(topic, "", queueID); + MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueID); this.consumeMessageService.submitConsumeRequest(messageExtList, processQueue, messageQueue, true); - log.info(".......submitConsumeRequest:{},Offset:{}...", localOffsetKey, offset); } return true; } + + private String genLocalOffsetKey(String consumerGroup, String topic, String brokerName, int queueID) { + return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 4a81c3b7..0953fdc1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1376,15 +1376,19 @@ public class MQClientInstance { public ClientConfig getNettyClientConfig() { return nettyClientConfig; } + public boolean processSnodePushMessage(final MessageExt msg, final String consumerGroup, final String topic, + final String brokerName, final int queueID, final long offset) { + log.debug("Recieve:processSnodePushMessage :{}-{}-{}-{}-{}", + consumerGroup, topic, brokerName, queueID, offset); MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); if (null != mqConsumerInner) { DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; - consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset); + consumer.processPushMessage(msg, consumerGroup, topic, brokerName, queueID, offset); return true; } -- GitLab