From f7c55b9fd3e0777993f84152b9127cfc07dce18e Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Tue, 15 Jan 2019 19:26:22 +0800 Subject: [PATCH] Fix unit test ensure mvn install pass --- .../broker/client/ClientChannelInfo.java | 21 ++++---- .../client/ClientHousekeepingService.java | 16 +++---- .../broker/client/ConsumerGroupInfo.java | 15 +++--- .../broker/client/ConsumerManager.java | 12 ++--- .../DefaultConsumerIdsChangeListener.java | 3 -- .../broker/client/ProducerManager.java | 26 ++++------ .../broker/client/net/Broker2Client.java | 14 +++--- .../rocketmq/broker/out/BrokerOuterAPI.java | 5 +- .../processor/AdminBrokerProcessor.java | 6 +-- ...ractTransactionalMessageCheckListener.java | 12 ++--- .../broker/client/ProducerManagerTest.java | 26 +++++----- .../processor/ClientManageProcessorTest.java | 40 +++++++++------- .../EndTransactionProcessorTest.java | 32 +++++++------ .../processor/PullMessageProcessorTest.java | 47 ++++++++---------- .../processor/SendMessageProcessorTest.java | 46 ++++++++---------- .../store/RemoteBrokerOffsetStore.java | 3 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 6 +-- .../client/impl/factory/MQClientInstance.java | 6 +-- .../consumer/DefaultMQPushConsumerTest.java | 1 - .../org/apache/rocketmq/common/UtilAll.java | 4 -- .../common/flowcontrol/FlowControlConfig.java | 5 +- .../common/flowcontrol/FlowControlRule.java | 2 - .../rocketmq/example/benchmark/Producer.java | 3 +- .../rocketmq/example/quickstart/Producer.java | 1 - .../processor/DefaultRequestProcessor.java | 3 +- .../routeinfo/BrokerHousekeepingService.java | 24 ++++++---- .../DefaultRequestProcessorTest.java | 27 +++++++---- .../rocketmq/remoting/RemotingServer.java | 1 - .../netty/NettyChannelHandlerContextImpl.java | 2 +- .../remoting/netty/NettyChannelImpl.java | 2 +- .../transport/http2/Http2ClientImpl.java | 16 ++----- .../transport/http2/Http2ServerImpl.java | 48 +++++++++---------- .../rocketmq/NettyRemotingClient.java | 10 +--- .../rocketmq/NettyRemotingServer.java | 3 +- .../netty/NettyRemotingClientTest.java | 2 +- .../protocol/RemotingCommandTest.java | 2 +- .../DefaultConsumerIdsChangeListener.java | 4 +- .../snode/constant/SnodeConstant.java | 7 +-- .../RequestSizeFlowControlServiceImpl.java | 1 - .../snode/processor/HeartbeatProcessor.java | 2 +- .../snode/processor/SendMessageProcessor.java | 1 - .../snode/service/impl/EnodeServiceImpl.java | 25 +++++----- .../snode/service/impl/NnodeServiceImpl.java | 6 +-- .../snode/service/impl/PushServiceImpl.java | 2 +- 44 files changed, 251 insertions(+), 289 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java index 192a6f8c..ef8be3e3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java @@ -16,12 +16,11 @@ */ package org.apache.rocketmq.broker.client; -import io.netty.channel.Channel; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.serialize.LanguageCode; public class ClientChannelInfo { - private final RemotingChannel channel; + private final RemotingChannel remotingChannel; private final String clientId; private final LanguageCode language; private final int version; @@ -31,15 +30,15 @@ public class ClientChannelInfo { this(channel, null, null, 0); } - public ClientChannelInfo(RemotingChannel channel, String clientId, LanguageCode language, int version) { - this.channel = channel; + public ClientChannelInfo(RemotingChannel remotingChannel, String clientId, LanguageCode language, int version) { + this.remotingChannel = remotingChannel; this.clientId = clientId; this.language = language; this.version = version; } - public RemotingChannel getChannel() { - return channel; + public RemotingChannel getRemotingChannel() { + return remotingChannel; } public String getClientId() { @@ -66,7 +65,7 @@ public class ClientChannelInfo { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((channel == null) ? 0 : channel.hashCode()); + result = prime * result + ((remotingChannel == null) ? 0 : remotingChannel.hashCode()); result = prime * result + ((clientId == null) ? 0 : clientId.hashCode()); result = prime * result + ((language == null) ? 0 : language.hashCode()); result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32)); @@ -83,10 +82,10 @@ public class ClientChannelInfo { if (getClass() != obj.getClass()) return false; ClientChannelInfo other = (ClientChannelInfo) obj; - if (channel == null) { - if (other.channel != null) + if (remotingChannel == null) { + if (other.remotingChannel != null) return false; - } else if (this.channel != other.channel) { + } else if (this.remotingChannel != other.remotingChannel) { return false; } @@ -95,7 +94,7 @@ public class ClientChannelInfo { @Override public String toString() { - return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language + return "ClientChannelInfo [remotingChannel=" + remotingChannel + ", clientId=" + clientId + ", language=" + language + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]"; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java index 5a39e4f5..8022a0f5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java @@ -74,28 +74,28 @@ public class ClientHousekeepingService implements ChannelEventListener { log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; Channel channel = nettyChannel.getChannel(); - this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, remotingChannel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, remotingChannel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); } @Override public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) { - log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; Channel channel = nettyChannel.getChannel(); - this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, remotingChannel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, remotingChannel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) { - log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; Channel channel = nettyChannel.getChannel(); - this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); - this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); + log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, remotingChannel); + this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, remotingChannel); this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 36314244..1e44df20 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.client; -import io.netty.channel.Channel; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -26,11 +25,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; public class ConsumerGroupInfo { @@ -99,13 +98,13 @@ public class ConsumerGroupInfo { } public void unregisterChannel(final ClientChannelInfo clientChannelInfo) { - ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel()); + ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getRemotingChannel()); if (old != null) { log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString()); } } - public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { + public boolean doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) { final ClientChannelInfo info = this.channelInfoTable.remove(channel); if (info != null) { log.warn( @@ -124,9 +123,9 @@ public class ConsumerGroupInfo { this.messageModel = messageModel; this.consumeFromWhere = consumeFromWhere; - ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); + ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getRemotingChannel()); if (null == infoOld) { - ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); + ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getRemotingChannel(), infoNew); if (null == prev) { log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType, messageModel, infoNew.toString()); @@ -140,7 +139,7 @@ public class ConsumerGroupInfo { this.groupName, infoOld.toString(), infoNew.toString()); - this.channelInfoTable.put(infoNew.getChannel(), infoNew); + this.channelInfoTable.put(infoNew.getRemotingChannel(), infoNew); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index f621c1d1..c39d7b4d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.client; -import io.netty.channel.Channel; import java.util.HashSet; import java.util.Iterator; import java.util.Map.Entry; @@ -25,14 +24,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; public class ConsumerManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -75,7 +73,7 @@ public class ConsumerManager { return 0; } - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) { Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); @@ -159,9 +157,9 @@ public class ConsumerManager { if (diff > CHANNEL_EXPIRED_TIMEOUT) { log.warn( "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}", - RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel().remoteAddress()), group); + RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getRemotingChannel().remoteAddress()), group); - clientChannelInfo.getChannel().close(); + clientChannelInfo.getRemotingChannel().close(); itChannel.remove(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java index e2174dc4..d583453a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java @@ -16,11 +16,8 @@ */ package org.apache.rocketmq.broker.client; -import io.netty.channel.Channel; - import java.util.Collection; import java.util.List; - import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.RemotingChannel; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 1c9a557b..11b2c155 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.broker.client; -import io.netty.channel.Channel; - import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -27,14 +25,12 @@ import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.common.RemotingUtil; public class ProducerManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -86,8 +82,8 @@ public class ProducerManager { it.remove(); log.warn( "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", - RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress()), group); - info.getChannel().close(); + RemotingHelper.parseChannelRemoteAddr(info.getRemotingChannel().remoteAddress()), group); + info.getRemotingChannel().close(); } } } @@ -102,7 +98,7 @@ public class ProducerManager { } } - public void doChannelCloseEvent(final String remoteAddr, final Channel channel) { + public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) { if (channel != null) { try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { @@ -110,16 +106,12 @@ public class ProducerManager { for (final Map.Entry> entry : this.groupChannelTable .entrySet()) { final String group = entry.getKey(); - final HashMap clientChannelInfoTable = - entry.getValue(); - final ClientChannelInfo clientChannelInfo = - clientChannelInfoTable.remove(channel); + final HashMap clientChannelInfoTable = entry.getValue(); + final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel); if (clientChannelInfo != null) { - log.info( - "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", + log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", clientChannelInfo.toString(), remoteAddr, group); } - } } finally { this.groupChannelLock.unlock(); @@ -145,9 +137,9 @@ public class ProducerManager { this.groupChannelTable.put(group, channelTable); } - clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); + clientChannelInfoFound = channelTable.get(clientChannelInfo.getRemotingChannel()); if (null == clientChannelInfoFound) { - channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo); + channelTable.put(clientChannelInfo.getRemotingChannel(), clientChannelInfo); log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString()); } @@ -172,7 +164,7 @@ public class ProducerManager { try { HashMap channelTable = this.groupChannelTable.get(group); if (null != channelTable && !channelTable.isEmpty()) { - ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel()); + ClientChannelInfo old = channelTable.remove(clientChannelInfo.getRemotingChannel()); if (old != null) { log.info("unregister a producer[{}] from groupChannelTable {}", group, clientChannelInfo.toString()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 4eee9dbd..370fee42 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -16,7 +16,12 @@ */ package org.apache.rocketmq.broker.client.net; -import io.netty.channel.Channel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -45,13 +50,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; - public class Broker2Client { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index d8e96a81..001058c1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponse import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClientFactory; @@ -55,7 +56,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class BrokerOuterAPI { @@ -193,9 +193,6 @@ public class BrokerOuterAPI { RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; - if (response == null){ - System.out.println("ssssssssssssss"); - } switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a83c4888..914bb5b9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -603,7 +603,7 @@ public class AdminBrokerProcessor implements RequestProcessor { connection.setClientId(info.getClientId()); connection.setLanguage(info.getLanguage()); connection.setVersion(info.getVersion()); - connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress())); + connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getRemotingChannel().remoteAddress())); bodydata.getConnectionSet().add(connection); } @@ -638,7 +638,7 @@ public class AdminBrokerProcessor implements RequestProcessor { connection.setClientId(info.getClientId()); connection.setLanguage(info.getLanguage()); connection.setVersion(info.getVersion()); - connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress())); + connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getRemotingChannel().remoteAddress())); bodydata.getConnectionSet().add(connection); } @@ -1278,7 +1278,7 @@ public class AdminBrokerProcessor implements RequestProcessor { newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); - return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); + return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getRemotingChannel(), newRequest); } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index 152e0675..1a6c4bec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -16,7 +16,11 @@ */ package org.apache.rocketmq.broker.transaction; -import io.netty.channel.Channel; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; @@ -24,12 +28,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.RemotingChannel; public abstract class AbstractTransactionalMessageCheckListener { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java index 2b01e7a3..c52ee2e4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelFuture; import java.lang.reflect.Field; import java.util.HashMap; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,35 +38,38 @@ public class ProducerManagerTest { private String group = "FooBar"; private ClientChannelInfo clientInfo; + private RemotingChannel remotingChannel; + @Mock - private RemotingChannel channel; + private Channel channel; @Before public void init() { producerManager = new ProducerManager(); - clientInfo = new ClientChannelInfo(channel); + remotingChannel = new NettyChannelImpl(channel); + clientInfo = new ClientChannelInfo(remotingChannel); } @Test public void scanNotActiveChannel() throws Exception { producerManager.registerProducer(group, clientInfo); - assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull(); - + assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNotNull(); Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT"); field.setAccessible(true); long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager); clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10); -// when(channel.close()).thenReturn(mock(ChannelFuture.class)); + when(channel.close()).thenReturn(mock(ChannelFuture.class)); producerManager.scanNotActiveChannel(); - assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull(); + assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNull(); } @Test public void doChannelCloseEvent() throws Exception { producerManager.registerProducer(group, clientInfo); - assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull(); -// producerManager.doChannelCloseEvent("127.0.0.1", channel); - assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull(); + assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNotNull(); + producerManager.doChannelCloseEvent("127.0.0.1", remotingChannel); + assertThat(producerManager.getGroupChannelTable().get(group).get(remotingChannel)).isNull(); + } @Test @@ -73,7 +77,7 @@ public class ProducerManagerTest { producerManager.registerProducer(group, clientInfo); HashMap channelMap = producerManager.getGroupChannelTable().get(group); assertThat(channelMap).isNotNull(); - assertThat(channelMap.get(channel)).isEqualTo(clientInfo); + assertThat(channelMap.get(remotingChannel)).isEqualTo(clientInfo); } @Test @@ -81,7 +85,7 @@ public class ProducerManagerTest { producerManager.registerProducer(group, clientInfo); HashMap channelMap = producerManager.getGroupChannelTable().get(group); assertThat(channelMap).isNotNull(); - assertThat(channelMap.get(channel)).isEqualTo(clientInfo); + assertThat(channelMap.get(remotingChannel)).isEqualTo(clientInfo); producerManager.unregisterProducer(group, clientInfo); channelMap = producerManager.getGroupChannelTable().get(group); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java index 1be03097..ef005981 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.util.HashMap; import java.util.UUID; @@ -27,14 +28,15 @@ import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.ServerConfig; -import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Before; import org.junit.Test; @@ -45,6 +47,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class ClientManageProcessorTest { @@ -52,9 +55,15 @@ public class ClientManageProcessorTest { @Spy private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig()); @Mock - private ChannelHandlerContext handlerContext; + private NettyChannelHandlerContextImpl handlerContext; + @Mock - private NettyChannelImpl channel; + private ChannelHandlerContext channelHandlerContext; + + private RemotingChannel remotingChannel; + + @Mock + private Channel channel; private ClientChannelInfo clientChannelInfo; private String clientId = UUID.randomUUID().toString(); @@ -63,11 +72,12 @@ public class ClientManageProcessorTest { @Before public void init() { -// when(handlerContext.channel()).thenReturn(channel); + when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext); + when(channelHandlerContext.channel()).thenReturn(channel); clientManageProcessor = new ClientManageProcessor(brokerController); - clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100); + remotingChannel = new NettyChannelImpl(channel); + clientChannelInfo = new ClientChannelInfo(remotingChannel, clientId, LanguageCode.JAVA, 100); brokerController.getProducerManager().registerProducer(group, clientChannelInfo); - ConsumerData consumerData = createConsumerData(group, topic); brokerController.getConsumerManager().registerConsumer( consumerData.getGroupName(), @@ -84,11 +94,10 @@ public class ClientManageProcessorTest { brokerController.getProducerManager().registerProducer(group, clientChannelInfo); HashMap channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group); assertThat(channelMap).isNotNull(); - assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo); + assertThat(channelMap.get(remotingChannel)).isEqualTo(clientChannelInfo); RemotingCommand request = createUnRegisterProducerCommand(); - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = clientManageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); @@ -102,9 +111,8 @@ public class ClientManageProcessorTest { assertThat(consumerGroupInfo).isNotNull(); RemotingCommand request = createUnRegisterConsumerCommand(); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = clientManageProcessor.processRequest(nettyChannelHandlerContext, request); assertThat(response).isNotNull(); + RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request); + assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group); @@ -118,7 +126,7 @@ public class ClientManageProcessorTest { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); request.setLanguage(LanguageCode.JAVA); request.setVersion(100); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } @@ -129,7 +137,7 @@ public class ClientManageProcessorTest { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader); request.setLanguage(LanguageCode.JAVA); request.setVersion(100); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } } \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java index 82c0a7d2..6da07986 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.processor; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.transaction.OperationResult; @@ -28,10 +29,11 @@ import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; @@ -49,6 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -57,7 +60,10 @@ public class EndTransactionProcessorTest { private EndTransactionProcessor endTransactionProcessor; @Mock - private ChannelHandlerContext handlerContext; + private NettyChannelHandlerContextImpl handlerContext; + + @Mock + private ChannelHandlerContext channelHandlerContext; @Spy private BrokerController @@ -74,10 +80,13 @@ public class EndTransactionProcessorTest { public void init() { brokerController.setMessageStore(messageStore); brokerController.setTransactionalMessageService(transactionMsgService); + Channel mockChannel = mock(Channel.class); + when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext); + when(channelHandlerContext.channel()).thenReturn(mockChannel); endTransactionProcessor = new EndTransactionProcessor(brokerController); } - private OperationResult createResponse(int status) { + private OperationResult createResponse(int status){ OperationResult response = new OperationResult(); response.setPrepareMessage(createDefaultMessageExt()); response.setResponseCode(status); @@ -91,9 +100,7 @@ public class EndTransactionProcessorTest { when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @@ -103,16 +110,14 @@ public class EndTransactionProcessorTest { when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult (PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true); - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @Test public void testProcessRequest_NotType() throws RemotingCommandException { RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_NOT_TYPE, true); - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request); assertThat(response).isNull(); } @@ -120,8 +125,7 @@ public class EndTransactionProcessorTest { public void testProcessRequest_RollBack() throws RemotingCommandException { when(transactionMsgService.rollbackMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS)); RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, true); - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @@ -152,7 +156,7 @@ public class EndTransactionProcessorTest { private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) { EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java index a5a32c42..fd99c08e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java @@ -38,11 +38,12 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.CodecHelper; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; @@ -69,9 +70,11 @@ public class PullMessageProcessorTest { @Spy private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig()); @Mock - private ChannelHandlerContext handlerContext; + private NettyChannelHandlerContextImpl handlerContext; + @Mock - private Channel channel; + private ChannelHandlerContext channelHandlerContext; + @Mock private MessageStore messageStore; private ClientChannelInfo clientChannelInfo; @@ -82,11 +85,13 @@ public class PullMessageProcessorTest { public void init() { brokerController.setMessageStore(messageStore); pullMessageProcessor = new PullMessageProcessor(brokerController); - RemotingChannel mockChannel = mock(RemotingChannel.class); + Channel mockChannel = mock(Channel.class); + RemotingChannel remotingChannel = mock(RemotingChannel.class); when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); -// when(handlerContext.channel()).thenReturn(mockChannel); + when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext); + when(channelHandlerContext.channel()).thenReturn(mockChannel); brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); - clientChannelInfo = new ClientChannelInfo(mockChannel); + clientChannelInfo = new ClientChannelInfo(remotingChannel); ConsumerData consumerData = createConsumerData(group, topic); brokerController.getConsumerManager().registerConsumer( consumerData.getGroupName(), @@ -102,8 +107,7 @@ public class PullMessageProcessorTest { public void testProcessRequest_TopicNotExist() throws RemotingCommandException { brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST); assertThat(response.getRemark()).contains("topic[" + topic + "] not exist"); @@ -113,9 +117,7 @@ public class PullMessageProcessorTest { public void testProcessRequest_SubNotExist() throws RemotingCommandException { brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, false); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_EXIST); assertThat(response.getRemark()).contains("consumer's group info not exist"); @@ -125,8 +127,7 @@ public class PullMessageProcessorTest { public void testProcessRequest_SubNotLatest() throws RemotingCommandException { final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); request.addExtField("subVersion", String.valueOf(101)); - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_LATEST); assertThat(response.getRemark()).contains("subscription not latest"); @@ -138,9 +139,7 @@ public class PullMessageProcessorTest { when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @@ -169,9 +168,7 @@ public class PullMessageProcessorTest { consumeMessageHookList.add(consumeMessageHook); pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(messageContext[0]).isNotNull(); @@ -187,9 +184,7 @@ public class PullMessageProcessorTest { when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY); } @@ -201,9 +196,7 @@ public class PullMessageProcessorTest { when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult); final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request); + RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED); } @@ -220,7 +213,7 @@ public class PullMessageProcessorTest { requestHeader.setSysFlag(0); requestHeader.setSubVersion(100L); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index ac8a1060..c6a5b51b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -18,6 +18,10 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; @@ -32,10 +36,11 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.netty.CodecHelper; +import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; @@ -53,11 +58,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -69,7 +69,10 @@ import static org.mockito.Mockito.when; public class SendMessageProcessorTest { private SendMessageProcessor sendMessageProcessor; @Mock - private ChannelHandlerContext handlerContext; + private NettyChannelHandlerContextImpl handlerContext; + + @Mock + private ChannelHandlerContext channelHandlerContext; @Spy private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig()); @Mock @@ -87,7 +90,8 @@ public class SendMessageProcessorTest { when(messageStore.now()).thenReturn(System.currentTimeMillis()); Channel mockChannel = mock(Channel.class); when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024)); - when(handlerContext.channel()).thenReturn(mockChannel); + when(handlerContext.getChannelHandlerContext()).thenReturn(channelHandlerContext); + when(channelHandlerContext.channel()).thenReturn(mockChannel); when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt()); sendMessageProcessor = new SendMessageProcessor(brokerController); } @@ -182,9 +186,7 @@ public class SendMessageProcessorTest { final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK); sendMessageProcessor = new SendMessageProcessor(brokerController); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand response = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request); + final RemotingCommand response = sendMessageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } @@ -201,11 +203,8 @@ public class SendMessageProcessorTest { response[0] = invocation.getArgument(0); return null; } - }).when(handlerContext).writeAndFlush(any(Object.class)); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand responseToReturn = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request); - + }).when(channelHandlerContext).writeAndFlush(any(Object.class)); + RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request); if (responseToReturn != null) { assertThat(response[0]).isNull(); response[0] = responseToReturn; @@ -213,7 +212,6 @@ public class SendMessageProcessorTest { assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS); } - private RemotingCommand createSendTransactionMsgCommand(int requestCode) { SendMessageRequestHeader header = createSendMsgRequestHeader(); int sysFlag = header.getSysFlag(); @@ -224,7 +222,7 @@ public class SendMessageProcessorTest { header.setSysFlag(sysFlag); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header); request.setBody(new byte[] {'a'}); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } @@ -247,7 +245,7 @@ public class SendMessageProcessorTest { RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); request.setBody(new byte[] {'a'}); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } @@ -260,7 +258,7 @@ public class SendMessageProcessorTest { requestHeader.setOffset(123L); RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader); -// request.makeCustomHeaderToNet(); + CodecHelper.makeCustomHeaderToNet(request); return request; } @@ -273,10 +271,8 @@ public class SendMessageProcessorTest { response[0] = invocation.getArgument(0); return null; } - }).when(handlerContext).writeAndFlush(any(Object.class)); - - NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext); - RemotingCommand responseToReturn = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request); + }).when(channelHandlerContext).writeAndFlush(any(Object.class)); + RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request); if (responseToReturn != null) { assertThat(response[0]).isNull(); response[0] = responseToReturn; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index f3e2f4e7..9087505f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.consumer.store; -import com.sun.org.apache.regexp.internal.RE; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -31,10 +30,10 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.exception.RemotingException; /** diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 98a626c0..5b4d3181 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -135,8 +135,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -144,12 +144,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.interceptor.Interceptor; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.netty.ResponseFuture; -import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.RemotingSerializable; import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 769164bf..321f804d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -62,7 +62,6 @@ import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.filter.ExpressionType; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; @@ -75,11 +74,10 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.interceptor.Interceptor; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index ff2fb78b..72247949 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -147,7 +147,6 @@ public class DefaultMQPushConsumerTest { }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); - doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 3b1866e4..b1a8d419 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.common; -import com.alibaba.fastjson.parser.ParserConfig; -import com.alibaba.fastjson.util.TypeUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -31,12 +29,10 @@ import java.net.NetworkInterface; import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.Enumeration; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java index 58bbade9..57e9aa86 100644 --- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java @@ -37,14 +37,11 @@ public class FlowControlConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private String flowControlFileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + private static final String DEFAULT_FLOW_CONTROL_FILE = "conf/flowControl.yml"; private String flowControlFileName = System.getProperty("rocketmq.flow.control.file", DEFAULT_FLOW_CONTROL_FILE); - private List rules; - - public static final String defaultResourceName = "overallFlowControl"; - private Map>> plainFlowControlRules; public FlowControlConfig() { diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java index 8735ff96..c286c873 100644 --- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.common.flowcontrol; -import org.apache.rocketmq.remoting.interceptor.Interceptor; - public class FlowControlRule { private String flowControlResourceName; private Integer flowControlGrade; diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index ce2b83f9..1ba42ea3 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -24,7 +24,6 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -33,8 +32,8 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index b2609c80..b87aada2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -20,7 +20,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.remoting.common.RemotingHelper; /** diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index a7da99f5..e3136fa9 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -295,10 +295,9 @@ public class DefaultRequestProcessor implements RequestProcessor { public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); - final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); - log.info("requestHeader: " + requestHeader); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java index e0cdd959..77f387c4 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java @@ -39,22 +39,28 @@ public class BrokerHousekeepingService implements ChannelEventListener { @Override public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) { - NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; - Channel channel = nettyChannel.getChannel(); - this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + if (remotingChannel != null) { + NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; + Channel channel = nettyChannel.getChannel(); + this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + } } @Override public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) { - NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; - Channel channel = nettyChannel.getChannel(); - this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + if (remotingChannel != null) { + NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; + Channel channel = nettyChannel.getChannel(); + this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + } } @Override public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) { - NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; - Channel channel = nettyChannel.getChannel(); - this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + if (remotingChannel != null) { + NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel; + Channel channel = nettyChannel.getChannel(); + this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); + } } } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java index 8cd75f89..a2609ee7 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; import org.apache.rocketmq.common.protocol.RequestCode; @@ -37,11 +36,12 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.assertj.core.util.Maps; import org.junit.Before; @@ -91,8 +91,10 @@ public class DefaultRequestProcessorTest { request.addExtField("namespace", "namespace"); request.addExtField("key", "key"); request.addExtField("value", "value"); - - RemotingCommand response = defaultRequestProcessor.processRequest(null, request); + NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class); + ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); + when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext); + RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getRemark()).isNull(); @@ -111,7 +113,10 @@ public class DefaultRequestProcessorTest { request.addExtField("namespace", "namespace"); request.addExtField("key", "key"); - RemotingCommand response = defaultRequestProcessor.processRequest(null, request); + NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class); + ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); + when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext); + RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getRemark()).isNull(); @@ -130,7 +135,10 @@ public class DefaultRequestProcessorTest { request.addExtField("namespace", "namespace"); request.addExtField("key", "key"); - RemotingCommand response = defaultRequestProcessor.processRequest(null, request); + NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class); + ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); + when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext); + RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request); assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND); assertThat(response.getRemark()).isEqualTo("No config item, Namespace: namespace Key: key"); @@ -151,8 +159,11 @@ public class DefaultRequestProcessorTest { request.addExtField("namespace", "namespace"); request.addExtField("key", "key"); - RemotingCommand response = defaultRequestProcessor.processRequest(null, request); - + NettyChannelHandlerContextImpl remotingChannel = mock(NettyChannelHandlerContextImpl.class); + ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); + when(remotingChannel.getChannelHandlerContext()).thenReturn(channelHandlerContext); + RemotingCommand response = defaultRequestProcessor.processRequest(remotingChannel, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); assertThat(response.getRemark()).isNull(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java index c0b8558d..a0420656 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java @@ -21,7 +21,6 @@ import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface RemotingServer extends RemotingService { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java index a00ef052..f1746fab 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java @@ -99,7 +99,7 @@ public class NettyChannelHandlerContextImpl implements RemotingChannel { final NettyChannelHandlerContextImpl that = (NettyChannelHandlerContextImpl) o; - return channelHandlerContext != null ? channelHandlerContext.equals(that.channelHandlerContext) : that.channelHandlerContext == null; + return channelHandlerContext.channel() != null ? channelHandlerContext.channel().equals(that.channelHandlerContext.channel()) : that.channelHandlerContext.channel() == null; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java index fe33a522..90f2b31d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java @@ -32,7 +32,7 @@ public class NettyChannelImpl implements RemotingChannel { private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING); - private final io.netty.channel.Channel channel; + private final Channel channel; public NettyChannelImpl(Channel channel) { this.channel = channel; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java index 4aa5d689..761d2bfb 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.remoting.transport.http2; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -41,26 +40,21 @@ import javax.net.ssl.SSLException; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.common.Pair; +import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.interceptor.Interceptor; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; -import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker; -import org.apache.rocketmq.remoting.netty.NettyChannelImpl; -import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; -import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder; -import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract; +import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; +import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder; import org.apache.rocketmq.remoting.util.ThreadUtils; public class Http2ClientImpl extends NettyRemotingClientAbstract implements RemotingClient { @@ -248,7 +242,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo @Override public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { - executor = (executor == null ? this.publicExecutor : executor); + executor = executor == null ? this.publicExecutor : executor; registerNettyProcessor(requestCode, processor, executor); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java index d86e4caa..77913922 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java @@ -48,24 +48,22 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingServer; +import org.apache.rocketmq.remoting.RequestProcessor; +import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.interceptor.Interceptor; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; -import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; -import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder; -import org.apache.rocketmq.remoting.RequestProcessor; -import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract; +import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; +import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder; import org.apache.rocketmq.remoting.util.JvmUtils; import org.apache.rocketmq.remoting.util.ThreadUtils; @@ -139,7 +137,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo @Override public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { - executor = (executor == null ? this.publicExecutor : executor); + executor = executor == null ? this.publicExecutor : executor; registerNettyProcessor(requestCode, processor, executor); } @@ -204,25 +202,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo public void start() { super.start(); final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - this.serverBootstrap.group(this.bossGroup, this.ioGroup). - channel(socketChannelClass).childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - channels.add(ch); - - ChannelPipeline cp = ch.pipeline(); - cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels)); - cp.addLast(workerGroup, - Http2Handler.newHandler(true), - new NettyEncoder(), - new NettyDecoder(), - new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(), - serverConfig.getConnectionChannelWriterIdleSeconds(), - serverConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnectManageHandler(), - new NettyServerHandler()); - } - }); + this.serverBootstrap.group(this.bossGroup, this.ioGroup).channel(socketChannelClass).childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + channels.add(ch); + ChannelPipeline cp = ch.pipeline(); + cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels)); + cp.addLast(workerGroup, + Http2Handler.newHandler(true), + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(), + serverConfig.getConnectionChannelWriterIdleSeconds(), + serverConfig.getServerChannelMaxIdleTimeSeconds()), + new NettyConnectManageHandler(), + new NettyServerHandler()); + } + }); applyOptions(serverBootstrap); ChannelFuture channelFuture = this.serverBootstrap.bind(this.port).syncUninterruptibly(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java index 46581474..b1616385 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java @@ -36,10 +36,11 @@ import java.util.concurrent.ExecutorService; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; +import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; @@ -47,14 +48,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.ClientConfig; -import org.apache.rocketmq.remoting.RequestProcessor; -import org.apache.rocketmq.remoting.interceptor.ExceptionContext; -import org.apache.rocketmq.remoting.interceptor.Interceptor; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; -import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker; -import org.apache.rocketmq.remoting.interceptor.RequestContext; -import org.apache.rocketmq.remoting.interceptor.ResponseContext; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.TlsHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java index ec02a28d..fa3673c2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java @@ -52,7 +52,6 @@ import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.remoting.interceptor.Interceptor; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.netty.FileRegionEncoder; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; @@ -222,7 +221,7 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements @Override public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { - executor = (executor == null ? this.publicExecutor : executor); + executor = executor == null ? this.publicExecutor : executor; registerNettyProcessor(requestCode, processor, executor); } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java index 13bccca9..3c5b9f06 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java @@ -31,7 +31,7 @@ public class NettyRemotingClientTest { private NettyRemotingClient remotingClient = new NettyRemotingClient(new ClientConfig()); @Test - public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException { + public void testSetCallbackExecutor() { ExecutorService customized = Executors.newCachedThreadPool(); remotingClient.setCallbackExecutor(customized); diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java index 243d7f3a..c9bf0b87 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java @@ -109,7 +109,7 @@ public class RemotingCommandTest { @Test public void testNotNullField() throws Exception { RemotingCommand remotingCommand = new RemotingCommand(); - Method method = RemotingCommand.class.getDeclaredMethod("isFieldNullable", Field.class); + Method method = CodecHelper.class.getDeclaredMethod("isFieldNullable", Field.class); method.setAccessible(true); Field nullString = FieldTestClass.class.getDeclaredField("nullString"); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java b/snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java index df74e930..90a5e993 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java @@ -20,7 +20,9 @@ import java.util.List; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.snode.SnodeController; -//TODO Filter implementation +/** + * TODO Filter + */ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener { private final SnodeController snodeController; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java index 65838f3b..ea1fbfae 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java @@ -15,12 +15,13 @@ * limitations under the License. */ package org.apache.rocketmq.snode.constant; + public class SnodeConstant { - public static final long heartbeatTimeout = 3000; + public static final long HEARTBEAT_TIME_OUT = 3000; - public static final long oneWaytimeout = 10; + public static final long ONE_WAY_TIMEOUT = 10; - public static final long defaultTimeoutMills = 3000L; + public static final long DEFAULT_TIMEOUT_MILLS = 3000L; public static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java index 90886941..956e79f5 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.snode.flowcontrol; -import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 5bdae5cc..d66c027b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -61,7 +61,7 @@ public class HeartbeatProcessor implements RequestProcessor { private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) { HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( - new NettyChannelImpl((((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel())), + new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel()), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index 09cd90bc..f9f6be22 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -54,7 +54,6 @@ public class SendMessageProcessor implements RequestProcessor { boolean isSendBack = false; if (request.getCode() == RequestCode.SEND_MESSAGE_V2) { sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); - System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2); enodeName = sendMessageRequestHeaderV2.getN(); } else { isSendBack = true; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java index abbddd71..7a709976 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java @@ -29,13 +29,10 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; -import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader; -import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.logging.InternalLogger; @@ -71,7 +68,7 @@ public class EnodeServiceImpl implements EnodeService { String enodeAddr = entry.getValue().get(MixAll.MASTER_ID); if (enodeAddr != null) { try { - this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills); + this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } catch (Exception ex) { log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex); } @@ -114,7 +111,7 @@ public class EnodeServiceImpl implements EnodeService { CompletableFuture future = new CompletableFuture<>(); try { String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); - this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> { + this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS, (responseFuture) -> { future.complete(responseFuture.getResponseCommand()); }); } catch (Exception ex) { @@ -139,7 +136,7 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { - this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout); + this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.ONE_WAY_TIMEOUT); } catch (Exception e) { log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e); } @@ -164,7 +161,7 @@ public class EnodeServiceImpl implements EnodeService { public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { synchronized (this) { - ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills); + ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.DEFAULT_TIMEOUT_MILLS); if (clusterInfo != null) { HashMap> enodeAddress = clusterInfo.getClusterAddrTable(); for (Map.Entry> entry : enodeAddress.entrySet()) { @@ -189,7 +186,7 @@ public class EnodeServiceImpl implements EnodeService { String enodeAddress = entry.getValue().get(MixAll.MASTER_ID); try { RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress, - request, SnodeConstant.defaultTimeoutMills); + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); if (response != null && response.getCode() == ResponseCode.SUCCESS) { persist = true; } else { @@ -215,7 +212,7 @@ public class EnodeServiceImpl implements EnodeService { requestHeader.setCommitOffset(offset); requestHeader.setEnodeName(enodeName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); - this.snodeController.getRemotingClient().invokeOneway(address, request, SnodeConstant.defaultTimeoutMills); + this.snodeController.getRemotingClient().invokeOneway(address, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } catch (Exception ex) { log.error("Persist offset to Enode error!"); } @@ -231,7 +228,7 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), - request, SnodeConstant.defaultTimeoutMills); + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } @Override @@ -246,7 +243,7 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr), - request, SnodeConstant.defaultTimeoutMills); + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } @Override @@ -254,7 +251,7 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException { String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), - request, SnodeConstant.defaultTimeoutMills); + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } @Override @@ -262,7 +259,7 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr), - request, SnodeConstant.defaultTimeoutMills); + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } @Override @@ -279,6 +276,6 @@ public class EnodeServiceImpl implements EnodeService { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); return this.snodeController.getRemotingClient().invokeSync(address, - request, SnodeConstant.defaultTimeoutMills); + request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index e1c20170..8853b1fa 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -66,7 +66,7 @@ public class NnodeServiceImpl implements NnodeService { if (nnodeAddressList != null && nnodeAddressList.size() > 0) { for (String nodeAddress : nnodeAddressList) { try { - this.snodeController.getRemotingClient().invokeSync(nodeAddress, remotingCommand, SnodeConstant.heartbeatTimeout); + this.snodeController.getRemotingClient().invokeSync(nodeAddress, remotingCommand, SnodeConstant.HEARTBEAT_TIME_OUT); } catch (Exception ex) { log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex); } @@ -93,7 +93,7 @@ public class NnodeServiceImpl implements NnodeService { requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); - RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills); + RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); log.info("GetTopicRouteInfoFromNameServer response: " + response); assert response != null; switch (response.getCode()) { @@ -167,7 +167,7 @@ public class NnodeServiceImpl implements NnodeService { RemotingSendRequestException, RemotingConnectException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); - RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills); + RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.DEFAULT_TIMEOUT_MILLS); switch (response.getCode()) { case ResponseCode.SUCCESS: { ClusterInfo clusterInfo = ClusterInfo.decode(response.getBody(), ClusterInfo.class); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 1ea56c16..8a42b1cb 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -90,7 +90,7 @@ public class PushServiceImpl implements PushService { RemotingChannel remotingChannel = clientChannelInfoEntry.getValue().getChannel(); if (remotingChannel.isWritable()) { log.warn("Push message to topic: {} queueId: {} consumer group:{}, message:{}", topic, queueId, clientChannelInfoEntry.getKey(), pushMessage); - snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills); + snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS); } } } else { -- GitLab