提交 fa2f6114 编写于 作者: D duhengforever

Refactor remoting module

上级 1bedba8c
......@@ -84,13 +84,14 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
......@@ -245,10 +246,15 @@ public class BrokerController {
result = result && this.messageStore.load();
if (result) {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
this.remotingServer = RemotingServerFactory.getRemotingServer();
this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// this.fastRemotingServer = RemotingServerFactory.getRemotingServer();
// this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class ClientChannelInfo {
private final Channel channel;
......
......@@ -29,7 +29,7 @@ import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.filter.util.BloomFilter;
import org.apache.rocketmq.filter.util.BloomFilterData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import java.util.Collection;
import java.util.HashSet;
......
......@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerOffsetManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......
......@@ -31,8 +31,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
......@@ -47,15 +45,17 @@ import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRespon
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class BrokerOuterAPI {
......@@ -71,7 +71,9 @@ public class BrokerOuterAPI {
}
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient = RemotingClientFactory.getClient();
this.remotingClient.init(nettyClientConfig, null);
// this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
}
......@@ -147,7 +149,7 @@ public class BrokerOuterAPI {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
......
......@@ -84,4 +84,34 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
protected void deallocate() {
this.getMessageResult.release();
}
@Override
public long transferred() {
return transferred;
}
@Override
public FileRegion retain() {
super.retain();
return this;
}
@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FileRegion touch() {
return this;
}
@Override
public FileRegion touch(Object hint) {
return this;
}
}
......@@ -73,4 +73,33 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
protected void deallocate() {
this.selectMappedBufferResult.release();
}
@Override
public long transferred() {
return transferred;
}
@Override
public FileRegion retain() {
super.retain();
return this;
}
@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FileRegion touch() {
return this;
}
@Override
public FileRegion touch(Object hint) {
return this;
}
}
......@@ -84,4 +84,33 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
protected void deallocate() {
this.queryMessageResult.release();
}
@Override
public long transferred() {
return transferred;
}
@Override
public FileRegion retain() {
super.retain();
return this;
}
@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FileRegion touch() {
return this;
}
@Override
public FileRegion touch(Object hint) {
return this;
}
}
......@@ -107,9 +107,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
......
......@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class SubscriptionGroupManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
......
......@@ -32,9 +32,9 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
......
......@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
......@@ -115,7 +115,7 @@ public class ClientManageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA);
request.setVersion(100);
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
......@@ -126,7 +126,7 @@ public class ClientManageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA);
request.setVersion(100);
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
}
\ No newline at end of file
......@@ -146,7 +146,7 @@ public class EndTransactionProcessorTest {
private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) {
EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header);
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
}
......@@ -204,7 +204,7 @@ public class PullMessageProcessorTest {
requestHeader.setSysFlag(0);
requestHeader.setSubVersion(100L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
......
......@@ -217,7 +217,7 @@ public class SendMessageProcessorTest {
header.setSysFlag(sysFlag);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header);
request.setBody(new byte[] {'a'});
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
......@@ -240,7 +240,7 @@ public class SendMessageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.setBody(new byte[] {'a'});
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
......@@ -253,7 +253,7 @@ public class SendMessageProcessorTest {
requestHeader.setOffset(123L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.makeCustomHeaderToNet();
// request.makeCustomHeaderToNet();
return request;
}
......
......@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
/**
* Client Common configuration
......
......@@ -20,7 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Wrapper class for offset serialization
......
......@@ -146,11 +146,11 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
public class MQClientAPIImpl {
......@@ -1210,6 +1210,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
log.info("getTopicRouteInfoFromNameServer response: " + response);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
......
......@@ -35,7 +35,6 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
/**
* This class is the entry point for applications intending to send messages.
......
......@@ -45,7 +45,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class DataVersion extends RemotingSerializable {
private long timestamp = System.currentTimeMillis();
......
......@@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeStats extends RemotingSerializable {
private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
......
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.admin;
import java.util.HashMap;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicStatsTable extends RemotingSerializable {
private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>();
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class BrokerStatsData extends RemotingSerializable {
......
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class CheckClientRequestBody extends RemotingSerializable {
......
......@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ClusterInfo extends RemotingSerializable {
private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class Connection {
private String clientId;
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeByWho extends RemotingSerializable {
private HashSet<String> consumedGroup = new HashSet<String>();
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeMessageDirectlyResult extends RemotingSerializable {
private boolean order = false;
......
......@@ -20,7 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeStatsList extends RemotingSerializable {
private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>();
......
......@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerConnection extends RemotingSerializable {
private HashSet<Connection> connectionSet = new HashSet<Connection>();
......
......@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
......
......@@ -25,7 +25,7 @@ import java.util.TreeSet;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerRunningInfo extends RemotingSerializable {
public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
......
......@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
@Deprecated
public class GetConsumerStatusBody extends RemotingSerializable {
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class GroupList extends RemotingSerializable {
private HashSet<String> groupList = new HashSet<String>();
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class KVTable extends RemotingSerializable {
private HashMap<String, String> table = new HashMap<String, String>();
......
......@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class LockBatchRequestBody extends RemotingSerializable {
private String consumerGroup;
......
......@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class LockBatchResponseBody extends RemotingSerializable {
......
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ProducerConnection extends RemotingSerializable {
private HashSet<Connection> connectionSet = new HashSet<Connection>();
......
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import java.util.List;
......
......@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class QueryConsumeTimeSpanBody extends RemotingSerializable {
List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>();
......
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class QueryCorrectionOffsetBody extends RemotingSerializable {
private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>();
......
......@@ -36,7 +36,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class RegisterBrokerBody extends RemotingSerializable {
......
......@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.Map;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ResetOffsetBody extends RemotingSerializable {
private Map<MessageQueue, Long> offsetTable;
......
......@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ResetOffsetBodyForC extends RemotingSerializable {
......
......@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class SubscriptionGroupWrapper extends RemotingSerializable {
private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
......
......@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
......
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicList extends RemotingSerializable {
private Set<String> topicList = new HashSet<String>();
......
......@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class UnlockBatchRequestBody extends RemotingSerializable {
private String consumerGroup;
......
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.header;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class GetConsumerListByGroupResponseBody extends RemotingSerializable {
private List<String> consumerIdList;
......
......@@ -22,7 +22,7 @@ package org.apache.rocketmq.common.protocol.heartbeat;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class HeartbeatData extends RemotingSerializable {
private String clientID;
......
......@@ -23,7 +23,7 @@ package org.apache.rocketmq.common.protocol.route;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
......
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.topic;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class OffsetMovedEvent extends RemotingSerializable {
private String consumerGroup;
......
......@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
......@@ -32,13 +32,13 @@ import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.apache.rocketmq.srvutil.FileWatchService;
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
......@@ -77,7 +77,9 @@ public class NamesrvController {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingServer = RemotingServerFactory.getRemotingServer();
this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
......@@ -111,6 +113,7 @@ public class NamesrvController {
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
......@@ -129,6 +132,7 @@ public class NamesrvController {
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
......
......@@ -17,7 +17,7 @@
package org.apache.rocketmq.namesrv.kvconfig;
import java.util.HashMap;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class KVConfigSerializeWrapper extends RemotingSerializable {
private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
......
......@@ -69,7 +69,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
log.info("receive remoting command: " + request);
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
......@@ -280,7 +280,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
log.info("requestHeader: " + requestHeader );
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
......
......@@ -37,7 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
......
......@@ -39,7 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class PushConsumerImpl implements PushConsumer {
private final DefaultMQPushConsumer rocketmqPushConsumer;
......
......@@ -37,7 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
......
......@@ -544,13 +544,18 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.42.Final</version>
<version>4.1.32.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
......
......@@ -37,6 +37,10 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
......
......@@ -22,6 +22,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -51,4 +52,6 @@ public interface RemotingClient extends RemotingService {
ExecutorService getCallbackExecutor();
boolean isChannelWritable(final String addr);
void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
}
package org.apache.rocketmq.remoting;
import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.util.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingClientFactory {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private RemotingClientFactory() {
}
private static Map<String, RemotingClient> clients;
private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
static {
log.info("begin load client");
clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class);
log.info("end load client, size:{}", clients.size());
}
public static RemotingClient getClient(String protocolType) {
return clients.get(protocolType);
}
public static RemotingClient getClient() {
return clients.get(RemotingUtil.DEFAULT_PROTOCOL);
}
}
......@@ -23,6 +23,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingServer extends RemotingService {
......@@ -36,6 +37,8 @@ public interface RemotingServer extends RemotingService {
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
void push(final String addr, final String sessionId, RemotingCommand remotingCommand);
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
......@@ -48,4 +51,6 @@ public interface RemotingServer extends RemotingService {
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
}
package org.apache.rocketmq.remoting;
import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.util.RemotingUtil;
import org.apache.rocketmq.remoting.util.ServiceProvider;
public class RemotingServerFactory {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private RemotingServerFactory() {
}
private static Map<String, RemotingServer> servers;
// private static Map<String/*protocolType*/, String/*path*/ >
private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
static {
log.info("begin load server");
servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class);
log.info("end load server, size:{}", servers.size());
}
public static RemotingServer getRemotingServer() {
return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL);
}
public static RemotingServer getRemotingServer(String protocolType) {
return servers.get(protocolType);
}
// public static RemotingServer createNewInstance(String protocolType){
// return ServiceProvider.load()
// }
}
......@@ -58,98 +58,6 @@ public class RemotingHelper {
return isa;
}
public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
long beginTime = System.currentTimeMillis();
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
if (socketChannel != null) {
boolean sendRequestOK = false;
try {
socketChannel.configureBlocking(true);
//bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
socketChannel.socket().setSoTimeout((int) timeoutMillis);
ByteBuffer byteBufferRequest = request.encode();
while (byteBufferRequest.hasRemaining()) {
int length = socketChannel.write(byteBufferRequest);
if (length > 0) {
if (byteBufferRequest.hasRemaining()) {
if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
throw new RemotingSendRequestException(addr);
}
}
} else {
throw new RemotingSendRequestException(addr);
}
Thread.sleep(1);
}
sendRequestOK = true;
ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
while (byteBufferSize.hasRemaining()) {
int length = socketChannel.read(byteBufferSize);
if (length > 0) {
if (byteBufferSize.hasRemaining()) {
if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
throw new RemotingTimeoutException(addr, timeoutMillis);
}
}
} else {
throw new RemotingTimeoutException(addr, timeoutMillis);
}
Thread.sleep(1);
}
int size = byteBufferSize.getInt(0);
ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
while (byteBufferBody.hasRemaining()) {
int length = socketChannel.read(byteBufferBody);
if (length > 0) {
if (byteBufferBody.hasRemaining()) {
if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
throw new RemotingTimeoutException(addr, timeoutMillis);
}
}
} else {
throw new RemotingTimeoutException(addr, timeoutMillis);
}
Thread.sleep(1);
}
byteBufferBody.flip();
return RemotingCommand.decode(byteBufferBody);
} catch (IOException e) {
log.error("invokeSync failure", e);
if (sendRequestOK) {
throw new RemotingTimeoutException(addr, timeoutMillis);
} else {
throw new RemotingSendRequestException(addr);
}
} finally {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
throw new RemotingConnectException(addr);
}
}
public static String parseChannelRemoteAddr(final Channel channel) {
if (null == channel) {
return "";
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.group.ChannelGroup;
public interface ChannelMetrics {
Integer getChannelCount();
ChannelGroup getChannels();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import java.util.concurrent.atomic.AtomicInteger;
public class ChannelStatisticsHandler extends ChannelDuplexHandler implements ChannelMetrics {
public static final String NAME = ChannelStatisticsHandler.class.getSimpleName();
private final AtomicInteger channelCount = new AtomicInteger(0);
private final ChannelGroup allChannels;
public ChannelStatisticsHandler(ChannelGroup allChannels) {
this.allChannels = allChannels;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// connect
channelCount.incrementAndGet();
allChannels.add(ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// disconnect
channelCount.decrementAndGet();
allChannels.remove(ctx.channel());
super.channelInactive(ctx);
}
@Override
public Integer getChannelCount() {
return channelCount.get();
}
@Override
public ChannelGroup getChannels() {
return allChannels;
}
}
package org.apache.rocketmq.remoting.netty;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.remoting.serialize.RocketMQSerializable;
import org.apache.rocketmq.remoting.serialize.SerializeType;
import org.apache.rocketmq.remoting.serialize.Serializer;
import org.apache.rocketmq.remoting.serialize.SerializerFactory;
public class CodecHelper {
public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
protected static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
protected static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
public static RemotingCommand decode(final byte[] array) {
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
return decode(byteBuffer);
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
System.out.println("cmd: " + cmd);
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.setBody(bodyData);
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
Serializer serializer = SerializerFactory.get(type);
if (serializer != null) {
RemotingCommand remotingCommand = serializer.deserializer(headerData);
return remotingCommand;
}
return null;
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
public static CommandCustomHeader decodeCommandCustomHeader(RemotingCommand remotingCommand,
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
if (remotingCommand.getExtFields() != null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String fieldName = field.getName();
if (!fieldName.startsWith("this")) {
try {
String value = remotingCommand.getExtFields().get(fieldName);
if (null == value) {
if (!isFieldNullable(field)) {
throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(STRING_CANONICAL_NAME)) {
valueParsed = value;
} else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
field.set(objectHeader, valueParsed);
} catch (Throwable e) {
}
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
private static Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
Field[] field = CLASS_HASH_MAP.get(classHeader);
if (field == null) {
field = classHeader.getDeclaredFields();
synchronized (CLASS_HASH_MAP) {
CLASS_HASH_MAP.put(classHeader, field);
}
}
return field;
}
private static boolean isFieldNullable(Field field) {
if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
Annotation annotation = field.getAnnotation(CFNotNull.class);
synchronized (NULLABLE_FIELD_CACHE) {
NULLABLE_FIELD_CACHE.put(field, annotation == null);
}
}
return NULLABLE_FIELD_CACHE.get(field);
}
private static String getCanonicalName(Class clazz) {
String name = CANONICAL_NAME_CACHE.get(clazz);
if (name == null) {
name = clazz.getCanonicalName();
synchronized (CANONICAL_NAME_CACHE) {
CANONICAL_NAME_CACHE.put(clazz, name);
}
}
return name;
}
public static ByteBuffer encode(RemotingCommand remotingCommand) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = headerEncode(remotingCommand);
length += headerData.length;
// 3> body data length
if (remotingCommand.getBody() != null) {
length += remotingCommand.getBody().length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, remotingCommand.getSerializeTypeCurrentRPC()));
// header data
result.put(headerData);
// body data;
if (remotingCommand.getBody() != null) {
result.put(remotingCommand.getBody());
}
result.flip();
return result;
}
private static byte[] headerEncode(RemotingCommand remotingCommand) {
makeCustomHeaderToNet(remotingCommand);
Serializer serializer = SerializerFactory.get(remotingCommand.getSerializeTypeCurrentRPC());
if (serializer == null) {
serializer = SerializerFactory.get(SerializeType.JSON);
}
return serializer.serializer(remotingCommand);
}
public static void makeCustomHeaderToNet(RemotingCommand remotingCommand) {
if (remotingCommand.getCustomHeader() != null) {
Field[] fields = getClazzFields(remotingCommand.getCustomHeader().getClass());
HashMap extFields = remotingCommand.getExtFields();
if (null == extFields) {
remotingCommand.setExtFields(new HashMap<String, String>());
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(remotingCommand.getCustomHeader());
} catch (Exception e) {
}
if (value != null) {
remotingCommand.getExtFields().put(name, value.toString());
}
}
}
}
}
}
public static ByteBuffer encodeHeader(RemotingCommand remotingCommand) {
int bodyLength = remotingCommand.getBody() != null ? remotingCommand.getBody().length : 0;
return encodeHeader(bodyLength, remotingCommand);
}
public static ByteBuffer encodeHeader(final int bodyLength, RemotingCommand remotingCommand) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = headerEncode(remotingCommand);
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, remotingCommand.getSerializeTypeCurrentRPC()));
// header data
result.put(headerData);
result.flip();
return result;
}
}
......@@ -68,7 +68,7 @@ public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
long toTransfer = msg.count();
while (true) {
long transferred = msg.transfered();
long transferred = msg.transferred();
if (toTransfer - transferred <= 0) {
break;
}
......
......@@ -17,13 +17,11 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.util.internal.logging.InternalLogLevel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class NettyLogger {
private static AtomicBoolean nettyLoggerSeted = new AtomicBoolean(false);
......@@ -52,6 +50,30 @@ public class NettyLogger {
private InternalLogger logger = null;
@Override public void trace(Throwable throwable) {
}
@Override public void debug(Throwable throwable) {
}
@Override public void info(Throwable throwable) {
}
@Override public void warn(Throwable throwable) {
}
@Override public void error(Throwable throwable) {
}
@Override public void log(InternalLogLevel level, Throwable throwable) {
}
public NettyBridgeLogger(String name) {
logger = InternalLoggerFactory.getLogger(name);
}
......
......@@ -17,11 +17,15 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
......@@ -33,13 +37,16 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.common.ServiceThread;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
......@@ -49,6 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public abstract class NettyRemotingAbstract {
......@@ -60,12 +68,12 @@ public abstract class NettyRemotingAbstract {
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreOneway;
protected Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreAsync;
protected Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
......@@ -83,10 +91,11 @@ public abstract class NettyRemotingAbstract {
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
protected NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
* The default request processor to use in case there is no exact match in {@link #processorTable} per request
* code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
......@@ -99,6 +108,13 @@ public abstract class NettyRemotingAbstract {
NettyLogger.initNettyLogger();
}
protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
public NettyRemotingAbstract() {
this.semaphoreOneway = new Semaphore(65535, true);
this.semaphoreAsync = new Semaphore(65535, true);
}
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
......@@ -110,6 +126,11 @@ public abstract class NettyRemotingAbstract {
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
public void init(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
/**
* Custom channel event listener.
*
......@@ -329,6 +350,15 @@ public abstract class NettyRemotingAbstract {
*/
public abstract ExecutorService getCallbackExecutor();
protected void startUpHouseKeepingService() {
this.houseKeepingService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
scanResponseTable();
}
}, 3000, 1000, TimeUnit.MICROSECONDS);
}
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
......@@ -358,6 +388,21 @@ public abstract class NettyRemotingAbstract {
}
}
public void start() {
if (getChannelEventListener() != null) {
nettyEventExecutor.start();
}
}
public void shutdown() {
if (this.nettyEventExecutor != null) {
this.nettyEventExecutor.shutdown();
}
if (this.houseKeepingService != null) {
this.houseKeepingService.shutdown();
}
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
......@@ -410,7 +455,7 @@ public abstract class NettyRemotingAbstract {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
......@@ -465,6 +510,7 @@ public abstract class NettyRemotingAbstract {
/**
* mark the request of the specified channel as fail and to invoke fail callback immediately
*
* @param channel the channel which is close already
*/
protected void failFast(final Channel channel) {
......@@ -570,4 +616,15 @@ public abstract class NettyRemotingAbstract {
return NettyEventExecutor.class.getSimpleName();
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}
......@@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting.netty;
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverCallbackExecutorThreads = 8;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
......@@ -28,6 +28,33 @@ public class NettyServerConfig implements Cloneable {
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
private int serverAcceptorThreads = 1;
private int connectionChannelReaderIdleSeconds = 0;
private int connectionChannelWriterIdleSeconds = 0;
public int getConnectionChannelReaderIdleSeconds() {
return connectionChannelReaderIdleSeconds;
}
public void setConnectionChannelReaderIdleSeconds(int connectionChannelReaderIdleSeconds) {
this.connectionChannelReaderIdleSeconds = connectionChannelReaderIdleSeconds;
}
public int getConnectionChannelWriterIdleSeconds() {
return connectionChannelWriterIdleSeconds;
}
public void setConnectionChannelWriterIdleSeconds(int connectionChannelWriterIdleSeconds) {
this.connectionChannelWriterIdleSeconds = connectionChannelWriterIdleSeconds;
}
public int getServerAcceptorThreads() {
return serverAcceptorThreads;
}
public void setServerAcceptorThreads(int serverAcceptorThreads) {
this.serverAcceptorThreads = serverAcceptorThreads;
}
/**
* make make install
......
......@@ -17,43 +17,41 @@
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.serialize.SerializeType;
import org.msgpack.annotation.Message;
@Message
public class RemotingCommand {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
private static final int RPC_ONEWAY = 1; // 0, RPC
private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
// 1, Oneway
// 1, RESPONSE_COMMAND
private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
/**
* REQUEST_COMMAND
*/
private static final int RPC_TYPE = 0;
/**
* One way
*/
private static final int RPC_ONEWAY = 1;
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
......@@ -74,15 +72,16 @@ public class RemotingCommand {
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
protected RemotingCommand() {
public RemotingCommand() {
}
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
......@@ -132,59 +131,12 @@ public class RemotingCommand {
return cmd;
}
public static RemotingCommand createResponseCommand(int code, String remark) {
return createResponseCommand(code, remark, null);
}
public static RemotingCommand decode(final byte[] array) {
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
return decode(byteBuffer);
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
public CommandCustomHeader getCustomHeader() {
return customHeader;
}
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
public static RemotingCommand createResponseCommand(int code, String remark) {
return createResponseCommand(code, remark, null);
}
public static int createNewRequestId() {
......@@ -208,16 +160,6 @@ public class RemotingCommand {
return true;
}
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
public void markResponseType() {
int bits = 1 << RPC_TYPE;
this.flag |= bits;
......@@ -227,206 +169,13 @@ public class RemotingCommand {
return customHeader;
}
public void writeCustomHeader(CommandCustomHeader customHeader) {
this.customHeader = customHeader;
}
public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
} catch (InstantiationException e) {
return null;
} catch (IllegalAccessException e) {
return null;
}
if (this.extFields != null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String fieldName = field.getName();
if (!fieldName.startsWith("this")) {
try {
String value = this.extFields.get(fieldName);
if (null == value) {
if (!isFieldNullable(field)) {
throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(STRING_CANONICAL_NAME)) {
valueParsed = value;
} else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
} else {
throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
}
field.set(objectHeader, valueParsed);
} catch (Throwable e) {
log.error("Failed field [{}] decoding", fieldName, e);
}
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
Field[] field = CLASS_HASH_MAP.get(classHeader);
if (field == null) {
field = classHeader.getDeclaredFields();
synchronized (CLASS_HASH_MAP) {
CLASS_HASH_MAP.put(classHeader, field);
}
}
return field;
}
private boolean isFieldNullable(Field field) {
if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
Annotation annotation = field.getAnnotation(CFNotNull.class);
synchronized (NULLABLE_FIELD_CACHE) {
NULLABLE_FIELD_CACHE.put(field, annotation == null);
}
}
return NULLABLE_FIELD_CACHE.get(field);
}
private String getCanonicalName(Class clazz) {
String name = CANONICAL_NAME_CACHE.get(clazz);
if (name == null) {
name = clazz.getCanonicalName();
synchronized (CANONICAL_NAME_CACHE) {
CANONICAL_NAME_CACHE.put(clazz, name);
}
}
return name;
}
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
return CodecHelper.decodeCommandCustomHeader(this, classHeader);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
return CodecHelper.encodeHeader(bodyLength, this);
}
public void markOnewayRPC() {
......@@ -440,14 +189,6 @@ public class RemotingCommand {
return (this.flag & bits) == bits;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
@JSONField(serialize = false)
public RemotingCommandType getType() {
if (this.isResponseType()) {
......@@ -463,6 +204,14 @@ public class RemotingCommand {
return (this.flag & bits) == bits;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public LanguageCode getLanguage() {
return language;
}
......@@ -526,11 +275,8 @@ public class RemotingCommand {
extFields.put(key, value);
}
@Override
public String toString() {
return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+ Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ serializeTypeCurrentRPC + "]";
public void setCustomHeader(CommandCustomHeader customHeader) {
this.customHeader = customHeader;
}
public SerializeType getSerializeTypeCurrentRPC() {
......@@ -540,4 +286,12 @@ public class RemotingCommand {
public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
@Override
public String toString() {
return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+ Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ serializeTypeCurrentRPC + "]";
}
}
\ No newline at end of file
......@@ -18,5 +18,5 @@ package org.apache.rocketmq.remoting.protocol;
public enum RemotingCommandType {
REQUEST_COMMAND,
RESPONSE_COMMAND;
RESPONSE_COMMAND
}
......@@ -15,8 +15,11 @@
* limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
package org.apache.rocketmq.remoting.serialize;
import org.msgpack.annotation.MessagePackOrdinalEnum;
@MessagePackOrdinalEnum
public enum LanguageCode {
JAVA((byte) 0),
CPP((byte) 1),
......
package org.apache.rocketmq.remoting.serialize;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.msgpack.MessagePack;
public class MsgPackSerializable implements Serializer {
private final MessagePack messagePack = new MessagePack();
// public MsgPackSerializable(){
// messagePack.register(LanguageCode.class);
// messagePack.register(SerializeType.class);
// }
@Override
public SerializeType type() {
return SerializeType.MSGPACK;
}
@Override
public <T> T deserializer(byte[] content, Class<T> c) {
try {
return messagePack.read(content, c);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public RemotingCommand deserializer(byte[] content) {
return deserializer(content, RemotingCommand.class);
}
@Override
public byte[] serializer(Object object) {
try {
return messagePack.write(object);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
......@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
package org.apache.rocketmq.remoting.serialize;
import com.alibaba.fastjson.JSON;
import java.nio.charset.Charset;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public abstract class RemotingSerializable {
public class RemotingSerializable implements Serializer {
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] encode(final Object obj) {
......@@ -58,4 +59,24 @@ public abstract class RemotingSerializable {
public String toJson(final boolean prettyFormat) {
return toJson(this, prettyFormat);
}
@Override
public SerializeType type() {
return SerializeType.JSON;
}
@Override
public <T> T deserializer(byte[] content, Class<T> c) {
return decode(content, c);
}
@Override
public byte[] serializer(Object object) {
return encode(object);
}
@Override
public RemotingCommand deserializer(byte[] content) {
return decode(content, RemotingCommand.class);
}
}
......@@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
package org.apache.rocketmq.remoting.serialize;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class RocketMQSerializable {
public class RocketMQSerializable implements Serializer {
private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
......@@ -201,4 +202,25 @@ public class RocketMQSerializable {
}
return true;
}
@Override
public SerializeType type() {
return SerializeType.ROCKETMQ;
}
@Override
public <T> T deserializer(byte[] content, Class<T> c) {
//Fixme
return null;
}
@Override
public byte[] serializer(Object object) {
return rocketMQProtocolEncode((RemotingCommand) object);
}
@Override
public RemotingCommand deserializer(byte[] content) {
return rocketMQProtocolDecode(content);
}
}
......@@ -15,11 +15,15 @@
* limitations under the License.
*/
package org.apache.rocketmq.remoting.protocol;
package org.apache.rocketmq.remoting.serialize;
import org.msgpack.annotation.MessagePackOrdinalEnum;
@MessagePackOrdinalEnum
public enum SerializeType {
JSON((byte) 0),
ROCKETMQ((byte) 1);
ROCKETMQ((byte) 1),
MSGPACK((byte) 2);
private byte code;
......
......@@ -15,35 +15,16 @@
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
package org.apache.rocketmq.remoting.serialize;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MQProtosHelper {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public interface Serializer {
SerializeType type();
public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
final long timeoutMillis) {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
<T> T deserializer(final byte[] content, final Class<T> c);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
byte[] serializer(final Object object);
try {
RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis);
if (response != null) {
return ResponseCode.SUCCESS == response.getCode();
}
} catch (Exception e) {
log.error("Failed to register broker", e);
}
return false;
}
RemotingCommand deserializer(final byte[] content);
}
package org.apache.rocketmq.remoting.serialize;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SerializerFactory {
private static Map<Enum<SerializeType>, Serializer> serializerMap = new ConcurrentHashMap<Enum<SerializeType>, Serializer>();
private SerializerFactory() {
}
static {
register(SerializeType.JSON, new RemotingSerializable());
register(SerializeType.ROCKETMQ, new RocketMQSerializable());
register(SerializeType.MSGPACK, new MsgPackSerializable());
}
public static void register(SerializeType type, Serializer serialization) {
serializerMap.putIfAbsent(type, serialization);
}
public static Serializer get(SerializeType type) {
return serializerMap.get(type);
}
}
package org.apache.rocketmq.remoting.transport;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyEvent;
import org.apache.rocketmq.remoting.netty.NettyEventType;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
public NettyRemotingServerAbstract() {
super();
}
public NettyRemotingServerAbstract(final int permitsOneway, final int permitsAsync) {
super(permitsOneway, permitsAsync);
}
public class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (getChannelEventListener() != null) {
putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (getChannelEventListener() != null) {
putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (getChannelEventListener() != null) {
putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (getChannelEventListener() != null) {
putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
package org.apache.rocketmq.remoting.transport.http2;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class Http2ClientImpl extends NettyRemotingClientAbstract implements RemotingClient {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup ioGroup;
private ExecutorService publicExecutor;
private ExecutorService callbackExecutor;
private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
public Http2ClientImpl(final NettyClientConfig clientConfig,
final ChannelEventListener channelEventListener) {
super(clientConfig.getClientOnewaySemaphoreValue(), clientConfig.getClientAsyncSemaphoreValue());
init(clientConfig, channelEventListener);
}
public Http2ClientImpl(){
super();
}
@Override
public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = clientConfig;
this.channelEventListener = channelEventListener;
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
clientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
buildSslContext();
}
private void buildSslContext() {
SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
try {
sslContext = SslContextBuilder.forClient()
.sslProvider(provider)
/* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
* Please refer to the HTTP/2 specification for cipher requirements. */
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} catch (SSLException e) {
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
this.bootstrap.group(this.ioGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
defaultEventExecutorGroup,
Http2Handler.newHandler(false),
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
startUpHouseKeepingService();
}
@Override
public void shutdown() {
super.shutdown();
try {
for (ChannelWrapper cw : this.channelTables.values()) {
this.closeChannel(null, cw.getChannel());
}
this.channelTables.clear();
if (this.ioGroup != null) {
this.ioGroup.shutdownGracefully();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("Http2ClientImpl shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
}
}
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
}
@Override
public void updateNameServerAddressList(List<String> addrs) {
super.updateNameServerAddressList(addrs);
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public Bootstrap getBootstrap() {
return this.bootstrap;
}
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public boolean isChannelWritable(String addr) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.isWritable();
}
return true;
}
@Override
public List<String> getNameServerAddressList() {
return this.namesrvAddrList.get();
}
@Override
public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
return callbackExecutor != null ? callbackExecutor : publicExecutor;
}
@Override
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersDecoder;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.StreamBufferingEncoder;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
public class Http2Handler extends Http2ConnectionHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private boolean isServer;
private int lastStreamId;
private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
final Http2Settings initialSettings, final boolean isServer) {
super(decoder, encoder, initialSettings);
decoder.frameListener(new FrameListener());
this.isServer = isServer;
}
public static Http2Handler newHandler(final boolean isServer) {
log.info("isServer: " + isServer);
Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
Http2Connection connection = new DefaultHttp2Connection(isServer);
Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter));
connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
DEFAULT_WINDOW_UPDATE_RATIO, true));
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader);
Http2Settings settings = new Http2Settings();
if (!isServer) {
settings.pushEnabled(true);
}
settings.initialWindowSize(1048576 * 10); //10MiB
settings.maxConcurrentStreams(Integer.MAX_VALUE);
return newHandler(decoder, encoder, settings, isServer);
}
private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
final Http2Settings settings, boolean isServer) {
return new Http2Handler(decoder, encoder, settings, isServer);
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg,
final ChannelPromise promise) throws Exception {
if (isServer) {
assert msg instanceof ByteBuf;
sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
} else {
final Http2Headers headers = new DefaultHttp2Headers();
try {
long threadId = Thread.currentThread().getId();
long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
ctx.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
ByteBuf payload) throws Http2Exception {
encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
Http2Headers headers = new DefaultHttp2Headers();
headers.status(OK.codeAsText());
encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
}
private class FrameListener extends Http2FrameAdapter {
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
//Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
data.retain();
Http2Handler.this.lastStreamId = streamId;
ctx.fireChannelRead(data);
return data.readableBytes() + padding;
}
}
}
package org.apache.rocketmq.remoting.transport.http2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
import org.apache.rocketmq.remoting.util.JvmUtils;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class Http2ServerImpl extends NettyRemotingServerAbstract implements RemotingServer {
private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private ServerBootstrap serverBootstrap;
private EventLoopGroup bossGroup;
private EventLoopGroup ioGroup;
private EventExecutorGroup workerGroup;
private Class<? extends ServerSocketChannel> socketChannelClass;
private NettyServerConfig serverConfig;
private ChannelEventListener channelEventListener;
private ExecutorService publicExecutor;
private int port;
private RPCHook rpcHook;
public Http2ServerImpl(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
init(nettyServerConfig, channelEventListener);
}
public Http2ServerImpl() {
super();
}
@Override
public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.serverConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
this.publicExecutor = ThreadUtils.newFixedThreadPool(
serverConfig.getServerCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
if (JvmUtils.isLinux() && this.serverConfig.isUseEpollNativeSelector()) {
this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
this.port = nettyServerConfig.getListenPort();
buildHttp2SslContext();
}
private void buildHttp2SslContext() {
try {
SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
SelfSignedCertificate ssc;
//NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
//Please refer to the HTTP/2 specification for cipher requirements.
ssc = new SelfSignedCertificate();
sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
.sslProvider(provider)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
} catch (Exception e) {
LOG.error("Can not build SSL context !", e);
}
}
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
@Override
public int localListenPort() {
return this.port;
}
@Override
public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) {
return processorTable.get(requestCode);
}
@Override
public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
}
@Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
@Override
public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
private void applyOptions(ServerBootstrap bootstrap) {
if (null != serverConfig) {
bootstrap.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getServerSocketRcvBufSize());
}
if (serverConfig.isServerPooledByteBufAllocatorEnable()) {
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
}
@Override
public void start() {
super.start();
final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
this.serverBootstrap.group(this.bossGroup, this.ioGroup).
channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
channels.add(ch);
ChannelPipeline cp = ch.pipeline();
cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels));
cp.addLast(workerGroup,
Http2Handler.newHandler(true),
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
serverConfig.getConnectionChannelWriterIdleSeconds(),
serverConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler());
}
});
applyOptions(serverBootstrap);
ChannelFuture channelFuture = this.serverBootstrap.bind(this.port).syncUninterruptibly();
this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
startUpHouseKeepingService();
}
@Override
public void shutdown() {
super.shutdown();
ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
}
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
return this.publicExecutor;
}
@Override
public ChannelEventListener getChannelEventListener() {
return this.channelEventListener;
}
}
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
......@@ -46,8 +47,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
return CodecHelper.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
......
......@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
......@@ -24,17 +24,17 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
ByteBuffer byteBuffer = CodecHelper.encodeHeader(remotingCommand);
out.writeBytes(byteBuffer);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.xml.ws.AsyncHandler;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class NettyRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private NettyClientConfig nettyClientConfig;
private Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup eventLoopGroupWorker;
private ExecutorService publicExecutor;
private ExecutorService asyncExecutor;
/**
* Invoke the callback methods in this executor when process response.
*/
private ExecutorService callbackExecutor;
private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
public NettyRemotingClient() {
super();
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
init(nettyClientConfig, channelEventListener);
}
@Override
public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = nettyClientConfig;
this.channelEventListener = channelEventListener;
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
nettyClientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
nettyClientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
@Override
public Bootstrap getBootstrap() {
return this.bootstrap;
}
@Override
public void start() {
bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
startUpHouseKeepingService();
}
@Override
public void shutdown() {
super.shutdown();
try {
clearChannels();
if (this.eventLoopGroupWorker != null) {
this.eventLoopGroupWorker.shutdownGracefully();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
if (this.publicExecutor != null) {
this.publicExecutor.shutdown();
}
} catch (Exception e) {
log.error("NettyRemotingClient shutdown exception, ", e);
}
}
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
}
@Override
public void updateNameServerAddressList(List<String> addrs) {
super.updateNameServerAddressList(addrs);
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public boolean isChannelWritable(String addr) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.isWritable();
}
return true;
}
@Override
public List<String> getNameServerAddressList() {
return this.namesrvAddrList.get();
}
@Override
public ChannelEventListener getChannelEventListener() {
return channelEventListener;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
return callbackExecutor != null ? callbackExecutor : publicExecutor;
}
@Override
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}
......@@ -14,66 +14,64 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.netty;
package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
import org.apache.rocketmq.remoting.util.JvmUtils;
import org.apache.rocketmq.remoting.util.ThreadUtils;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
public class NettyRemotingServer extends NettyRemotingServerAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;
private final ExecutorService publicExecutor;
private final ChannelEventListener channelEventListener;
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupSelector;
private EventLoopGroup eventLoopGroupBoss;
private NettyServerConfig nettyServerConfig;
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private ExecutorService publicExecutor;
private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private Class<? extends ServerSocketChannel> socketChannelClass;
private RPCHook rpcHook;
......@@ -83,69 +81,56 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
public NettyRemotingServer() {
super();
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
init(nettyServerConfig, channelEventListener);
}
@Override
public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
this.nettyServerConfig = serverConfig;
super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.nettyServerConfig = serverConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
if (useEpoll()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
this.publicExecutor = ThreadUtils.newFixedThreadPool(
publicThreadNums,
10000, "Remoting-PublicExecutor", true);
if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.eventLoopGroupBoss = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.eventLoopGroupSelector = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.port = nettyServerConfig.getListenPort();
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
loadSslContext();
}
public void loadSslContext() {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
......@@ -158,36 +143,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
&& nettyServerConfig.isUseEpollNativeSelector()
&& Epoll.isAvailable();
}
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
super.start();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.channel(socketChannelClass)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
......@@ -197,7 +165,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new IdleStateHandler(nettyServerConfig.getConnectionChannelReaderIdleSeconds(),
nettyServerConfig.getConnectionChannelWriterIdleSeconds(),
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
......@@ -215,46 +185,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
startUpHouseKeepingService();
}
@Override
public void shutdown() {
try {
if (this.timer != null) {
this.timer.cancel();
super.shutdown();
if (this.eventLoopGroupBoss != null) {
this.eventLoopGroupBoss.shutdownGracefully();
}
this.eventLoopGroupBoss.shutdownGracefully();
this.eventLoopGroupSelector.shutdownGracefully();
if (this.nettyEventExecutor != null) {
this.nettyEventExecutor.shutdown();
if (this.eventLoopGroupSelector != null) {
this.eventLoopGroupSelector.shutdownGracefully();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
......@@ -390,80 +339,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
@Override
public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
RemotingUtil.closeChannel(ctx.channel());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.util;
import io.netty.channel.epoll.Epoll;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Random;
public final class JvmUtils {
public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
//public static final String OS_VERSION = System.getProperty("os.version").toLowerCase();
/**
* A constructor to stop this class being constructed.
*/
private JvmUtils() {
// Unused
}
public static boolean isWindows() {
return OS_NAME.startsWith("win");
}
public static boolean isWindows10() {
return OS_NAME.startsWith("win") && OS_NAME.endsWith("10");
}
public static boolean isMacOSX() {
return OS_NAME.contains("mac");
}
public static boolean isLinux() {
return OS_NAME.startsWith("linux");
}
public static boolean isUseEpoll() {
return isLinux() && Epoll.isAvailable();
}
public static boolean isUnix() {
return OS_NAME.contains("nix") ||
OS_NAME.contains("nux") ||
OS_NAME.contains("aix") ||
OS_NAME.contains("bsd") ||
OS_NAME.contains("sun") ||
OS_NAME.contains("hpux");
}
public static boolean isSolaris() {
return OS_NAME.startsWith("sun");
}
public static int getProcessId() {
String pid = null;
final File self = new File("/proc/self");
try {
if (self.exists()) {
pid = self.getCanonicalFile().getName();
}
} catch (IOException ignored) {
//Ignore it
}
if (pid == null) {
pid = ManagementFactory.getRuntimeMXBean().getName().split("@", 0)[0];
}
if (pid == null) {
int rpid = new Random().nextInt(1 << 16);
return rpid;
} else {
return Integer.parseInt(pid);
}
}
}
package org.apache.rocketmq.remoting.util;
public class RemotingUtil {
public static final String REMOTING_CHARSET = "UTF-8";
public static final String DEFAULT_PROTOCOL = "rocketmq";
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.rocketmq.remoting.util;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ServiceProvider {
private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
/**
* A reference to the classloader that loaded this class. It's more efficient to compute it once and cache it here.
*/
private static ClassLoader thisClassLoader;
/**
* JDK1.3+ <a href= "http://java.sun.com/j2se/1.3/docs/guide/jar/jar.html#Service%20Provider" > 'Service Provider' specification</a>.
*/
static {
thisClassLoader = getClassLoader(ServiceProvider.class);
}
/**
* Returns a string that uniquely identifies the specified object, including its class.
* <p>
* The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString()
* method, but works even when the specified object's class has overidden the toString method.
*
* @param o may be null.
* @return a string of form classname@hashcode, or "null" if param o is null.
*/
protected static String objectId(Object o) {
if (o == null) {
return "null";
} else {
return o.getClass().getName() + "@" + System.identityHashCode(o);
}
}
protected static ClassLoader getClassLoader(Class<?> clazz) {
try {
return clazz.getClassLoader();
} catch (SecurityException e) {
LOG.error("Unable to get classloader for class {} due to security restrictions !",
clazz, e.getMessage());
throw e;
}
}
public static ClassLoader getContextClassLoader() {
ClassLoader classLoader = null;
try {
classLoader = Thread.currentThread().getContextClassLoader();
} catch (SecurityException ex) {
/**
* The getContextClassLoader() method throws SecurityException when the context
* class loader isn't an ancestor of the calling class's class
* loader, or if security permissions are restricted.
*/
}
return classLoader;
}
public static InputStream getResourceAsStream(ClassLoader loader, String name) {
if (loader != null) {
return loader.getResourceAsStream(name);
} else {
return ClassLoader.getSystemResourceAsStream(name);
}
}
public static <T> Map<String, T> load(String path, Class<?> clazz) {
LOG.info("Looking for a resource file of name [{}] ...", path);
Map<String, T> services = new ConcurrentHashMap<String, T>();
try {
final InputStream is = getResourceAsStream(getContextClassLoader(), path);
if (is != null) {
BufferedReader reader;
try {
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
reader = new BufferedReader(new InputStreamReader(is));
}
String serviceName = reader.readLine();
while (serviceName != null && !"".equals(serviceName)) {
LOG.info(
"Creating an instance as specified by file {} which was present in the path of the context classloader.",
path);
String[] service = serviceName.split("=");
if (service.length != 2) {
continue;
} else {
if (services.containsKey(service[0])) {
continue;
} else {
LOG.info("Begin to load protocol: " + service[0]);
services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz));
}
}
serviceName = reader.readLine();
}
reader.close();
} else {
// is == null
LOG.warn("No resource file with name [{}] found.", path);
}
} catch (Exception e) {
LOG.error("Error occured when looking for resource file " + path, e);
}
return services;
}
public static <T> T loadClass(String name, String path, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(), name);
if (is != null) {
BufferedReader reader;
try {
try {
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
reader = new BufferedReader(new InputStreamReader(is));
}
String serviceName = reader.readLine();
reader.close();
if (serviceName != null && !"".equals(serviceName)) {
return initService(getContextClassLoader(), serviceName, clazz);
} else {
LOG.warn("ServiceName is empty!");
return null;
}
} catch (Exception e) {
LOG.warn("Error occurred when looking for resource file " + name, e);
}
}
return null;
}
protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
Class<?> serviceClazz = null;
try {
if (classLoader != null) {
try {
// Warning: must typecast here & allow exception to be generated/caught & recast properly
serviceClazz = classLoader.loadClass(serviceName);
if (clazz.isAssignableFrom(serviceClazz)) {
LOG.info("Loaded class {} from classloader {}", serviceClazz.getName(),
objectId(classLoader));
} else {
// This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation.
LOG.error(
"Class {} loaded from classloader {} does not extend {} as loaded by this classloader.",
new Object[] {
serviceClazz.getName(),
objectId(serviceClazz.getClassLoader()), clazz.getName()});
}
return (T) serviceClazz.newInstance();
} catch (ClassNotFoundException ex) {
if (classLoader == thisClassLoader) {
// Nothing more to try, onwards.
LOG.warn("Unable to locate any class {} via classloader", serviceName,
objectId(classLoader));
throw ex;
}
// Ignore exception, continue
} catch (NoClassDefFoundError e) {
if (classLoader == thisClassLoader) {
// Nothing more to try, onwards.
LOG.warn(
"Class {} cannot be loaded via classloader {}.it depends on some other class that cannot be found.",
serviceClazz, objectId(classLoader));
throw e;
}
// Ignore exception, continue
}
}
} catch (Exception e) {
LOG.error("Unable to init service.", e);
}
return (T) serviceClazz;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class ThreadUtils {
/**
* A constructor to stop this class being constructed.
*/
private ThreadUtils() {
// Unused
}
public static ExecutorService newThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
String processName, boolean isDaemon) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
}
public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(workQueueCapacity),
newGenericThreadFactory(processName, isDaemon));
}
public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon));
}
public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
boolean isDaemon) {
return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon));
}
public static ThreadFactory newGenericThreadFactory(String processName) {
return newGenericThreadFactory(processName, false);
}
public static ThreadFactory newGenericThreadFactory(String processName, int threads) {
return newGenericThreadFactory(processName, threads, false);
}
public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
return new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
thread.setDaemon(isDaemon);
return thread;
}
};
}
public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
final boolean isDaemon) {
return new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
thread.setDaemon(isDaemon);
return thread;
}
};
}
/**
* Create a new thread
*
* @param name The name of the thread
* @param runnable The work for the thread to do
* @param daemon Should the thread block JVM stop?
* @return The unstarted thread
*/
public static Thread newThread(String name, Runnable runnable, boolean daemon) {
Thread thread = new Thread(runnable, name);
thread.setDaemon(daemon);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
}
});
return thread;
}
/**
* Shutdown passed thread using isAlive and join.
*
* @param t Thread to stop
*/
public static void shutdownGracefully(final Thread t) {
shutdownGracefully(t, 0);
}
/**
* Shutdown passed thread using isAlive and join.
*
* @param millis Pass 0 if we're to wait forever.
* @param t Thread to stop
*/
public static void shutdownGracefully(final Thread t, final long millis) {
if (t == null)
return;
while (t.isAlive()) {
try {
t.interrupt();
t.join(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* An implementation of the graceful stop sequence recommended by
* {@link ExecutorService}.
*
* @param executor executor
* @param timeout timeout
* @param timeUnit timeUnit
*/
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
// Disable new tasks from being submitted.
executor.shutdown();
try {
// Wait a while for existing tasks to terminate.
if (!executor
.awaitTermination(timeout, timeUnit)) {
executor.shutdownNow();
// Wait a while for tasks to respond to being cancelled.
if (!executor.awaitTermination(timeout, timeUnit)) {
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted.
executor.shutdownNow();
// Preserve interrupt status.
Thread.currentThread().interrupt();
}
}
}
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient
http2=org.apache.rocketmq.remoting.transport.http2.Http2ClientImpl
\ No newline at end of file
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
\ No newline at end of file
......@@ -27,13 +27,13 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......
......@@ -25,10 +25,10 @@ import java.io.PrintWriter;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册