提交 6609c866 编写于 作者: Z Zhanhui Li

Add javadoc to NettyRemotingAbstract class and several other trivial clean up.

上级 6a9628b3
...@@ -48,32 +48,84 @@ import org.slf4j.Logger; ...@@ -48,32 +48,84 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public abstract class NettyRemotingAbstract { public abstract class NettyRemotingAbstract {
/**
* Remoting logger instance.
*/
private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreOneway; protected final Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreAsync; protected final Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
*/
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256); new ConcurrentHashMap<Integer, ResponseFuture>(256);
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
* @param permitsOneway Number of permits for one-way requests.
* @param permitsAsync Number of permits for asynchronous requests.
*/
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true); this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true); this.semaphoreAsync = new Semaphore(permitsAsync, true);
} }
/**
* Custom channel event listener.
* @return custom channel event listener if defined; null otherwise.
*/
public abstract ChannelEventListener getChannelEventListener(); public abstract ChannelEventListener getChannelEventListener();
/**
* Put a netty event to the executor.
* @param event Netty event instance.
*/
public void putNettyEvent(final NettyEvent event) { public void putNettyEvent(final NettyEvent event) {
this.nettyEventExecuter.putNettyEvent(event); this.nettyEventExecutor.putNettyEvent(event);
} }
/**
* Entry of incoming command processing.
*
* <p>
* <strong>Note:</strong>
* The incoming remoting command may be
* <ul>
* <li>An inquiry request from a remote peer component;</li>
* <li>A response to a previous request issued by this very participant.</li>
* </ul>
* </p>
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg; final RemotingCommand cmd = msg;
if (cmd != null) { if (cmd != null) {
...@@ -90,6 +142,11 @@ public abstract class NettyRemotingAbstract { ...@@ -90,6 +142,11 @@ public abstract class NettyRemotingAbstract {
} }
} }
/**
* Process incoming request command issued by remote peer.
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
...@@ -175,6 +232,11 @@ public abstract class NettyRemotingAbstract { ...@@ -175,6 +232,11 @@ public abstract class NettyRemotingAbstract {
} }
} }
/**
* Process response from remote peer to the previous issued requests.
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque(); final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque); final ResponseFuture responseFuture = responseTable.get(opaque);
...@@ -196,7 +258,10 @@ public abstract class NettyRemotingAbstract { ...@@ -196,7 +258,10 @@ public abstract class NettyRemotingAbstract {
} }
} }
//execute callback in callback executor. If callback executor is null, run directly in current thread /**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
* @param responseFuture
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) { private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false; boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getCallbackExecutor();
...@@ -229,10 +294,24 @@ public abstract class NettyRemotingAbstract { ...@@ -229,10 +294,24 @@ public abstract class NettyRemotingAbstract {
} }
} }
/**
* Custom RPC hook.
* @return RPC hook if specified; null otherwise.
*/
public abstract RPCHook getRPCHook(); public abstract RPCHook getRPCHook();
abstract public ExecutorService getCallbackExecutor(); /**
* This method specifies thread pool to use while invoking callback methods.
* @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the
* netty event-loop thread.
*/
public abstract ExecutorService getCallbackExecutor();
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* </p>
*/
public void scanResponseTable() { public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
...@@ -386,7 +465,7 @@ public abstract class NettyRemotingAbstract { ...@@ -386,7 +465,7 @@ public abstract class NettyRemotingAbstract {
} }
} }
class NettyEventExecuter extends ServiceThread { class NettyEventExecutor extends ServiceThread {
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>(); private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000; private final int maxSize = 10000;
...@@ -436,7 +515,7 @@ public abstract class NettyRemotingAbstract { ...@@ -436,7 +515,7 @@ public abstract class NettyRemotingAbstract {
@Override @Override
public String getServiceName() { public String getServiceName() {
return NettyEventExecuter.class.getSimpleName(); return NettyEventExecutor.class.getSimpleName();
} }
} }
} }
...@@ -172,7 +172,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -172,7 +172,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}, 1000 * 3, 1000); }, 1000 * 3, 1000);
if (this.channelEventListener != null) { if (this.channelEventListener != null) {
this.nettyEventExecuter.start(); this.nettyEventExecutor.start();
} }
} }
...@@ -189,8 +189,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -189,8 +189,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.eventLoopGroupWorker.shutdownGracefully(); this.eventLoopGroupWorker.shutdownGracefully();
if (this.nettyEventExecuter != null) { if (this.nettyEventExecutor != null) {
this.nettyEventExecuter.shutdown(); this.nettyEventExecutor.shutdown();
} }
if (this.defaultEventExecutorGroup != null) { if (this.defaultEventExecutorGroup != null) {
...@@ -586,7 +586,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -586,7 +586,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg); processMessageReceived(ctx, msg);
} }
} }
...@@ -594,8 +593,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -594,8 +593,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : localAddress.toString(); final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : remoteAddress.toString(); final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise); super.connect(ctx, remoteAddress, localAddress, promise);
...@@ -613,7 +612,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -613,7 +612,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
super.disconnect(ctx, promise); super.disconnect(ctx, promise);
if (NettyRemotingClient.this.channelEventListener != null) { if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
} }
} }
...@@ -625,7 +624,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -625,7 +624,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
super.close(ctx, promise); super.close(ctx, promise);
if (NettyRemotingClient.this.channelEventListener != null) { if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
} }
} }
...@@ -639,7 +638,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -639,7 +638,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
closeChannel(ctx.channel()); closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) { if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
} }
} }
} }
...@@ -654,7 +653,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ...@@ -654,7 +653,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause); log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel()); closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) { if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel())); NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
} }
} }
} }
......
...@@ -160,7 +160,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -160,7 +160,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
new NettyEncoder(), new NettyEncoder(),
new NettyDecoder(), new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnetManageHandler(), new NettyConnectManageHandler(),
new NettyServerHandler()); new NettyServerHandler());
} }
}); });
...@@ -178,7 +178,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -178,7 +178,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
} }
if (this.channelEventListener != null) { if (this.channelEventListener != null) {
this.nettyEventExecuter.start(); this.nettyEventExecutor.start();
} }
this.timer.scheduleAtFixedRate(new TimerTask() { this.timer.scheduleAtFixedRate(new TimerTask() {
...@@ -205,8 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -205,8 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
this.eventLoopGroupSelector.shutdownGracefully(); this.eventLoopGroupSelector.shutdownGracefully();
if (this.nettyEventExecuter != null) { if (this.nettyEventExecutor != null) {
this.nettyEventExecuter.shutdown(); this.nettyEventExecutor.shutdown();
} }
if (this.defaultEventExecutorGroup != null) { if (this.defaultEventExecutorGroup != null) {
...@@ -297,7 +297,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -297,7 +297,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
} }
} }
class NettyConnetManageHandler extends ChannelDuplexHandler { class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
...@@ -319,7 +319,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -319,7 +319,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
super.channelActive(ctx); super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) { if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
} }
} }
...@@ -330,21 +330,21 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -330,21 +330,21 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
super.channelInactive(ctx); super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) { if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
} }
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleStateEvent evnet = (IdleStateEvent) evt; IdleStateEvent event = (IdleStateEvent) evt;
if (evnet.state().equals(IdleState.ALL_IDLE)) { if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel()); RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) { if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
} }
} }
} }
...@@ -359,7 +359,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -359,7 +359,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) { if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel())); NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
} }
RemotingUtil.closeChannel(ctx.channel()); RemotingUtil.closeChannel(ctx.channel());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册