diff --git a/distribution/conf/flowControl.yml b/distribution/conf/flowControl.yml new file mode 100644 index 0000000000000000000000000000000000000000..cf71d2ddca549fc0da90cdef18ff94e69756166e --- /dev/null +++ b/distribution/conf/flowControl.yml @@ -0,0 +1,19 @@ +snode: + countLimit: # flow control type, only requestCount & requestSize support + - flowControlResourceName: 310 + flowControlGrade: 1 + flowControlBehavior: 1 + flowControlResourceCount: 500.00 #QPS + + - flowControlResourceName: overall + flowControlGrade: 1 + flowControlBehavior: 1 + flowControlResourceCount: 10000.00 #QPS + + sizeLimit: + - flowControlResourceName: 310 + flowControlGrade: 1 + flowControlBehavior: 1 + flowControlResourceCount: 5.00 #KB/S + + topicLimit: \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java index f1746fab192c99a66e99c0b0f9cc4d328020fe1e..dde3419d4d91c68de5e899caf9452c577b426339 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java @@ -23,9 +23,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyChannelHandlerContextImpl implements RemotingChannel { - public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; - private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING); + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private final ChannelHandlerContext channelHandlerContext; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java index 90f2b31d55f0fb1e4ea619b506e635328d3be71a..cc76e8ebc7ff6cf68e4d9e2f50418e7f8b4199ec 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java @@ -28,9 +28,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class NettyChannelImpl implements RemotingChannel { - public static final String ROCKETMQ_REMOTING = "RocketmqRemoting"; - - private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING); + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private final Channel channel; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index f2ff24bbf9e02ecfbd28da7346261ec4ff8cdae1..3f7917058af97e34133bf784b60d8f2e8797dc8f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -176,7 +176,7 @@ public abstract class NettyRemotingAbstract { processRequestCommand(remotingChannel, cmd); break; case RESPONSE_COMMAND: - processResponseCommand(ctx, cmd); + processResponseCommand(remotingChannel, cmd); break; default: break; @@ -279,10 +279,13 @@ public abstract class NettyRemotingAbstract { /** * Process response from remote peer to the previous issued requests. * - * @param ctx channel handler context. + * @param remotingChannel channel handler context. * @param cmd response command instance. */ - public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { + public void processResponseCommand(final RemotingChannel remotingChannel, RemotingCommand cmd) { + NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel; + final ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext(); + final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java index cef31f244d1159465133d399080954cca6f8c920..472a8de2dc1b82e3ff188b0979165a602c7eb22e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java @@ -32,14 +32,6 @@ public class ServiceProvider { */ private static ClassLoader thisClassLoader; - /** - * JDK1.3+ 'Service Provider' - * specification. - */ - public static final String TRANSACTION_SERVICE_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService"; - - public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"; - static { thisClassLoader = getClassLoader(ServiceProvider.class); }