提交 507553bd 编写于 作者: D dongeforever

Finish the rewrite logic for SEARCH_OFFSET_BY_TIMESTAMP

上级 8d49c16d
......@@ -119,6 +119,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......@@ -625,15 +627,46 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
ImmutableList<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
//TODO should make sure the offset timestamp is equal or bigger than the searched timestamp
for (int i = mappingItems.size() - 1; i >=0; i--) {
//TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
Long timestamp = requestHeader.getTimestamp();
long offset = -1;
for (int i = 0; i < mappingItems.size(); i++) {
LogicQueueMappingItem item = mappingItems.get(i);
if (mappingDetail.getBname().equals(item.getBname())) {
//means the leader
assert i == mappingItems.size() - 1;
offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(), item.getQueueId(), timestamp);
if (offset > 0) {
offset = item.computeStaticQueueOffset(offset);
}
} else {
requestHeader.setPhysical(true);
requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId());
RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().pullMessage(item.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
SearchOffsetResponseHeader offsetResponseHeader = (SearchOffsetResponseHeader)rpcResponse.getHeader();
if (offsetResponseHeader.getOffset() < 0
|| (item.checkIfEndOffsetDecided() && offsetResponseHeader.getOffset() >= item.getEndOffset())) {
continue;
} else {
offset = item.computeStaticQueueOffset(offsetResponseHeader.getOffset());
}
}
}
requestHeader.setQueueId(mappingItem.getQueueId());
return null;
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
......@@ -646,12 +679,11 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
{
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (mappingDetail != null) {
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
}
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
......
......@@ -6,8 +6,8 @@ public class LogicQueueMappingItem {
private int queueId;
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset
private long endOffset; // the end of the physical offset
private long startOffset; // the start of the physical offset, included
private long endOffset; // the end of the physical offset, excluded
private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable
......@@ -23,6 +23,13 @@ public class LogicQueueMappingItem {
}
public long computeStaticQueueOffset(long physicalQueueOffset) {
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
if (endOffset >= startOffset
&& endOffset < physicalQueueOffset) {
return logicOffset + (endOffset - startOffset);
}
return logicOffset + (physicalQueueOffset - startOffset);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册