diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index a8bbaa557dc4e8186343392e53d1f619858f4239..2c2f014a892f6ab284f3ec11f73431f07516e67d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -59,11 +58,7 @@ public class RebalancePushImpl extends RebalanceImpl { log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); - Set queueIdSet = new HashSet(); - for (MessageQueue messageQueue : mqAll) { - queueIdSet.add(messageQueue); - } - subscriptionData.setMessageQueueSet(queueIdSet); + subscriptionData.setMessageQueueSet(mqDivided); int currentQueueCount = this.processQueueTable.size(); if (currentQueueCount != 0) { int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index c75c224d9b3e3549284ab1b3bc1da4ff0524ef06..1f5ecd878805644b00ad1756ea4a390ddd181f19 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { @Override public void registerPushSession(Set subscriptionDataSet, RemotingChannel remotingChannel, String groupId) { + log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet); Set prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set keySet = new HashSet<>(); for (SubscriptionData subscriptionData : subscriptionDataSet) { @@ -56,7 +57,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { Set prev = pushTable.putIfAbsent(messageQueue, clientSet); clientSet = prev != null ? prev : clientSet; } - log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); + log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); clientSet.add(remotingChannel); } } @@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { if (keySet.size() > 0) { this.clientSubscriptionTable.put(remotingChannel, keySet); } + if (prevSubSet != null) { for (MessageQueue messageQueue : prevSubSet) { if (!keySet.contains(messageQueue)) { @@ -75,6 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { } } } + log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel)); } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 08e342eb1816a8e3d1ca78f4b28cbf8d721ef767..3a7d822db37101e3e3c4e118c86cd1e0299409ef 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -71,7 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor { private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) { HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); - log.info("heartbeatData: {}", heartbeatData); + log.debug("heartbeatData: {}", heartbeatData); Channel channel = null; Attribute clientAttribute = null; if (remotingChannel instanceof NettyChannelHandlerContextImpl) { @@ -85,7 +85,6 @@ public class HeartbeatProcessor implements RequestProcessor { client.setClientRole(ClientRole.Producer); this.snodeController.getProducerManager().register(producerData.getGroupName(), client); } - Set groupSet = new HashSet<>(); for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) { client.setClientRole(ClientRole.Consumer); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index cadc4d7d3a215f561958cd9c1586175d1c18c955..c3fd2fe8972af9410fa55f51391261a8267b0094 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -99,7 +99,6 @@ public class SendMessageProcessor implements RequestProcessor { remotingChannel.reply(data); this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length); if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { - log.info("Send message response: {}", data); this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data); } } else {