diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index aad9454f4f7d50bd236f415682ff5986cefa1d4d..04433964c42b5564e8a9a35e4a331191535e719e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -141,8 +141,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen if (!mappingContext.isLeader()) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); } - if (mappingContext.checkIfAsPhysical()) { - //let it go + List mappingItemList = mappingContext.getMappingItemList(); + if (mappingItemList.size() == 1 + && mappingItemList.get(0).getLogicOffset() == 0) { + //as physical, just let it go requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId()); return null; } @@ -169,6 +171,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen requestHeader.setBname(mappingItem.getBname()); requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setPhysical(true); + requestHeader.setSetZeroIfNotFound(false); RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); if (rpcResponse.getException() != null) { @@ -230,7 +233,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); - if (minOffset <= 0 + if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) { + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("Not found, do not set to zero, maybe this group boot first"); + }else if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { responseHeader.setOffset(0L); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java index b7714da5b8289cbd313851aac97c479e212c29ff..ebcbe0db2ab2b78730a4e22dd4a5a11149899c6c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java @@ -32,6 +32,8 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader { @CFNotNull private Integer queueId; + private Boolean setZeroIfNotFound; + @Override public void checkFields() throws RemotingCommandException { } @@ -63,4 +65,12 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader { public void setQueueId(Integer queueId) { this.queueId = queueId; } + + public Boolean getSetZeroIfNotFound() { + return setZeroIfNotFound; + } + + public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) { + this.setZeroIfNotFound = setZeroIfNotFound; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java index f705c43932d9856ffb40227538c8cdeb256bf0e9..d6a6fd97648b80e6f47f0e0ce4e10a468f13538a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java @@ -38,11 +38,6 @@ public class TopicQueueMappingContext { } - public boolean checkIfAsPhysical() { - return mappingDetail == null - || mappingItemList == null - || (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0); - } public boolean isLeader() { return leaderItem != null && leaderItem.getBname().equals(mappingDetail.getBname());