提交 912613d2 编写于 作者: D dongeforever

Abstract the rpc layer

上级 fdbc7ffd
......@@ -26,9 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
......@@ -45,15 +43,10 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
......@@ -66,8 +59,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -474,95 +467,6 @@ public class BrokerOuterAPI {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
private String getBrokerAddrByNameOrException(String bname) throws MQBrokerException {
String addr = this.brokerController.getBrokerAddrByName(bname);
if (addr == null) {
throw new MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname, addr);
}
return addr;
}
public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS:
case ResponseCode.PULL_NOT_FOUND:
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
default:
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
}
public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
}
public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RemotingCommand.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
return rpcResponse;
}
}
}
}
......@@ -119,8 +119,8 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
......
......@@ -36,7 +36,6 @@ import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
......@@ -60,14 +59,15 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -157,7 +157,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return rewriteResult;
}
}
return RemotingCommand.createCommandForRpcResponse(rpcResponse);
return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
......@@ -54,7 +53,6 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
......
......@@ -28,14 +28,11 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
......
......@@ -87,6 +87,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import static org.apache.rocketmq.common.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final static InternalLogger log = ClientLogger.getLog();
......@@ -162,37 +164,7 @@ public class MQClientInstance {
}
public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
if (route.getTopicQueueMappingByBroker() == null
|| route.getTopicQueueMappingByBroker().isEmpty()) {
return new ConcurrentHashMap<>();
}
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<>();
int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
String brokerName = entry.getKey();
if (entry.getValue().getTotalQueues() > totalNums) {
if (totalNums != 0) {
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
}
totalNums = entry.getValue().getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
String oldBrokerName = mqEndPoints.put(mq, brokerName);
log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
}
}
//accomplish the static logic queues
for (int i = 0; i < totalNums; i++) {
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
if (!mqEndPoints.containsKey(mq)) {
mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
}
}
return mqEndPoints;
}
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
......
......@@ -86,4 +86,11 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int POLLING_TIMEOUT = 210;
public static final int NOT_LEADER_FOR_QUEUE = 501;
public static final int RPC_UNKNOWN = -1000;
public static final int RPC_ADDR_IS_NULL = -1002;
public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
public static final int RPC_TIME_OUT = -1006;
}
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
......@@ -20,7 +20,7 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicQueueMappingInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ClientMetadata {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<String, ConcurrentMap<MessageQueue, String>>();
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
if (topicEndPointsTable.get(mq.getTopic()) != null
&& !topicEndPointsTable.get(mq.getTopic()).isEmpty()) {
return topicEndPointsTable.get(mq.getTopic()).get(mq);
}
return mq.getBrokerName();
}
public void refreshClusterInfo(ClusterInfo clusterInfo) {
if (clusterInfo == null
|| clusterInfo.getBrokerAddrTable() == null) {
return;
}
for (Map.Entry<String, BrokerData> entry : clusterInfo.getBrokerAddrTable().entrySet()) {
brokerAddrTable.put(entry.getKey(), entry.getValue().getBrokerAddrs());
}
}
public String findMasterBrokerAddr(String brokerName) {
if (!brokerAddrTable.containsKey(brokerName)) {
return null;
}
return brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID);
}
public static ConcurrentMap<MessageQueue, String> topicRouteData2EndpointsForStaticTopic(final String topic, final TopicRouteData route) {
if (route.getTopicQueueMappingByBroker() == null
|| route.getTopicQueueMappingByBroker().isEmpty()) {
return new ConcurrentHashMap<MessageQueue, String>();
}
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
String brokerName = entry.getKey();
if (entry.getValue().getTotalQueues() > totalNums) {
if (totalNums != 0) {
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
}
totalNums = entry.getValue().getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
String oldBrokerName = mqEndPoints.put(mq, brokerName);
log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
}
}
//accomplish the static logic queues
for (int i = 0; i < totalNums; i++) {
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
if (!mqEndPoints.containsKey(mq)) {
mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
}
}
return mqEndPoints;
}
}
......@@ -14,12 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
package org.apache.rocketmq.common.rpc;
public abstract class TopicQueueRequestHeader extends RequestHeader {
public abstract String getTopic();
public abstract void setTopic(String topic);
public abstract Integer getQueueId();
public abstract void setQueueId(Integer queueId);
import org.apache.rocketmq.remoting.CommandCustomHeader;
public abstract class CommonRpcHeader implements CommandCustomHeader {
//the namespace name
protected String namespace;
//if the data has been namespaced
protected Boolean namespaced;
//the abstract remote addr name, usually the physical broker name
protected String bname;
protected Boolean oneway;
public String getBname() {
return bname;
}
public void setBname(String bname) {
this.bname = bname;
}
public Boolean getOneway() {
return oneway;
}
public void setOneway(Boolean oneway) {
this.oneway = oneway;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public Boolean getNamespaced() {
return namespaced;
}
public void setNamespaced(Boolean namespaced) {
this.namespaced = namespaced;
}
}
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.concurrent.Future;
public interface RpcClient {
//common invoke paradigm, the logic remote addr is defined in "bname" field of request
//For oneway request, the sign is labeled in request, and do not need an another method named "invokeOneway"
//For one
Future<RpcResponse> invoke(RpcRequest request, long timeoutMs) throws RpcException;
//For rocketmq, most requests are corresponded to MessageQueue
//And for LogicQueue, the broker name is mocked, the physical addr could only be defined by MessageQueue
Future<RpcResponse> invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException;
}
package org.apache.rocketmq.common.rpc;
import java.util.concurrent.Future;
public abstract class RpcClientHook {
//if the return is not null, return it
public abstract RpcResponse beforeRequest(RpcRequest rpcRequest) throws RpcException;
//if the return is not null, return it
public abstract RpcResponse afterResponse(RpcResponse rpcResponse) throws RpcException;
}
package org.apache.rocketmq.common.rpc;
import com.google.common.util.concurrent.Futures;
import com.sun.org.apache.xpath.internal.functions.FuncPosition;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
public class RpcClientImpl implements RpcClient {
private ClientMetadata clientMetadata;
private RemotingClient remotingClient;
private List<RpcClientHook> clientHookList = new ArrayList<RpcClientHook>();
public RpcClientImpl(ClientMetadata clientMetadata, RemotingClient remotingClient) {
this.clientMetadata = clientMetadata;
this.remotingClient = remotingClient;
}
public void registerHook(RpcClientHook hook) {
clientHookList.add(hook);
}
@Override
public Future<RpcResponse> invoke(MessageQueue mq, RpcRequest request, long timeoutMs) throws RpcException {
String bname = clientMetadata.getBrokerNameFromMessageQueue(mq);
request.getHeader().setBname(bname);
return invoke(request, timeoutMs);
}
public Promise<RpcResponse> createResponseFuture() {
return ImmediateEventExecutor.INSTANCE.newPromise();
}
@Override
public Future<RpcResponse> invoke(RpcRequest request, long timeoutMs) throws RpcException {
if (clientHookList.size() > 0) {
for (RpcClientHook rpcClientHook: clientHookList) {
RpcResponse response = rpcClientHook.beforeRequest(request);
if (response != null) {
//For 1.6, there is not easy-to-use future impl
return createResponseFuture().setSuccess(response);
}
}
}
String addr = getBrokerAddrByNameOrException(request.getHeader().bname);
Promise<RpcResponse> rpcResponsePromise = null;
try {
switch (request.getCode()) {
case RequestCode.PULL_MESSAGE:
rpcResponsePromise = handlePullMessage(addr, request, timeoutMs);
break;
default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
}
} catch (RpcException rpcException) {
throw rpcException;
} catch (Exception e) {
throw new RpcException(ResponseCode.RPC_UNKNOWN, "error from remoting layer", e);
}
return rpcResponsePromise;
}
private String getBrokerAddrByNameOrException(String bname) throws RpcException {
String addr = this.clientMetadata.findMasterBrokerAddr(bname);
if (addr == null) {
throw new RpcException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + bname);
}
return addr;
}
private void processFailedResponse(String addr, RemotingCommand requestCommand, ResponseFuture responseFuture, Promise<RpcResponse> rpcResponsePromise) {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
if (responseCommand != null) {
//this should not happen
return;
}
int errorCode = ResponseCode.RPC_UNKNOWN;
String errorMessage = null;
if (!responseFuture.isSendRequestOK()) {
errorCode = ResponseCode.RPC_SEND_TO_CHANNEL_FAILED;
errorMessage = "send request failed to " + addr + ". Request: " + requestCommand;
} else if (responseFuture.isTimeout()) {
errorCode = ResponseCode.RPC_TIME_OUT;
errorMessage = "wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + requestCommand;
} else {
errorMessage = "unknown reason. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand;
}
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(errorCode, errorMessage)));
}
public Promise<RpcResponse> handlePullMessage(final String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
InvokeCallback callback = new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
if (responseCommand == null) {
processFailedResponse(addr, requestCommand, responseFuture, rpcResponsePromise);
return;
}
try {
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS:
case ResponseCode.PULL_NOT_FOUND:
case ResponseCode.PULL_RETRY_IMMEDIATELY:
case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
default:
RpcResponse rpcResponse = new RpcResponse(new RpcException(responseCommand.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse);
}
} catch (Exception e) {
String errorMessage = "process failed. addr: " + addr + ", timeoutMillis: " + responseFuture.getTimeoutMillis() + ". Request: " + requestCommand;
RpcResponse rpcResponse = new RpcResponse(new RpcException(ResponseCode.RPC_UNKNOWN, errorMessage, e));
rpcResponsePromise.setSuccess(rpcResponse);
}
}
};
this.remotingClient.invokeAsync(addr, requestCommand, timeoutMillis, callback);
return rpcResponsePromise;
}
public RpcResponse handleSearchOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse;
}
}
}
public RpcResponse handleGetMinOffset(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse;
}
}
}
public RpcResponse handleGetEarliestMsgStoretime(String bname, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
String addr = getBrokerAddrByNameOrException(bname);
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
assert responseCommand != null;
switch (responseCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader) responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(responseCommand.getCode(), null, null);
rpcResponse.setException(new RpcException(responseCommand.getCode(), "unknown remote error"));
return rpcResponse;
}
}
}
}
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import java.nio.ByteBuffer;
public class RpcClientUtils {
public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
RemotingCommand cmd = RemotingCommand.createRequestCommand(rpcRequest.getCode(), rpcRequest.getHeader());
cmd.setBody(encodeBody(rpcRequest.getBody()));
return cmd;
}
public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
RemotingCommand cmd = RemotingCommand.createResponseCommand(rpcResponse.getCode(), rpcResponse.getHeader());
cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
cmd.setBody(encodeBody(rpcResponse.getBody()));
return cmd;
}
public static byte[] encodeBody(Object body) {
if (body instanceof byte[]) {
return (byte[])body;
} else if (body instanceof RemotingSerializable) {
return ((RemotingSerializable) body).encode();
} else if (body instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer)body;
buffer.mark();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
buffer.reset();
return data;
} else {
throw new RuntimeException("Unsupported body type " + body.getClass());
}
}
}
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class RpcException extends RemotingException {
private int errorCode;
public RpcException(int errorCode, String message) {
super(message);
this.errorCode = errorCode;
}
public RpcException(int errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}
public int getErrorCode() {
return errorCode;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
}
......@@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
package org.apache.rocketmq.common.rpc;
public class RpcRequest {
private int code;
private CommandCustomHeader header;
private byte[] body;
private CommonRpcHeader header;
private Object body;
public RpcRequest(int code, CommandCustomHeader header, byte[] body) {
public RpcRequest(int code, CommonRpcHeader header, Object body) {
this.code = code;
this.header = header;
this.body = body;
......@@ -31,11 +31,11 @@ public class RpcRequest {
return code;
}
public CommandCustomHeader getHeader() {
public CommonRpcHeader getHeader() {
return header;
}
public byte[] getBody() {
public Object getBody() {
return body;
}
}
......@@ -14,13 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
public class RpcResponse {
private int code;
private CommandCustomHeader header;
private byte[] body;
public Exception exception;
private Object body;
public RpcException exception;
public RpcResponse() {
}
public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
this.code = code;
......@@ -28,6 +34,11 @@ public class RpcResponse {
this.body = body;
}
public RpcResponse(RpcException rpcException) {
this.code = rpcException.getErrorCode();
this.exception = rpcException;
}
public int getCode() {
return code;
}
......@@ -36,15 +47,16 @@ public class RpcResponse {
return header;
}
public byte[] getBody() {
public Object getBody() {
return body;
}
public Exception getException() {
public RpcException getException() {
return exception;
}
public void setException(Exception exception) {
public void setException(RpcException exception) {
this.exception = exception;
}
}
......@@ -14,16 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
package org.apache.rocketmq.common.rpc;
public abstract class RequestHeader implements CommandCustomHeader {
public abstract class TopicQueueRequestHeader extends CommonRpcHeader {
//Physical or Logical
protected Boolean physical;
@Override
public Boolean getPhysical() {
return physical;
}
@Override
public void setPhysical(Boolean physical) {
this.physical = physical;
}
public abstract String getTopic();
public abstract void setTopic(String topic);
public abstract Integer getQueueId();
public abstract void setQueueId(Integer queueId);
}
......@@ -20,8 +20,6 @@ import com.alibaba.fastjson.annotation.JSONField;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RpcRequest;
import org.apache.rocketmq.remoting.RpcResponse;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
......@@ -88,6 +86,7 @@ public class RemotingCommand {
protected RemotingCommand() {
}
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
......@@ -96,20 +95,11 @@ public class RemotingCommand {
return cmd;
}
public static RemotingCommand createCommandForRpcRequest(RpcRequest rpcRequest) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(rpcRequest.getCode());
cmd.customHeader = rpcRequest.getHeader();
setCmdVersion(cmd);
cmd.setBody(rpcRequest.getBody());
return cmd;
}
public static RemotingCommand createCommandForRpcResponse(RpcResponse rpcResponse) {
public static RemotingCommand createResponseCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.markResponseType();
cmd.setCode(rpcResponse.getCode());
cmd.setRemark(rpcResponse.getException() == null ? "" : rpcResponse.getException().getMessage());
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册