From e4c0a4c4ed6d84dcc297ff878d84b32f78b162c7 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Wed, 16 Jan 2019 16:44:01 +0800 Subject: [PATCH] polish and adjust codes for remoting module. --- distribution/conf/flowControl.yml | 19 +++++++++++++++++++ .../netty/NettyChannelHandlerContextImpl.java | 3 +-- .../remoting/netty/NettyChannelImpl.java | 4 +--- .../remoting/netty/NettyRemotingAbstract.java | 9 ++++++--- .../remoting/util/ServiceProvider.java | 8 -------- 5 files changed, 27 insertions(+), 16 deletions(-) create mode 100644 distribution/conf/flowControl.yml diff --git a/distribution/conf/flowControl.yml b/distribution/conf/flowControl.yml new file mode 100644 index 00000000..cf71d2dd --- /dev/null +++ b/distribution/conf/flowControl.yml @@ -0,0 +1,19 @@ +snode: + countLimit: # flow control type, only requestCount & requestSize support + - flowControlResourceName: 310 + flowControlGrade: 1 + flowControlBehavior: 1 + flowControlResourceCount: 500.00 #QPS + + - flowControlResourceName: overall + flowControlGrade: 1 + flowControlBehavior: 1 + flowControlResourceCount: 10000.00 #QPS + + sizeLimit: + - flowControlResourceName: 310 + flowControlGrade: 1 + flowControlBehavior: 1 + flowControlResourceCount: 5.00 #KB/S + + topicLimit: \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java index f1746fab..dde3419d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java @@ -23,9 +23,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyChannelHandlerContextImpl implements RemotingChannel { - public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; - private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING); + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private final ChannelHandlerContext channelHandlerContext; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java index 90f2b31d..cc76e8eb 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java @@ -28,9 +28,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyChannelImpl implements RemotingChannel { - public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; - - private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING); + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private final Channel channel; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index f2ff24bb..3f791705 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -176,7 +176,7 @@ public abstract class NettyRemotingAbstract { processRequestCommand(remotingChannel, cmd); break; case RESPONSE_COMMAND: - processResponseCommand(ctx, cmd); + processResponseCommand(remotingChannel, cmd); break; default: break; @@ -279,10 +279,13 @@ public abstract class NettyRemotingAbstract { /** * Process response from remote peer to the previous issued requests. * - * @param ctx channel handler context. + * @param remotingChannel channel handler context. * @param cmd response command instance. */ - public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { + public void processResponseCommand(final RemotingChannel remotingChannel, RemotingCommand cmd) { + NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; + final ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); + final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java index cef31f24..472a8de2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java @@ -32,14 +32,6 @@ public class ServiceProvider { */ private static ClassLoader thisClassLoader; - /** - * JDK1.3+ 'Service Provider' - * specification. - */ - public static final String TRANSACTION_SERVICE_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService"; - - public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"; - static { thisClassLoader = getClassLoader(ServiceProvider.class); } -- GitLab