提交 4f96f72c 编写于 作者: D dongeforever

Fix the max offset logic for logicOffset = -1

上级 2dd65949
...@@ -633,12 +633,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -633,12 +633,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
long offset = -1; long offset = -1;
for (int i = 0; i < mappingItems.size(); i++) { for (int i = 0; i < mappingItems.size(); i++) {
LogicQueueMappingItem item = mappingItems.get(i); LogicQueueMappingItem item = mappingItems.get(i);
if (!item.checkIfLogicoffsetDecided()) {
continue;
}
if (mappingDetail.getBname().equals(item.getBname())) { if (mappingDetail.getBname().equals(item.getBname())) {
//means the leader
assert i == mappingItems.size() - 1;
offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp); offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
if (offset > 0) { if (offset > 0) {
offset = item.computeStaticQueueOffsetUpToEnd(offset); offset = item.computeStaticQueueOffsetStrictly(offset);
break;
} }
} else { } else {
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
...@@ -650,12 +652,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -650,12 +652,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader(); SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader) rpcResponse.getHeader();
if (offsetResponseHeader.getOffset() < 0 if (offsetResponseHeader.getOffset() < 0
|| (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) { || (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
continue; continue;
} else { } else {
offset = item.computeStaticQueueOffsetUpToEnd(offsetResponseHeader.getOffset()); offset = item.computeStaticQueueOffsetStrictly(offsetResponseHeader.getOffset());
} }
} }
...@@ -705,17 +707,38 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -705,17 +707,38 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (!mappingContext.isLeader()) { if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset); try {
LogicQueueMappingItem maxItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), Long.MAX_VALUE, true);
assert maxItem != null;
assert maxItem.getLogicOffset() >= 0;
requestHeader.setBname(maxItem.getBname());
requestHeader.setPhysical(true);
requestHeader.setQueueId(mappingItem.getQueueId());
long maxPhysicalOffset = Long.MAX_VALUE;
if (maxItem.getBname().equals(mappingDetail.getBname())) {
//current broker
maxPhysicalOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
} else {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MAX_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
GetMaxOffsetResponseHeader offsetResponseHeader = (GetMaxOffsetResponseHeader) rpcResponse.getHeader();
maxPhysicalOffset = offsetResponseHeader.getOffset();
}
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset); responseHeader.setOffset(maxItem.computeStaticQueueOffsetStrictly(maxPhysicalOffset));
response.setCode(ResponseCode.SUCCESS); response.setCode(ResponseCode.SUCCESS);
response.setRemark(null); response.setRemark(null);
return response; return response;
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
} }
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
...@@ -770,7 +793,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -770,7 +793,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader(); GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
physicalOffset = offsetResponseHeader.getOffset(); physicalOffset = offsetResponseHeader.getOffset();
} }
long offset = mappingItem.computeStaticQueueOffsetUpToEnd(physicalOffset); long offset = mappingItem.computeStaticQueueOffsetLoosely(physicalOffset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
...@@ -820,7 +843,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -820,7 +843,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
//TODO check if it is in current broker //TODO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
......
...@@ -274,11 +274,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -274,11 +274,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& nextBeginOffset >= currentItem.getEndOffset()) { && nextBeginOffset >= currentItem.getEndOffset()) {
nextBeginOffset = currentItem.getEndOffset(); nextBeginOffset = currentItem.getEndOffset();
} }
responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset));
//handle min offset //handle min offset
responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset))); responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset)));
//handle max offset //handle max offset
responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset), responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset),
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
//set the offsetDelta //set the offsetDelta
responseHeader.setOffsetDelta(currentItem.computeOffsetDelta()); responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
......
...@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -135,7 +135,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
//no need to care the broker name //no need to care the broker name
long staticLogicOffset = mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getQueueOffset()); long staticLogicOffset = mappingItem.computeStaticQueueOffsetLoosely(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) { if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
......
...@@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -7,6 +7,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
...@@ -70,6 +71,9 @@ public class RpcClientImpl implements RpcClient { ...@@ -70,6 +71,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.GET_MIN_OFFSET: case RequestCode.GET_MIN_OFFSET:
rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs); rpcResponsePromise = handleGetMinOffset(addr, request, timeoutMs);
break; break;
case RequestCode.GET_MAX_OFFSET:
rpcResponsePromise = handleGetMaxOffset(addr, request, timeoutMs);
break;
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs); rpcResponsePromise = handleSearchOffset(addr, request, timeoutMs);
break; break;
...@@ -226,6 +230,27 @@ public class RpcClientImpl implements RpcClient { ...@@ -226,6 +230,27 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise; return rpcResponsePromise;
} }
public Promise<RpcResponse> handleGetMaxOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
}
default:{
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
return rpcResponsePromise;
}
public Promise<RpcResponse> handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception { public Promise<RpcResponse> handleGetEarliestMsgStoretime(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture(); final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
......
...@@ -31,7 +31,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { ...@@ -31,7 +31,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
this.timeOfEnd = timeOfEnd; this.timeOfEnd = timeOfEnd;
} }
public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
//should only be user in sendMessage and getMinOffset
public long computeStaticQueueOffsetLoosely(long physicalQueueOffset) {
//consider the newly mapped item //consider the newly mapped item
if (logicOffset < 0) { if (logicOffset < 0) {
return -1; return -1;
...@@ -46,10 +48,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { ...@@ -46,10 +48,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
return logicOffset + (physicalQueueOffset - startOffset); return logicOffset + (physicalQueueOffset - startOffset);
} }
public long computeStaticQueueOffset(long physicalQueueOffset) { public long computeStaticQueueOffsetStrictly(long physicalQueueOffset) {
if (logicOffset < 0) { assert logicOffset >= 0;
return logicOffset;
}
if (physicalQueueOffset < startOffset) { if (physicalQueueOffset < startOffset) {
return logicOffset; return logicOffset;
} }
...@@ -67,15 +68,15 @@ public class LogicQueueMappingItem extends RemotingSerializable { ...@@ -67,15 +68,15 @@ public class LogicQueueMappingItem extends RemotingSerializable {
return logicOffset; return logicOffset;
} }
} }
public boolean checkIfShouldDeleted() {
return endOffset == startOffset;
}
public boolean checkIfEndOffsetDecided() { public boolean checkIfEndOffsetDecided() {
//if the endOffset == startOffset, then the item should be deleted //if the endOffset == startOffset, then the item should be deleted
return endOffset > startOffset; return endOffset > startOffset;
} }
public boolean checkIfLogicoffsetDecided() {
return logicOffset >= 0;
}
public long computeOffsetDelta() { public long computeOffsetDelta() {
return logicOffset - startOffset; return logicOffset - startOffset;
} }
......
...@@ -632,9 +632,6 @@ public class TopicQueueMappingUtils { ...@@ -632,9 +632,6 @@ public class TopicQueueMappingUtils {
if (ignoreNegative && item.getLogicOffset() < 0) { if (ignoreNegative && item.getLogicOffset() < 0) {
continue; continue;
} }
if (!item.checkIfShouldDeleted()) {
return mappingItems.get(i);
}
} }
return null; return null;
} }
......
...@@ -16,7 +16,6 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; ...@@ -16,7 +16,6 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
...@@ -165,7 +164,7 @@ public class MQAdminUtils { ...@@ -165,7 +164,7 @@ public class MQAdminUtils {
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) { if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset()); throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
} }
newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize)); newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffsetStrictly(topicOffset.getMaxOffset()), blockSeqSize));
TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname());
//fresh the new leader //fresh the new leader
TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items); TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册