diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 782d29b6c023a595d8ae0f2d85995b40476b0c11..bfef761846ca41271abac0da7da767756fc6d279 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void commitSync() { - this.defaultLitePullConsumerImpl.commitSync(); + this.defaultLitePullConsumerImpl.commitAll(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index 0b090e3df30099f9b59f7655216a2041a630ea15..fad0b4f1b584d301c7683aac3be36531627e2721 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -177,6 +177,10 @@ public class AssignedMessageQueue { } } + public Set getAssignedMessageQueues() { + return this.assignedMessageQueueState.keySet(); + } + private class MessageQueueState { private MessageQueue messageQueue; private ProcessQueue processQueue; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index c3eb7fb1fa6135a08e32d52f810e6e5075eefe9b..8483da6ed845be0f2911c38a3794b89be7a08cca 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -590,37 +590,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - public synchronized void commitSync() { + public synchronized void commitAll() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); if (consumerOffset != -1) { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { + if (processQueue != null && !processQueue.isDropped()) { updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, false); - } - } - } - if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { - offsetStore.persistAll(assignedMessageQueue.messageQueues()); - } - } catch (Exception e) { - log.error("An error occurred when update consume offset synchronously.", e); - } - } - - private synchronized void commitAll() { - try { - for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); - if (consumerOffset != -1) { - ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { - updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, true); } } } @@ -927,11 +904,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { checkServiceState(); Set mqs = new HashSet(); - Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - mqs.addAll(allocateMq); + if (this.subscriptionType == SubscriptionType.SUBSCRIBE) { + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + } else if (this.subscriptionType == SubscriptionType.ASSIGN) { + Set assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues(); + mqs.addAll(assignedMessageQueue); + } this.offsetStore.persistAll(mqs); } catch (Exception e) { - log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e); } }