From 94dc0e0d259ca283e4728dd13e9a80d8213cd227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=8A=E5=90=8D?= Date: Tue, 7 Jan 2020 20:50:13 +0800 Subject: [PATCH] feat(pull_consumer) refactor the consumer offset update logic --- .../consumer/DefaultLitePullConsumer.java | 2 +- .../impl/consumer/AssignedMessageQueue.java | 4 +++ .../consumer/DefaultLitePullConsumerImpl.java | 33 +++++-------------- 3 files changed, 13 insertions(+), 26 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 782d29b6..bfef7618 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void commitSync() { - this.defaultLitePullConsumerImpl.commitSync(); + this.defaultLitePullConsumerImpl.commitAll(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index 0b090e3d..4f6c3d54 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -177,6 +177,10 @@ public class AssignedMessageQueue { } } + public Set getAssignedMessageQueue() { + return this.assignedMessageQueueState.keySet(); + } + private class MessageQueueState { private MessageQueue messageQueue; private ProcessQueue processQueue; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index c3eb7fb1..d78e20b7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -590,7 +590,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - public synchronized void commitSync() { + public synchronized void commitAll() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); @@ -599,28 +599,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, false); - } - } - } - if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { - offsetStore.persistAll(assignedMessageQueue.messageQueues()); - } - } catch (Exception e) { - log.error("An error occurred when update consume offset synchronously.", e); - } - } - - private synchronized void commitAll() { - try { - for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); - if (consumerOffset != -1) { - ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { - updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, true); } } } @@ -927,8 +905,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { checkServiceState(); Set mqs = new HashSet(); - Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - mqs.addAll(allocateMq); + if (this.subscriptionType == SubscriptionType.SUBSCRIBE) { + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + } else if (this.subscriptionType == SubscriptionType.ASSIGN) { + Set assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueue(); + mqs.addAll(assignedMessageQueue); + } this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); -- GitLab