提交 74ffae6d 编写于 作者: K King 提交者: von gosling

[ISSUE 504] Polish 'No route info of this topic' exception (#1415)

* Polish 'no route info of this topic' exception.

* Write exception to log

* Modify logic that cannot connect to nameserver

* Solve the problem that the regression test does not pass.

* Adapt to lite pull consumer

* Make checkNameServerSetting function private and change its name.
上级 9edeb4ec
......@@ -40,11 +40,11 @@ public class MQClientManager {
return instance;
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {
return getAndCreateMQClientInstance(clientConfig, null);
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
......
......@@ -272,7 +272,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
......
......@@ -634,7 +634,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.defaultMQPullConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
......
......@@ -579,7 +579,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
......
......@@ -362,7 +362,6 @@ public class MQClientInstance {
}
/**
*
* @param offsetTable
* @param namespace
* @return newOffsetTable
......@@ -381,6 +380,7 @@ public class MQClientInstance {
return newOffsetTable;
}
/**
* Remove offline broker
*/
......@@ -672,10 +672,13 @@ public class MQClientInstance {
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
......@@ -739,9 +742,10 @@ public class MQClientInstance {
return false;
}
/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
* This method will be removed in the version 5.0.0,because filterServer was removed,and method
* <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*/
@Deprecated
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
......
......@@ -180,7 +180,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
......@@ -271,6 +271,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
*
* @return
*/
@Override
......@@ -469,8 +470,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
......@@ -505,7 +507,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
......@@ -514,6 +515,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
private void validateNameServerSetting() throws MQClientException {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
}
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
......@@ -522,7 +532,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
......@@ -653,13 +662,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
......@@ -990,8 +995,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param mq
* @param sendCallback
......@@ -1105,6 +1111,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
......@@ -1117,8 +1124,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param selector
* @param arg
......@@ -1129,7 +1137,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
......
......@@ -53,8 +53,10 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -68,10 +70,11 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultLitePullConsumerImpl.class)
public class DefaultLitePullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
......@@ -88,6 +91,7 @@ public class DefaultLitePullConsumerTest {
@Before
public void init() throws Exception {
PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
......
......@@ -54,7 +54,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPullConsumerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQPullConsumer pullConsumer;
......
......@@ -59,8 +59,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
......@@ -73,7 +75,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
......@@ -102,10 +105,12 @@ public class DefaultMQPushConsumerTest {
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private String topic = "FooBar";
private String group = "FooBarGroup";
......
......@@ -66,7 +66,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
......@@ -184,6 +184,7 @@ public class DefaultMQProducerTest {
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@Test
public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
......@@ -211,12 +212,12 @@ public class DefaultMQProducerTest {
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
producer.send(new Message(),sendCallback);
producer.send(message,sendCallback,1000);
producer.send(message,new MessageQueue(),sendCallback);
producer.send(new Message(),new MessageQueue(),sendCallback,1000);
producer.send(new Message(),messageQueueSelector,null,sendCallback);
producer.send(message,messageQueueSelector,null,sendCallback,1000);
producer.send(new Message(), sendCallback);
producer.send(message, sendCallback, 1000);
producer.send(message, new MessageQueue(), sendCallback);
producer.send(new Message(), new MessageQueue(), sendCallback, 1000);
producer.send(new Message(), messageQueueSelector, null, sendCallback);
producer.send(message, messageQueueSelector, null, sendCallback, 1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(6);
......
......@@ -70,8 +70,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
......@@ -83,7 +85,8 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
......@@ -101,7 +104,6 @@ public class DefaultMQConsumerWithTraceTest {
private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer;
private AsyncTraceDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
......@@ -112,17 +114,16 @@ public class DefaultMQConsumerWithTraceTest {
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, "");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,"");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, false, "");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher();
asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
......@@ -131,12 +132,14 @@ public class DefaultMQConsumerWithTraceTest {
}
});
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
......
......@@ -60,7 +60,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQProducerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
......@@ -87,7 +87,7 @@ public class DefaultMQProducerWithTraceTest {
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[]{'a', 'b', 'c'});
message = new Message(topic, new byte[] {'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
asyncTraceDispatcher.getHostProducer();
......@@ -108,7 +108,6 @@ public class DefaultMQProducerWithTraceTest {
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
......
......@@ -52,7 +52,7 @@ import static org.mockito.Mockito.when;
public class ClusterTestRequestProcessorTest {
private ClusterTestRequestProcessor clusterTestProcessor;
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
private ChannelHandlerContext ctx;
......
......@@ -106,6 +106,7 @@
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
<sonar.exclusions>file:**/generated-sources/**,**/test/**</sonar.exclusions>
<powermock.version>2.0.2</powermock.version>
</properties>
......@@ -458,6 +459,18 @@
<version>2.23.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
......
......@@ -24,6 +24,6 @@ public class RemotingConnectException extends RemotingException {
}
public RemotingConnectException(String addr, Throwable cause) {
super("connect to <" + addr + "> failed", cause);
super("connect to " + addr + " failed", cause);
}
}
......@@ -358,8 +358,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
......@@ -393,7 +391,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
}
......@@ -406,7 +404,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return this.createChannel(addr);
}
private Channel getAndCreateNameserverChannel() throws InterruptedException {
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
......@@ -440,9 +438,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.lockNamesrvChannel.unlock();
}
......@@ -587,7 +584,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
return channelEventListener;
}
@Override
public ExecutorService getCallbackExecutor() {
return callbackExecutor != null ? callbackExecutor : publicExecutor;
......
......@@ -116,7 +116,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.defaultMQAdminExt.changeInstanceNameToPID();
this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook);
boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this);
if (!registerOK) {
......
......@@ -85,7 +85,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQAdminExtTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static Properties properties = new Properties();
private static TopicList topicList = new TopicList();
......
......@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class CommandUtilTest {
private DefaultMQAdminExt defaultMQAdminExt;
private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private MQClientAPIImpl mQClientAPIImpl;
@Before
......
......@@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.anyBoolean;
......@@ -54,7 +53,7 @@ public class BrokerConsumeStatsSubCommadTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class BrokerStatusSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
public class CleanExpiredCQSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
public class CleanUnusedTopicCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
public class GetBrokerConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock;
public class SendMsgStatusCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -44,7 +44,7 @@ import static org.mockito.Mockito.mock;
public class UpdateBrokerConfigSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -54,7 +54,7 @@ import static org.mockito.Mockito.when;
public class ConsumerConnectionSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -49,7 +49,7 @@ import static org.mockito.Mockito.when;
public class ProducerConnectionSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
public class ConsumerProgressSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -65,7 +65,7 @@ import static org.mockito.Mockito.when;
public class ConsumerStatusSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -61,7 +61,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static MQAdminImpl mQAdminImpl;
......
......@@ -41,7 +41,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
......@@ -52,7 +51,7 @@ import static org.mockito.Mockito.when;
public class GetNamesrvConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
public class UpdateKvConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -49,7 +49,7 @@ import static org.mockito.Mockito.when;
public class WipeWritePermSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -47,7 +47,7 @@ import static org.mockito.Mockito.when;
public class GetConsumerStatusCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
public class ResetOffsetByTimeCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
......
......@@ -69,7 +69,7 @@ import static org.mockito.Mockito.when;
public class MonitorServiceTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
private static MonitorConfig monitorConfig;
private static MonitorListener monitorListener;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册