提交 94dc0e0d 编写于 作者: 翊名

feat(pull_consumer) refactor the consumer offset update logic

上级 50ba523b
...@@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override @Override
public void commitSync() { public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync(); this.defaultLitePullConsumerImpl.commitAll();
} }
@Override @Override
......
...@@ -177,6 +177,10 @@ public class AssignedMessageQueue { ...@@ -177,6 +177,10 @@ public class AssignedMessageQueue {
} }
} }
public Set<MessageQueue> getAssignedMessageQueue() {
return this.assignedMessageQueueState.keySet();
}
private class MessageQueueState { private class MessageQueueState {
private MessageQueue messageQueue; private MessageQueue messageQueue;
private ProcessQueue processQueue; private ProcessQueue processQueue;
......
...@@ -590,7 +590,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -590,7 +590,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
public synchronized void commitSync() { public synchronized void commitAll() {
try { try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
...@@ -599,28 +599,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -599,28 +599,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) {
updateConsumeOffset(messageQueue, consumerOffset); updateConsumeOffset(messageQueue, consumerOffset);
updateConsumeOffsetToBroker(messageQueue, consumerOffset, false);
}
}
}
if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offsetStore.persistAll(assignedMessageQueue.messageQueues());
}
} catch (Exception e) {
log.error("An error occurred when update consume offset synchronously.", e);
}
}
private synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) {
updateConsumeOffset(messageQueue, consumerOffset);
updateConsumeOffsetToBroker(messageQueue, consumerOffset, true);
} }
} }
} }
...@@ -927,8 +905,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -927,8 +905,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
try { try {
checkServiceState(); checkServiceState();
Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
mqs.addAll(allocateMq); Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
} else if (this.subscriptionType == SubscriptionType.ASSIGN) {
Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueue();
mqs.addAll(assignedMessageQueue);
}
this.offsetStore.persistAll(mqs); this.offsetStore.persistAll(mqs);
} catch (Exception e) { } catch (Exception e) {
log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册