提交 86b3ff7b 编写于 作者: D dongeforever

Finish the processor

上级 da12c9ed
...@@ -646,8 +646,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -646,8 +646,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
requestHeader.setTimestamp(timestamp); requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId()); requestHeader.setQueueId(item.getQueueId());
RpcRequest rpcRequest = new RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null); requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().searchOffset(item.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); requestHeader.setBname(item.getBname());
RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
...@@ -751,8 +753,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -751,8 +753,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
try { try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
//TODO check if it is leader
RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
...@@ -802,8 +808,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -802,8 +808,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
try { try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
//TODO check if it is leader
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册