diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 9087505fe50b4dccb1e362ea5feb51782fafe27e..7752393549893f4303e436c6a87aa6bcab82ddfa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -124,14 +124,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { if (offset != null) { if (mqs.contains(mq)) { try { - this.updateConsumeOffsetToBroker(mq, offset.get()); - log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + this.updateConsumeOffsetToSnode(mq, offset.get()); + log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { - log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); + log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e); } } else { unusedMQ.add(mq); @@ -153,14 +153,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { AtomicLong offset = this.offsetTable.get(mq); if (offset != null) { try { - this.updateConsumeOffsetToBroker(mq, offset.get()); - log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", + this.updateConsumeOffsetToSnode(mq, offset.get()); + log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToSnode {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { - log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); + log.error("updateConsumeOffsetToSnode exception, " + mq.toString(), e); } } } @@ -193,6 +193,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); } + private void updateConsumeOffsetToSnode(MessageQueue mq, long offset) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + updateConsumeOffsetToBroker(mq, offset, true); + } /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. @@ -200,14 +204,14 @@ public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); - if (null == findBrokerResult) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (null == snodeAddr){ + this.mQClientFactory.updateSnodeInfoFromNameServer(); + snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); } - if (findBrokerResult != null) { + if (snodeAddr != null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); @@ -216,35 +220,34 @@ public class RemoteBrokerOffsetStore implements OffsetStore { requestHeader.setEnodeName(mq.getBrokerName()); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + snodeAddr, requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + snodeAddr, requestHeader, 1000 * 5); } } else { - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + throw new MQClientException("Update offset to Broker[" + mq.getBrokerName() + "] failed, Snode is null.", null); } } private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); - if (null == findBrokerResult) { - - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + String snodeAddr = this.mQClientFactory.findSnodeAddressInPublish(); + if (null == snodeAddr){ + this.mQClientFactory.updateSnodeInfoFromNameServer(); + snodeAddr= this.mQClientFactory.findSnodeAddressInPublish(); } - if (findBrokerResult != null) { + if (snodeAddr != null) { QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setEnodeName(mq.getBrokerName()); return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( - findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); + snodeAddr, requestHeader, 1000 * 5); } else { - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + throw new MQClientException("Get Offset from broker[" + mq.getBrokerName() + "] failed, Snode is not exist", null); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 30c9a4d911c0a89cb186087ef3697a35f691b7aa..ed67077830ea9d6e2cd9681e882d612b925bbdca 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -627,10 +627,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { default: break; } - + this.tryToFindSnodePublishInfo(); this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + this.mQClientFactory.sendHeartbeatToAllSnodeWithLock(); this.mQClientFactory.rebalanceImmediately(); } @@ -1138,4 +1138,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.consumeMessageService = consumeMessageService; } + private void tryToFindSnodePublishInfo() { + this.mQClientFactory.updateSnodeInfoFromNameServer(); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index b92b6bedd3ac81aa93022e4d35414bda121fe35e..4f85cc0d12430db8b396c289ec8c2793eab1e291 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -84,7 +84,8 @@ public class RebalancePushImpl extends RebalanceImpl { } // notify broker - this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); + //this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); + this.getmQClientFactory().sendHeartbeatToAllSnodeWithLock(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 3c34fdba9af0585dc854625b905448538ad92300..f881d981878463ab13d5c333e729866c0a19169b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -501,7 +501,8 @@ public class MQClientInstance { } // may need to check one broker every cluster... // assume that the configs of every broker in cluster are the the same. - String addr = findBrokerAddrByTopic(subscriptionData.getTopic()); + //String addr = findBrokerAddrByTopic(subscriptionData.getTopic()); + String addr = findSnodeAddressInPublish(); if (addr != null) { try { @@ -1191,17 +1192,17 @@ public class MQClientInstance { } public List findConsumerIdList(final String topic, final String group) { - String brokerAddr = this.findBrokerAddrByTopic(topic); - if (null == brokerAddr) { - this.updateTopicRouteInfoFromNameServer(topic); - brokerAddr = this.findBrokerAddrByTopic(topic); + String snodeAddr = this.findSnodeAddressInPublish(); + if (null == snodeAddr) { + this.updateSnodeInfoFromNameServer(); + snodeAddr = this.findSnodeAddressInPublish(); } - if (null != brokerAddr) { + if (null != snodeAddr) { try { - return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000); + return this.mQClientAPIImpl.getConsumerIdListByGroup(snodeAddr, group, 3000); } catch (Exception e) { - log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e); + log.warn("getConsumerIdListByGroup exception, " + snodeAddr + " " + group, e); } }