diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 782d29b6c023a595d8ae0f2d85995b40476b0c11..bfef761846ca41271abac0da7da767756fc6d279 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void commitSync() { - this.defaultLitePullConsumerImpl.commitSync(); + this.defaultLitePullConsumerImpl.commitAll(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index 0b090e3df30099f9b59f7655216a2041a630ea15..4f6c3d545c269ea8053ced608d34ad900ff0624f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -177,6 +177,10 @@ public class AssignedMessageQueue { } } + public Set getAssignedMessageQueue() { + return this.assignedMessageQueueState.keySet(); + } + private class MessageQueueState { private MessageQueue messageQueue; private ProcessQueue processQueue; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index c3eb7fb1fa6135a08e32d52f810e6e5075eefe9b..d78e20b7e32dea9a58f9b9707c1dc9711ace3f8b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -590,7 +590,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - public synchronized void commitSync() { + public synchronized void commitAll() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); @@ -599,28 +599,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, false); - } - } - } - if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { - offsetStore.persistAll(assignedMessageQueue.messageQueues()); - } - } catch (Exception e) { - log.error("An error occurred when update consume offset synchronously.", e); - } - } - - private synchronized void commitAll() { - try { - for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); - if (consumerOffset != -1) { - ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { - updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, true); } } } @@ -927,8 +905,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { checkServiceState(); Set mqs = new HashSet(); - Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - mqs.addAll(allocateMq); + if (this.subscriptionType == SubscriptionType.SUBSCRIBE) { + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + } else if (this.subscriptionType == SubscriptionType.ASSIGN) { + Set assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueue(); + mqs.addAll(assignedMessageQueue); + } this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);