diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index c8c7aa78d8bf7a95fd22d42992b83265a82cbfb8..a550ecc59dff62354c58948d93fc6c968fbfdba4 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.snode.client; -import io.netty.channel.Channel; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -27,8 +26,9 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.netty.NettyChannelImpl; - +/** + * TODO Refactor housekeeping service + */ public class ClientHousekeepingService implements ChannelEventListener { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ProducerManager producerManager; @@ -74,7 +74,7 @@ public class ClientHousekeepingService implements ChannelEventListener { public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) { log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel); - this.producerManager.doChannelCloseEvent(remoteAddr, remotingChannel); + this.consumerManager.doChannelCloseEvent(remoteAddr, remotingChannel); } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java index c8140c59f1646e3b3319f341a1530dda13680bcb..28a67dc41213f1f787ae46ebc12f88cd2be9f38b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java @@ -37,8 +37,8 @@ public class ConsumerGroupInfo { private final String groupName; private final ConcurrentMap subscriptionTable = new ConcurrentHashMap<>(); - private final ConcurrentMap channelInfoTable = - new ConcurrentHashMap<>(16); + private final ConcurrentMap channelInfoTable = new ConcurrentHashMap<>(16); + private ConcurrentHashMap> channelSubscriptionTable = new ConcurrentHashMap<>(2048); private volatile ConsumeType consumeType; @@ -129,7 +129,10 @@ public class ConsumerGroupInfo { } public void removeChannelSubscription(final RemotingChannel remotingChannel) { - this.channelSubscriptionTable.remove(remotingChannel); + Set subscriptionDataSet = this.channelSubscriptionTable.remove(remotingChannel); + if (subscriptionDataSet != null) { + log.debug("Unregister a push session[{}] from consumerGroupInfo {}", this.groupName, subscriptionDataSet); + } } public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java index dc3bb028e31ca69ee023fa4eb9e3196967279f52..11d9eee263447bb77fabe724235f7b756c6da9ed 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java @@ -32,15 +32,19 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; +/** + * TODO Refactor this manager + */ public class ConsumerManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; - private final ConcurrentMap consumerTable = - new ConcurrentHashMap<>(1024); + + private final ConcurrentMap consumerTable = new ConcurrentHashMap<>(1024); + + private final ConcurrentHashMap>> topicConsumerTable = new ConcurrentHashMap<>(2048); private final ConsumerIdsChangeListener consumerIdsChangeListener; - private final ConcurrentHashMap> topicConsumerTable = new ConcurrentHashMap<>(2048); + private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) { this.consumerIdsChangeListener = consumerIdsChangeListener; @@ -68,10 +72,10 @@ public class ConsumerManager { return 0; } - private void removePushSession(final ConsumerGroupInfo info, final RemotingChannel channel) { + private void clearPushSession(final String consumerGroup, final ConsumerGroupInfo info, + final RemotingChannel channel) { Set subscriptionDataSet = info.getSubscriotionDataSet(channel); - removeConsumerTopicTable(subscriptionDataSet, channel); - + removeConsumerTopicTable(consumerGroup, subscriptionDataSet, channel); } public void doChannelCloseEvent(final String remoteAddr, final RemotingChannel channel) { @@ -79,7 +83,7 @@ public class ConsumerManager { while (it.hasNext()) { Entry next = it.next(); ConsumerGroupInfo info = next.getValue(); - removePushSession(info, channel); + clearPushSession(info.getGroupName(), info, channel); boolean removed = info.doChannelCloseEvent(remoteAddr, channel); if (removed) { if (info.getChannelInfoTable().isEmpty()) { @@ -105,9 +109,9 @@ public class ConsumerManager { consumerGroupInfo = prev != null ? prev : tmp; } - boolean r1 = - consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, - consumeFromWhere); + boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, + consumeFromWhere); + boolean r2 = consumerGroupInfo.updateSubscription(subList); consumerGroupInfo.updateChannelSubscription(clientChannelInfo, subList); @@ -128,7 +132,7 @@ public class ConsumerManager { if (null != consumerGroupInfo) { consumerGroupInfo.unregisterChannel(clientChannelInfo); consumerGroupInfo.removeChannelSubscription(clientChannelInfo.getChannel()); - removePushSession(consumerGroupInfo, clientChannelInfo.getChannel()); + clearPushSession(group, consumerGroupInfo, clientChannelInfo.getChannel()); if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { @@ -189,46 +193,67 @@ public class ConsumerManager { return groups; } - public void updateTopicConsumerTable(Set subscriptionDataSet, + public void registerPushSession(String consumerGroup, Set subscriptionDataSet, ClientChannelInfo clientChannelInfo) { - for (SubscriptionData subscriptionData : subscriptionDataSet) { - String topic = subscriptionData.getTopic(); - for (Integer queueId : subscriptionData.getQueueIdSet()) { - ConcurrentHashMap clientChannelInfoMap = this.topicConsumerTable.get(topic); - if (clientChannelInfoMap == null) { - clientChannelInfoMap = new ConcurrentHashMap<>(); - ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap); - if (prev != null) { - clientChannelInfoMap = prev; + if (clientChannelInfo != null) { + for (SubscriptionData subscriptionData : subscriptionDataSet) { + String topic = subscriptionData.getTopic(); + for (Integer queueId : subscriptionData.getQueueIdSet()) { + ConcurrentHashMap> clientChannelInfoMap = this.topicConsumerTable.get(topic); + if (clientChannelInfoMap == null) { + clientChannelInfoMap = new ConcurrentHashMap<>(); + ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap); + if (prev != null) { + clientChannelInfoMap = prev; + } } + log.info("Register push for consumer group: {} topic: {}, queueId: {}", consumerGroup, topic, queueId); + ConcurrentHashMap consumerGroupChannelTable = clientChannelInfoMap.get(queueId); + if (consumerGroupChannelTable == null) { + consumerGroupChannelTable = new ConcurrentHashMap<>(); + ConcurrentHashMap preMap = clientChannelInfoMap.putIfAbsent(queueId, consumerGroupChannelTable); + if (preMap != null) { + consumerGroupChannelTable = preMap; + } + } + consumerGroupChannelTable.putIfAbsent(consumerGroup, clientChannelInfo); + clientChannelInfoMap.put(queueId, consumerGroupChannelTable); } - clientChannelInfoMap.put(queueId, clientChannelInfo); } } } - public ClientChannelInfo getClientInfoTable(String topic, Integer queueId) { - ConcurrentHashMap clientChannelInfoMap = this.topicConsumerTable.get(topic); + public ConcurrentHashMap getClientInfoTable(String topic, Integer queueId) { + ConcurrentHashMap> clientChannelInfoMap = this.topicConsumerTable.get(topic); if (clientChannelInfoMap != null) { return clientChannelInfoMap.get(queueId); } return null; } - public void removeConsumerTopicTable(Set subscriptionDataSet, + public void removeConsumerTopicTable(String consumerGroup, Set subscriptionDataSet, RemotingChannel remotingChannel) { - for (SubscriptionData subscriptionData : subscriptionDataSet) { - String topic = subscriptionData.getTopic(); - for (Integer queueId : subscriptionData.getQueueIdSet()) { - ConcurrentHashMap clientChannelInfoMap = this.topicConsumerTable.get(topic); - if (clientChannelInfoMap != null) { - ClientChannelInfo old = clientChannelInfoMap.get(queueId); - if (old != null && old.getChannel() == remotingChannel) { - clientChannelInfoMap.remove(queueId, old); + if (subscriptionDataSet != null) { + for (SubscriptionData subscriptionData : subscriptionDataSet) { + String topic = subscriptionData.getTopic(); + for (Integer queueId : subscriptionData.getQueueIdSet()) { + ConcurrentHashMap> clientChannelInfoMap = this.topicConsumerTable.get(topic); + if (clientChannelInfoMap != null) { + ConcurrentHashMap queueConsumerGroupMap = clientChannelInfoMap.get(queueId); + if (queueConsumerGroupMap != null) { + ClientChannelInfo clientChannelInfo = queueConsumerGroupMap.get(consumerGroup); + if (clientChannelInfo.getChannel().equals(remotingChannel)) { + log.info("Remove push topic: {}, queueId: {}, consumerGroup:{} session", topic, queueId, consumerGroup); + queueConsumerGroupMap.remove(consumerGroup, clientChannelInfo); + } + } + if (clientChannelInfoMap.isEmpty()) { + log.info("All consumer offline, so remove this map"); + this.topicConsumerTable.remove(topic, clientChannelInfoMap); + } } } } } } - } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 2ea53eabed3eb97c1c7505e93e5e3f0441bd337a..5bdae5cc8a66e01c395717ae0fa2055eb86c28f8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -101,7 +101,7 @@ public class HeartbeatProcessor implements RequestProcessor { } if (data.isRealPushEnable()) { - this.snodeController.getConsumerManager().updateTopicConsumerTable(data.getSubscriptionDataSet(), clientChannelInfo); + this.snodeController.getConsumerManager().registerPushSession(data.getGroupName(), data.getSubscriptionDataSet(), clientChannelInfo); } } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 0476a6b286491b895d0e074c5cf6c50f8e27e4af..1ea56c16b0e2219daef57d2ba2430e4625175a15 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -16,7 +16,11 @@ */ package org.apache.rocketmq.snode.service.impl; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,15 +80,19 @@ public class PushServiceImpl implements PushService { pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset()); pushMessageHeader.setTopic(topic); pushMessageHeader.setQueueId(queueId); - RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class); + RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader); pushMessage.setBody(message); - pushMessage.setCustomHeader(pushMessageHeader); - pushMessage.setCode(RequestCode.SNODE_PUSH_MESSAGE); - ClientChannelInfo clientChannelInfo = snodeController.getConsumerManager().getClientInfoTable(topic, queueId); - if (clientChannelInfo != null) { - log.warn("Push message to topic: {} queueId: {}, message:{}", topic, queueId, pushMessage); - RemotingChannel remotingChannel = clientChannelInfo.getChannel(); - snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills); + ConcurrentHashMap consumerGroupTable = snodeController.getConsumerManager().getClientInfoTable(topic, queueId); + if (consumerGroupTable != null) { + Iterator> itChannel = consumerGroupTable.entrySet().iterator(); + while (itChannel.hasNext()) { + Entry clientChannelInfoEntry = itChannel.next(); + RemotingChannel remotingChannel = clientChannelInfoEntry.getValue().getChannel(); + if (remotingChannel.isWritable()) { + log.warn("Push message to topic: {} queueId: {} consumer group:{}, message:{}", topic, queueId, clientChannelInfoEntry.getKey(), pushMessage); + snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills); + } + } } else { log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId); } @@ -115,8 +123,8 @@ public class PushServiceImpl implements PushService { @Override public void pushMessage(final String topic, final Integer queueId, final byte[] message, final RemotingCommand response) { - ClientChannelInfo clientChannelInfo = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId); - if (clientChannelInfo != null) { + ConcurrentHashMap clientChannelInfoTable = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId); + if (clientChannelInfoTable != null) { PushTask pushTask = new PushTask(topic, queueId, message, response); pushMessageExecutorService.submit(pushTask); } else {