提交 311d5f4a 编写于 作者: D dongeforever

Try to abstract the rpc layer for BrokerOuterAPI

上级 32f58c79
......@@ -45,8 +45,15 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
......@@ -59,6 +66,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -75,6 +84,7 @@ public class BrokerOuterAPI {
private final BrokerController brokerController;
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private final String currBrokerName;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
......@@ -86,6 +96,7 @@ public class BrokerOuterAPI {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
this.brokerController = brokerController;
this.currBrokerName = brokerController.getBrokerConfig().getBrokerName();
}
public void start() {
......@@ -463,14 +474,13 @@ public class BrokerOuterAPI {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
public RemotingCommand pullMessage(String brokerName, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
public RemotingCommand pullMessage(String bname, PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(brokerName);
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), brokerName, this.brokerController.getBrokerConfig().getBrokerName()));
return response;
return RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR,
String.format("%s-%d cannot find addr when forward to broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), bname, currBrokerName),
PullMessageResponseHeader.class);
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
......@@ -478,4 +488,108 @@ public class BrokerOuterAPI {
return response;
}
public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest );
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS:
case ResponseCode.PULL_NOT_FOUND:
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
default:
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
}
public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
}
public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
RpcResponse rpcResponse = new RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
rpcResponse.setException(new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr));
return rpcResponse;
}
RemotingCommand requestCommand = RemotingCommand.createRequestCommand(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
}
}
......@@ -139,6 +139,8 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
......@@ -614,6 +616,29 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
private RemotingCommand rewriteRequestForStaticTopic(SearchOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
}
ImmutableList<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
//TODO should make sure the offset timestamp is equal or bigger than the searched timestamp
for (int i = mappingItems.size() - 1; i >=0; i--) {
}
requestHeader.setQueueId(mappingItem.getQueueId());
return null;
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
......@@ -621,6 +646,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
{
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (mappingDetail != null) {
}
}
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp());
......
......@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class SearchOffsetRequestHeader implements CommandCustomHeader {
public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
......@@ -37,18 +37,22 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
}
@Override
public String getTopic() {
return topic;
}
@Override
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public Integer getQueueId() {
return queueId;
}
@Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
public class RpcRequest {
private int code;
private CommandCustomHeader header;
private byte[] body;
public RpcRequest(int code, CommandCustomHeader header, byte[] body) {
this.code = code;
this.header = header;
this.body = body;
}
public int getCode() {
return code;
}
public CommandCustomHeader getHeader() {
return header;
}
public byte[] getBody() {
return body;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
public class RpcResponse {
private int code;
private CommandCustomHeader header;
private byte[] body;
public Exception exception;
public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
this.code = code;
this.header = header;
this.body = body;
}
public int getCode() {
return code;
}
public CommandCustomHeader getHeader() {
return header;
}
public byte[] getBody() {
return body;
}
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
}
......@@ -20,11 +20,13 @@ import com.alibaba.fastjson.annotation.JSONField;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -85,6 +87,15 @@ public class RemotingCommand {
protected RemotingCommand() {
}
public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(rpcRequest.getCode());
cmd.customHeader = rpcRequest.getHeader();
setCmdVersion(cmd);
cmd.setBody(rpcRequest.getBody());
return cmd;
}
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
......@@ -110,6 +121,13 @@ public class RemotingCommand {
return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
}
public static RemotingCommand buildErrorResponse(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(classHeader);
response.setCode(code);
response.setRemark(remark);
return response;
}
public static RemotingCommand buildErrorResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(code);
......@@ -117,6 +135,7 @@ public class RemotingCommand {
return response;
}
public static RemotingCommand createResponseCommand(int code, String remark,
Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册