diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index 31553a67ae5fd28cfc2605ecc633f7aaf5edbeb4..053c049c9cd703832afe9044b92fd8e82fa2b719 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -40,11 +40,11 @@ public class MQClientManager { return instance; } - public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) { - return getAndCreateMQClientInstance(clientConfig, null); + public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) { + return getOrCreateMQClientInstance(clientConfig, null); } - public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 5217a315398a2d6965933e8e70a1e453e6405edf..72b29e203042d8973cccf5853cb4be8b1ba8bcf8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -272,7 +272,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } private void initMQClientFactory() throws MQClientException { - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index c12d8357eae3bc365266a72a476a01be0efa3dd0..afd72a08002c6c9124feb6260f011e9810407842 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -634,7 +634,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.defaultMQPullConsumer.changeInstanceNameToPID(); } - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 48fe41ada6d6c376cec49d589b2cf6881e72f2c7..d1b5de15eec1f26de0e4b042c8d55a24c1316006 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -579,7 +579,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.defaultMQPushConsumer.changeInstanceNameToPID(); } - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 4f9d42cc57d1e8a131d238ffb7cc229641c3c30c..bbd2eecb1c3785f7520ade1d830b8f2ce19cb1e4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -362,7 +362,6 @@ public class MQClientInstance { } /** - * * @param offsetTable * @param namespace * @return newOffsetTable @@ -381,6 +380,7 @@ public class MQClientInstance { return newOffsetTable; } + /** * Remove offline broker */ @@ -672,10 +672,13 @@ public class MQClientInstance { } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } - } catch (Exception e) { + } catch (MQClientException e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } + } catch (RemotingException e) { + log.error("updateTopicRouteInfoFromNameServer Exception", e); + throw new IllegalStateException(e); } finally { this.lockNamesrv.unlock(); } @@ -739,9 +742,10 @@ public class MQClientInstance { return false; } + /** - * This method will be removed in the version 5.0.0,because filterServer was removed,and method subscribe(final String topic, final MessageSelector messageSelector) - * is recommended. + * This method will be removed in the version 5.0.0,because filterServer was removed,and method + * subscribe(final String topic, final MessageSelector messageSelector) is recommended. */ @Deprecated private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 62aaef3b13ff9996c65a712867120dc4321360e5..95696d91b475cc353a14288330e36631b965a7e0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -180,7 +180,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.defaultMQProducer.changeInstanceNameToPID(); } - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); + this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { @@ -271,6 +271,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { /** * This method will be removed in the version 5.0.0 and getCheckListener is recommended. + * * @return */ @Override @@ -464,13 +465,14 @@ public class DefaultMQProducerImpl implements MQProducerInner { * DEFAULT ASYNC ------------------------------------------------------- */ public void send(Message msg, - SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param sendCallback * @param timeout the sendCallback will be invoked at most time @@ -505,7 +507,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { } - public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } @@ -514,6 +515,15 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } + private void validateNameServerSetting() throws MQClientException { + List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); + if (null == nsList || nsList.isEmpty()) { + throw new MQClientException( + "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); + } + + } + private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, @@ -522,7 +532,6 @@ public class DefaultMQProducerImpl implements MQProducerInner { ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); - final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; @@ -653,13 +662,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { throw mqClientException; } - List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); - if (null == nsList || nsList.isEmpty()) { - throw new MQClientException( - "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); - } + validateNameServerSetting(); - throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), + throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } @@ -681,11 +686,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { } private SendResult sendKernelImpl(final Message msg, - final MessageQueue mq, - final CommunicationMode communicationMode, - final SendCallback sendCallback, - final TopicPublishInfo topicPublishInfo, - final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { @@ -990,8 +995,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param mq * @param sendCallback @@ -1105,6 +1111,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } + validateNameServerSetting(); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); } @@ -1117,8 +1124,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } /** - * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. - * A new one will be provided in next version + * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be + * provided in next version + * * @param msg * @param selector * @param arg @@ -1129,7 +1137,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { * @throws InterruptedException */ @Deprecated - public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) + public void send(final Message msg, final MessageQueueSelector selector, final Object arg, + final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); ExecutorService executor = this.getAsyncSenderExecutor(); @@ -1173,7 +1182,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter localTransactionExecuter, final Object arg) + final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java index 7d496acf0200113996598a139a0063f481027550..b55f8a0a1c89867753e2a2b1b0b404966ad04b1e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java @@ -53,8 +53,10 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -68,10 +70,11 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultLitePullConsumerImpl.class) public class DefaultLitePullConsumerTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; @@ -88,6 +91,7 @@ public class DefaultLitePullConsumerTest { @Before public void init() throws Exception { + PowerMockito.suppress(PowerMockito.method(DefaultLitePullConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); field.setAccessible(true); RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java index 569055dea2566068b0ca165d893b710dcd08b751..7afaf2bea05738779d6379effaa0d358c3c4a16a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java @@ -54,7 +54,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQPullConsumerTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; private DefaultMQPullConsumer pullConsumer; diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index ff2fb78bbfb9df89596abc2ad7b35fcc33a2df3e..e6f0e866882ec460ba4505d007fca8e0ebfd995c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -59,8 +59,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -73,7 +75,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultMQPushConsumerImpl.class) public class DefaultMQPushConsumerTest { private String consumerGroup; private String topic = "FooBar"; @@ -102,10 +105,12 @@ public class DefaultMQPushConsumerTest { }); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); field.set(pushConsumerImpl, rebalancePushImpl); + pushConsumer.subscribe(topic, "*"); pushConsumer.start(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index 171a95a8b68f520c1c9e833ad85fc66880b2cd35..bb2132111c179e8fb13dc283aa4634f2313e6040 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock; @RunWith(MockitoJUnitRunner.class) public class MQClientInstanceTest { - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private String topic = "FooBar"; private String group = "FooBarGroup"; diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 9540755fe339f37651f983fbe1ab524e7fa94004..b9be49c033b538915c70f015313402ba6e8bf612 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -66,7 +66,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DefaultMQProducerTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; @Mock @@ -184,6 +184,7 @@ public class DefaultMQProducerTest { }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); } + @Test public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException { final AtomicInteger cc = new AtomicInteger(0); @@ -211,12 +212,12 @@ public class DefaultMQProducerTest { Message message = new Message(); message.setTopic("test"); message.setBody("hello world".getBytes()); - producer.send(new Message(),sendCallback); - producer.send(message,sendCallback,1000); - producer.send(message,new MessageQueue(),sendCallback); - producer.send(new Message(),new MessageQueue(),sendCallback,1000); - producer.send(new Message(),messageQueueSelector,null,sendCallback); - producer.send(message,messageQueueSelector,null,sendCallback,1000); + producer.send(new Message(), sendCallback); + producer.send(message, sendCallback, 1000); + producer.send(message, new MessageQueue(), sendCallback); + producer.send(new Message(), new MessageQueue(), sendCallback, 1000); + producer.send(new Message(), messageQueueSelector, null, sendCallback); + producer.send(message, messageQueueSelector, null, sendCallback, 1000); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(6); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 0d00c9bdb448841226c093e1cf0d00d532c62104..496c5143e02387f5a39289f8a77ce61021fbd927 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -70,8 +70,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -83,7 +85,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultMQPushConsumerImpl.class) public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; @@ -101,7 +104,6 @@ public class DefaultMQConsumerWithTraceTest { private DefaultMQPushConsumer normalPushConsumer; private DefaultMQPushConsumer customTraceTopicpushConsumer; - private AsyncTraceDispatcher asyncTraceDispatcher; private MQClientInstance mQClientTraceFactory; @Mock @@ -112,17 +114,16 @@ public class DefaultMQConsumerWithTraceTest { @Before public void init() throws Exception { consumerGroup = "FooBarGroup" + System.currentTimeMillis(); - pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,""); + pushConsumer = new DefaultMQPushConsumer(consumerGroup, true, ""); consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis(); - normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,""); - customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic); + normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal, false, ""); + customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); - asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher(); + asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher(); traceProducer = asyncTraceDispatcher.getTraceProducer(); - pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, @@ -131,12 +132,14 @@ public class DefaultMQConsumerWithTraceTest { } }); + PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); field.setAccessible(true); field.set(pushConsumerImpl, rebalancePushImpl); pushConsumer.subscribe(topic, "*"); + pushConsumer.start(); mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 903be01cddbf13031368db73daa9df8ab6198ebb..3759acba139f9d7eb3d343855acbc27e338512b7 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -60,7 +60,7 @@ import static org.mockito.Mockito.when; public class DefaultMQProducerWithTraceTest { @Spy - private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; @@ -87,7 +87,7 @@ public class DefaultMQProducerWithTraceTest { producer.setNamesrvAddr("127.0.0.1:9876"); normalProducer.setNamesrvAddr("127.0.0.1:9877"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); - message = new Message(topic, new byte[]{'a', 'b', 'c'}); + message = new Message(topic, new byte[] {'a', 'b', 'c'}); asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); asyncTraceDispatcher.getHostProducer(); @@ -108,14 +108,13 @@ public class DefaultMQProducerWithTraceTest { field.setAccessible(true); field.set(mQClientFactory, mQClientAPIImpl); - producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenReturn(createSendResult(SendStatus.SEND_OK)); + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); } diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java index a0e8137524afe2488663fa33b8ac36f667ce4db6..c8bf057606d6507c1c0ef434d37a02a332488a1b 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java @@ -52,7 +52,7 @@ import static org.mockito.Mockito.when; public class ClusterTestRequestProcessorTest { private ClusterTestRequestProcessor clusterTestProcessor; private DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private MQClientAPIImpl mQClientAPIImpl; private ChannelHandlerContext ctx; diff --git a/pom.xml b/pom.xml index bdc44062ec1221044bbf74ef6fc7a6584b252138..46b88ceb04c88ca4f506b1a7b5a5972f4a79a26b 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ ${project.basedir}/../test/target/jacoco-it.exec file:**/generated-sources/**,**/test/** + 2.0.2 @@ -458,6 +459,18 @@ 2.23.0 test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java index 8286177c2efb15b6341cd22771f5a31339650b93..40e3b5aeefbe43db9421baf2dc143809dd9bc053 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingConnectException.java @@ -24,6 +24,6 @@ public class RemotingConnectException extends RemotingException { } public RemotingConnectException(String addr, Throwable cause) { - super("connect to <" + addr + "> failed", cause); + super("connect to " + addr + " failed", cause); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index fc9df37c652dcaef17b95a77bf2bffdaf60c9ba7..49fe2462dfa44c200c35aad123a4a9978f381a6a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -358,8 +358,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - - @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { @@ -393,7 +391,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - private Channel getAndCreateChannel(final String addr) throws InterruptedException { + private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException { if (null == addr) { return getAndCreateNameserverChannel(); } @@ -406,7 +404,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return this.createChannel(addr); } - private Channel getAndCreateNameserverChannel() throws InterruptedException { + private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); @@ -440,9 +438,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelNew; } } + throw new RemotingConnectException(addrList.toString()); } - } catch (Exception e) { - log.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.lockNamesrvChannel.unlock(); } @@ -587,7 +584,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return channelEventListener; } - @Override public ExecutorService getCallbackExecutor() { return callbackExecutor != null ? callbackExecutor : publicExecutor; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 210d5a997d4d5d4c7171d71a8e1af91b47b12e99..0051ceb52bd7faec0a8d852455ca73864aea5186 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -116,7 +116,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { this.defaultMQAdminExt.changeInstanceNameToPID(); - this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); + this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this); if (!registerOK) { diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 78659800219742a7b2cf19cf188679d2584b13b8..3146b1781154792a2d7928e0306e9cf172b5ddac 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -85,7 +85,7 @@ import static org.mockito.Mockito.when; public class DefaultMQAdminExtTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; private static Properties properties = new Properties(); private static TopicList topicList = new TopicList(); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java index 33b449768d105b6d223c84d6b46ae5d22beba916..b556e5c1975bf5f2ccee6cccfa9d0a3548c9060b 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class CommandUtilTest { private DefaultMQAdminExt defaultMQAdminExt; private DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private MQClientAPIImpl mQClientAPIImpl; @Before diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java index 1089c1b70cd7475503662c9a8c71a78a7e7dfd6c..1abd8575b0d2817f045fe3bb563e05a8c36c5a24 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java @@ -39,7 +39,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; import org.apache.rocketmq.tools.command.SubCommandException; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -54,7 +53,7 @@ public class BrokerConsumeStatsSubCommadTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java index b1d92d79a3fc8ae24f7c3912cc8bbf7ad5e75f24..c850d71d881534d4006645c494fd4731814b47e4 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class BrokerStatusSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java index a5ba24f6503187a81a0230c510e9e37ae9dccd42..241ae8829abfb28349e9d2fc537bf29dc4c6fe8b 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.when; public class CleanExpiredCQSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java index 95373800727c5a3f3209ecfb0978c66da41a2fcf..759f783e49522b9ac5a721986fd2d66e506a936c 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.when; public class CleanUnusedTopicCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java index 978a2fd59616c19347c3d6186f7ea58fcf9f519a..8bb40793790312e7a3bc36feb1dd4a4a11175fe6 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class GetBrokerConfigCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java index c0f7639d93dfdcc1a5e2a6ccb96a30b399956d47..9e9bc789d93eb0c95169bf44b01bdaaac40cfddc 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java @@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock; public class SendMsgStatusCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java index 46c6eb3e74596f0d48a2469b7acd8de404a6e6b5..c74107edf921be37b0af4b818a3e0b307d74a2bd 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java @@ -44,7 +44,7 @@ import static org.mockito.Mockito.mock; public class UpdateBrokerConfigSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java index 3d64ce2422cee9f6342576cbaad91fb397432dbe..584943ce442ad29d105a2dd9a682cec2b07ddeed 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java @@ -54,7 +54,7 @@ import static org.mockito.Mockito.when; public class ConsumerConnectionSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java index 76c8dc4b3ccc50a5363e59e15fabd7640332f240..060ba9383d5fc4fc3caef2f2d08275b0cf952d0e 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java @@ -49,7 +49,7 @@ import static org.mockito.Mockito.when; public class ProducerConnectionSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java index 6d69c10b5abbe752f2cfedb43177e6ca3de226d3..19d903cea6f8151313e68e566f977d7d65d66315 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java @@ -53,7 +53,7 @@ import static org.mockito.Mockito.when; public class ConsumerProgressSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java index aa6f27de09f0881486338025453e63d72b541a52..7f44af8225ce6968623d639c7ad9f983ee96502b 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java @@ -65,7 +65,7 @@ import static org.mockito.Mockito.when; public class ConsumerStatusSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java index e757608280ff8f912f95563e72a6734422712060..504b46567808c98bb77de104e13b5c5f37aeb9c4 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java @@ -61,7 +61,7 @@ public class QueryMsgByUniqueKeySubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; private static MQAdminImpl mQAdminImpl; diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java index 8163fd15bc939c1aa1f7a978df5d59d048ad5b60..dde80eb3e672165338cc6a50928bc19255c2975c 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java @@ -41,7 +41,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; import org.apache.rocketmq.tools.command.SubCommandException; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -52,7 +51,7 @@ import static org.mockito.Mockito.when; public class GetNamesrvConfigCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java index af4deace0a9cd563944a336e8548f97f5d992797..c4edcafe6f2dc52c49a840b059f742989401b145 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java @@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock; public class UpdateKvConfigCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java index 11711d08866e80f2f185fbfc6cef15b325f0203e..9befdf8948ab016ee920c43d9f307eae3de26818 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java @@ -49,7 +49,7 @@ import static org.mockito.Mockito.when; public class WipeWritePermSubCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java index 86454008c220cdcc5f4d88bc78a5cabfc5971a40..a01bf8167ae65c36046a97fa495686020a691002 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java @@ -47,7 +47,7 @@ import static org.mockito.Mockito.when; public class GetConsumerStatusCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java index b7af8c870b56d62a58738d16ee687d5cdebc1687..d73a996b3fbcd60a73f998cf2475340b5562bbb3 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java @@ -53,7 +53,7 @@ import static org.mockito.Mockito.when; public class ResetOffsetByTimeCommandTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; @BeforeClass diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java index 4989a9b55ab37920a97415fbae24de81a7334300..57278b9b222fd61b0227c87ab4d2445f8da73408 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java @@ -69,7 +69,7 @@ import static org.mockito.Mockito.when; public class MonitorServiceTest { private static DefaultMQAdminExt defaultMQAdminExt; private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; - private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig()); + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private static MQClientAPIImpl mQClientAPIImpl; private static MonitorConfig monitorConfig; private static MonitorListener monitorListener;