diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java index fbc071b28f7d6ab390f56dae0ad44697840814bf..5ba3534cb8af195571246b75e311f3c9ae3f1704 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java @@ -20,23 +20,23 @@ public class NettyClientConfig { /** * Worker thread number */ - private int clientWorkerThreads = 4; + private int clientWorkerThreads = NettySystemConfig.clientWorkerSize; private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors(); private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE; private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE; - private int connectTimeoutMillis = 3000; + private int connectTimeoutMillis = NettySystemConfig.connectTimeoutMillis; private long channelNotActiveInterval = 1000 * 60; /** * IdleStateEvent will be triggered when neither read nor write was performed for * the specified period of this time. Specify {@code 0} to disable */ - private int clientChannelMaxIdleTimeSeconds = 120; + private int clientChannelMaxIdleTimeSeconds = NettySystemConfig.clientChannelMaxIdleTimeSeconds; private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean clientPooledByteBufAllocatorEnable = false; - private boolean clientCloseSocketIfTimeout = false; + private boolean clientCloseSocketIfTimeout = NettySystemConfig.clientCloseSocketIfTimeout; private boolean useTLS; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index 6357c03ba8b4203a83db3df16601d7358cf77728..ef767a36edf13df8d1d3e498419a62202e22085c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -28,8 +28,17 @@ public class NettySystemConfig { "com.rocketmq.remoting.clientAsyncSemaphoreValue"; public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientOnewaySemaphoreValue"; + public static final String COM_ROCKETMQ_REMOTING_CLIENT_WORKER_SIZE = + "com.rocketmq.remoting.client.worker.size"; + public static final String COM_ROCKETMQ_REMOTING_CLIENT_CONNECT_TIMEOUT = + "com.rocketmq.remoting.client.connect.timeout"; + public static final String COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS = + "com.rocketmq.remoting.client.channel.maxIdleTimeSeconds"; + public static final String COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT = + "com.rocketmq.remoting.client.closeSocketIfTimeout"; - public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // + + public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); @@ -39,4 +48,12 @@ public class NettySystemConfig { Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); public static int socketRcvbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); + public static int clientWorkerSize = + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_WORKER_SIZE, "4")); + public static int connectTimeoutMillis = + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CONNECT_TIMEOUT, "3000")); + public static int clientChannelMaxIdleTimeSeconds = + Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS, "120")); + public static boolean clientCloseSocketIfTimeout = + Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT, "true")); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyClientConfigTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyClientConfigTest.java new file mode 100644 index 0000000000000000000000000000000000000000..15cf0338649339b8d4bc4d886c1223c7432da65a --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyClientConfigTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.netty; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class NettyClientConfigTest { + + @Test + public void testChangeConfigBySystemProperty() throws NoSuchFieldException, IllegalAccessException { + + + System.setProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_WORKER_SIZE, "1"); + System.setProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_CONNECT_TIMEOUT, "2000"); + System.setProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS, "60"); + System.setProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "16383"); + System.setProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "16384"); + System.setProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT, "false"); + + + NettySystemConfig.socketSndbufSize = + Integer.parseInt(System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); + NettySystemConfig.socketRcvbufSize = + Integer.parseInt(System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); + NettySystemConfig.clientWorkerSize = + Integer.parseInt(System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_WORKER_SIZE, "4")); + NettySystemConfig.connectTimeoutMillis = + Integer.parseInt(System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_CONNECT_TIMEOUT, "3000")); + NettySystemConfig.clientChannelMaxIdleTimeSeconds = + Integer.parseInt(System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS, "120")); + NettySystemConfig.clientCloseSocketIfTimeout = + Boolean.parseBoolean(System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT, "true")); + + NettyClientConfig changedConfig = new NettyClientConfig(); + assertThat(changedConfig.getClientWorkerThreads()).isEqualTo(1); + assertThat(changedConfig.getClientOnewaySemaphoreValue()).isEqualTo(65535); + assertThat(changedConfig.getClientAsyncSemaphoreValue()).isEqualTo(65535); + assertThat(changedConfig.getConnectTimeoutMillis()).isEqualTo(2000); + assertThat(changedConfig.getClientChannelMaxIdleTimeSeconds()).isEqualTo(60); + assertThat(changedConfig.getClientSocketSndBufSize()).isEqualTo(16383); + assertThat(changedConfig.getClientSocketRcvBufSize()).isEqualTo(16384); + assertThat(changedConfig.isClientCloseSocketIfTimeout()).isEqualTo(false); + } +}