未验证 提交 f77d7fee 编写于 作者: H huangli 提交者: GitHub

[ISSUE #2732] Fix message loss problem when rebalance with LitePullConsumer (#2832)

* [ISSUE #2732] Fix message loss problem when rebalance with LitePullConsumer

* Fix message loss problem when rebalance with LitePullConsumer, update 2
上级 c3d46410
...@@ -83,9 +83,12 @@ public class AssignedMessageQueue { ...@@ -83,9 +83,12 @@ public class AssignedMessageQueue {
return -1; return -1;
} }
public void updatePullOffset(MessageQueue messageQueue, long offset) { public void updatePullOffset(MessageQueue messageQueue, long offset, ProcessQueue processQueue) {
MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) { if (messageQueueState != null) {
if (messageQueueState.getProcessQueue() != processQueue) {
return;
}
messageQueueState.setPullOffset(offset); messageQueueState.setPullOffset(offset);
} }
} }
......
...@@ -612,9 +612,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -612,9 +612,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset) { private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) {
if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) { if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset); assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue);
} }
} }
...@@ -740,6 +740,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -740,6 +740,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
long offset = nextPullOffset(messageQueue); long offset = nextPullOffset(messageQueue);
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
long pullDelayTimeMills = 0; long pullDelayTimeMills = 0;
try { try {
SubscriptionData subscriptionData; SubscriptionData subscriptionData;
...@@ -752,7 +755,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -752,7 +755,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) { switch (pullResult.getPullStatus()) {
case FOUND: case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue); final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
...@@ -769,7 +774,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -769,7 +774,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
default: default:
break; break;
} }
updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
} catch (Throwable e) { } catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException; pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error("An error occurred in pull message process.", e); log.error("An error occurred in pull message process.", e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册