diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java index 8c3c56a08cc3b19ed4e89f7fc06372688493f18b..4506e71e97a9ae1935365bb745fadb44907319d5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.netty; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.ByteBuffer; @@ -26,6 +27,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +@ChannelHandler.Sharable public class NettyEncoder extends MessageToByteEncoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 7f6284ea0b0ce8c220445cf310ab843eaac2723e..32d169b2656016ebdf565891d3d10c16b1890c40 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -22,6 +22,7 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -82,6 +83,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private static final String TLS_HANDLER_NAME = "sslHandler"; private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + // sharable handlers + private HandshakeHandler handshakeHandler; + private NettyEncoder encoder; + private NettyConnectManageHandler connectionManageHandler; + private NettyServerHandler serverHandler; + public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } @@ -186,6 +193,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } }); + prepareSharableHandlers(); + ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) @@ -200,14 +209,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() - .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, - new HandshakeHandler(TlsSystemConfig.tlsMode)) + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, - new NettyEncoder(), + encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnectManageHandler(), - new NettyServerHandler() + connectionManageHandler, + serverHandler ); } }); @@ -334,6 +342,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return this.publicExecutor; } + private void prepareSharableHandlers() { + handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); + encoder = new NettyEncoder(); + connectionManageHandler = new NettyConnectManageHandler(); + serverHandler = new NettyServerHandler(); + } + + @ChannelHandler.Sharable class HandshakeHandler extends SimpleChannelInboundHandler { private final TlsMode tlsMode; @@ -396,6 +412,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } + @ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler { @Override @@ -404,6 +421,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } + @ChannelHandler.Sharable class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java index c3da3e9af5982aa54dd6a728c80a3c803b206450..5330c90060da4304d3bba550cdfe2f5415474153 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java @@ -26,6 +26,9 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -90,4 +93,18 @@ public class NettyRemotingAbstractTest { semaphore.acquire(1); assertThat(semaphore.availablePermits()).isEqualTo(0); } + + @Test + public void testScanResponseTable() { + int dummyId = 1; + // mock timeout + ResponseFuture responseFuture = new ResponseFuture(null,dummyId, -1000, new InvokeCallback() { + @Override + public void operationComplete(final ResponseFuture responseFuture) { + } + }, null); + remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture); + remotingAbstract.scanResponseTable(); + assertNull(remotingAbstract.responseTable.get(dummyId)); + } } \ No newline at end of file