提交 74565fdd 编写于 作者: D dongeforever

Fix the computePullFromWhereWithException

上级 fad17e1e
...@@ -200,6 +200,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -200,6 +200,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//actually, we need do nothing, but keep the code structure here //actually, we need do nothing, but keep the code structure here
if (code == ResponseCode.PULL_OFFSET_MOVED) { if (code == ResponseCode.PULL_OFFSET_MOVED) {
responseCode = ResponseCode.PULL_OFFSET_MOVED; responseCode = ResponseCode.PULL_OFFSET_MOVED;
nextBeginOffset = maxOffset;
} else { } else {
//maybe current broker is the slave //maybe current broker is the slave
responseCode = code; responseCode = code;
...@@ -217,11 +218,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -217,11 +218,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if (requestOffset < minOffset) { if (requestOffset < minOffset) {
if (code == ResponseCode.PULL_OFFSET_MOVED) { if (code == ResponseCode.PULL_OFFSET_MOVED) {
responseCode = ResponseCode.PULL_OFFSET_MOVED; responseCode = ResponseCode.PULL_OFFSET_MOVED;
nextBeginOffset = minOffset;
} else { } else {
//maybe read from slave, but we still set it to moved //maybe read from slave, but we still set it to moved
responseCode = ResponseCode.PULL_OFFSET_MOVED; responseCode = ResponseCode.PULL_OFFSET_MOVED;
nextBeginOffset = minOffset;
} }
nextBeginOffset = minOffset;
} else if (requestOffset >= maxOffset) { } else if (requestOffset >= maxOffset) {
//just move to another item //just move to another item
LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
......
...@@ -72,6 +72,7 @@ import org.apache.rocketmq.common.message.MessageConst; ...@@ -72,6 +72,7 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo;
...@@ -297,6 +298,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -297,6 +298,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L; long offset = -1L;
try { try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
if (offset < 0) {
throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
}
} catch (Exception e) { } catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
......
...@@ -189,7 +189,8 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -189,7 +189,8 @@ public class RebalancePushImpl extends RebalanceImpl {
} }
} }
} else { } else {
result = -1; throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query consume offset from " +
"offset store");
} }
break; break;
} }
...@@ -198,9 +199,11 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -198,9 +199,11 @@ public class RebalancePushImpl extends RebalanceImpl {
if (lastOffset >= 0) { if (lastOffset >= 0) {
result = lastOffset; result = lastOffset;
} else if (-1 == lastOffset) { } else if (-1 == lastOffset) {
//the offset will be fixed by the OFFSET_ILLEGAL process
result = 0L; result = 0L;
} else { } else {
result = -1; throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " +
"store");
} }
break; break;
} }
...@@ -227,7 +230,8 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -227,7 +230,8 @@ public class RebalancePushImpl extends RebalanceImpl {
} }
} }
} else { } else {
result = -1; throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " +
"store");
} }
break; break;
} }
...@@ -236,6 +240,10 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -236,6 +240,10 @@ public class RebalancePushImpl extends RebalanceImpl {
break; break;
} }
if (result < 0) {
throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Found unexpected result " + result);
}
return result; return result;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册