diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java index 755ed589a511ca5a6678187b87497c315ed42044..e684dd1fd86bb831094e57bff509212ebabbd82f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQRealPushConsumerImpl.java @@ -914,10 +914,14 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + if (realPushModel) { + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + } else { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } } } catch (Exception e) { - throw new MQClientException("subscription exception", e); + throw new MQClientException("Subscription exception", e); } } @@ -930,11 +934,15 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { subscriptionData.setFilterClassSource(filterClassSource); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + if (realPushModel) { + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + } else { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } } } catch (Exception e) { - throw new MQClientException("subscription exception", e); + throw new MQClientException("Subscription exception", e); } } @@ -950,7 +958,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + if (realPushModel) { + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); + } else { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } } } catch (Exception e) { throw new MQClientException("subscription exception", e);