From d5cb67ff802c5d92ba8b42f0a0ebe94a05eb9965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=97=AD?= Date: Thu, 15 Oct 2020 11:04:19 +0800 Subject: [PATCH] [ISSUE #2165] Slave read enable not work sometimes When cluster deployed on DLedger mode (#2167) * [Client] Fix slaveReadEnable=true not work sometimes When cluster deployed on DLedger mode * [Client] Add unit test for findBrokerAddressInSubscribe Co-authored-by: zhangxu16 --- .../client/impl/factory/MQClientInstance.java | 5 +++ .../impl/factory/MQClientInstanceTest.java | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 48cc1885..b5aaeb8e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1043,6 +1043,11 @@ public class MQClientInstance { slave = brokerId != MixAll.MASTER_ID; found = brokerAddr != null; + if (!found && slave) { + brokerAddr = map.get(brokerId + 1); + found = brokerAddr != null; + } + if (!found && !onlyThisBroker) { Entry entry = map.entrySet().iterator().next(); brokerAddr = entry.getValue(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index bb213211..e0506aa1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -19,9 +19,12 @@ package org.apache.rocketmq.client.impl.factory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; @@ -30,8 +33,10 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.internal.util.reflection.FieldSetter; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -42,6 +47,12 @@ public class MQClientInstanceTest { private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private String topic = "FooBar"; private String group = "FooBarGroup"; + private ConcurrentMap> brokerAddrTable = new ConcurrentHashMap>(); + + @Before + public void init() throws Exception { + FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable); + } @Test public void testTopicRouteData2TopicPublishInfo() { @@ -74,6 +85,34 @@ public class MQClientInstanceTest { assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4); } + @Test + public void testFindBrokerAddressInSubscribe() { + // dledger normal case + String brokerName = "BrokerA"; + HashMap addrMap = new HashMap(); + addrMap.put(0L, "127.0.0.1:10911"); + addrMap.put(1L, "127.0.0.1:10912"); + addrMap.put(2L, "127.0.0.1:10913"); + brokerAddrTable.put(brokerName, addrMap); + long brokerId = 1; + FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false); + assertThat(brokerResult).isNotNull(); + assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912"); + assertThat(brokerResult.isSlave()).isTrue(); + + // dledger case, when node n0 was voted as the leader + brokerName = "BrokerB"; + HashMap addrMapNew = new HashMap(); + addrMapNew.put(0L, "127.0.0.1:10911"); + addrMapNew.put(2L, "127.0.0.1:10912"); + addrMapNew.put(3L, "127.0.0.1:10913"); + brokerAddrTable.put(brokerName, addrMapNew); + brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false); + assertThat(brokerResult).isNotNull(); + assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912"); + assertThat(brokerResult.isSlave()).isTrue(); + } + @Test public void testRegisterProducer() { boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class)); -- GitLab