From cb90409ee06534807c99dc448ef907b76d421be4 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 7 Feb 2019 22:30:47 +0800 Subject: [PATCH] Fix connection closed but not clean session issue --- .../store/RemoteBrokerOffsetStore.java | 10 +- .../client/exception/MQSnodeException.java | 5 +- .../consumer/DefaultMQPushConsumerImpl.java | 4 +- .../client/impl/factory/MQClientInstance.java | 2 - .../rocketmq/snode/SnodeController.java | 185 +++++++++--------- .../client/ClientHousekeepingService.java | 19 +- .../rocketmq/snode/client/ClientManager.java | 3 +- .../snode/client/impl/ClientManagerImpl.java | 68 ++++--- .../processor/ConsumerManageProcessor.java | 6 +- .../snode/service/impl/NnodeServiceImpl.java | 2 +- .../snode/service/NnodeServiceImplTest.java | 2 +- 11 files changed, 157 insertions(+), 149 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 77523935..d7ec1693 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; @@ -193,6 +192,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); } + private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); @@ -206,9 +206,9 @@ public class RemoteBrokerOffsetStore implements OffsetStore { MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - if (null == snodeAddr){ + if (null == snodeAddr) { this.mQClientFactory.updateSnodeInfoFromNameServer(); - snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } if (snodeAddr != null) { @@ -233,9 +233,9 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); - if (null == snodeAddr){ + if (null == snodeAddr) { this.mQClientFactory.updateSnodeInfoFromNameServer(); - snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); + snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); } if (snodeAddr != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java index 440e5779..b8c804e9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java +++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQSnodeException.java @@ -16,12 +16,9 @@ */ package org.apache.rocketmq.client.exception; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.help.FAQUrl; - public class MQSnodeException extends MQBrokerException { public MQSnodeException(int responseCode, String errorMessage) { - super(responseCode,errorMessage); + super(responseCode, errorMessage); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index ed670778..82e63c06 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -56,7 +56,6 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -72,8 +71,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultMQPushConsumerImpl implements MQConsumerInner { @@ -1138,6 +1137,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.consumeMessageService = consumeMessageService; } + private void tryToFindSnodePublishInfo() { this.mQClientFactory.updateSnodeInfoFromNameServer(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f881d981..6c2be58e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.common.ThreadLocalIndex; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.ClientRemotingProcessor; import org.apache.rocketmq.client.impl.FindBrokerResult; @@ -80,7 +79,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 43b0b5f6..18fc905f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -86,7 +86,7 @@ import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; public class SnodeController { private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.SNODE_LOGGER_NAME); + .getLogger(LoggerName.SNODE_LOGGER_NAME); private final SnodeConfig snodeConfig; private final ServerConfig nettyServerConfig; @@ -124,12 +124,12 @@ public class SnodeController { private SlowConsumerService slowConsumerService; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "SnodeControllerScheduledThread")); + .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "SnodeControllerScheduledThread")); public SnodeController(ServerConfig nettyServerConfig, - ClientConfig nettyClientConfig, - SnodeConfig snodeConfig) { + ClientConfig nettyClientConfig, + SnodeConfig snodeConfig) { this.nettyClientConfig = nettyClientConfig; this.nettyServerConfig = nettyServerConfig; this.snodeConfig = snodeConfig; @@ -137,69 +137,69 @@ public class SnodeController { this.nnodeService = new NnodeServiceImpl(this); this.scheduledService = new ScheduledServiceImpl(this); this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient() - .init(this.getNettyClientConfig(), null); + .init(this.getNettyClientConfig(), null); this.mqttRemotingClient = RemotingClientFactory.getInstance() - .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) - .init(this.getNettyClientConfig(), null); + .createRemotingClient(RemotingUtil.MQTT_PROTOCOL) + .init(this.getNettyClientConfig(), null); this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodeSendMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodeSendMessageThread", + false); this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodePullMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodePullMessageThread", + false); this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHeartBeatCorePoolSize(), - snodeConfig.getSnodeHeartBeatMaxPoolSize(), - 1000 * 60, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), - "SnodeHeartbeatThread", - true); + snodeConfig.getSnodeHeartBeatCorePoolSize(), + snodeConfig.getSnodeHeartBeatMaxPoolSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()), + "SnodeHeartbeatThread", + true); this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "SnodePullMessageThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "SnodePullMessageThread", + false); this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeSendMessageMinPoolSize(), - snodeConfig.getSnodeSendMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), - "ConsumerManagerThread", - false); + snodeConfig.getSnodeSendMessageMinPoolSize(), + snodeConfig.getSnodeSendMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()), + "ConsumerManagerThread", + false); this.handleMqttMessageExecutor = ThreadUtils.newThreadPoolExecutor( - snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), - snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), - 3000, - TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), - "SnodeHandleMqttMessageThread", - false); + snodeConfig.getSnodeHandleMqttMessageMinPoolSize(), + snodeConfig.getSnodeHandleMqttMessageMaxPoolSize(), + 3000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(snodeConfig.getSnodeHandleMqttThreadPoolQueueCapacity()), + "SnodeHandleMqttMessageThread", + false); if (this.snodeConfig.getNamesrvAddr() != null) { this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", - this.snodeConfig.getNamesrvAddr()); + this.snodeConfig.getNamesrvAddr()); } this.subscriptionGroupManager = new SubscriptionGroupManager(this); @@ -216,7 +216,7 @@ public class SnodeController { this.consumerManager = new ConsumerManagerImpl(this); this.iotClientManager = new IOTClientManagerImpl(this); this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, - this.consumerManager, this.iotClientManager); + this.consumerManager, this.iotClientManager); this.slowConsumerService = new SlowConsumerServiceImpl(this); } @@ -226,7 +226,7 @@ public class SnodeController { private void initRemotingServerInterceptorGroup() { List remotingServerInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); + .loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) { if (this.remotingServerInterceptorGroup == null) { this.remotingServerInterceptorGroup = new InterceptorGroup(); @@ -234,17 +234,17 @@ public class SnodeController { for (Interceptor interceptor : remotingServerInterceptors) { this.remotingServerInterceptorGroup.registerInterceptor(interceptor); log.warn("Remoting server interceptor: {} registered!", - interceptor.interceptorName()); + interceptor.interceptorName()); } } } public boolean initialize() { this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer() - .init(this.nettyServerConfig, this.clientHousekeepingService); + .init(this.nettyServerConfig, this.clientHousekeepingService); this.mqttRemotingServer = RemotingServerFactory.getInstance().createRemotingServer( - RemotingUtil.MQTT_PROTOCOL) - .init(this.nettyServerConfig, this.clientHousekeepingService); + RemotingUtil.MQTT_PROTOCOL) + .init(this.nettyServerConfig, this.clientHousekeepingService); this.registerProcessor(); initSnodeInterceptorGroup(); initRemotingServerInterceptorGroup(); @@ -262,7 +262,7 @@ public class SnodeController { } List accessValidators = ServiceProvider - .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); + .loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The snode dose not load the AccessValidator"); return; @@ -282,7 +282,7 @@ public class SnodeController { //Do not catch the exception RemotingCommand request = requestContext.getRequest(); String remoteAddr = RemotingUtil.socketAddress2IpString( - requestContext.getRemotingChannel().remoteAddress()); + requestContext.getRemotingChannel().remoteAddress()); validator.validate(validator.parse(request, remoteAddr)); } @@ -300,17 +300,17 @@ public class SnodeController { private void initSnodeInterceptorGroup() { List consumeMessageInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); + .loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { this.consumeMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : consumeMessageInterceptors) { this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); log.warn("Consume message interceptor: {} registered!", - interceptor.interceptorName()); + interceptor.interceptorName()); } } List sendMessageInterceptors = InterceptorFactory.getInstance() - .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); + .loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) { this.sendMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : sendMessageInterceptors) { @@ -323,55 +323,60 @@ public class SnodeController { public void registerProcessor() { this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, - this.sendMessageExecutor); + this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, - this.sendMessageExecutor); + this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, - this.heartbeatExecutor); + this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, - this.pullMessageExecutor); + this.pullMessageExecutor); this.snodeServer - .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); - this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, + .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); + this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, + this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, - this.consumerManageExecutor); + this.consumerManageExecutor); this.snodeServer - .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, - this.consumerManageExecutor); + .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, + this.consumerManageExecutor); this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, - defaultMqttMessageProcessor, handleMqttMessageExecutor); + defaultMqttMessageProcessor, handleMqttMessageExecutor); + + defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.CONNECT, - new MqttConnectMessageHandler(this)); + new MqttConnectMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.DISCONNECT, - new MqttDisconnectMessageHandler(this)); + new MqttDisconnectMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PINGREQ, - new MqttPingreqMessageHandler(this)); + new MqttPingreqMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBLISH, - new MqttPublishMessageHandler(this)); + new MqttPublishMessageHandler(this)); defaultMqttMessageProcessor - .registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); + .registerMessageHanlder(MqttMessageType.PUBACK, new MqttPubackMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.PUBCOMP, - new MqttPubcompMessageHandler(this)); + new MqttPubcompMessageHandler(this)); defaultMqttMessageProcessor - .registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); + .registerMessageHanlder(MqttMessageType.PUBREC, new MqttPubrecMessageHandler(this)); defaultMqttMessageProcessor - .registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); + .registerMessageHanlder(MqttMessageType.PUBREL, new MqttPubrelMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.SUBSCRIBE, - new MqttSubscribeMessageHandler(this)); + new MqttSubscribeMessageHandler(this)); defaultMqttMessageProcessor.registerMessageHanlder(MqttMessageType.UNSUBSCRIBE, - new MqttUnsubscribeMessagHandler(this)); + new MqttUnsubscribeMessagHandler(this)); + + + } public void start() { @@ -496,7 +501,7 @@ public class SnodeController { } public void setRemotingServerInterceptorGroup( - InterceptorGroup remotingServerInterceptorGroup) { + InterceptorGroup remotingServerInterceptorGroup) { this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 1db3f9ce..ceba1fc1 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -25,7 +25,6 @@ import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; -import org.apache.rocketmq.snode.client.impl.ClientRole; import org.apache.rocketmq.snode.constant.SnodeConstant; public class ClientHousekeepingService implements ChannelEventListener { @@ -53,15 +52,13 @@ public class ClientHousekeepingService implements ChannelEventListener { this.iotClientManager.shutdown(); } - private ClientRole clientRole(RemotingChannel remotingChannel) { + private Client getClient(RemotingChannel remotingChannel) { if (remotingChannel instanceof NettyChannelImpl) { Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); Attribute clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY); if (clientAttribute != null) { Client client = clientAttribute.get(); - if (client != null) { - return client.getClientRole(); - } + return client; } } log.warn("RemotingChannel type error: {}", remotingChannel.getClass()); @@ -69,17 +66,17 @@ public class ClientHousekeepingService implements ChannelEventListener { } private void closeChannel(String remoteAddress, RemotingChannel remotingChannel) { - ClientRole clientRole = clientRole(remotingChannel); - if (clientRole != null) { - switch (clientRole) { + Client client = getClient(remotingChannel); + if (client != null) { + switch (client.getClientRole()) { case Consumer: - this.consumerManager.onClose(remoteAddress, remotingChannel); + this.consumerManager.onClose(client.getGroups(), remotingChannel); return; case Producer: - this.producerManager.onClose(remoteAddress, remotingChannel); + this.producerManager.onClose(client.getGroups(), remotingChannel); return; case IOTCLIENT: - this.iotClientManager.onClose(remoteAddress, remotingChannel); + this.iotClientManager.onClose(client.getGroups(), remotingChannel); return; default: } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java index b0a1892a..600dd16e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.snode.client; import java.util.List; +import java.util.Set; import org.apache.rocketmq.remoting.RemotingChannel; public interface ClientManager { @@ -24,7 +25,7 @@ public interface ClientManager { void unRegister(String groupId, RemotingChannel remotingChannel); - void onClose(String groupId, RemotingChannel remotingChannel); + void onClose(Set groupId, RemotingChannel remotingChannel); List getChannels(String groupId); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java index 71b9824e..b9cba61d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/ClientManagerImpl.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -30,20 +31,22 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.snode.client.Client; import org.apache.rocketmq.snode.client.ClientManager; public abstract class ClientManagerImpl implements ClientManager { private static final InternalLogger log = InternalLoggerFactory - .getLogger(LoggerName.SNODE_LOGGER_NAME); + .getLogger(LoggerName.SNODE_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor( - new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); + .newSingleThreadScheduledExecutor( + new ThreadFactoryImpl("ClientHousekeepingScheduledThread")); private final ConcurrentHashMap> groupClientTable = new ConcurrentHashMap<>( - 1024); + 1024); public abstract void onClosed(String group, RemotingChannel remotingChannel); @@ -76,7 +79,7 @@ public abstract class ClientManagerImpl implements ClientManager { Map.Entry entry = (Map.Entry) iterator.next(); String group = (String) entry.getKey(); ConcurrentHashMap channelTable = (ConcurrentHashMap) entry - .getValue(); + .getValue(); Iterator iter = channelTable.entrySet().iterator(); while (iter.hasNext()) { Map.Entry channelTableEntry = (Map.Entry) iter.next(); @@ -86,14 +89,14 @@ public abstract class ClientManagerImpl implements ClientManager { iter.remove(); client.getRemotingChannel().close(); log.warn( - "SCAN: Remove expired channel from {}ClientTable. channel={}, group={}", - client.getClientRole(), - RemotingHelper.parseChannelRemoteAddr( - client.getRemotingChannel().remoteAddress()), group); + "SCAN: Remove expired channel from {}ClientTable. channel={}, group={}", + client.getClientRole(), + RemotingHelper.parseChannelRemoteAddr( + client.getRemotingChannel().remoteAddress()), group); if (channelTable.isEmpty()) { iterator.remove(); log.warn("SCAN: Remove group={} channel from {}ClientTable.", group, - client.getClientRole()); + client.getClientRole()); } } } @@ -104,37 +107,41 @@ public abstract class ClientManagerImpl implements ClientManager { public boolean register(String groupId, Client client) { boolean updated = false; if (client != null) { - ConcurrentHashMap channelTable = groupClientTable.get(groupId); + ConcurrentHashMap channelTable = this.groupClientTable.get(groupId); if (channelTable == null) { channelTable = new ConcurrentHashMap(); ConcurrentHashMap prev = groupClientTable.putIfAbsent(groupId, channelTable); channelTable = prev != null ? prev : channelTable; } - - Client oldClient = channelTable.get(client.getRemotingChannel()); + log.info("*********"); + RemotingChannel remotingChannel = client.getRemotingChannel(); + if (remotingChannel instanceof NettyChannelHandlerContextImpl) { + remotingChannel = new NettyChannelImpl(((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel()); + } + Client oldClient = channelTable.get(remotingChannel); if (oldClient == null) { - Client prev = channelTable.put(client.getRemotingChannel(), client); + Client prev = channelTable.put(remotingChannel, client); if (prev != null) { log.info("New client connected, group: {} {} {} channel: {}", groupId, - client.toString()); + client.toString()); updated = true; } oldClient = client; } else { if (!oldClient.getClientId().equals(client.getClientId())) { log.error( - "[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", - groupId, - oldClient.toString(), - channelTable.toString()); - channelTable.put(client.getRemotingChannel(), client); + "[BUG] client channel exist in snode, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", + groupId, + oldClient.toString(), + channelTable.toString()); + channelTable.put(remotingChannel, client); } } oldClient.setLastUpdateTimestamp(System.currentTimeMillis()); + onRegister(groupId, remotingChannel); } - log.debug("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId, - client.getLastUpdateTimestamp()); - onRegister(groupId, client.getRemotingChannel()); + log.info("Register client role: {}, group: {}, last: {}", client.getClientRole(), groupId, + client.getLastUpdateTimestamp()); return updated; } @@ -148,7 +155,7 @@ public abstract class ClientManagerImpl implements ClientManager { if (channelTable.isEmpty()) { groupClientTable.remove(groupId); log.info("Unregister client ok, no any connection, and remove consumer group, {}", - groupId); + groupId); } } } @@ -157,12 +164,15 @@ public abstract class ClientManagerImpl implements ClientManager { public void unRegister(String groupId, RemotingChannel remotingChannel) { removeClient(groupId, remotingChannel); onUnregister(groupId, remotingChannel); + } @Override - public void onClose(String groupId, RemotingChannel remotingChannel) { - removeClient(groupId, remotingChannel); - onClosed(groupId, remotingChannel); + public void onClose(Set groups, RemotingChannel remotingChannel) { + for (String groupId : groups) { + removeClient(groupId, remotingChannel); + onClosed(groupId, remotingChannel); + } } public List getChannels(String groupId) { @@ -184,7 +194,7 @@ public abstract class ClientManagerImpl implements ClientManager { Map channelClientMap = this.groupClientTable.get(groupId); if (channelClientMap != null) { Iterator> it = channelClientMap.entrySet() - .iterator(); + .iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); Client client = entry.getValue(); @@ -201,7 +211,7 @@ public abstract class ClientManagerImpl implements ClientManager { return null; } ConcurrentHashMap channelClientMap = groupClientTable - .get(groupId); + .get(groupId); return channelClientMap.get(remotingChannel); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java index 4cd54d6b..a3e29c0c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java @@ -130,9 +130,9 @@ public class ConsumerManageProcessor implements RequestProcessor { if (!clientIds.isEmpty()) { GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody(); body.setConsumerIdList(clientIds); - response.setBody(body.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); + response.setBody(body.encode()); return response; } else { log.warn("GetAllClientId failed, {} {}", requestHeader.getConsumerGroup(), @@ -163,11 +163,11 @@ public class ConsumerManageProcessor implements RequestProcessor { RemotingSendRequestException, RemotingConnectException, RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class); - final QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); final QueryConsumerOffsetRequestHeader requestHeader = (QueryConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class); + final QueryConsumerOffsetResponseHeader responseHeader = + (QueryConsumerOffsetResponseHeader) response.readCustomHeader(); long offset = this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(), diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index d593613b..8dcdf0c6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService { } @Override - public void registerSnode(SnodeConfig snodeConfig) throws Exception{ + public void registerSnode(SnodeConfig snodeConfig) throws Exception { List nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList(); RemotingCommand remotingCommand = new RemotingCommand(); RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader(); diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java index 45a19dd8..26c528ab 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java @@ -60,7 +60,7 @@ public class NnodeServiceImplTest extends SnodeTestBase { } @Test - public void registerSnodeSuccessTest() throws InterruptedException, RemotingConnectException, + public void registerSnodeTest() throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { when(snodeController.getRemotingClient().getNameServerAddressList()).thenReturn(createNnodeList()); when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse()); -- GitLab