提交 4915871c 编写于 作者: Z zander

Expose the rpc hook

上级 7a6164d4
......@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
......@@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract {
*/
protected volatile SslContext sslContext;
/**
* custom rpc hooks
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
static {
NettyLogger.initNettyLogger();
}
......@@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract {
}
}
protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
}
protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doAfterResponse(addr, request, response);
}
}
}
/**
* Process incoming request command issued by remote peer.
*
......@@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
......@@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract {
}
}
/**
* Custom RPC hook.
* Just be compatible with the previous version, use getRPCHooks instead.
*/
@Deprecated
protected RPCHook getRPCHook() {
if (rpcHooks.size() > 0) {
return rpcHooks.get(0);
}
return null;
}
/**
* Custom RPC hooks.
*
* @return RPC hook if specified; null otherwise.
* @return RPC hooks if specified; null otherwise.
*/
public abstract RPCHook getRPCHook();
public List<RPCHook> getRPCHooks() {
return rpcHooks;
}
/**
* This method specifies thread pool to use while invoking callback methods.
......
......@@ -34,6 +34,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
......@@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
......@@ -64,8 +67,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
......@@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private ExecutorService callbackExecutor;
private final ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
......@@ -283,7 +283,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
if (!rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
public void closeChannel(final Channel channel) {
......@@ -357,6 +359,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
......@@ -364,17 +368,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
......@@ -522,9 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
......@@ -547,9 +545,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
doBeforeRpcHooks(addr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
......@@ -592,10 +588,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
......
......@@ -40,6 +40,8 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
......@@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
private int port = 0;
......@@ -266,7 +267,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void registerRPCHook(RPCHook rpcHook) {
this.rpcHook = rpcHook;
if (!rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
@Override
......@@ -318,10 +321,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
@Override
public RPCHook getRPCHook() {
return this.rpcHook;
}
@Override
public ExecutorService getCallbackExecutor() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册