diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 48cc18851d721358d3672c227f830734700bfad1..b5aaeb8e438900146c7bb9d1721edc49b5074190 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1043,6 +1043,11 @@ public class MQClientInstance { slave = brokerId != MixAll.MASTER_ID; found = brokerAddr != null; + if (!found && slave) { + brokerAddr = map.get(brokerId + 1); + found = brokerAddr != null; + } + if (!found && !onlyThisBroker) { Entry entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index bb2132111c179e8fb13dc283aa4634f2313e6040..e0506aa144a5cd42b78bf0f5aead2fbc2cc0f3fe 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -19,9 +19,12 @@ package org.apache.rocketmq.client.impl.factory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; @@ -30,8 +33,10 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.internal.util.reflection.FieldSetter; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -42,6 +47,12 @@ public class MQClientInstanceTest { private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private String topic = "FooBar"; private String group = "FooBarGroup"; + private ConcurrentMap> brokerAddrTable = new ConcurrentHashMap>(); + + @Before + public void init() throws Exception { + FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable); + } @Test public void testTopicRouteData2TopicPublishInfo() { @@ -74,6 +85,34 @@ public class MQClientInstanceTest { assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4); } + @Test + public void testFindBrokerAddressInSubscribe() { + // dledger normal case + String brokerName = "BrokerA"; + HashMap addrMap = new HashMap(); + addrMap.put(0L, "127.0.0.1:10911"); + addrMap.put(1L, "127.0.0.1:10912"); + addrMap.put(2L, "127.0.0.1:10913"); + brokerAddrTable.put(brokerName, addrMap); + long brokerId = 1; + FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false); + assertThat(brokerResult).isNotNull(); + assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912"); + assertThat(brokerResult.isSlave()).isTrue(); + + // dledger case, when node n0 was voted as the leader + brokerName = "BrokerB"; + HashMap addrMapNew = new HashMap(); + addrMapNew.put(0L, "127.0.0.1:10911"); + addrMapNew.put(2L, "127.0.0.1:10912"); + addrMapNew.put(3L, "127.0.0.1:10913"); + brokerAddrTable.put(brokerName, addrMapNew); + brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false); + assertThat(brokerResult).isNotNull(); + assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912"); + assertThat(brokerResult.isSlave()).isTrue(); + } + @Test public void testRegisterProducer() { boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));