diff --git a/.travis.yml b/.travis.yml
index dd57ba3030c259e8ca26bbe3f4c0048325750c97..916cac5765dda3f9dec70ad52d140274f35c28ca 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -39,4 +39,5 @@ script:
- travis_retry mvn -B package jacoco:report coveralls:report
after_success:
+ - mvn clean install -Pit-test
- mvn sonar:sonar
diff --git a/BUILDING b/BUILDING
index a92cbd53f9412bf76bab31e1890f592a6a25e9fb..1498b3e323c0fd60ef2f7c9eed12e7c0e5476281 100644
--- a/BUILDING
+++ b/BUILDING
@@ -34,4 +34,4 @@ Then, import to eclipse by specifying the root directory of the project via:
Execute the following command in order to build the tar.gz packages and install JAR to the local repository:
-$ mvn clean package install -Prelease-all assembly:assembly -U
\ No newline at end of file
+$ mvn clean install -Prelease-all assembly:assembly -U
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8ec8170b84b8591e72dd9c8ea78801bf91e28fa6..8b8c468426877e646bb8bccc93536408cd3f26fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,9 +160,6 @@
1.7
1.7
-
-
- -Xms512m -Xmx1024m
jacoco
https://builds.apache.org/analysis
@@ -181,6 +178,7 @@
example
filtersrv
srvutil
+ test
@@ -236,25 +234,6 @@
true
-
- maven-surefire-plugin
- 2.19.1
-
- 1
- true
-
- **/*Test.java
-
-
-
-
- maven-failsafe-plugin
- 2.19.1
-
- 1
- true
-
-
maven-javadoc-plugin
2.10.4
@@ -310,6 +289,7 @@
UTF-8
true
true
+ false
check
@@ -352,12 +332,20 @@
prepare-agent
+
+ ${project.build.directory}/jacoco.exec
+
default-prepare-agent-integration
+ pre-integration-test
prepare-agent-integration
+
+ ${project.build.directory}/jacoco-it.exec
+ failsafeArgLine
+
default-report
@@ -373,6 +361,14 @@
+
+ maven-surefire-plugin
+ 2.19.1
+
+ 1
+ true
+
+
org.codehaus.mojo
findbugs-maven-plugin
@@ -475,6 +471,37 @@
+
+ it-test
+
+
+
+ maven-failsafe-plugin
+ 2.19.1
+
+ 1
+ true
+ @{failsafeArgLine}
+
+ **/NormalMsgDelayIT.java
+ **/BroadCastNormalMsgNotRecvIT.java
+ **/TagMessageWithSameGroupConsumerIT.java
+ **/AsyncSendWithMessageQueueSelectorIT.java
+ **/AsyncSendWithMessageQueueIT.java
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
@@ -537,7 +564,7 @@
${project.groupId}
- rocketmq-qatest
+ rocketmq-test
${project.version}
diff --git a/test/pom.xml b/test/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..09ec9d3aa2e2f5897752b3a8ed68439a90594d55
--- /dev/null
+++ b/test/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+ rocketmq-all
+ org.apache.rocketmq
+ 4.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rocketmq-test
+
+
+
+
+ log4j
+ log4j
+ 1.2.17
+
+
+ ${project.groupId}
+ rocketmq-broker
+
+
+ ${project.groupId}
+ rocketmq-namesrv
+
+
+ com.google.truth
+ truth
+ 0.30
+
+
+
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..6b2357bd83c609c381405ad221dfe97f63668f72
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.mq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class MQAsyncProducer {
+ private static Logger logger = Logger.getLogger(MQAsyncProducer.class);
+ private AbstractMQProducer producer = null;
+ private long msgNum;
+ private int intervalMills;
+ private Thread sendT;
+ private AtomicBoolean bPause = new AtomicBoolean(false);
+
+ public MQAsyncProducer(final AbstractMQProducer producer, final long msgNum,
+ final int intervalMills) {
+ this.producer = producer;
+ this.msgNum = msgNum;
+ this.intervalMills = intervalMills;
+
+ sendT = new Thread(new Runnable() {
+ public void run() {
+ for (int i = 0; i < msgNum; i++) {
+ if (!bPause.get()) {
+ producer.send();
+ TestUtil.waitForMonment(intervalMills);
+ } else {
+ while (true) {
+ if (bPause.get()) {
+ TestUtil.waitForMonment(10);
+ } else
+ break;
+ }
+ }
+
+ }
+ }
+ });
+
+ }
+
+ public void start() {
+ sendT.start();
+ }
+
+ public void waitSendAll(int waitMills) {
+ long startTime = System.currentTimeMillis();
+ while ((producer.getAllMsgBody().size() + producer.getSendErrorMsg().size()) < msgNum) {
+ if (System.currentTimeMillis() - startTime < waitMills) {
+ TestUtil.waitForMonment(200);
+ } else {
+ logger.error(String.format("time elapse:%s, but the message sending has not finished",
+ System.currentTimeMillis() - startTime));
+ break;
+ }
+ }
+ }
+
+ public void pauseProducer() {
+ bPause.set(true);
+ }
+
+ public void notifyProducer() {
+ bPause.set(false);
+ }
+
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..4a2ce2b7ad975a50d204b5ab4afd3719a4e13d46
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class RMQAsyncSendProducer extends AbstractMQProducer {
+ private static Logger logger = Logger
+ .getLogger(RMQAsyncSendProducer.class);
+ private String nsAddr = null;
+ private DefaultMQProducer producer = null;
+ private SendCallback sendCallback = null;
+ private List successSendResult = new ArrayList();
+ private AtomicInteger exceptionMsgCount = new AtomicInteger(
+ 0);
+ private int msgSize = 0;
+
+ public RMQAsyncSendProducer(String nsAddr, String topic) {
+ super(topic);
+ this.nsAddr = nsAddr;
+ sendCallback = new SendCallback() {
+ public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+ successSendResult.add(sendResult);
+ }
+
+ public void onException(Throwable throwable) {
+ exceptionMsgCount.getAndIncrement();
+ }
+ };
+
+ create();
+ start();
+ }
+
+ public int getSuccessMsgCount() {
+ return successSendResult.size();
+ }
+
+ public List getSuccessSendResult() {
+ return successSendResult;
+ }
+
+ public int getExceptionMsgCount() {
+ return exceptionMsgCount.get();
+ }
+
+ private void create() {
+ producer = new DefaultMQProducer();
+ producer.setProducerGroup(RandomUtil.getStringByUUID());
+ producer.setInstanceName(RandomUtil.getStringByUUID());
+
+ if (nsAddr != null) {
+ producer.setNamesrvAddr(nsAddr);
+ }
+
+ }
+
+ private void start() {
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ logger.error("producer start failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public SendResult send(Object msg, Object arg) {
+ return null;
+ }
+
+ public void shutdown() {
+ producer.shutdown();
+ }
+
+ public void asyncSend(Object msg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.send(metaqMsg, sendCallback);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void asyncSend(int msgSize) {
+ this.msgSize = msgSize;
+
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.asyncSend(msg);
+ }
+ }
+
+ public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.send(metaqMsg, selector, arg, sendCallback);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void asyncSend(int msgSize, MessageQueueSelector selector) {
+ this.msgSize = msgSize;
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.asyncSend(msg, selector, i);
+ }
+ }
+
+ public void asyncSend(Object msg, MessageQueue mq) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.send(metaqMsg, mq, sendCallback);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void asyncSend(int msgSize, MessageQueue mq) {
+ this.msgSize = msgSize;
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.asyncSend(msg, mq);
+ }
+ }
+
+ public void waitForResponse(int timeoutMills) {
+ long startTime = System.currentTimeMillis();
+ while (this.successSendResult.size() != this.msgSize) {
+ if (System.currentTimeMillis() - startTime < timeoutMills) {
+ TestUtil.waitForMonment(100);
+ } else {
+ logger.info("timeout but still not recv all response!");
+ break;
+ }
+ }
+ }
+
+ public void sendOneWay(Object msg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.sendOneway(metaqMsg);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendOneWay(int msgSize) {
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.sendOneWay(msg);
+ }
+ }
+
+ public void sendOneWay(Object msg, MessageQueue mq) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.sendOneway(metaqMsg, mq);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendOneWay(int msgSize, MessageQueue mq) {
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.sendOneWay(msg, mq);
+ }
+ }
+
+ public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) {
+ Message metaqMsg = (Message) msg;
+ try {
+ producer.sendOneway(metaqMsg, selector, arg);
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void sendOneWay(int msgSize, MessageQueueSelector selector) {
+ for (int i = 0; i < msgSize; i++) {
+ Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
+ this.sendOneWay(msg, selector, i);
+ }
+ }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..8af49eac4b5efe887c8416a4ccfc695ea973b9a6
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQBroadCastConsumer extends RMQNormalConsumer {
+ private static Logger logger = Logger.getLogger(RMQBroadCastConsumer.class);
+
+ public RMQBroadCastConsumer(String nsAddr, String topic, String subExpression,
+ String consumerGroup, AbstractListener listner) {
+ super(nsAddr, topic, subExpression, consumerGroup, listner);
+ }
+
+ @Override
+ public void create() {
+ super.create();
+ consumer.setMessageModel(MessageModel.BROADCASTING);
+ }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..3f185d3a16336fc77f6ec355681d2c77bf3ad078
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
+import org.apache.rocketmq.test.listener.AbstractListener;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class RMQNormalConsumer extends AbstractMQConsumer {
+ private static Logger logger = Logger.getLogger(RMQNormalConsumer.class);
+ protected DefaultMQPushConsumer consumer = null;
+
+ public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
+ String consumerGroup, AbstractListener listner) {
+ super(nsAddr, topic, subExpression, consumerGroup, listner);
+ }
+
+ public AbstractListener getListner() {
+ return listner;
+ }
+
+ public void setListner(AbstractListener listner) {
+ this.listner = listner;
+ }
+
+ public void create() {
+ consumer = new DefaultMQPushConsumer(consumerGroup);
+ consumer.setInstanceName(RandomUtil.getStringByUUID());
+ consumer.setNamesrvAddr(nsAddr);
+ try {
+ consumer.subscribe(topic, subExpression);
+ } catch (MQClientException e) {
+ logger.error("consumer subscribe failed!");
+ e.printStackTrace();
+ }
+ consumer.setMessageListener(listner);
+ }
+
+ public void start() {
+ try {
+ consumer.start();
+ logger.info(String.format("consumer[%s] started!", consumer.getConsumerGroup()));
+ } catch (MQClientException e) {
+ logger.error("consumer start failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public void subscribe(String topic, String subExpression) {
+ try {
+ consumer.subscribe(topic, subExpression);
+ } catch (MQClientException e) {
+ logger.error("consumer subscribe failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public void shutdown() {
+ consumer.shutdown();
+ }
+
+ @Override
+ public void clearMsg() {
+ this.listner.clearMsg();
+ }
+
+ public void restart() {
+ consumer.shutdown();
+ create();
+ start();
+ }
+}
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..26b77fe846c7d002f8e5bad0e60dc63d55056789
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+
+public class RMQNormalProducer extends AbstractMQProducer {
+ private static Logger logger = Logger.getLogger(RMQNormalProducer.class);
+ private DefaultMQProducer producer = null;
+ private String nsAddr = null;
+
+ public RMQNormalProducer(String nsAddr, String topic) {
+ super(topic);
+ this.nsAddr = nsAddr;
+ create();
+ start();
+ }
+
+ public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
+ String producerInstanceName) {
+ super(topic);
+ this.producerGroupName = producerGroupName;
+ this.producerInstanceName = producerInstanceName;
+ this.nsAddr = nsAddr;
+
+ create();
+ start();
+ }
+
+ public DefaultMQProducer getProducer() {
+ return producer;
+ }
+
+ public void setProducer(DefaultMQProducer producer) {
+ this.producer = producer;
+ }
+
+ protected void create() {
+ producer = new DefaultMQProducer();
+ producer.setProducerGroup(getProducerGroupName());
+ producer.setInstanceName(getProducerInstanceName());
+
+ if (nsAddr != null) {
+ producer.setNamesrvAddr(nsAddr);
+ }
+
+ }
+
+ public void start() {
+ try {
+ producer.start();
+ super.setStartSuccess(true);
+ } catch (MQClientException e) {
+ super.setStartSuccess(false);
+ logger.error("producer start failed!");
+ e.printStackTrace();
+ }
+ }
+
+ public SendResult send(Object msg, Object orderKey) {
+ org.apache.rocketmq.client.producer.SendResult metaqResult = null;
+ Message metaqMsg = (Message) msg;
+ try {
+ long start = System.currentTimeMillis();
+ metaqResult = producer.send(metaqMsg);
+ this.msgRTs.addData(System.currentTimeMillis() - start);
+ if (isDebug) {
+ logger.info(metaqResult);
+ }
+ sendResult.setMsgId(metaqResult.getMsgId());
+ sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
+ sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
+ msgBodys.addData(new String(metaqMsg.getBody()));
+ originMsgs.addData(msg);
+ originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
+ } catch (Exception e) {
+ if (isDebug) {
+ e.printStackTrace();
+ }
+
+ sendResult.setSendResult(false);
+ sendResult.setSendException(e);
+ errorMsgs.addData(msg);
+ }
+
+ return sendResult;
+ }
+
+ public void send(Map> msgs) {
+ for (MessageQueue mq : msgs.keySet()) {
+ send(msgs.get(mq), mq);
+ }
+ }
+
+ public void send(List