diff --git a/README.md b/README.md
index a5f47e597cee42340069a9bf63fd0e51eb6e3372..dc53b3afc730656acc69c9564d1082c2d5d1f263 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,7 @@
## Apache RocketMQ [![Build Status](https://travis-ci.org/apache/rocketmq.svg?branch=master)](https://travis-ci.org/apache/rocketmq) [![Coverage Status](https://coveralls.io/repos/github/apache/rocketmq/badge.svg?branch=master)](https://coveralls.io/github/apache/rocketmq?branch=master)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.rocketmq/rocketmq-all/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Corg.apache.rocketmq)
-[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://rocketmq.apache.org/dowloading/releases)
+[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)]
+(https://rocketmq.apache.org/dowloading/releases)
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
**[Apache RocketMQ](https://rocketmq.apache.org) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.**
@@ -47,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu
----------
## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
+
+
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index b5e608558cbfc148388f28f44c76f4e1cb315fd1..971237929ab0bbcca01237677881becee84d3c4c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0a4f9086a87d88603f00f0335330af560d..4b055e0fc2f90a13253084cfce0ebebaa2547bf0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -548,10 +548,10 @@ public class MQClientInstance {
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
- log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
+ log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
- id, addr);
+ id, addr, e);
}
}
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index 66f7f5dc8f5fc4b2f545081315692951d612d34d..78457dbfe8a7e78052c2c2a65ac719edd9ad56b0 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@ -42,9 +42,9 @@ public class DefaultMessageStoreShutDownTest {
public void init() throws Exception {
messageStore = spy(buildMessageStore());
boolean load = messageStore.load();
- when(messageStore.dispatchBehindBytes()).thenReturn(100L);
assertTrue(load);
messageStore.start();
+ when(messageStore.dispatchBehindBytes()).thenReturn(100L);
}
@Test
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index fb9a7798df5b6fb2c81511cc76b9f5cd0c5e7893..78a6837727c44abd139065961f48958960eb4b12 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -115,7 +115,7 @@ public class MessageStoreTestBase extends StoreTestBase {
protected void doGetMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) {
for (int i = 0; i < num; i++) {
- GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, beginLogicsOffset + i, 3, null);
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, queueId, beginLogicsOffset + i, 3, null);
Assert.assertNotNull(getMessageResult);
Assert.assertTrue(!getMessageResult.getMessageBufferList().isEmpty());
MessageExt messageExt = MessageDecoder.decode(getMessageResult.getMessageBufferList().get(0));
diff --git a/store/src/test/resources/logback-test.xml b/store/src/test/resources/logback-test.xml
index 875b6715ac993f55f30946d038231aa3a85cd17d..a033816ddad7359e1edcdb5a4ae5a0cd9aa001a1 100644
--- a/store/src/test/resources/logback-test.xml
+++ b/store/src/test/resources/logback-test.xml
@@ -28,7 +28,7 @@
-
+
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index c37cf1f96ba57c647ef951405f1aaad50e2fc2ec..48508462668e8f523d5e5ab6e389c93d9a3e49ef 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory;
+import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
@@ -64,6 +65,7 @@ public class ConsumerFactory {
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
+ defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
defaultMQPullConsumer.setNamesrvAddr(nsAddr);
defaultMQPullConsumer.start();
return defaultMQPullConsumer;
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
index 66767cc9f0d682da8112ff2c989186b745aecd3c..76e6e09d0bf7f88899960b4197d24ea1e48cbd0c 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.factory;
+import java.util.UUID;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.test.util.RandomUtil;
@@ -25,6 +26,7 @@ public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
+ producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns);
try {
producer.start();
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
index bd151d0561ef6e37732857ce75aca599184a996e..8863ee3e52df1911385031b6645a87ef3918bd04 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.test.util;
import java.util.HashMap;
import java.util.Set;
+import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
@@ -40,6 +41,7 @@ public class MQAdmin {
int queueNum, int waitTimeSec) {
boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setInstanceName(UUID.randomUUID().toString());
mqAdminExt.setNamesrvAddr(nameSrvAddr);
try {
mqAdminExt.start();
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 1f4fe7175ececfb92e6ecb0f362820e434d0e758..d9955c04de333390520532f5cbeaa3b0e3d7ac73 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -39,7 +39,7 @@ public class BaseConf {
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
- protected static int consumeTime = 5 * 60 * 1000;
+ protected static int consumeTime = 2 * 60 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 0ac321e4a3121d6ffa4e4385e0940870daaeb06b..0eacd584fadd0843129911ac33e1891f2b64d1e1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -50,10 +50,10 @@ public class IntegrationTestBase {
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
protected static final int INDEX_NUM = 1000;
- private static final AtomicInteger port = new AtomicInteger(50000);
+ private static final AtomicInteger port = new AtomicInteger(40000);
public static synchronized int nextPort() {
- return port.addAndGet(5);
+ return port.addAndGet(random.nextInt(10) + 10);
}
protected static Random random = new Random();
@@ -110,7 +110,7 @@ public class IntegrationTestBase {
namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties");
- nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
+ nameServerNettyServerConfig.setListenPort(nextPort());
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try {
Assert.assertTrue(namesrvController.initialize());
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
index 813447f21fa55561d6cb8c6d1b6a7fc1169e1002..7fd229749f4d43411759a317f782bdb50336fbb7 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
@@ -19,12 +19,13 @@ import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.factory.ProducerFactory;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import static org.apache.rocketmq.test.base.IntegrationTestBase.nextPort;
import static sun.util.locale.BaseLocale.SEP;
-public class ProduceAndConsumeTest {
+public class DLedgerProduceAndConsumeIT {
public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
BrokerConfig brokerConfig = new BrokerConfig();
@@ -58,7 +59,7 @@ public class ProduceAndConsumeTest {
BrokerConfig brokerConfig = buildBrokerConfig(cluster, brokerName);
MessageStoreConfig storeConfig = buildStoreConfig(brokerName, peers, selfId);
BrokerController brokerController = IntegrationTestBase.createAndStartBroker(storeConfig, brokerConfig);
- Thread.sleep(1000);
+ Thread.sleep(3000);
Assert.assertEquals(BrokerRole.SYNC_MASTER, storeConfig.getBrokerRole());
@@ -97,6 +98,8 @@ public class ProduceAndConsumeTest {
Assert.assertArrayEquals(("Hello" + i).getBytes(), messageExt.getBody());
}
+ producer.shutdown();
+ consumer.shutdown();
brokerController.shutdown();
}
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
index 9294c3fd93411e0226c4e6a09fe2f17a33dabd1c..1d7ea2033acc039a05b028bb5a29f4c04c3550f8 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
@@ -29,10 +29,15 @@ import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
+/**
+ * Currently, dose not support the ordered broadcast message
+ */
+@Ignore
public class OrderMsgBroadCastIT extends BaseBroadCastIT {
private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class);
private RMQNormalProducer producer = null;
diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java
index 8cb0f41c1c01fa33f4118147c8fce4d63a027d26..b97b0637b043b243f791188d5a1b10c981fe436c 100644
--- a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java
@@ -51,7 +51,8 @@ public class NormalMsgDelayIT extends DelayConf {
}
@Test
- public void testDelayLevell() {
+ public void testDelayLevel1() throws Exception {
+ Thread.sleep(3000);
int delayLevel = 1;
List