提交 da589b19 编写于 作者: 斜阳

[ISSUE #2708] Client may submit wrong offset when network instability

上级 6ab6afd0
...@@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
*/ */
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000;
private DefaultLitePullConsumer defaultLitePullConsumer; private DefaultLitePullConsumer defaultLitePullConsumer;
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
...@@ -743,7 +745,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -743,7 +745,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
try { try {
offset = nextPullOffset(messageQueue); offset = nextPullOffset(messageQueue);
} catch (MQClientException e) { } catch (MQClientException e) {
log.error("get next pull offset failed, maybe timeout exception"); log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return; return;
} }
......
...@@ -269,12 +269,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -269,12 +269,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
} else { } else {
if (processQueue.isLocked()) { if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) { if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L; long offset = -1L;
try { try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) { } catch (MQClientException e) {
log.error("compute consume offset failed, maybe timeout exception"); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset", e);
return;
} }
boolean brokerBusy = offset < pullRequest.getNextOffset(); boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
...@@ -284,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -284,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest, offset); pullRequest, offset);
} }
pullRequest.setLockedFirst(true); pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset); pullRequest.setNextOffset(offset);
} }
} else { } else {
......
...@@ -23,14 +23,14 @@ public class PullRequest { ...@@ -23,14 +23,14 @@ public class PullRequest {
private MessageQueue messageQueue; private MessageQueue messageQueue;
private ProcessQueue processQueue; private ProcessQueue processQueue;
private long nextOffset; private long nextOffset;
private boolean lockedFirst = false; private boolean previouslyLocked = false;
public boolean isLockedFirst() { public boolean isPreviouslyLocked() {
return lockedFirst; return previouslyLocked;
} }
public void setLockedFirst(boolean lockedFirst) { public void setPreviouslyLocked(boolean previouslyLocked) {
this.lockedFirst = lockedFirst; this.previouslyLocked = previouslyLocked;
} }
public String getConsumerGroup() { public String getConsumerGroup() {
......
...@@ -375,7 +375,7 @@ public abstract class RebalanceImpl { ...@@ -375,7 +375,7 @@ public abstract class RebalanceImpl {
this.removeDirtyOffset(mq); this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue(); ProcessQueue pq = new ProcessQueue();
long nextOffset = 0L; long nextOffset = -1L;
try { try {
nextOffset = this.computePullFromWhereWithException(mq); nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) { } catch (MQClientException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册