diff --git a/.travis.yml b/.travis.yml
index dd57ba3030c259e8ca26bbe3f4c0048325750c97..916cac5765dda3f9dec70ad52d140274f35c28ca 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -39,4 +39,5 @@ script:
- travis_retry mvn -B package jacoco:report coveralls:report
after_success:
+ - mvn clean install -Pit-test
- mvn sonar:sonar
diff --git a/BUILDING b/BUILDING
index a92cbd53f9412bf76bab31e1890f592a6a25e9fb..1498b3e323c0fd60ef2f7c9eed12e7c0e5476281 100644
--- a/BUILDING
+++ b/BUILDING
@@ -34,4 +34,4 @@ Then, import to eclipse by specifying the root directory of the project via:
Execute the following command in order to build the tar.gz packages and install JAR to the local repository:
-$ mvn clean package install -Prelease-all assembly:assembly -U
\ No newline at end of file
+$ mvn clean install -Prelease-all assembly:assembly -U
\ No newline at end of file
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 8726c697d584df7d2af9052c5bf20dd8bbb58a5e..039c94298488a06ababcb84d7ba31eec72cc7a76 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -173,7 +173,6 @@ public class BrokerOuterAPI {
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
- result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 92dc5e72b25185b833c44ad1281af3282776af2d..bdf2a01efea6b426e0e16370ddd8e6c664d58fe0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -184,7 +184,7 @@ public class SubscriptionGroupManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
} else {
- log.warn("delete subscription group failed, subscription group: {} not exist", old);
+ log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
}
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 6f2c9a38d60252b970fd4ea4ec137653484e4328..f596b8365e758e1b72d66ab6ba74b050927bb365 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -53,8 +53,9 @@ public class MQClientManager {
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
+ log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
- log.warn("Previous MQClientInstance has created for clientId:[{}]", clientId);
+ log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..861e284138e29d016accd2e18cf49925773f1f5a
--- /dev/null
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.assertj.core.util.Maps;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.*;
+
+public class DefaultRequestProcessorTest {
+ /** Test Target */
+ private DefaultRequestProcessor defaultRequestProcessor;
+
+ private NamesrvController namesrvController;
+
+ private NamesrvConfig namesrvConfig;
+
+ private NettyServerConfig nettyServerConfig;
+
+ private Logger logger;
+
+ @Before
+ public void init() throws Exception {
+ namesrvConfig = new NamesrvConfig();
+ nettyServerConfig = new NettyServerConfig();
+
+ namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
+ defaultRequestProcessor = new DefaultRequestProcessor(namesrvController);
+
+ logger = mock(Logger.class);
+ when(logger.isInfoEnabled()).thenReturn(false);
+ setFinalStatic(DefaultRequestProcessor.class.getDeclaredField("log"), logger);
+ }
+
+ @Test
+ public void testProcessRequest_PutKVConfig() throws RemotingCommandException {
+ PutKVConfigRequestHeader header = new PutKVConfigRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG,
+ header);
+ request.addExtField("namespace", "namespace");
+ request.addExtField("key", "key");
+ request.addExtField("value", "value");
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+
+ assertThat(namesrvController.getKvConfigManager().getKVConfig("namespace", "key"))
+ .isEqualTo("value");
+ }
+
+ @Test
+ public void testProcessRequest_GetKVConfigReturnNotNull() throws RemotingCommandException {
+ namesrvController.getKvConfigManager().putKVConfig("namespace", "key", "value");
+
+ GetKVConfigRequestHeader header = new GetKVConfigRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG,
+ header);
+ request.addExtField("namespace", "namespace");
+ request.addExtField("key", "key");
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+
+ GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response
+ .readCustomHeader();
+
+ assertThat(responseHeader.getValue()).isEqualTo("value");
+ }
+
+ @Test
+ public void testProcessRequest_GetKVConfigReturnNull() throws RemotingCommandException {
+ GetKVConfigRequestHeader header = new GetKVConfigRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG,
+ header);
+ request.addExtField("namespace", "namespace");
+ request.addExtField("key", "key");
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
+ assertThat(response.getRemark()).isEqualTo("No config item, Namespace: namespace Key: key");
+
+ GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response
+ .readCustomHeader();
+
+ assertThat(responseHeader.getValue()).isNull();
+ }
+
+ @Test
+ public void testProcessRequest_DeleteKVConfig() throws RemotingCommandException {
+ namesrvController.getKvConfigManager().putKVConfig("namespace", "key", "value");
+
+ DeleteKVConfigRequestHeader header = new DeleteKVConfigRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG,
+ header);
+ request.addExtField("namespace", "namespace");
+ request.addExtField("key", "key");
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+
+ assertThat(namesrvController.getKvConfigManager().getKVConfig("namespace", "key"))
+ .isNull();
+ }
+
+ @Test
+ public void testProcessRequest_RegisterBroker() throws RemotingCommandException,
+ NoSuchFieldException, IllegalAccessException {
+ RemotingCommand request = genSampleRegisterCmd(true);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+
+ RouteInfoManager routes = namesrvController.getRouteInfoManager();
+ Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
+ brokerAddrTable.setAccessible(true);
+
+ BrokerData broker = new BrokerData();
+ broker.setBrokerName("broker");
+ broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
+
+ assertThat((Map) brokerAddrTable.get(routes))
+ .contains(new HashMap.SimpleEntry("broker", broker));
+ }
+
+ @Test
+ public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
+ NoSuchFieldException, IllegalAccessException {
+ RemotingCommand request = genSampleRegisterCmd(true);
+
+ // version >= MQVersion.Version.V3_0_11.ordinal() to register with filter server
+ request.setVersion(100);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+
+ RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getRemark()).isNull();
+
+ RouteInfoManager routes = namesrvController.getRouteInfoManager();
+ Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
+ brokerAddrTable.setAccessible(true);
+
+ BrokerData broker = new BrokerData();
+ broker.setBrokerName("broker");
+ broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
+
+ assertThat((Map) brokerAddrTable.get(routes))
+ .contains(new HashMap.SimpleEntry("broker", broker));
+ }
+
+ @Test
+ public void testProcessRequest_UnregisterBroker() throws RemotingCommandException, NoSuchFieldException, IllegalAccessException {
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.channel()).thenReturn(null);
+
+ //Register broker
+ RemotingCommand regRequest = genSampleRegisterCmd(true);
+ defaultRequestProcessor.processRequest(ctx, regRequest);
+
+ //Unregister broker
+ RemotingCommand unregRequest = genSampleRegisterCmd(false);
+ RemotingCommand unregResponse = defaultRequestProcessor.processRequest(ctx, unregRequest);
+
+ assertThat(unregResponse.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(unregResponse.getRemark()).isNull();
+
+ RouteInfoManager routes = namesrvController.getRouteInfoManager();
+ Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
+ brokerAddrTable.setAccessible(true);
+
+ assertThat((Map)brokerAddrTable.get(routes)).isEmpty();
+ }
+
+
+ private static RemotingCommand genSampleRegisterCmd(boolean reg) {
+ RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
+ header.setBrokerName("broker");
+ RemotingCommand request = RemotingCommand.createRequestCommand(
+ reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, header);
+ request.addExtField("brokerName", "broker");
+ request.addExtField("brokerAddr", "10.10.1.1");
+ request.addExtField("clusterName", "cluster");
+ request.addExtField("haServerAddr", "10.10.2.1");
+ request.addExtField("brokerId", "2333");
+ return request;
+ }
+
+
+ private static void setFinalStatic(Field field, Object newValue) throws Exception {
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(null, newValue);
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8ec8170b84b8591e72dd9c8ea78801bf91e28fa6..8b8c468426877e646bb8bccc93536408cd3f26fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,9 +160,6 @@
1.7
1.7
-
-
- -Xms512m -Xmx1024m
jacoco
https://builds.apache.org/analysis
@@ -181,6 +178,7 @@
example
filtersrv
srvutil
+ test
@@ -236,25 +234,6 @@
true
-
- maven-surefire-plugin
- 2.19.1
-
- 1
- true
-
- **/*Test.java
-
-
-
-
- maven-failsafe-plugin
- 2.19.1
-
- 1
- true
-
-
maven-javadoc-plugin
2.10.4
@@ -310,6 +289,7 @@
UTF-8
true
true
+ false
check
@@ -352,12 +332,20 @@
prepare-agent
+
+ ${project.build.directory}/jacoco.exec
+
default-prepare-agent-integration
+ pre-integration-test
prepare-agent-integration
+
+ ${project.build.directory}/jacoco-it.exec
+ failsafeArgLine
+
default-report
@@ -373,6 +361,14 @@
+
+ maven-surefire-plugin
+ 2.19.1
+
+ 1
+ true
+
+
org.codehaus.mojo
findbugs-maven-plugin
@@ -475,6 +471,37 @@
+
+ it-test
+
+
+
+ maven-failsafe-plugin
+ 2.19.1
+
+ 1
+ true
+ @{failsafeArgLine}
+
+ **/NormalMsgDelayIT.java
+ **/BroadCastNormalMsgNotRecvIT.java
+ **/TagMessageWithSameGroupConsumerIT.java
+ **/AsyncSendWithMessageQueueSelectorIT.java
+ **/AsyncSendWithMessageQueueIT.java
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
@@ -537,7 +564,7 @@
${project.groupId}
- rocketmq-qatest
+ rocketmq-test
${project.version}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
index a9c0a62945366fe3f9ce8dbc6d93f3f609e31989..f80ff14c1074f78a50dffef2efe803c39e69c79a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -20,7 +20,7 @@ import com.alibaba.fastjson.JSON;
import java.nio.charset.Charset;
public abstract class RemotingSerializable {
- public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 0ebe795a12fcd498bbfff768c68ebd963d4d6c07..86dab46d41715a92f6d4102a8350114bfecc406e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -23,14 +23,14 @@ import java.util.Iterator;
import java.util.Map;
public class RocketMQSerializable {
- public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
- remarkBytes = cmd.getRemark().getBytes(RemotingSerializable.CHARSET_UTF8);
+ remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
@@ -89,9 +89,9 @@ public class RocketMQSerializable {
if (entry.getKey() != null && entry.getValue() != null) {
kvLength =
// keySize + Key
- 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length
+ 2 + entry.getKey().getBytes(CHARSET_UTF8).length
// valSize + val
- + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length;
+ + 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
totalLength += kvLength;
}
}
@@ -103,8 +103,8 @@ public class RocketMQSerializable {
while (it.hasNext()) {
Map.Entry entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
- key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8);
- val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8);
+ key = entry.getKey().getBytes(CHARSET_UTF8);
+ val = entry.getValue().getBytes(CHARSET_UTF8);
content.putShort((short) key.length);
content.put(key);
@@ -154,7 +154,7 @@ public class RocketMQSerializable {
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
- cmd.setRemark(new String(remarkContent, RemotingSerializable.CHARSET_UTF8));
+ cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap extFields
@@ -187,8 +187,7 @@ public class RocketMQSerializable {
valContent = new byte[valSize];
byteBuffer.get(valContent);
- map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent,
- RemotingSerializable.CHARSET_UTF8));
+ map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
}
return map;
}
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
index 776b3054852c87cdc336ff42ed2a8af1e9703a98..2872eb720248ac1ab3235464c72276a295e7e352 100644
--- a/style/rmq_checkstyle.xml
+++ b/style/rmq_checkstyle.xml
@@ -121,7 +121,6 @@
-
diff --git a/test/pom.xml b/test/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..09ec9d3aa2e2f5897752b3a8ed68439a90594d55
--- /dev/null
+++ b/test/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+ rocketmq-all
+ org.apache.rocketmq
+ 4.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rocketmq-test
+
+
+
+
+ log4j
+ log4j
+ 1.2.17
+
+
+ ${project.groupId}
+ rocketmq-broker
+
+
+ ${project.groupId}
+ rocketmq-namesrv
+
+
+ com.google.truth
+ truth
+ 0.30
+
+
+
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..6b2357bd83c609c381405ad221dfe97f63668f72
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.mq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class MQAsyncProducer {
+ private static Logger logger = Logger.getLogger(MQAsyncProducer.class);
+ private AbstractMQProducer producer = null;
+ private long msgNum;
+ private int intervalMills;
+ private Thread sendT;
+ private AtomicBoolean bPause = new AtomicBoolean(false);
+
+ public MQAsyncProducer(final AbstractMQProducer producer, final long msgNum,
+ final int intervalMills) {
+ this.producer = producer;
+ this.msgNum = msgNum;
+ this.intervalMills = intervalMills;
+
+ sendT = new Thread(new Runnable() {
+ public void run() {
+ for (int i = 0; i < msgNum; i++) {
+ if (!bPause.get()) {
+ producer.send();
+ TestUtil.waitForMonment(intervalMills);
+ } else {
+ while (true) {
+ if (bPause.get()) {
+ TestUtil.waitForMonment(10);
+ } else
+ break;
+ }
+ }
+
+ }
+ }
+ });
+
+ }
+
+ public void start() {
+ sendT.start();
+ }
+
+ public void waitSendAll(int waitMills) {
+ long startTime = System.currentTimeMillis();
+ while ((producer.getAllMsgBody().size() + producer.getSendErrorMsg().size()) < msgNum) {
+ if (System.currentTimeMillis() - startTime < waitMills) {
+ TestUtil.waitForMonment(200);
+ } else {
+ logger.error(String.format("time elapse:%s, but the message sending has not finished",
+ System.currentTimeMillis() - startTime));
+ break;
+ }
+ }
+ }
+
+ public void pauseProducer() {
+ bPause.set(true);
+ }
+
+ public void notifyProducer() {
+ bPause.set(false);
+ }
+
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..4a2ce2b7ad975a50d204b5ab4afd3719a4e13d46
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class RMQAsyncSendProducer extends AbstractMQProducer {
+ private static Logger logger = Logger
+ .getLogger(RMQAsyncSendProducer.class);
+ private String nsAddr = null;
+ private DefaultMQProducer producer = null;
+ private SendCallback sendCallback = null;
+ private List successSendResult = new ArrayList();
+ private AtomicInteger exceptionMsgCount = new AtomicInteger(
+ 0);
+ private int msgSize = 0;
+
+ public RMQAsyncSendProducer(String nsAddr, String topic) {
+ super(topic);
+ this.nsAddr = nsAddr;
+ sendCallback = new SendCallback() {
+ public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+ successSendResult.add(sendResult);
+ }
+
+ public void onException(Throwable throwable) {
+ exceptionMsgCount.getAndIncrement();
+ }
+ };
+
+ create();
+ start();
+ }
+
+ public int getSuccessMsgCount() {
+ return successSendResult.size();
+ }
+
+ public List getSuccessSendResult() {
+ return successSendResult;
+ }
+
+ public int getExceptionMsgCount() {
+ return exceptionMsgCount.get();
+ }
+
+ private void create() {
+ producer = new DefaultMQProducer();
+ producer.setProducerGroup(RandomUtil.getStringByUUID());
+ producer.setInstanceName(RandomUtil.getStringByUUID());
+
+ if (nsAddr != null) {
+ producer.setNamesrvAddr(nsAddr);
+ }
+
+ }
+
+ private void start() {
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ logger.error("producer start failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public SendResult send(Object msg, Object arg) {
+ return null;
+ }
+
+ public void shutdown() {
+ producer.shutdown();
+ }
+
+ public void asyncSend(Object msg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.send(metaqMsg, sendCallback);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void asyncSend(int msgSize) {
+ this.msgSize = msgSize;
+
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.asyncSend(msg);
+ }
+ }
+
+ public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.send(metaqMsg, selector, arg, sendCallback);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void asyncSend(int msgSize, MessageQueueSelector selector) {
+ this.msgSize = msgSize;
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.asyncSend(msg, selector, i);
+ }
+ }
+
+ public void asyncSend(Object msg, MessageQueue mq) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.send(metaqMsg, mq, sendCallback);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void asyncSend(int msgSize, MessageQueue mq) {
+ this.msgSize = msgSize;
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.asyncSend(msg, mq);
+ }
+ }
+
+ public void waitForResponse(int timeoutMills) {
+ long startTime = System.currentTimeMillis();
+ while (this.successSendResult.size() != this.msgSize) {
+ if (System.currentTimeMillis() - startTime < timeoutMills) {
+ TestUtil.waitForMonment(100);
+ } else {
+ logger.info("timeout but still not recv all response!");
+ break;
+ }
+ }
+ }
+
+ public void sendOneWay(Object msg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.sendOneway(metaqMsg);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendOneWay(int msgSize) {
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.sendOneWay(msg);
+ }
+ }
+
+ public void sendOneWay(Object msg, MessageQueue mq) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.sendOneway(metaqMsg, mq);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendOneWay(int msgSize, MessageQueue mq) {
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.sendOneWay(msg, mq);
+ }
+ }
+
+ public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.sendOneway(metaqMsg, selector, arg);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendOneWay(int msgSize, MessageQueueSelector selector) {
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.sendOneWay(msg, selector, i);
+ }
+ }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..8af49eac4b5efe887c8416a4ccfc695ea973b9a6
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQBroadCastConsumer extends RMQNormalConsumer {
+ private static Logger logger = Logger.getLogger(RMQBroadCastConsumer.class);
+
+ public RMQBroadCastConsumer(String nsAddr, String topic, String subExpression,
+ String consumerGroup, AbstractListener listner) {
+ super(nsAddr, topic, subExpression, consumerGroup, listner);
+ }
+
+ @Override
+ public void create() {
+ super.create();
+ consumer.setMessageModel(MessageModel.BROADCASTING);
+ }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..3f185d3a16336fc77f6ec355681d2c77bf3ad078
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
+import org.apache.rocketmq.test.listener.AbstractListener;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class RMQNormalConsumer extends AbstractMQConsumer {
+ private static Logger logger = Logger.getLogger(RMQNormalConsumer.class);
+ protected DefaultMQPushConsumer consumer = null;
+
+ public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
+ String consumerGroup, AbstractListener listner) {
+ super(nsAddr, topic, subExpression, consumerGroup, listner);
+ }
+
+ public AbstractListener getListner() {
+ return listner;
+ }
+
+ public void setListner(AbstractListener listner) {
+ this.listner = listner;
+ }
+
+ public void create() {
+ consumer = new DefaultMQPushConsumer(consumerGroup);
+ consumer.setInstanceName(RandomUtil.getStringByUUID());
+ consumer.setNamesrvAddr(nsAddr);
+ try {
+ consumer.subscribe(topic, subExpression);
+ } catch (MQClientException e) {
+ logger.error("consumer subscribe failed!");
+ e.printStackTrace();
+ }
+ consumer.setMessageListener(listner);
+ }
+
+ public void start() {
+ try {
+ consumer.start();
+ logger.info(String.format("consumer[%s] started!", consumer.getConsumerGroup()));
+ } catch (MQClientException e) {
+ logger.error("consumer start failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public void subscribe(String topic, String subExpression) {
+ try {
+ consumer.subscribe(topic, subExpression);
+ } catch (MQClientException e) {
+ logger.error("consumer subscribe failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public void shutdown() {
+ consumer.shutdown();
+ }
+
+ @Override
+ public void clearMsg() {
+ this.listner.clearMsg();
+ }
+
+ public void restart() {
+ consumer.shutdown();
+ create();
+ start();
+ }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..26b77fe846c7d002f8e5bad0e60dc63d55056789
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+
+public class RMQNormalProducer extends AbstractMQProducer {
+ private static Logger logger = Logger.getLogger(RMQNormalProducer.class);
+ private DefaultMQProducer producer = null;
+ private String nsAddr = null;
+
+ public RMQNormalProducer(String nsAddr, String topic) {
+ super(topic);
+ this.nsAddr = nsAddr;
+ create();
+ start();
+ }
+
+ public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
+ String producerInstanceName) {
+ super(topic);
+ this.producerGroupName = producerGroupName;
+ this.producerInstanceName = producerInstanceName;
+ this.nsAddr = nsAddr;
+
+ create();
+ start();
+ }
+
+ public DefaultMQProducer getProducer() {
+ return producer;
+ }
+
+ public void setProducer(DefaultMQProducer producer) {
+ this.producer = producer;
+ }
+
+ protected void create() {
+ producer = new DefaultMQProducer();
+ producer.setProducerGroup(getProducerGroupName());
+ producer.setInstanceName(getProducerInstanceName());
+
+ if (nsAddr != null) {
+ producer.setNamesrvAddr(nsAddr);
+ }
+
+ }
+
+ public void start() {
+ try {
+ producer.start();
+ super.setStartSuccess(true);
+ } catch (MQClientException e) {
+ super.setStartSuccess(false);
+ logger.error("producer start failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public SendResult send(Object msg, Object orderKey) {
+ org.apache.rocketmq.client.producer.SendResult metaqResult = null;
+ Message metaqMsg = (Message) msg;
+ try {
+ long start = System.currentTimeMillis();
+ metaqResult = producer.send(metaqMsg);
+ this.msgRTs.addData(System.currentTimeMillis() - start);
+ if (isDebug) {
+ logger.info(metaqResult);
+ }
+ sendResult.setMsgId(metaqResult.getMsgId());
+ sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
+ sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
+ } catch (Exception e) {
+ if (isDebug) {
+ e.printStackTrace();
+ }
+
+ sendResult.setSendResult(false);
+ sendResult.setSendException(e);
+ errorMsgs.addData(msg);
+ }
+
+ return sendResult;
+ }
+
+ public void send(Map> msgs) {
+ for (MessageQueue mq : msgs.keySet()) {
+ send(msgs.get(mq), mq);
+ }
+ }
+
+ public void send(List