From 1dde4fe7f297474e5cf6be945ad5fac82f99ed04 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Tue, 19 Feb 2019 21:40:23 +0800 Subject: [PATCH] Polish rebalance process in real push mode --- .../rocketmq/client/impl/consumer/RebalancePushImpl.java | 7 +------ .../snode/client/impl/SubscriptionManagerImpl.java | 5 ++++- .../rocketmq/snode/processor/HeartbeatProcessor.java | 3 +-- .../rocketmq/snode/processor/SendMessageProcessor.java | 1 - 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index a8bbaa55..2c2f014a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -59,11 +58,7 @@ public class RebalancePushImpl extends RebalanceImpl { log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); - Set queueIdSet = new HashSet(); - for (MessageQueue messageQueue : mqAll) { - queueIdSet.add(messageQueue); - } - subscriptionData.setMessageQueueSet(queueIdSet); + subscriptionData.setMessageQueueSet(mqDivided); int currentQueueCount = this.processQueueTable.size(); if (currentQueueCount != 0) { int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index c75c224d..1f5ecd87 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { @Override public void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId) { + log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet); Set prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set keySet = new HashSet<>(); for (SubscriptionData subscriptionData : subscriptionDataSet) { @@ -56,7 +57,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { Set prev = pushTable.putIfAbsent(messageQueue, clientSet); clientSet = prev != null ? prev : clientSet; } - log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); + log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); clientSet.add(remotingChannel); } } @@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { if (keySet.size() > 0) { this.clientSubscriptionTable.put(remotingChannel, keySet); } + if (prevSubSet != null) { for (MessageQueue messageQueue : prevSubSet) { if (!keySet.contains(messageQueue)) { @@ -75,6 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { } } } + log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel)); } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 08e342eb..3a7d822d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -71,7 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor { private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) { HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); - log.info("heartbeatData: {}", heartbeatData); + log.debug("heartbeatData: {}", heartbeatData); Channel channel = null; Attribute clientAttribute = null; if (remotingChannel instanceof NettyChannelHandlerContextImpl) { @@ -85,7 +85,6 @@ public class HeartbeatProcessor implements RequestProcessor { client.setClientRole(ClientRole.Producer); this.snodeController.getProducerManager().register(producerData.getGroupName(), client); } - Set groupSet = new HashSet<>(); for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) { client.setClientRole(ClientRole.Consumer); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index cadc4d7d..c3fd2fe8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -99,7 +99,6 @@ public class SendMessageProcessor implements RequestProcessor { remotingChannel.reply(data); this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length); if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { - log.info("Send message response: {}", data); this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data); } } else { -- GitLab