提交 6ab6afd0 编写于 作者: 斜阳

[ISSUE #2708] Client may submit wrong offset when network instability

上级 952772fa
......@@ -626,9 +626,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
private long fetchConsumeOffset(MessageQueue messageQueue) {
private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
long offset = this.rebalanceImpl.computePullFromWhere(messageQueue);
long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue);
return offset;
}
......@@ -652,7 +652,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
private long nextPullOffset(MessageQueue messageQueue) {
private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
long offset = -1;
long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) {
......@@ -739,7 +739,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return;
}
long offset = nextPullOffset(messageQueue);
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
} catch (MQClientException e) {
log.error("get next pull offset failed, maybe timeout exception");
return;
}
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
......
......@@ -270,7 +270,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) {
log.error("compute consume offset failed, maybe timeout exception");
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
......
......@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
......@@ -373,7 +374,15 @@ public abstract class RebalanceImpl {
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
long nextOffset = 0L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
......@@ -408,8 +417,17 @@ public abstract class RebalanceImpl {
public abstract void removeDirtyOffset(final MessageQueue mq);
/**
* When the network is unstable, using this interface may return wrong offset.
* It is recommended to use computePullFromWhereWithException instead.
* @param mq
* @return offset
*/
@Deprecated
public abstract long computePullFromWhere(final MessageQueue mq);
public abstract long computePullFromWhereWithException(final MessageQueue mq) throws MQClientException;
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
public void removeProcessQueue(final MessageQueue mq) {
......
......@@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Deprecated
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1L;
try {
result = computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.warn("Compute consume offset exception, mq={}", mq);
}
return result;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
long result = -1;
switch (consumeFromWhere) {
......@@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
} else {
try {
......@@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......
......@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
......@@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl {
this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Deprecated
@Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
return 0;
}
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
......
......@@ -137,8 +137,20 @@ public class RebalancePushImpl extends RebalanceImpl {
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Deprecated
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1L;
try {
result = computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.warn("Compute consume offset exception, mq={}", mq);
}
return result;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
......@@ -159,7 +171,8 @@ public class RebalancePushImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......@@ -187,7 +200,8 @@ public class RebalancePushImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
} else {
try {
......@@ -195,7 +209,8 @@ public class RebalancePushImpl extends RebalanceImpl {
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册