提交 5b777932 编写于 作者: D dongeforever

Polish the resetZero logic for logic queue

上级 74565fdd
......@@ -141,8 +141,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
}
if (mappingContext.checkIfAsPhysical()) {
//let it go
List<LogicQueueMappingItem> mappingItemList = mappingContext.getMappingItemList();
if (mappingItemList.size() == 1
&& mappingItemList.get(0).getLogicOffset() == 0) {
//as physical, just let it go
requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
return null;
}
......@@ -169,6 +171,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
requestHeader.setBname(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId());
requestHeader.setPhysical(true);
requestHeader.setSetZeroIfNotFound(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
......@@ -230,7 +233,10 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, do not set to zero, maybe this group boot first");
}else if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
......
......@@ -32,6 +32,8 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private Integer queueId;
private Boolean setZeroIfNotFound;
@Override
public void checkFields() throws RemotingCommandException {
}
......@@ -63,4 +65,12 @@ public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
public Boolean getSetZeroIfNotFound() {
return setZeroIfNotFound;
}
public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) {
this.setZeroIfNotFound = setZeroIfNotFound;
}
}
......@@ -38,11 +38,6 @@ public class TopicQueueMappingContext {
}
public boolean checkIfAsPhysical() {
return mappingDetail == null
|| mappingItemList == null
|| (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0);
}
public boolean isLeader() {
return leaderItem != null && leaderItem.getBname().equals(mappingDetail.getBname());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册