提交 1dde4fe7 编写于 作者: D duhenglucky

Polish rebalance process in real push mode

上级 3c264c8f
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.client.impl.consumer; package org.apache.rocketmq.client.impl.consumer;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -59,11 +58,7 @@ public class RebalancePushImpl extends RebalanceImpl { ...@@ -59,11 +58,7 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion); subscriptionData.setSubVersion(newVersion);
Set<MessageQueue> queueIdSet = new HashSet<MessageQueue>(); subscriptionData.setMessageQueueSet(mqDivided);
for (MessageQueue messageQueue : mqAll) {
queueIdSet.add(messageQueue);
}
subscriptionData.setMessageQueueSet(queueIdSet);
int currentQueueCount = this.processQueueTable.size(); int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) { if (currentQueueCount != 0) {
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
......
...@@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -44,6 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override @Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel, public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, RemotingChannel remotingChannel,
String groupId) { String groupId) {
log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>(); Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) { for (SubscriptionData subscriptionData : subscriptionDataSet) {
...@@ -56,7 +57,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -56,7 +57,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet); Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet; clientSet = prev != null ? prev : clientSet;
} }
log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel); clientSet.add(remotingChannel);
} }
} }
...@@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -64,6 +65,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
if (keySet.size() > 0) { if (keySet.size() > 0) {
this.clientSubscriptionTable.put(remotingChannel, keySet); this.clientSubscriptionTable.put(remotingChannel, keySet);
} }
if (prevSubSet != null) { if (prevSubSet != null) {
for (MessageQueue messageQueue : prevSubSet) { for (MessageQueue messageQueue : prevSubSet) {
if (!keySet.contains(messageQueue)) { if (!keySet.contains(messageQueue)) {
...@@ -75,6 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -75,6 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
} }
} }
} }
log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: {}", groupId, remotingChannel.remoteAddress(), this.clientSubscriptionTable.get(remotingChannel));
} }
@Override @Override
......
...@@ -71,7 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor { ...@@ -71,7 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) { private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
log.info("heartbeatData: {}", heartbeatData); log.debug("heartbeatData: {}", heartbeatData);
Channel channel = null; Channel channel = null;
Attribute<Client> clientAttribute = null; Attribute<Client> clientAttribute = null;
if (remotingChannel instanceof NettyChannelHandlerContextImpl) { if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
...@@ -85,7 +85,6 @@ public class HeartbeatProcessor implements RequestProcessor { ...@@ -85,7 +85,6 @@ public class HeartbeatProcessor implements RequestProcessor {
client.setClientRole(ClientRole.Producer); client.setClientRole(ClientRole.Producer);
this.snodeController.getProducerManager().register(producerData.getGroupName(), client); this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
} }
Set<String> groupSet = new HashSet<>(); Set<String> groupSet = new HashSet<>();
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) { for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
client.setClientRole(ClientRole.Consumer); client.setClientRole(ClientRole.Consumer);
......
...@@ -99,7 +99,6 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -99,7 +99,6 @@ public class SendMessageProcessor implements RequestProcessor {
remotingChannel.reply(data); remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length); this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
log.info("Send message response: {}", data);
this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data); this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
} }
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册