From 74ffae6dc276f06e6c5710b73f3ea3ce5babd536 Mon Sep 17 00:00:00 2001 From: King <794220751@qq.com> Date: Wed, 28 Aug 2019 11:11:56 +0800 Subject: [PATCH] [ISSUE 504] Polish 'No route info of this topic' exception (#1415) * Polish 'no route info of this topic' exception. * Write exception to log * Modify logic that cannot connect to nameserver * Solve the problem that the regression test does not pass. * Adapt to lite pull consumer * Make checkNameServerSetting function private and change its name. --- .../rocketmq/client/impl/MQClientManager.java | 6 +- .../consumer/DefaultLitePullConsumerImpl.java | 2 +- .../consumer/DefaultMQPullConsumerImpl.java | 2 +- .../consumer/DefaultMQPushConsumerImpl.java | 2 +- .../client/impl/factory/MQClientInstance.java | 12 ++-- .../impl/producer/DefaultMQProducerImpl.java | 55 +++++++++++-------- .../consumer/DefaultLitePullConsumerTest.java | 10 +++- .../consumer/DefaultMQPullConsumerTest.java | 2 +- .../consumer/DefaultMQPushConsumerTest.java | 9 ++- .../impl/factory/MQClientInstanceTest.java | 2 +- .../producer/DefaultMQProducerTest.java | 15 ++--- .../trace/DefaultMQConsumerWithTraceTest.java | 19 ++++--- .../trace/DefaultMQProducerWithTraceTest.java | 11 ++-- .../ClusterTestRequestProcessorTest.java | 2 +- pom.xml | 13 +++++ .../exception/RemotingConnectException.java | 2 +- .../remoting/netty/NettyRemotingClient.java | 10 +--- .../tools/admin/DefaultMQAdminExtImpl.java | 2 +- .../tools/admin/DefaultMQAdminExtTest.java | 2 +- .../tools/command/CommandUtilTest.java | 2 +- .../BrokerConsumeStatsSubCommadTest.java | 3 +- .../broker/BrokerStatusSubCommandTest.java | 2 +- .../broker/CleanExpiredCQSubCommandTest.java | 2 +- .../broker/CleanUnusedTopicCommandTest.java | 2 +- .../broker/GetBrokerConfigCommandTest.java | 2 +- .../broker/SendMsgStatusCommandTest.java | 2 +- .../UpdateBrokerConfigSubCommandTest.java | 2 +- .../ConsumerConnectionSubCommandTest.java | 2 +- .../ProducerConnectionSubCommandTest.java | 2 +- .../ConsumerProgressSubCommandTest.java | 2 +- .../ConsumerStatusSubCommandTest.java | 2 +- .../QueryMsgByUniqueKeySubCommandTest.java | 2 +- .../namesrv/GetNamesrvConfigCommandTest.java | 3 +- .../namesrv/UpdateKvConfigCommandTest.java | 2 +- .../namesrv/WipeWritePermSubCommandTest.java | 2 +- .../offset/GetConsumerStatusCommandTest.java | 2 +- .../offset/ResetOffsetByTimeCommandTest.java | 2 +- .../tools/monitor/MonitorServiceTest.java | 2 +- 38 files changed, 125 insertions(+), 93 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index 31553a67..053c049c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -40,11 +40,11 @@ public class MQClientManager { return instance; } - public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) { - return getAndCreateMQClientInstance(clientConfig, null); + public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) { + return getOrCreateMQClientInstance(clientConfig, null); } - public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 5217a315..72b29e20 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -272,7 +272,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private void initMQClientFactory() throws MQClientException { - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index c12d8357..afd72a08 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -634,7 +634,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.defaultMQPullConsumer.changeInstanceNameToPID(); } - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 48fe41ad..d1b5de15 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -579,7 +579,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.defaultMQPushConsumer.changeInstanceNameToPID(); } - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 4f9d42cc..bbd2eecb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -362,7 +362,6 @@ public class MQClientInstance { } /** - * * @param offsetTable * @param namespace * @return newOffsetTable @@ -381,6 +380,7 @@ public class MQClientInstance { return newOffsetTable; } + /** * Remove offline broker */ @@ -672,10 +672,13 @@ public class MQClientInstance { } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } - } catch (Exception e) { + } catch (MQClientException e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } + } catch (RemotingException e) { + log.error("updateTopicRouteInfoFromNameServer Exception", e); + throw new IllegalStateException(e); } finally { this.lockNamesrv.unlock(); } @@ -739,9 +742,10 @@ public class MQClientInstance { return false; } + /** - * This method will be removed in the version 5.0.0,because filterServer was removed,and method subscribe(final String topic, final MessageSelector messageSelector) - * is recommended. + * This method will be removed in the version 5.0.0,because filterServer was removed,and method + * subscribe(final String topic, final MessageSelector messageSelector) is recommended. */ @Deprecated private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 62aaef3b..95696d91 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -180,7 +180,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.defaultMQProducer.changeInstanceNameToPID(); } - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { @@ -271,6 +271,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { /** * This method will be removed in the version 5.0.0 and getCheckListener is recommended. + * * @return */ @Override @@ -464,13 +465,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { * DEFAULT ASYNC ------------------------------------------------------- */ public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param sendCallback * @param timeout the sendCallback will be invoked at most time @@ -505,7 +507,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { } - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } @@ -514,6 +515,15 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } + private void validateNameServerSetting() throws MQClientException { + List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); + if (null == nsList || nsList.isEmpty()) { + throw new MQClientException( + "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); + } + + } + private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, @@ -522,7 +532,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); - final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; @@ -653,13 +662,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { throw mqClientException; } - List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); - if (null == nsList || nsList.isEmpty()) { - throw new MQClientException( - "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); - } + validateNameServerSetting(); - throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), + throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } @@ -681,11 +686,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { @@ -990,8 +995,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param mq * @param sendCallback @@ -1105,6 +1111,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } + validateNameServerSetting(); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); } @@ -1117,8 +1124,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param selector * @param arg @@ -1129,7 +1137,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { * @throws InterruptedException */ @Deprecated - public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); ExecutorService executor = this.getAsyncSenderExecutor(); @@ -1173,7 +1182,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter localTransactionExecuter, final Object arg) + final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 7d496acf..b55f8a0a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -53,8 +53,10 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -68,10 +70,11 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultLitePullConsumerImpl.class) public class DefaultLitePullConsumerTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; @@ -88,6 +91,7 @@ public class DefaultLitePullConsumerTest { @Before public void init() throws Exception { + PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); field.setAccessible(true); RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index 569055de..7afaf2be 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -54,7 +54,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQPullConsumerTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; private DefaultMQPullConsumer pullConsumer; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index ff2fb78b..e6f0e866 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -59,8 +59,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -73,7 +75,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultMQPushConsumerImpl.class) public class DefaultMQPushConsumerTest { private String consumerGroup; private String topic = "FooBar"; @@ -102,10 +105,12 @@ public class DefaultMQPushConsumerTest { }); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); field.set(pushConsumerImpl, rebalancePushImpl); + pushConsumer.subscribe(topic, "*"); pushConsumer.start(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index 171a95a8..bb213211 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock; @RunWith(MockitoJUnitRunner.class) public class MQClientInstanceTest { - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private String topic = "FooBar"; private String group = "FooBarGroup"; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 9540755f..b9be49c0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -66,7 +66,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQProducerTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; @Mock @@ -184,6 +184,7 @@ public class DefaultMQProducerTest { }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); } + @Test public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException { final AtomicInteger cc = new AtomicInteger(0); @@ -211,12 +212,12 @@ public class DefaultMQProducerTest { Message message = new Message(); message.setTopic("test"); message.setBody("hello world".getBytes()); - producer.send(new Message(),sendCallback); - producer.send(message,sendCallback,1000); - producer.send(message,new MessageQueue(),sendCallback); - producer.send(new Message(),new MessageQueue(),sendCallback,1000); - producer.send(new Message(),messageQueueSelector,null,sendCallback); - producer.send(message,messageQueueSelector,null,sendCallback,1000); + producer.send(new Message(), sendCallback); + producer.send(message, sendCallback, 1000); + producer.send(message, new MessageQueue(), sendCallback); + producer.send(new Message(), new MessageQueue(), sendCallback, 1000); + producer.send(new Message(), messageQueueSelector, null, sendCallback); + producer.send(message, messageQueueSelector, null, sendCallback, 1000); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(6); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 0d00c9bd..496c5143 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -70,8 +70,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -83,7 +85,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultMQPushConsumerImpl.class) public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; @@ -101,7 +104,6 @@ public class DefaultMQConsumerWithTraceTest { private DefaultMQPushConsumer normalPushConsumer; private DefaultMQPushConsumer customTraceTopicpushConsumer; - private AsyncTraceDispatcher asyncTraceDispatcher; private MQClientInstance mQClientTraceFactory; @Mock @@ -112,17 +114,16 @@ public class DefaultMQConsumerWithTraceTest { @Before public void init() throws Exception { consumerGroup = "FooBarGroup" + System.currentTimeMillis(); - pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,""); + pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, ""); consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis(); - normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,""); - customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic); + normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, false, ""); + customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); - asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher(); + asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher(); traceProducer = asyncTraceDispatcher.getTraceProducer(); - pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, @@ -131,12 +132,14 @@ public class DefaultMQConsumerWithTraceTest { } }); + PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); field.set(pushConsumerImpl, rebalancePushImpl); pushConsumer.subscribe(topic, "*"); + pushConsumer.start(); mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 903be01c..3759acba 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -60,7 +60,7 @@ import static org.mockito.Mockito.when; public class DefaultMQProducerWithTraceTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; @@ -87,7 +87,7 @@ public class DefaultMQProducerWithTraceTest { producer.setNamesrvAddr("127.0.0.1:9876"); normalProducer.setNamesrvAddr("127.0.0.1:9877"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); - message = new Message(topic, new byte[]{'a', 'b', 'c'}); + message = new Message(topic, new byte[] {'a', 'b', 'c'}); asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); asyncTraceDispatcher.getHostProducer(); @@ -108,14 +108,13 @@ public class DefaultMQProducerWithTraceTest { field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); - producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java index a0e81375..c8bf0576 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java @@ -52,7 +52,7 @@ import static org.mockito.Mockito.when; public class ClusterTestRequestProcessorTest { private ClusterTestRequestProcessor clusterTestProcessor; private DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private MQClientAPIImpl mQClientAPIImpl; private ChannelHandlerContext ctx; diff --git a/pom.xml b/pom.xml index bdc44062..46b88ceb 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ ${project.basedir}/../test/target/jacoco-it.exec file:**/generated-sources/**,**/test/** + 2.0.2 @@ -458,6 +459,18 @@ 2.23.0 test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java index 8286177c..40e3b5ae 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java @@ -24,6 +24,6 @@ public class RemotingConnectException extends RemotingException { } public RemotingConnectException(String addr, Throwable cause) { - super("connect to <" + addr + "> failed", cause); + super("connect to " + addr + " failed", cause); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index fc9df37c..49fe2462 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -358,8 +358,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - - @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { @@ -393,7 +391,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - private Channel getAndCreateChannel(final String addr) throws InterruptedException { + private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException { if (null == addr) { return getAndCreateNameserverChannel(); } @@ -406,7 +404,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return this.createChannel(addr); } - private Channel getAndCreateNameserverChannel() throws InterruptedException { + private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); @@ -440,9 +438,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelNew; } } + throw new RemotingConnectException(addrList.toString()); } - } catch (Exception e) { - log.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.lockNamesrvChannel.unlock(); } @@ -587,7 +584,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelEventListener; } - @Override public ExecutorService getCallbackExecutor() { return callbackExecutor != null ? callbackExecutor : publicExecutor; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 210d5a99..0051ceb5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -116,7 +116,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { this.defaultMQAdminExt.changeInstanceNameToPID(); - this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); + this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this); if (!registerOK) { diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 78659800..3146b178 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -85,7 +85,7 @@ import static org.mockito.Mockito.when; public class DefaultMQAdminExtTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; private static Properties properties = new Properties(); private static TopicList topicList = new TopicList(); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java index 33b44976..b556e5c1 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class CommandUtilTest { private DefaultMQAdminExt defaultMQAdminExt; private DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private MQClientAPIImpl mQClientAPIImpl; @Before diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java index 1089c1b7..1abd8575 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java @@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; import org.apache.rocketmq.tools.command.SubCommandException; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -54,7 +53,7 @@ public class BrokerConsumeStatsSubCommadTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java index b1d92d79..c850d71d 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class BrokerStatusSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java index a5ba24f6..241ae882 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.when; public class CleanExpiredCQSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java index 95373800..759f783e 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.when; public class CleanUnusedTopicCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java index 978a2fd5..8bb40793 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class GetBrokerConfigCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java index c0f7639d..9e9bc789 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java @@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock; public class SendMsgStatusCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java index 46c6eb3e..c74107ed 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java @@ -44,7 +44,7 @@ import static org.mockito.Mockito.mock; public class UpdateBrokerConfigSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java index 3d64ce24..584943ce 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java @@ -54,7 +54,7 @@ import static org.mockito.Mockito.when; public class ConsumerConnectionSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java index 76c8dc4b..060ba938 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java @@ -49,7 +49,7 @@ import static org.mockito.Mockito.when; public class ProducerConnectionSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java index 6d69c10b..19d903ce 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java @@ -53,7 +53,7 @@ import static org.mockito.Mockito.when; public class ConsumerProgressSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java index aa6f27de..7f44af82 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java @@ -65,7 +65,7 @@ import static org.mockito.Mockito.when; public class ConsumerStatusSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java index e7576082..504b4656 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java @@ -61,7 +61,7 @@ public class QueryMsgByUniqueKeySubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; private static MQAdminImpl mQAdminImpl; diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java index 8163fd15..dde80eb3 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java @@ -41,7 +41,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; import org.apache.rocketmq.tools.command.SubCommandException; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -52,7 +51,7 @@ import static org.mockito.Mockito.when; public class GetNamesrvConfigCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java index af4deace..c4edcafe 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java @@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock; public class UpdateKvConfigCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java index 11711d08..9befdf89 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java @@ -49,7 +49,7 @@ import static org.mockito.Mockito.when; public class WipeWritePermSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java index 86454008..a01bf816 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java @@ -47,7 +47,7 @@ import static org.mockito.Mockito.when; public class GetConsumerStatusCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java index b7af8c87..d73a996b 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java @@ -53,7 +53,7 @@ import static org.mockito.Mockito.when; public class ResetOffsetByTimeCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java index 4989a9b5..57278b9b 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java @@ -69,7 +69,7 @@ import static org.mockito.Mockito.when; public class MonitorServiceTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; private static MonitorConfig monitorConfig; private static MonitorListener monitorListener; -- GitLab