提交 66e15cfa 编写于 作者: S ShannonDing

Send subscription to snode when using real push

上级 4dbf4160
...@@ -914,10 +914,14 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -914,10 +914,14 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
topic, subExpression); topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) { if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); if (realPushModel) {
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
} else {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} }
} catch (Exception e) { } catch (Exception e) {
throw new MQClientException("subscription exception", e); throw new MQClientException("Subscription exception", e);
} }
} }
...@@ -930,11 +934,15 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -930,11 +934,15 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
subscriptionData.setFilterClassSource(filterClassSource); subscriptionData.setFilterClassSource(filterClassSource);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) { if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); if (realPushModel) {
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
} else {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} }
} catch (Exception e) { } catch (Exception e) {
throw new MQClientException("subscription exception", e); throw new MQClientException("Subscription exception", e);
} }
} }
...@@ -950,7 +958,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner { ...@@ -950,7 +958,11 @@ public class DefaultMQRealPushConsumerImpl implements MQPushConsumerInner {
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) { if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); if (realPushModel) {
this.mQClientFactory.sendHeartbeatToAllSnodeWithLock();
} else {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} }
} catch (Exception e) { } catch (Exception e) {
throw new MQClientException("subscription exception", e); throw new MQClientException("subscription exception", e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册