提交 ab013861 编写于 作者: S shtykh_roman 提交者: dongeforever

[ROCKETMQ-76] Expose IntegrationTestBase to be used by other integration...

[ROCKETMQ-76] Expose IntegrationTestBase to be used by other integration projects closes apache/incubator-rocketmq#52
上级 7e37799e
...@@ -47,4 +47,20 @@ ...@@ -47,4 +47,20 @@
<version>0.30</version> <version>0.30</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>
...@@ -52,7 +52,7 @@ public class MQAdmin { ...@@ -52,7 +52,7 @@ public class MQAdmin {
while (!createResult) { while (!createResult) {
createResult = checkTopicExist(mqAdminExt, topic); createResult = checkTopicExist(mqAdminExt, topic);
if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) { if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) {
TestUtils.waitForMonment(100); TestUtils.waitForMoment(100);
} else { } else {
log.error(String.format("timeout,but create topic[%s] failed!", topic)); log.error(String.format("timeout,but create topic[%s] failed!", topic));
break; break;
......
...@@ -20,7 +20,7 @@ package org.apache.rocketmq.test.util; ...@@ -20,7 +20,7 @@ package org.apache.rocketmq.test.util;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class TestUtils { public class TestUtils {
public static void waitForMonment(long time) { public static void waitForMoment(long time) {
try { try {
Thread.sleep(time); Thread.sleep(time);
} catch (InterruptedException var3) { } catch (InterruptedException var3) {
......
...@@ -42,7 +42,6 @@ public class BaseConf { ...@@ -42,7 +42,6 @@ public class BaseConf {
protected static int brokerNum; protected static int brokerNum;
protected static int waitTime = 5; protected static int waitTime = 5;
protected static int consumeTime = 1 * 60 * 1000; protected static int consumeTime = 1 * 60 * 1000;
protected static int topicCreateTime = 30 * 1000;
protected static NamesrvController namesrvController; protected static NamesrvController namesrvController;
protected static BrokerController brokerController1; protected static BrokerController brokerController1;
protected static BrokerController brokerController2; protected static BrokerController brokerController2;
...@@ -66,22 +65,8 @@ public class BaseConf { ...@@ -66,22 +65,8 @@ public class BaseConf {
} }
public static String initTopic() { public static String initTopic() {
long startTime = System.currentTimeMillis();
String topic = MQRandomUtils.getRandomTopic(); String topic = MQRandomUtils.getRandomTopic();
boolean createResult = false; IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
while (true) {
createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
System.currentTimeMillis() - startTime));
break;
} else {
TestUtils.waitForMonment(500);
continue;
}
}
return topic; return topic;
} }
......
...@@ -30,18 +30,23 @@ import org.apache.rocketmq.namesrv.NamesrvController; ...@@ -30,18 +30,23 @@ import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class IntegrationTestBase { public class IntegrationTestBase {
public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
protected static final String SEP = File.separator; protected static final String SEP = File.separator;
protected static final String BROKER_NAME_PREFIX = "TestBrokerName_"; protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0); protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
protected static final List<File> TMPE_FILES = new ArrayList<>(); protected static final List<File> TMPE_FILES = new ArrayList<>();
protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>(); protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>();
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>(); protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class); protected static int topicCreateTime = 30 * 1000;
protected static Random random = new Random(); protected static Random random = new Random();
static { static {
...@@ -125,6 +130,27 @@ public class IntegrationTestBase { ...@@ -125,6 +130,27 @@ public class IntegrationTestBase {
return brokerController; return brokerController;
} }
public static boolean initTopic(String topic, String nsAddr, String clusterName) {
long startTime = System.currentTimeMillis();
boolean createResult;
while (true) {
createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
System.currentTimeMillis() - startTime));
break;
} else {
TestUtils.waitForMoment(500);
continue;
}
}
return createResult;
}
public static void deleteFile(File file) { public static void deleteFile(File file) {
if (!file.exists()) { if (!file.exists()) {
return; return;
......
...@@ -78,10 +78,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -78,10 +78,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag, RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100); producer.send(tag, msgSize, 100);
TestUtils.waitForMonment(5); TestUtils.waitForMoment(5);
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag, RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), tag,
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
TestUtils.waitForMonment(5); TestUtils.waitForMoment(5);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
...@@ -102,10 +102,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf { ...@@ -102,10 +102,10 @@ public class TagMessageWithSameGroupConsumerIT extends BaseConf {
new RMQNormalListner(originMsgDCName, msgBodyDCName)); new RMQNormalListner(originMsgDCName, msgBodyDCName));
producer.send(tag, msgSize, 100); producer.send(tag, msgSize, 100);
TestUtils.waitForMonment(5); TestUtils.waitForMoment(5);
consumer2.shutdown(); consumer2.shutdown();
mqClients.remove(1); mqClients.remove(1);
TestUtils.waitForMonment(5); TestUtils.waitForMoment(5);
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
......
...@@ -112,7 +112,7 @@ public class AsyncSendExceptionIT extends BaseConf { ...@@ -112,7 +112,7 @@ public class AsyncSendExceptionIT extends BaseConf {
int checkNum = 50; int checkNum = 50;
while (!sendFail && checkNum > 0) { while (!sendFail && checkNum > 0) {
checkNum--; checkNum--;
TestUtils.waitForMonment(100); TestUtils.waitForMoment(100);
} }
producer.shutdown(); producer.shutdown();
assertThat(sendFail).isEqualTo(true); assertThat(sendFail).isEqualTo(true);
...@@ -141,7 +141,7 @@ public class AsyncSendExceptionIT extends BaseConf { ...@@ -141,7 +141,7 @@ public class AsyncSendExceptionIT extends BaseConf {
int checkNum = 50; int checkNum = 50;
while (sendFail && checkNum > 0) { while (sendFail && checkNum > 0) {
checkNum--; checkNum--;
TestUtils.waitForMonment(100); TestUtils.waitForMoment(100);
} }
producer.shutdown(); producer.shutdown();
assertThat(sendFail).isEqualTo(false); assertThat(sendFail).isEqualTo(false);
......
...@@ -64,7 +64,7 @@ public class QueryMsgByIdIT extends BaseConf { ...@@ -64,7 +64,7 @@ public class QueryMsgByIdIT extends BaseConf {
MessageExt recvMsg = (MessageExt) consumer.getListner().getFirstMsg(); MessageExt recvMsg = (MessageExt) consumer.getListner().getFirstMsg();
MessageExt queryMsg = null; MessageExt queryMsg = null;
try { try {
TestUtils.waitForMonment(3000); TestUtils.waitForMoment(3000);
queryMsg = producer.getProducer().viewMessage(((MessageClientExt) recvMsg).getOffsetMsgId()); queryMsg = producer.getProducer().viewMessage(((MessageClientExt) recvMsg).getOffsetMsgId());
} catch (Exception e) { } catch (Exception e) {
} }
......
...@@ -59,7 +59,7 @@ public class QueryMsgByKeyIT extends BaseConf { ...@@ -59,7 +59,7 @@ public class QueryMsgByKeyIT extends BaseConf {
List<MessageExt> queryMsgs = null; List<MessageExt> queryMsgs = null;
try { try {
TestUtils.waitForMonment(500 * 3); TestUtils.waitForMoment(500 * 3);
queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 5000, queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 5000,
System.currentTimeMillis() + 5000).getMessageList(); System.currentTimeMillis() + 5000).getMessageList();
} catch (Exception e) { } catch (Exception e) {
...@@ -89,7 +89,7 @@ public class QueryMsgByKeyIT extends BaseConf { ...@@ -89,7 +89,7 @@ public class QueryMsgByKeyIT extends BaseConf {
i--; i--;
queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000, queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000,
System.currentTimeMillis() + 15000).getMessageList(); System.currentTimeMillis() + 15000).getMessageList();
TestUtils.waitForMonment(1000); TestUtils.waitForMoment(1000);
if (i == 0 || (queryMsgs != null && queryMsgs.size() == max)) { if (i == 0 || (queryMsgs != null && queryMsgs.size() == max)) {
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册