From a0955a43fad5305dacc18df8e624cbea26b5309a Mon Sep 17 00:00:00 2001 From: "shenhui.backend" Date: Mon, 14 Oct 2019 23:50:27 +0800 Subject: [PATCH] compatible with JDK1.6 in remoting module --- .../rocketmq/broker/processor/ClientManageProcessor.java | 3 ++- .../broker/processor/ConsumerManageProcessor.java | 3 ++- .../broker/processor/EndTransactionProcessor.java | 3 ++- .../broker/processor/ForwardRequestProcessor.java | 3 ++- .../rocketmq/broker/processor/PullMessageProcessor.java | 3 ++- .../rocketmq/broker/processor/QueryMessageProcessor.java | 3 ++- .../rocketmq/client/impl/ClientRemotingProcessor.java | 3 ++- .../namesrv/processor/DefaultRequestProcessor.java | 3 ++- .../org/apache/rocketmq/remoting/RemotingServerTest.java | 9 ++------- 9 files changed, 18 insertions(+), 15 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java index 97123792..aa7d0a3a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java @@ -40,9 +40,10 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ClientManageProcessor implements NettyRequestProcessor { +public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index 028d21bf..77317a6f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -34,10 +34,11 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ConsumerManageProcessor implements NettyRequestProcessor { +public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index c9e85ed1..eb7982cf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -41,7 +42,7 @@ import org.apache.rocketmq.store.config.BrokerRole; /** * EndTransaction processor: process commit and rollback message */ -public class EndTransactionProcessor implements NettyRequestProcessor { +public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java index b0f0a054..cd935983 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java @@ -21,10 +21,11 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ForwardRequestProcessor implements NettyRequestProcessor { +public class ForwardRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 10c0112f..73511987 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -66,7 +67,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.stats.BrokerStatsManager; -public class PullMessageProcessor implements NettyRequestProcessor { +public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; private List consumeMessageHookList; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index a5ca872a..0f8d64c8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -33,12 +33,13 @@ import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class QueryMessageProcessor implements NettyRequestProcessor { +public class QueryMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index 0bd810a1..d45c5c22 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -46,11 +46,12 @@ import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class ClientRemotingProcessor implements NettyRequestProcessor { +public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 467078c4..7210246e 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -54,10 +54,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class DefaultRequestProcessor implements NettyRequestProcessor { +public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); protected final NamesrvController namesrvController; diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java index 0ecfaaa5..e378a7bf 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java @@ -26,12 +26,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; -import org.apache.rocketmq.remoting.netty.NettyRemotingServer; -import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.netty.ResponseFuture; +import org.apache.rocketmq.remoting.netty.*; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.AfterClass; @@ -48,7 +43,7 @@ public class RemotingServerTest { public static RemotingServer createRemotingServer() throws InterruptedException { NettyServerConfig config = new NettyServerConfig(); RemotingServer remotingServer = new NettyRemotingServer(config); - remotingServer.registerProcessor(0, new NettyRequestProcessor() { + remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { request.setRemark("Hi " + ctx.channel().remoteAddress()); -- GitLab