未验证 提交 cca4d923 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2948 from lizhimins/fix-offset

[ISSUE #2708] Client may submit wrong offset when network instability
...@@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -115,6 +115,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
*/ */
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000;
private DefaultLitePullConsumer defaultLitePullConsumer; private DefaultLitePullConsumer defaultLitePullConsumer;
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
...@@ -626,9 +628,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -626,9 +628,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
private long fetchConsumeOffset(MessageQueue messageQueue) { private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
long offset = this.rebalanceImpl.computePullFromWhere(messageQueue); long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue);
return offset; return offset;
} }
...@@ -652,7 +654,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -652,7 +654,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
} }
private long nextPullOffset(MessageQueue messageQueue) { private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
long offset = -1; long offset = -1;
long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue); long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) { if (seekOffset != -1) {
...@@ -739,7 +741,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -739,7 +741,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return; return;
} }
long offset = nextPullOffset(messageQueue); long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
} catch (MQClientException e) {
log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
}
if (this.isCancelled() || processQueue.isDropped()) { if (this.isCancelled() || processQueue.isDropped()) {
return; return;
} }
......
...@@ -269,8 +269,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -269,8 +269,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
} }
} else { } else {
if (processQueue.isLocked()) { if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) { if (!pullRequest.isPreviouslyLocked()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset(); boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy); pullRequest, offset, brokerBusy);
...@@ -279,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -279,7 +286,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest, offset); pullRequest, offset);
} }
pullRequest.setLockedFirst(true); pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset); pullRequest.setNextOffset(offset);
} }
} else { } else {
......
...@@ -23,14 +23,14 @@ public class PullRequest { ...@@ -23,14 +23,14 @@ public class PullRequest {
private MessageQueue messageQueue; private MessageQueue messageQueue;
private ProcessQueue processQueue; private ProcessQueue processQueue;
private long nextOffset; private long nextOffset;
private boolean lockedFirst = false; private boolean previouslyLocked = false;
public boolean isLockedFirst() { public boolean isPreviouslyLocked() {
return lockedFirst; return previouslyLocked;
} }
public void setLockedFirst(boolean lockedFirst) { public void setPreviouslyLocked(boolean previouslyLocked) {
this.lockedFirst = lockedFirst; this.previouslyLocked = previouslyLocked;
} }
public String getConsumerGroup() { public String getConsumerGroup() {
......
...@@ -28,6 +28,7 @@ import java.util.Set; ...@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
...@@ -373,7 +374,15 @@ public abstract class RebalanceImpl { ...@@ -373,7 +374,15 @@ public abstract class RebalanceImpl {
this.removeDirtyOffset(mq); this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue(); ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) { if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) { if (pre != null) {
...@@ -408,8 +417,17 @@ public abstract class RebalanceImpl { ...@@ -408,8 +417,17 @@ public abstract class RebalanceImpl {
public abstract void removeDirtyOffset(final MessageQueue mq); public abstract void removeDirtyOffset(final MessageQueue mq);
/**
* When the network is unstable, using this interface may return wrong offset.
* It is recommended to use computePullFromWhereWithException instead.
* @param mq
* @return offset
*/
@Deprecated
public abstract long computePullFromWhere(final MessageQueue mq); public abstract long computePullFromWhere(final MessageQueue mq);
public abstract long computePullFromWhereWithException(final MessageQueue mq) throws MQClientException;
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList); public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
public void removeProcessQueue(final MessageQueue mq) { public void removeProcessQueue(final MessageQueue mq) {
......
...@@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl { ...@@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq); this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
} }
@Deprecated
@Override @Override
public long computePullFromWhere(MessageQueue mq) { public long computePullFromWhere(MessageQueue mq) {
long result = -1L;
try {
result = computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.warn("Compute consume offset exception, mq={}", mq);
}
return result;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere(); ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
long result = -1; long result = -1;
switch (consumeFromWhere) { switch (consumeFromWhere) {
...@@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { ...@@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try { try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} else { } else {
try { try {
...@@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { ...@@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
UtilAll.YYYYMMDDHHMMSS).getTime(); UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} }
} else { } else {
......
...@@ -20,6 +20,7 @@ import java.util.List; ...@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
...@@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl { ...@@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl {
this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
} }
@Deprecated
@Override @Override
public long computePullFromWhere(MessageQueue mq) { public long computePullFromWhere(MessageQueue mq) {
return 0; return 0;
} }
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
return 0;
}
@Override @Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) { public void dispatchPullRequest(List<PullRequest> pullRequestList) {
} }
......
...@@ -137,8 +137,20 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -137,8 +137,20 @@ public class RebalancePushImpl extends RebalanceImpl {
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
} }
@Deprecated
@Override @Override
public long computePullFromWhere(MessageQueue mq) { public long computePullFromWhere(MessageQueue mq) {
long result = -1L;
try {
result = computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.warn("Compute consume offset exception, mq={}", mq);
}
return result;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
long result = -1; long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
...@@ -159,7 +171,8 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -159,7 +171,8 @@ public class RebalancePushImpl extends RebalanceImpl {
try { try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} }
} else { } else {
...@@ -187,7 +200,8 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -187,7 +200,8 @@ public class RebalancePushImpl extends RebalanceImpl {
try { try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} else { } else {
try { try {
...@@ -195,7 +209,8 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -195,7 +209,8 @@ public class RebalancePushImpl extends RebalanceImpl {
UtilAll.YYYYMMDDHHMMSS).getTime(); UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} }
} else { } else {
......
...@@ -48,6 +48,7 @@ import org.apache.rocketmq.client.impl.consumer.PullMessageService; ...@@ -48,6 +48,7 @@ import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
...@@ -74,6 +75,7 @@ import static org.mockito.ArgumentMatchers.anyLong; ...@@ -74,6 +75,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
...@@ -87,6 +89,7 @@ public class DefaultMQPushConsumerTest { ...@@ -87,6 +89,7 @@ public class DefaultMQPushConsumerTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer; private DefaultMQPushConsumer pushConsumer;
@Before @Before
...@@ -130,6 +133,7 @@ public class DefaultMQPushConsumerTest { ...@@ -130,6 +133,7 @@ public class DefaultMQPushConsumerTest {
}); });
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
// suppress updateTopicRouteInfoFromNameServer // suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID(); pushConsumer.changeInstanceNameToPID();
...@@ -142,7 +146,7 @@ public class DefaultMQPushConsumerTest { ...@@ -142,7 +146,7 @@ public class DefaultMQPushConsumerTest {
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl()); rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl());
doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class)); doReturn(123L).when(rebalanceImpl).computePullFromWhereWithException(any(MessageQueue.class));
FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true); FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true);
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
...@@ -262,7 +266,7 @@ public class DefaultMQPushConsumerTest { ...@@ -262,7 +266,7 @@ public class DefaultMQPushConsumerTest {
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) { ConsumeConcurrentlyContext context) {
countDownLatch.countDown(); countDownLatch.countDown();
try { try {
Thread.sleep(1000); Thread.sleep(1000);
...@@ -320,4 +324,21 @@ public class DefaultMQPushConsumerTest { ...@@ -320,4 +324,21 @@ public class DefaultMQPushConsumerTest {
} }
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
} }
@Test
public void testPullMessage_ExceptionOccursWhenComputePullFromWhere() throws MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(
new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(),
(msgs, context) -> {
messageExts[0] = msgs.get(0);
return null;
}));
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(messageExts[0]).isNull();
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册