From 1f106d77c95e0f36e8b276514886109ebd449eda Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Tue, 7 Jan 2020 20:50:13 +0800 Subject: [PATCH] feat(pull_consumer) refactor the consumer offset update logic --- .../consumer/DefaultLitePullConsumer.java | 2 +- .../impl/consumer/AssignedMessageQueue.java | 4 ++ .../consumer/DefaultLitePullConsumerImpl.java | 38 +++++-------------- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 782d29b6..bfef7618 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void commitSync() { - this.defaultLitePullConsumerImpl.commitSync(); + this.defaultLitePullConsumerImpl.commitAll(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index 0b090e3d..fad0b4f1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -177,6 +177,10 @@ public class AssignedMessageQueue { } } + public Set getAssignedMessageQueues() { + return this.assignedMessageQueueState.keySet(); + } + private class MessageQueueState { private MessageQueue messageQueue; private ProcessQueue processQueue; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index c3eb7fb1..8483da6e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -590,37 +590,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - public synchronized void commitSync() { + public synchronized void commitAll() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); if (consumerOffset != -1) { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { + if (processQueue != null && !processQueue.isDropped()) { updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, false); - } - } - } - if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { - offsetStore.persistAll(assignedMessageQueue.messageQueues()); - } - } catch (Exception e) { - log.error("An error occurred when update consume offset synchronously.", e); - } - } - - private synchronized void commitAll() { - try { - for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); - if (consumerOffset != -1) { - ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { - updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, true); } } } @@ -927,11 +904,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { checkServiceState(); Set mqs = new HashSet(); - Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - mqs.addAll(allocateMq); + if (this.subscriptionType == SubscriptionType.SUBSCRIBE) { + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + } else if (this.subscriptionType == SubscriptionType.ASSIGN) { + Set assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues(); + mqs.addAll(assignedMessageQueue); + } this.offsetStore.persistAll(mqs); } catch (Exception e) { - log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e); } } -- GitLab