From 66e15cfa11531c8f09045ab5a2cf4c7a55aa3a50 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Thu, 28 Feb 2019 17:50:52 +0800 Subject: [PATCH] Send subscription to snode when using real push --- .../DefaultMQRealPushConsumerImpl.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java index 755ed589..e684dd1f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java @@ -914,10 +914,14 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + if (realPushModel) { + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + } else { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } } } catch (Exception e) { - throw new MQClientException("subscription exception", e); + throw new MQClientException("Subscription exception", e); } } @@ -930,11 +934,15 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { subscriptionData.setFilterClassSource(filterClassSource); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + if (realPushModel) { + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + } else { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } } } catch (Exception e) { - throw new MQClientException("subscription exception", e); + throw new MQClientException("Subscription exception", e); } } @@ -950,7 +958,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + if (realPushModel) { + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + } else { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } } } catch (Exception e) { throw new MQClientException("subscription exception", e); -- GitLab