未验证 提交 d5cb67ff 编写于 作者: 张旭 提交者: GitHub

[ISSUE #2165] Slave read enable not work sometimes When cluster deployed on DLedger mode (#2167)

* [Client] Fix slaveReadEnable=true not work sometimes When cluster deployed on DLedger mode

* [Client] Add unit test for findBrokerAddressInSubscribe
Co-authored-by: Nzhangxu16 <zhangxu16@xiaomi.com>
上级 c932941f
...@@ -1043,6 +1043,11 @@ public class MQClientInstance { ...@@ -1043,6 +1043,11 @@ public class MQClientInstance {
slave = brokerId != MixAll.MASTER_ID; slave = brokerId != MixAll.MASTER_ID;
found = brokerAddr != null; found = brokerAddr != null;
if (!found && slave) {
brokerAddr = map.get(brokerId + 1);
found = brokerAddr != null;
}
if (!found && !onlyThisBroker) { if (!found && !onlyThisBroker) {
Entry<Long, String> entry = map.entrySet().iterator().next(); Entry<Long, String> entry = map.entrySet().iterator().next();
brokerAddr = entry.getValue(); brokerAddr = entry.getValue();
......
...@@ -19,9 +19,12 @@ package org.apache.rocketmq.client.impl.factory; ...@@ -19,9 +19,12 @@ package org.apache.rocketmq.client.impl.factory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner; import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
...@@ -30,8 +33,10 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; ...@@ -30,8 +33,10 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -42,6 +47,12 @@ public class MQClientInstanceTest { ...@@ -42,6 +47,12 @@ public class MQClientInstanceTest {
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar"; private String topic = "FooBar";
private String group = "FooBarGroup"; private String group = "FooBarGroup";
private ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
@Before
public void init() throws Exception {
FieldSetter.setField(mqClientInstance, MQClientInstance.class.getDeclaredField("brokerAddrTable"), brokerAddrTable);
}
@Test @Test
public void testTopicRouteData2TopicPublishInfo() { public void testTopicRouteData2TopicPublishInfo() {
...@@ -74,6 +85,34 @@ public class MQClientInstanceTest { ...@@ -74,6 +85,34 @@ public class MQClientInstanceTest {
assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4); assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4);
} }
@Test
public void testFindBrokerAddressInSubscribe() {
// dledger normal case
String brokerName = "BrokerA";
HashMap<Long, String> addrMap = new HashMap<Long, String>();
addrMap.put(0L, "127.0.0.1:10911");
addrMap.put(1L, "127.0.0.1:10912");
addrMap.put(2L, "127.0.0.1:10913");
brokerAddrTable.put(brokerName, addrMap);
long brokerId = 1;
FindBrokerResult brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false);
assertThat(brokerResult).isNotNull();
assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
assertThat(brokerResult.isSlave()).isTrue();
// dledger case, when node n0 was voted as the leader
brokerName = "BrokerB";
HashMap<Long, String> addrMapNew = new HashMap<Long, String>();
addrMapNew.put(0L, "127.0.0.1:10911");
addrMapNew.put(2L, "127.0.0.1:10912");
addrMapNew.put(3L, "127.0.0.1:10913");
brokerAddrTable.put(brokerName, addrMapNew);
brokerResult = mqClientInstance.findBrokerAddressInSubscribe(brokerName, brokerId, false);
assertThat(brokerResult).isNotNull();
assertThat(brokerResult.getBrokerAddr()).isEqualTo("127.0.0.1:10912");
assertThat(brokerResult.isSlave()).isTrue();
}
@Test @Test
public void testRegisterProducer() { public void testRegisterProducer() {
boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class)); boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册