diff --git a/test/pom.xml b/test/pom.xml
index 42ebea7cf16c5f2b0dac171f77da6c097252bda3..adb51fd4a20912e8edeca8bae72408bccc82e5be 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -47,4 +47,20 @@
0.30
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
index c3e0572d4ee5b8883c7e7fe5851543ac00242694..680780aeb5bc66bfa3d11374995289af57bf247d 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
@@ -52,7 +52,7 @@ public class MQAdmin {
while (!createResult) {
createResult = checkTopicExist(mqAdminExt, topic);
if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) {
- TestUtils.waitForMonment(100);
+ TestUtils.waitForMoment(100);
} else {
log.error(String.format("timeout,but create topic[%s] failed!", topic));
break;
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java
index 6326d46dc311516a2cd8754c9cdc208487fe6a27..3eb1f7d741b6dbee5623e7afb7f1c7c3b2ac6df4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.test.util;
import java.util.concurrent.TimeUnit;
public class TestUtils {
- public static void waitForMonment(long time) {
+ public static void waitForMoment(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException var3) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 57462a247a9f180eebb6e500b3af41802765e493..92f77b8533d34e0a3015455fc21c6b1d34f15a25 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -42,7 +42,6 @@ public class BaseConf {
protected static int brokerNum;
protected static int waitTime = 5;
protected static int consumeTime = 1 * 60 * 1000;
- protected static int topicCreateTime = 30 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
@@ -66,22 +65,8 @@ public class BaseConf {
}
public static String initTopic() {
- long startTime = System.currentTimeMillis();
String topic = MQRandomUtils.getRandomTopic();
- boolean createResult = false;
- while (true) {
- createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
- if (createResult) {
- break;
- } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
- Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
- System.currentTimeMillis() - startTime));
- break;
- } else {
- TestUtils.waitForMonment(500);
- continue;
- }
- }
+ IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
return topic;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index ff9996d45b911a7e8ece203ee6e436d1bd280ccd..5329991a3e3596dc3ddaee2f7f6375c9077f8ce8 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -30,18 +30,23 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.TestUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IntegrationTestBase {
+ public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
+
protected static final String SEP = File.separator;
protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
protected static final List TMPE_FILES = new ArrayList<>();
protected static final List BROKER_CONTROLLERS = new ArrayList<>();
protected static final List NAMESRV_CONTROLLERS = new ArrayList<>();
- public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
+ protected static int topicCreateTime = 30 * 1000;
+
protected static Random random = new Random();
static {
@@ -125,6 +130,27 @@ public class IntegrationTestBase {
return brokerController;
}
+ public static boolean initTopic(String topic, String nsAddr, String clusterName) {
+ long startTime = System.currentTimeMillis();
+ boolean createResult;
+
+ while (true) {
+ createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
+ if (createResult) {
+ break;
+ } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
+ Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
+ System.currentTimeMillis() - startTime));
+ break;
+ } else {
+ TestUtils.waitForMoment(500);
+ continue;
+ }
+ }
+
+ return createResult;
+ }
+
public static void deleteFile(File file) {
if (!file.exists()) {
return;
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
index 03e81eb593e96c16d47760d6abcc44039609a53c..4cf8161e39c4a0c7c8a659651882448c3d9425f7 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWithSameGroupConsumerIT.java
@@ -78,10 +78,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100);
- TestUtils.waitForMonment(5);
+ TestUtils.waitForMoment(5);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName));
- TestUtils.waitForMonment(5);
+ TestUtils.waitForMoment(5);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
@@ -102,10 +102,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100);
- TestUtils.waitForMonment(5);
+ TestUtils.waitForMoment(5);
consumer2.shutdown();
mqClients.remove(1);
- TestUtils.waitForMonment(5);
+ TestUtils.waitForMoment(5);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
index 4125433bab0cdd412f3f0a4897d8ed44ab859675..b3d258f6fbb154fe2749abeceb6642cabbb641db 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
@@ -112,7 +112,7 @@ public class AsyncSendExceptionIT extends BaseConf {
int checkNum = 50;
while (!sendFail && checkNum > 0) {
checkNum--;
- TestUtils.waitForMonment(100);
+ TestUtils.waitForMoment(100);
}
producer.shutdown();
assertThat(sendFail).isEqualTo(true);
@@ -141,7 +141,7 @@ public class AsyncSendExceptionIT extends BaseConf {
int checkNum = 50;
while (sendFail && checkNum > 0) {
checkNum--;
- TestUtils.waitForMonment(100);
+ TestUtils.waitForMoment(100);
}
producer.shutdown();
assertThat(sendFail).isEqualTo(false);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java
index 92c40c7ff7d268b30f6f08ed115a10a477b38047..2cdd66c9923d33af5e9a358f578778bdd3b71dc3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java
@@ -64,7 +64,7 @@ public class QueryMsgByIdIT extends BaseConf {
MessageExt recvMsg = (MessageExt) consumer.getListner().getFirstMsg();
MessageExt queryMsg = null;
try {
- TestUtils.waitForMonment(3000);
+ TestUtils.waitForMoment(3000);
queryMsg = producer.getProducer().viewMessage(((MessageClientExt) recvMsg).getOffsetMsgId());
} catch (Exception e) {
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
index ec45a2925b054d67a128f4766a3caea9c488215e..68dd8db2ddfbc014f1e31e5aa142a9730081fcf4 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java
@@ -59,7 +59,7 @@ public class QueryMsgByKeyIT extends BaseConf {
List queryMsgs = null;
try {
- TestUtils.waitForMonment(500 * 3);
+ TestUtils.waitForMoment(500 * 3);
queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 5000,
System.currentTimeMillis() + 5000).getMessageList();
} catch (Exception e) {
@@ -89,7 +89,7 @@ public class QueryMsgByKeyIT extends BaseConf {
i--;
queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000,
System.currentTimeMillis() + 15000).getMessageList();
- TestUtils.waitForMonment(1000);
+ TestUtils.waitForMoment(1000);
if (i == 0 || (queryMsgs != null && queryMsgs.size() == max)) {
break;