diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 8dccebc04575db85ddc70e8c41de1a2cdf8a93c2..206b96ad1d7694ae54a2d6d8868e5eacfb814ae5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract { */ protected volatile SslContext sslContext; + /** + * custom rpc hooks + */ + protected List rpcHooks = new ArrayList(); + + + static { NettyLogger.initNettyLogger(); } @@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract { } } + protected void doBeforeRpcHooks(String addr, RemotingCommand request) { + if (rpcHooks.size() > 0) { + for (RPCHook rpcHook: rpcHooks) { + rpcHook.doBeforeRequest(addr, request); + } + } + } + + protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) { + if (rpcHooks.size() > 0) { + for (RPCHook rpcHook: rpcHooks) { + rpcHook.doAfterResponse(addr, request, response); + } + } + } + + /** * Process incoming request command issued by remote peer. * @@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract { @Override public void run() { try { - RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); - if (rpcHook != null) { - rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - } - + doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); - if (rpcHook != null) { - rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); - } + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { @@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract { } } + + /** * Custom RPC hook. + * Just be compatible with the previous version, use getRPCHooks instead. + */ + @Deprecated + protected RPCHook getRPCHook() { + if (rpcHooks.size() > 0) { + return rpcHooks.get(0); + } + return null; + } + + /** + * Custom RPC hooks. * - * @return RPC hook if specified; null otherwise. + * @return RPC hooks if specified; null otherwise. */ - public abstract RPCHook getRPCHook(); + public List getRPCHooks() { + return rpcHooks; + } + /** * This method specifies thread pool to use while invoking callback methods. diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 33c2eed8de188e0b21a88d21aad68494460d3ccc..e891ad7299a268b4178a540b52c246abfda9ab88 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -34,6 +34,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; import java.net.SocketAddress; import java.security.cert.CertificateException; @@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; @@ -64,8 +67,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { @@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private ExecutorService callbackExecutor; private final ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; - private RPCHook rpcHook; public NettyRemotingClient(final NettyClientConfig nettyClientConfig) { this(nettyClientConfig, null); @@ -283,7 +283,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + if (!rpcHooks.contains(rpcHook)) { + rpcHooks.add(rpcHook); + } } public void closeChannel(final Channel channel) { @@ -357,6 +359,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } + + @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { @@ -364,17 +368,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } + doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); - if (this.rpcHook != null) { - this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); - } + doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); @@ -522,9 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } + doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTooMuchRequestException("invokeAsync call timeout"); @@ -547,9 +545,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } + doBeforeRpcHooks(addr, request); this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); @@ -592,10 +588,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelEventListener; } - @Override - public RPCHook getRPCHook() { - return this.rpcHook; - } @Override public ExecutorService getCallbackExecutor() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 198484251c0365eaf3bd2cebd97b1456e0dc7de2..90386f37e2658c9acf630bb221b39385511e3af9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -40,6 +40,8 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup; import java.io.IOException; import java.net.InetSocketAddress; import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.List; import java.util.NoSuchElementException; import java.util.Timer; import java.util.TimerTask; @@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private final Timer timer = new Timer("ServerHouseKeepingService", true); private DefaultEventExecutorGroup defaultEventExecutorGroup; - private RPCHook rpcHook; private int port = 0; @@ -266,7 +267,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + if (!rpcHooks.contains(rpcHook)) { + rpcHooks.add(rpcHook); + } } @Override @@ -318,10 +321,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return channelEventListener; } - @Override - public RPCHook getRPCHook() { - return this.rpcHook; - } @Override public ExecutorService getCallbackExecutor() {