diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 83eeb02da4e3a5abbd82528055af8b9456afda6d..cddab3d738761172f99921fa5fcec1050470789c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -48,32 +48,84 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class NettyRemotingAbstract {
+
+ /**
+ * Remoting logger instance.
+ */
private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ /**
+ * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
+ */
protected final Semaphore semaphoreOneway;
+ /**
+ * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
+ */
protected final Semaphore semaphoreAsync;
+ /**
+ * This map caches all on-going requests.
+ */
protected final ConcurrentHashMap responseTable =
new ConcurrentHashMap(256);
+ /**
+ * This container holds all processors per request code, aka, for each incoming request, we may look up the
+ * responding processor in this map to handle the request.
+ */
protected final HashMap> processorTable =
new HashMap>(64);
- protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
+ /**
+ * Executor to feed netty events to user defined {@link ChannelEventListener}.
+ */
+ protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
+
+ /**
+ * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+ */
protected Pair defaultRequestProcessor;
+ /**
+ * Constructor, specifying capacity of one-way and asynchronous semaphores.
+ * @param permitsOneway Number of permits for one-way requests.
+ * @param permitsAsync Number of permits for asynchronous requests.
+ */
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
+ /**
+ * Custom channel event listener.
+ * @return custom channel event listener if defined; null otherwise.
+ */
public abstract ChannelEventListener getChannelEventListener();
+ /**
+ * Put a netty event to the executor.
+ * @param event Netty event instance.
+ */
public void putNettyEvent(final NettyEvent event) {
- this.nettyEventExecuter.putNettyEvent(event);
+ this.nettyEventExecutor.putNettyEvent(event);
}
+ /**
+ * Entry of incoming command processing.
+ *
+ *
+ * Note:
+ * The incoming remoting command may be
+ *
+ * - An inquiry request from a remote peer component;
+ * - A response to a previous request issued by this very participant.
+ *
+ *
+ * @param ctx Channel handler context.
+ * @param msg incoming remoting command.
+ * @throws Exception if there were any error while processing the incoming command.
+ */
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
@@ -90,6 +142,11 @@ public abstract class NettyRemotingAbstract {
}
}
+ /**
+ * Process incoming request command issued by remote peer.
+ * @param ctx channel handler context.
+ * @param cmd request command.
+ */
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair matched = this.processorTable.get(cmd.getCode());
final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
@@ -175,6 +232,11 @@ public abstract class NettyRemotingAbstract {
}
}
+ /**
+ * Process response from remote peer to the previous issued requests.
+ * @param ctx channel handler context.
+ * @param cmd response command instance.
+ */
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
@@ -196,7 +258,10 @@ public abstract class NettyRemotingAbstract {
}
}
- //execute callback in callback executor. If callback executor is null, run directly in current thread
+ /**
+ * Execute callback in callback executor. If callback executor is null, run directly in current thread
+ * @param responseFuture
+ */
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
@@ -229,10 +294,24 @@ public abstract class NettyRemotingAbstract {
}
}
+ /**
+ * Custom RPC hook.
+ * @return RPC hook if specified; null otherwise.
+ */
public abstract RPCHook getRPCHook();
- abstract public ExecutorService getCallbackExecutor();
-
+ /**
+ * This method specifies thread pool to use while invoking callback methods.
+ * @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the
+ * netty event-loop thread.
+ */
+ public abstract ExecutorService getCallbackExecutor();
+
+ /**
+ *
+ * This method is periodically invoked to scan and expire deprecated request.
+ *
+ */
public void scanResponseTable() {
final List rfList = new LinkedList();
Iterator> it = this.responseTable.entrySet().iterator();
@@ -386,7 +465,7 @@ public abstract class NettyRemotingAbstract {
}
}
- class NettyEventExecuter extends ServiceThread {
+ class NettyEventExecutor extends ServiceThread {
private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue();
private final int maxSize = 10000;
@@ -436,7 +515,7 @@ public abstract class NettyRemotingAbstract {
@Override
public String getServiceName() {
- return NettyEventExecuter.class.getSimpleName();
+ return NettyEventExecutor.class.getSimpleName();
}
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 26088aa5a5b98aad899e4215e7e342e6037c41a6..52ca47e6c2e5fccc1d7ffb9cc7a39ad12a23f60c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -172,7 +172,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
- this.nettyEventExecuter.start();
+ this.nettyEventExecutor.start();
}
}
@@ -189,8 +189,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.eventLoopGroupWorker.shutdownGracefully();
- if (this.nettyEventExecuter != null) {
- this.nettyEventExecuter.shutdown();
+ if (this.nettyEventExecutor != null) {
+ this.nettyEventExecutor.shutdown();
}
if (this.defaultEventExecutorGroup != null) {
@@ -586,7 +586,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
-
}
}
@@ -594,8 +593,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
- final String local = localAddress == null ? "UNKNOWN" : localAddress.toString();
- final String remote = remoteAddress == null ? "UNKNOWN" : remoteAddress.toString();
+ final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
+ final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise);
@@ -613,7 +612,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
super.disconnect(ctx, promise);
if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@@ -625,7 +624,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
super.close(ctx, promise);
if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@@ -639,7 +638,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
@@ -654,7 +653,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 6a6df374b2ed7b09338d7b43e21ce4d470d9a5a8..d8d9b6518b996041be258c32b74b98e85747470a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -160,7 +160,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- new NettyConnetManageHandler(),
+ new NettyConnectManageHandler(),
new NettyServerHandler());
}
});
@@ -178,7 +178,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
if (this.channelEventListener != null) {
- this.nettyEventExecuter.start();
+ this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@@ -205,8 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
this.eventLoopGroupSelector.shutdownGracefully();
- if (this.nettyEventExecuter != null) {
- this.nettyEventExecuter.shutdown();
+ if (this.nettyEventExecutor != null) {
+ this.nettyEventExecutor.shutdown();
}
if (this.defaultEventExecutorGroup != null) {
@@ -297,7 +297,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
- class NettyConnetManageHandler extends ChannelDuplexHandler {
+ class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -319,7 +319,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
+ NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@@ -330,21 +330,21 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+ NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
- IdleStateEvent evnet = (IdleStateEvent) evt;
- if (evnet.state().equals(IdleState.ALL_IDLE)) {
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
@@ -359,7 +359,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+ NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());