diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index aa6879b5afbf767659bad71804fc98d221c0e201..6bac707c80cd334379e46aa6ce8c3e93a129e668 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -200,6 +200,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements //actually, we need do nothing, but keep the code structure here if (code == ResponseCode.PULL_OFFSET_MOVED) { responseCode = ResponseCode.PULL_OFFSET_MOVED; + nextBeginOffset = maxOffset; } else { //maybe current broker is the slave responseCode = code; @@ -217,11 +218,12 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements if (requestOffset < minOffset) { if (code == ResponseCode.PULL_OFFSET_MOVED) { responseCode = ResponseCode.PULL_OFFSET_MOVED; + nextBeginOffset = minOffset; } else { //maybe read from slave, but we still set it to moved responseCode = ResponseCode.PULL_OFFSET_MOVED; + nextBeginOffset = minOffset; } - nextBeginOffset = minOffset; } else if (requestOffset >= maxOffset) { //just move to another item LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 9d158c66c84e593d5ce90147b2a6248b9f4825de..6365f96a5fbabecb26356cdb0b05201666e8a845 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -72,6 +72,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.PopProcessQueueInfo; @@ -297,6 +298,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { long offset = -1L; try { offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); + if (offset < 0) { + throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset); + } } catch (Exception e) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index d28a0469864edd5e4c805725fd557f4d7306558f..76803dd6aa36cd9e3575e98fe0c39c8458cca750 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -189,7 +189,8 @@ public class RebalancePushImpl extends RebalanceImpl { } } } else { - result = -1; + throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query consume offset from " + + "offset store"); } break; } @@ -198,9 +199,11 @@ public class RebalancePushImpl extends RebalanceImpl { if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { + //the offset will be fixed by the OFFSET_ILLEGAL process result = 0L; } else { - result = -1; + throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " + + "store"); } break; } @@ -227,7 +230,8 @@ public class RebalancePushImpl extends RebalanceImpl { } } } else { - result = -1; + throw new MQClientException(ResponseCode.QUERY_NOT_FOUND, "Failed to query offset from offset " + + "store"); } break; } @@ -236,6 +240,10 @@ public class RebalancePushImpl extends RebalanceImpl { break; } + if (result < 0) { + throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Found unexpected result " + result); + } + return result; }