提交 63de56c7 编写于 作者: S stevenschew

Merge branch 'master' into ROCKETMQ-57

......@@ -39,4 +39,5 @@ script:
- travis_retry mvn -B package jacoco:report coveralls:report
after_success:
- mvn clean install -Pit-test
- mvn sonar:sonar
......@@ -34,4 +34,4 @@ Then, import to eclipse by specifying the root directory of the project via:
Execute the following command in order to build the tar.gz packages and install JAR to the local repository:
$ mvn clean package install -Prelease-all assembly:assembly -U
\ No newline at end of file
$ mvn clean install -Prelease-all assembly:assembly -U
\ No newline at end of file
......@@ -173,7 +173,6 @@ public class BrokerOuterAPI {
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
......
......@@ -184,7 +184,7 @@ public class SubscriptionGroupManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
} else {
log.warn("delete subscription group failed, subscription group: {} not exist", old);
log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
}
}
}
......@@ -53,8 +53,9 @@ public class MQClientManager {
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.warn("Previous MQClientInstance has created for clientId:[{}]", clientId);
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.namesrv.processor;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.assertj.core.util.Maps;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
public class DefaultRequestProcessorTest {
/** Test Target */
private DefaultRequestProcessor defaultRequestProcessor;
private NamesrvController namesrvController;
private NamesrvConfig namesrvConfig;
private NettyServerConfig nettyServerConfig;
private Logger logger;
@Before
public void init() throws Exception {
namesrvConfig = new NamesrvConfig();
nettyServerConfig = new NettyServerConfig();
namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
defaultRequestProcessor = new DefaultRequestProcessor(namesrvController);
logger = mock(Logger.class);
when(logger.isInfoEnabled()).thenReturn(false);
setFinalStatic(DefaultRequestProcessor.class.getDeclaredField("log"), logger);
}
@Test
public void testProcessRequest_PutKVConfig() throws RemotingCommandException {
PutKVConfigRequestHeader header = new PutKVConfigRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG,
header);
request.addExtField("namespace", "namespace");
request.addExtField("key", "key");
request.addExtField("value", "value");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
assertThat(namesrvController.getKvConfigManager().getKVConfig("namespace", "key"))
.isEqualTo("value");
}
@Test
public void testProcessRequest_GetKVConfigReturnNotNull() throws RemotingCommandException {
namesrvController.getKvConfigManager().putKVConfig("namespace", "key", "value");
GetKVConfigRequestHeader header = new GetKVConfigRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG,
header);
request.addExtField("namespace", "namespace");
request.addExtField("key", "key");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response
.readCustomHeader();
assertThat(responseHeader.getValue()).isEqualTo("value");
}
@Test
public void testProcessRequest_GetKVConfigReturnNull() throws RemotingCommandException {
GetKVConfigRequestHeader header = new GetKVConfigRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG,
header);
request.addExtField("namespace", "namespace");
request.addExtField("key", "key");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
assertThat(response.getRemark()).isEqualTo("No config item, Namespace: namespace Key: key");
GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response
.readCustomHeader();
assertThat(responseHeader.getValue()).isNull();
}
@Test
public void testProcessRequest_DeleteKVConfig() throws RemotingCommandException {
namesrvController.getKvConfigManager().putKVConfig("namespace", "key", "value");
DeleteKVConfigRequestHeader header = new DeleteKVConfigRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG,
header);
request.addExtField("namespace", "namespace");
request.addExtField("key", "key");
RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
assertThat(namesrvController.getKvConfigManager().getKVConfig("namespace", "key"))
.isNull();
}
@Test
public void testProcessRequest_RegisterBroker() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {
RemotingCommand request = genSampleRegisterCmd(true);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
RouteInfoManager routes = namesrvController.getRouteInfoManager();
Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
brokerAddrTable.setAccessible(true);
BrokerData broker = new BrokerData();
broker.setBrokerName("broker");
broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
assertThat((Map) brokerAddrTable.get(routes))
.contains(new HashMap.SimpleEntry("broker", broker));
}
@Test
public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {
RemotingCommand request = genSampleRegisterCmd(true);
// version >= MQVersion.Version.V3_0_11.ordinal() to register with filter server
request.setVersion(100);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(response.getRemark()).isNull();
RouteInfoManager routes = namesrvController.getRouteInfoManager();
Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
brokerAddrTable.setAccessible(true);
BrokerData broker = new BrokerData();
broker.setBrokerName("broker");
broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
assertThat((Map) brokerAddrTable.get(routes))
.contains(new HashMap.SimpleEntry("broker", broker));
}
@Test
public void testProcessRequest_UnregisterBroker() throws RemotingCommandException, NoSuchFieldException, IllegalAccessException {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(ctx.channel()).thenReturn(null);
//Register broker
RemotingCommand regRequest = genSampleRegisterCmd(true);
defaultRequestProcessor.processRequest(ctx, regRequest);
//Unregister broker
RemotingCommand unregRequest = genSampleRegisterCmd(false);
RemotingCommand unregResponse = defaultRequestProcessor.processRequest(ctx, unregRequest);
assertThat(unregResponse.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(unregResponse.getRemark()).isNull();
RouteInfoManager routes = namesrvController.getRouteInfoManager();
Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
brokerAddrTable.setAccessible(true);
assertThat((Map)brokerAddrTable.get(routes)).isEmpty();
}
private static RemotingCommand genSampleRegisterCmd(boolean reg) {
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
header.setBrokerName("broker");
RemotingCommand request = RemotingCommand.createRequestCommand(
reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, header);
request.addExtField("brokerName", "broker");
request.addExtField("brokerAddr", "10.10.1.1");
request.addExtField("clusterName", "cluster");
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", "2333");
return request;
}
private static void setFinalStatic(Field field, Object newValue) throws Exception {
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, newValue);
}
}
\ No newline at end of file
......@@ -160,9 +160,6 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<!-- Overwritten by the test configuration,otherwise the JaCoCo agent cannot be attached.Details see http://www.eclemma.org/jacoco/trunk/doc/prepare-agent-mojo.html -->
<argLine>-Xms512m -Xmx1024m</argLine>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- URL of the ASF SonarQube server -->
<sonar.host.url>https://builds.apache.org/analysis</sonar.host.url>
......@@ -181,6 +178,7 @@
<module>example</module>
<module>filtersrv</module>
<module>srvutil</module>
<module>test</module>
</modules>
<build>
......@@ -236,25 +234,6 @@
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
......@@ -310,6 +289,7 @@
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
......@@ -352,12 +332,20 @@
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacoco.exec</destFile>
</configuration>
</execution>
<execution>
<id>default-prepare-agent-integration</id>
<phase>pre-integration-test</phase>
<goals>
<goal>prepare-agent-integration</goal>
</goals>
<configuration>
<destFile>${project.build.directory}/jacoco-it.exec</destFile>
<propertyName>failsafeArgLine</propertyName>
</configuration>
</execution>
<execution>
<id>default-report</id>
......@@ -373,6 +361,14 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
......@@ -475,6 +471,37 @@
</plugins>
</build>
</profile>
<profile>
<id>it-test</id>
<build>
<plugins>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<argLine>@{failsafeArgLine}</argLine>
<excludes>
<exclude>**/NormalMsgDelayIT.java</exclude>
<exclude>**/BroadCastNormalMsgNotRecvIT.java</exclude>
<exclude>**/TagMessageWithSameGroupConsumerIT.java</exclude>
<exclude>**/AsyncSendWithMessageQueueSelectorIT.java</exclude>
<exclude>**/AsyncSendWithMessageQueueIT.java</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
......@@ -537,7 +564,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-qatest</artifactId>
<artifactId>rocketmq-test</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......
......@@ -20,7 +20,7 @@ import com.alibaba.fastjson.JSON;
import java.nio.charset.Charset;
public abstract class RemotingSerializable {
public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
......
......@@ -23,14 +23,14 @@ import java.util.Iterator;
import java.util.Map;
public class RocketMQSerializable {
public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(RemotingSerializable.CHARSET_UTF8);
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
......@@ -89,9 +89,9 @@ public class RocketMQSerializable {
if (entry.getKey() != null && entry.getValue() != null) {
kvLength =
// keySize + Key
2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length
2 + entry.getKey().getBytes(CHARSET_UTF8).length
// valSize + val
+ 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length;
+ 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
totalLength += kvLength;
}
}
......@@ -103,8 +103,8 @@ public class RocketMQSerializable {
while (it.hasNext()) {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8);
val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8);
key = entry.getKey().getBytes(CHARSET_UTF8);
val = entry.getValue().getBytes(CHARSET_UTF8);
content.putShort((short) key.length);
content.put(key);
......@@ -154,7 +154,7 @@ public class RocketMQSerializable {
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, RemotingSerializable.CHARSET_UTF8));
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
......@@ -187,8 +187,7 @@ public class RocketMQSerializable {
valContent = new byte[valSize];
byteBuffer.get(valContent);
map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent,
RemotingSerializable.CHARSET_UTF8));
map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
}
return map;
}
......
......@@ -121,7 +121,6 @@
<!--whitespace-->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
<module name="WhitespaceAfter"/>
<module name="NoWhitespaceAfter"/>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-test</artifactId>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-broker</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-namesrv</artifactId>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>0.30</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.mq;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.util.TestUtil;
public class MQAsyncProducer {
private static Logger logger = Logger.getLogger(MQAsyncProducer.class);
private AbstractMQProducer producer = null;
private long msgNum;
private int intervalMills;
private Thread sendT;
private AtomicBoolean bPause = new AtomicBoolean(false);
public MQAsyncProducer(final AbstractMQProducer producer, final long msgNum,
final int intervalMills) {
this.producer = producer;
this.msgNum = msgNum;
this.intervalMills = intervalMills;
sendT = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < msgNum; i++) {
if (!bPause.get()) {
producer.send();
TestUtil.waitForMonment(intervalMills);
} else {
while (true) {
if (bPause.get()) {
TestUtil.waitForMonment(10);
} else
break;
}
}
}
}
});
}
public void start() {
sendT.start();
}
public void waitSendAll(int waitMills) {
long startTime = System.currentTimeMillis();
while ((producer.getAllMsgBody().size() + producer.getSendErrorMsg().size()) < msgNum) {
if (System.currentTimeMillis() - startTime < waitMills) {
TestUtil.waitForMonment(200);
} else {
logger.error(String.format("time elapse:%s, but the message sending has not finished",
System.currentTimeMillis() - startTime));
break;
}
}
}
public void pauseProducer() {
bPause.set(true);
}
public void notifyProducer() {
bPause.set(false);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.sendresult.SendResult;
import org.apache.rocketmq.test.util.RandomUtil;
import org.apache.rocketmq.test.util.TestUtil;
public class RMQAsyncSendProducer extends AbstractMQProducer {
private static Logger logger = Logger
.getLogger(RMQAsyncSendProducer.class);
private String nsAddr = null;
private DefaultMQProducer producer = null;
private SendCallback sendCallback = null;
private List<org.apache.rocketmq.client.producer.SendResult> successSendResult = new ArrayList<org.apache.rocketmq.client.producer.SendResult>();
private AtomicInteger exceptionMsgCount = new AtomicInteger(
0);
private int msgSize = 0;
public RMQAsyncSendProducer(String nsAddr, String topic) {
super(topic);
this.nsAddr = nsAddr;
sendCallback = new SendCallback() {
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
successSendResult.add(sendResult);
}
public void onException(Throwable throwable) {
exceptionMsgCount.getAndIncrement();
}
};
create();
start();
}
public int getSuccessMsgCount() {
return successSendResult.size();
}
public List<org.apache.rocketmq.client.producer.SendResult> getSuccessSendResult() {
return successSendResult;
}
public int getExceptionMsgCount() {
return exceptionMsgCount.get();
}
private void create() {
producer = new DefaultMQProducer();
producer.setProducerGroup(RandomUtil.getStringByUUID());
producer.setInstanceName(RandomUtil.getStringByUUID());
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
}
private void start() {
try {
producer.start();
} catch (MQClientException e) {
logger.error("producer start failed!");
e.printStackTrace();
}
}
public SendResult send(Object msg, Object arg) {
return null;
}
public void shutdown() {
producer.shutdown();
}
public void asyncSend(Object msg) {
Message metaqMsg = (Message) msg;
try {
producer.send(metaqMsg, sendCallback);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void asyncSend(int msgSize) {
this.msgSize = msgSize;
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
this.asyncSend(msg);
}
}
public void asyncSend(Object msg, MessageQueueSelector selector, Object arg) {
Message metaqMsg = (Message) msg;
try {
producer.send(metaqMsg, selector, arg, sendCallback);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void asyncSend(int msgSize, MessageQueueSelector selector) {
this.msgSize = msgSize;
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
this.asyncSend(msg, selector, i);
}
}
public void asyncSend(Object msg, MessageQueue mq) {
Message metaqMsg = (Message) msg;
try {
producer.send(metaqMsg, mq, sendCallback);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void asyncSend(int msgSize, MessageQueue mq) {
this.msgSize = msgSize;
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
this.asyncSend(msg, mq);
}
}
public void waitForResponse(int timeoutMills) {
long startTime = System.currentTimeMillis();
while (this.successSendResult.size() != this.msgSize) {
if (System.currentTimeMillis() - startTime < timeoutMills) {
TestUtil.waitForMonment(100);
} else {
logger.info("timeout but still not recv all response!");
break;
}
}
}
public void sendOneWay(Object msg) {
Message metaqMsg = (Message) msg;
try {
producer.sendOneway(metaqMsg);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendOneWay(int msgSize) {
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
this.sendOneWay(msg);
}
}
public void sendOneWay(Object msg, MessageQueue mq) {
Message metaqMsg = (Message) msg;
try {
producer.sendOneway(metaqMsg, mq);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendOneWay(int msgSize, MessageQueue mq) {
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
this.sendOneWay(msg, mq);
}
}
public void sendOneWay(Object msg, MessageQueueSelector selector, Object arg) {
Message metaqMsg = (Message) msg;
try {
producer.sendOneway(metaqMsg, selector, arg);
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendOneWay(int msgSize, MessageQueueSelector selector) {
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
this.sendOneWay(msg, selector, i);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQBroadCastConsumer extends RMQNormalConsumer {
private static Logger logger = Logger.getLogger(RMQBroadCastConsumer.class);
public RMQBroadCastConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) {
super(nsAddr, topic, subExpression, consumerGroup, listner);
}
@Override
public void create() {
super.create();
consumer.setMessageModel(MessageModel.BROADCASTING);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.RandomUtil;
public class RMQNormalConsumer extends AbstractMQConsumer {
private static Logger logger = Logger.getLogger(RMQNormalConsumer.class);
protected DefaultMQPushConsumer consumer = null;
public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) {
super(nsAddr, topic, subExpression, consumerGroup, listner);
}
public AbstractListener getListner() {
return listner;
}
public void setListner(AbstractListener listner) {
this.listner = listner;
}
public void create() {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
try {
consumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
logger.error("consumer subscribe failed!");
e.printStackTrace();
}
consumer.setMessageListener(listner);
}
public void start() {
try {
consumer.start();
logger.info(String.format("consumer[%s] started!", consumer.getConsumerGroup()));
} catch (MQClientException e) {
logger.error("consumer start failed!");
e.printStackTrace();
}
}
public void subscribe(String topic, String subExpression) {
try {
consumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
logger.error("consumer subscribe failed!");
e.printStackTrace();
}
}
public void shutdown() {
consumer.shutdown();
}
@Override
public void clearMsg() {
this.listner.clearMsg();
}
public void restart() {
consumer.shutdown();
create();
start();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.sendresult.SendResult;
public class RMQNormalProducer extends AbstractMQProducer {
private static Logger logger = Logger.getLogger(RMQNormalProducer.class);
private DefaultMQProducer producer = null;
private String nsAddr = null;
public RMQNormalProducer(String nsAddr, String topic) {
super(topic);
this.nsAddr = nsAddr;
create();
start();
}
public RMQNormalProducer(String nsAddr, String topic, String producerGroupName,
String producerInstanceName) {
super(topic);
this.producerGroupName = producerGroupName;
this.producerInstanceName = producerInstanceName;
this.nsAddr = nsAddr;
create();
start();
}
public DefaultMQProducer getProducer() {
return producer;
}
public void setProducer(DefaultMQProducer producer) {
this.producer = producer;
}
protected void create() {
producer = new DefaultMQProducer();
producer.setProducerGroup(getProducerGroupName());
producer.setInstanceName(getProducerInstanceName());
if (nsAddr != null) {
producer.setNamesrvAddr(nsAddr);
}
}
public void start() {
try {
producer.start();
super.setStartSuccess(true);
} catch (MQClientException e) {
super.setStartSuccess(false);
logger.error("producer start failed!");
e.printStackTrace();
}
}
public SendResult send(Object msg, Object orderKey) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
Message metaqMsg = (Message) msg;
try {
long start = System.currentTimeMillis();
metaqResult = producer.send(metaqMsg);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
}
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(metaqMsg.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
}
sendResult.setSendResult(false);
sendResult.setSendException(e);
errorMsgs.addData(msg);
}
return sendResult;
}
public void send(Map<MessageQueue, List<Object>> msgs) {
for (MessageQueue mq : msgs.keySet()) {
send(msgs.get(mq), mq);
}
}
public void send(List<Object> msgs, MessageQueue mq) {
for (Object msg : msgs) {
sendMQ((Message) msg, mq);
}
}
public SendResult sendMQ(Message msg, MessageQueue mq) {
org.apache.rocketmq.client.producer.SendResult metaqResult = null;
try {
long start = System.currentTimeMillis();
metaqResult = producer.send(msg, mq);
this.msgRTs.addData(System.currentTimeMillis() - start);
if (isDebug) {
logger.info(metaqResult);
}
sendResult.setMsgId(metaqResult.getMsgId());
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(msg.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(msg.getBody()), metaqResult);
} catch (Exception e) {
if (isDebug) {
e.printStackTrace();
}
sendResult.setSendResult(false);
sendResult.setSendException(e);
errorMsgs.addData(msg);
}
return sendResult;
}
public void shutdown() {
producer.shutdown();
}
@Override
public List<MessageQueue> getMessageQueue() {
List<MessageQueue> mqs = null;
try {
mqs = producer.fetchPublishMessageQueues(topic);
} catch (MQClientException e) {
e.printStackTrace();
}
return mqs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.clientinterface;
import org.apache.rocketmq.test.listener.AbstractListener;
public abstract class AbstractMQConsumer implements MQConsumer {
protected AbstractListener listner = null;
protected String nsAddr = null;
protected String topic = null;
protected String subExpression = null;
protected String consumerGroup = null;
protected boolean isDebug = false;
public AbstractMQConsumer() {
}
public AbstractMQConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) {
this.topic = topic;
this.subExpression = subExpression;
this.consumerGroup = consumerGroup;
this.listner = listner;
this.nsAddr = nsAddr;
}
public AbstractMQConsumer(String topic, String subExpression) {
this.topic = topic;
this.subExpression = subExpression;
}
public void setDebug() {
if (listner != null) {
listner.setDebug(true);
}
isDebug = true;
}
public void setDebug(boolean isDebug) {
if (listner != null) {
listner.setDebug(isDebug);
}
this.isDebug = isDebug;
}
public void setSubscription(String topic, String subExpression) {
this.topic = topic;
this.subExpression = subExpression;
}
public AbstractListener getListner() {
return listner;
}
public void setListner(AbstractListener listner) {
this.listner = listner;
}
public String getNsAddr() {
return nsAddr;
}
public void setNsAddr(String nsAddr) {
this.nsAddr = nsAddr;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getSubExpression() {
return subExpression;
}
public void setSubExpression(String subExpression) {
this.subExpression = subExpression;
}
public String getConsumerGroup() {
return consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
public void clearMsg() {
listner.clearMsg();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.clientinterface;
import java.util.Date;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.sendresult.SendResult;
import org.apache.rocketmq.test.util.RandomUtil;
import org.apache.rocketmq.test.util.TestUtil;
public abstract class AbstractMQProducer extends MQCollector implements MQProducer {
protected String topic = null;
protected SendResult sendResult = new SendResult();
protected boolean startSuccess = false;
protected String producerGroupName = null;
protected String producerInstanceName = null;
protected boolean isDebug = false;
public AbstractMQProducer(String topic) {
super();
producerGroupName = RandomUtil.getStringByUUID();
producerInstanceName = RandomUtil.getStringByUUID();
this.topic = topic;
}
public AbstractMQProducer(String topic, String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
producerGroupName = RandomUtil.getStringByUUID();
producerInstanceName = RandomUtil.getStringByUUID();
this.topic = topic;
}
public boolean isStartSuccess() {
return startSuccess;
}
public void setStartSuccess(boolean startSuccess) {
this.startSuccess = startSuccess;
}
public String getProducerInstanceName() {
return producerInstanceName;
}
public void setProducerInstanceName(String producerInstanceName) {
this.producerInstanceName = producerInstanceName;
}
public String getProducerGroupName() {
return producerGroupName;
}
public void setProducerGroupName(String producerGroupName) {
this.producerGroupName = producerGroupName;
}
public void setDebug() {
isDebug = true;
}
public void setDebug(boolean isDebug) {
this.isDebug = isDebug;
}
public void setRun() {
isDebug = false;
}
public List<MessageQueue> getMessageQueue() {
return null;
}
private Object getMessage() {
return this.getMessageByTag(null);
}
public Object getMessageByTag(String tag) {
Object objMsg = null;
if (this instanceof RMQNormalProducer) {
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(
topic, (RandomUtil.getStringByUUID() + "." + new Date()).getBytes());
objMsg = msg;
if (tag != null) {
msg.setTags(tag);
}
}
return objMsg;
}
public void send() {
Object msg = getMessage();
send(msg, null);
}
public void send(Object msg) {
send(msg, null);
}
public void send(long msgNum) {
for (int i = 0; i < msgNum; i++) {
this.send();
}
}
public void send(long msgNum, int intervalMills) {
for (int i = 0; i < msgNum; i++) {
this.send();
TestUtil.waitForMonment(intervalMills);
}
}
public void send(String tag, int msgSize) {
for (int i = 0; i < msgSize; i++) {
Object msg = getMessageByTag(tag);
send(msg, null);
}
}
public void send(String tag, int msgSize, int intervalMills) {
for (int i = 0; i < msgSize; i++) {
Object msg = getMessageByTag(tag);
send(msg, null);
TestUtil.waitForMonment(intervalMills);
}
}
public void send(List<Object> msgs) {
for (Object msg : msgs) {
this.send(msg, null);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.clientinterface;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.test.util.RandomUtil;
import org.apache.rocketmq.test.util.data.collect.DataCollector;
import org.apache.rocketmq.test.util.data.collect.DataCollectorManager;
public abstract class MQCollector {
protected DataCollector msgBodys = null;
protected DataCollector originMsgs = null;
protected DataCollector errorMsgs = null;
protected Map<Object, Object> originMsgIndex = null;
protected Collection<Object> msgBodysCopy = null;
protected DataCollector msgRTs = null;
public MQCollector() {
msgBodys = DataCollectorManager.getInstance()
.fetchListDataCollector(RandomUtil.getStringByUUID());
originMsgs = DataCollectorManager.getInstance()
.fetchListDataCollector(RandomUtil.getStringByUUID());
errorMsgs = DataCollectorManager.getInstance()
.fetchListDataCollector(RandomUtil.getStringByUUID());
originMsgIndex = new ConcurrentHashMap<Object, Object>();
msgRTs = DataCollectorManager.getInstance()
.fetchListDataCollector(RandomUtil.getStringByUUID());
}
public MQCollector(String originMsgCollector, String msgBodyCollector) {
originMsgs = DataCollectorManager.getInstance().fetchDataCollector(originMsgCollector);
msgBodys = DataCollectorManager.getInstance().fetchDataCollector(msgBodyCollector);
}
public Collection<Object> getAllMsgBody() {
return msgBodys.getAllData();
}
public Collection<Object> getAllOriginMsg() {
return originMsgs.getAllData();
}
public Object getFirstMsg() {
return ((List<Object>) originMsgs.getAllData()).get(0);
}
public Collection<Object> getAllUndupMsgBody() {
return msgBodys.getAllDataWithoutDuplicate();
}
public Collection<Object> getAllUndupOriginMsg() {
return originMsgs.getAllData();
}
public Collection<Object> getSendErrorMsg() {
return errorMsgs.getAllData();
}
public Collection<Object> getMsgRTs() {
return msgRTs.getAllData();
}
public Map<Object, Object> getOriginMsgIndex() {
return originMsgIndex;
}
public Collection<Object> getMsgBodysCopy() {
msgBodysCopy = new ArrayList<Object>();
msgBodysCopy.addAll(msgBodys.getAllData());
return msgBodysCopy;
}
public void clearMsg() {
msgBodys.resetData();
originMsgs.resetData();
errorMsgs.resetData();
originMsgIndex.clear();
msgRTs.resetData();
}
public void lockCollectors() {
msgBodys.lockIncrement();
originMsgs.lockIncrement();
errorMsgs.lockIncrement();
msgRTs.lockIncrement();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.clientinterface;
public interface MQConsumer {
void create();
void start();
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.clientinterface;
import org.apache.rocketmq.test.sendresult.SendResult;
public interface MQProducer {
SendResult send(Object msg, Object arg);
void setDebug();
void setRun();
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.factory;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.listener.AbstractListener;
public class ConsumerFactory {
public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listner) {
RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, subExpression,
consumerGroup, listner);
consumer.create();
consumer.start();
return consumer;
}
public static RMQBroadCastConsumer getRMQBroadCastConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listner) {
RMQBroadCastConsumer consumer = new RMQBroadCastConsumer(nsAddr, topic, subExpression,
consumerGroup, listner);
consumer.create();
consumer.start();
return consumer;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.factory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.util.RandomUtil;
public class MQMessageFactory {
private static Integer index = 0;
public static List<Object> getRMQMessage(String tag, String topic, int msgSize) {
List<Object> msgs = new ArrayList<Object>();
for (int i = 0; i < msgSize; i++) {
msgs.add(new Message(topic, tag, RandomUtil.getStringByUUID().getBytes()));
}
return msgs;
}
public static List<Object> getRMQMessage(List<String> tags, String topic, int msgSize) {
List<Object> msgs = new ArrayList<Object>();
for (int i = 0; i < msgSize; i++) {
for (String tag : tags) {
msgs.add(new Message(topic, tag, RandomUtil.getStringByUUID().getBytes()));
}
}
return msgs;
}
public static List<Object> getMessageBody(List<Object> msgs) {
List<Object> msgBodys = new ArrayList<Object>();
for (Object msg : msgs) {
msgBodys.add(new String(((Message) msg).getBody()));
}
return msgBodys;
}
public static Collection<Object> getMessage(Collection<Object>... msgs) {
Collection<Object> allMsgs = new ArrayList<Object>();
for (Collection<Object> msg : msgs) {
allMsgs.addAll(msg);
}
return allMsgs;
}
public static List<Object> getDelayMsg(String topic, int delayLevel, int msgSize) {
List<Object> msgs = new ArrayList<Object>();
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, RandomUtil.getStringByUUID().getBytes());
msg.setDelayTimeLevel(delayLevel);
msgs.add(msg);
}
return msgs;
}
public static List<Object> getKeyMsg(String topic, String key, int msgSize) {
List<Object> msgs = new ArrayList<Object>();
for (int i = 0; i < msgSize; i++) {
Message msg = new Message(topic, null, key, RandomUtil.getStringByUUID().getBytes());
msgs.add(msg);
}
return msgs;
}
public static Map<MessageQueue, List<Object>> getMsgByMQ(MessageQueue mq, int msgSize) {
List<MessageQueue> mqs = new ArrayList<MessageQueue>();
mqs.add(mq);
return getMsgByMQ(mqs, msgSize);
}
public static Map<MessageQueue, List<Object>> getMsgByMQ(List<MessageQueue> mqs, int msgSize) {
return getMsgByMQ(mqs, msgSize, null);
}
public static Map<MessageQueue, List<Object>> getMsgByMQ(List<MessageQueue> mqs, int msgSize,
String tag) {
Map<MessageQueue, List<Object>> msgs = new HashMap<MessageQueue, List<Object>>();
for (MessageQueue mq : mqs) {
msgs.put(mq, getMsg(mq.getTopic(), msgSize, tag));
}
return msgs;
}
public static List<Object> getMsg(String topic, int msgSize) {
return getMsg(topic, msgSize, null);
}
public static List<Object> getMsg(String topic, int msgSize, String tag) {
List<Object> msgs = new ArrayList<Object>();
while (msgSize > 0) {
Message msg = new Message(topic, (index++).toString().getBytes());
if (tag != null) {
msg.setTags(tag);
}
msgs.add(msg);
msgSize--;
}
return msgs;
}
public static List<MessageQueue> getMessageQueues(MessageQueue... mq) {
return Arrays.asList(mq);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.factory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.test.util.RandomUtils;
public class MessageFactory {
public static Message getRandomMessage(String topic) {
return getStringMessage(topic, RandomUtils.getStringByUUID());
}
public static Message getStringMessage(String topic, String body) {
Message msg = new Message(topic, body.getBytes());
return msg;
}
public static Message getStringMessageByTag(String topic, String tags, String body) {
Message msg = new Message(topic, tags, body.getBytes());
return msg;
}
public static Message getRandomMessageByTag(String topic, String tags) {
return getStringMessageByTag(topic, tags, RandomUtils.getStringByUUID());
}
public static Collection<Message> getRandomMessageList(String topic, int size) {
List<Message> msgList = new ArrayList<Message>();
for (int i = 0; i < size; i++) {
msgList.add(getRandomMessage(topic));
}
return msgList;
}
public static Collection<Message> getRandomMessageListByTag(String topic, String tags, int size) {
List<Message> msgList = new ArrayList<Message>();
for (int i = 0; i < size; i++) {
msgList.add(getRandomMessageByTag(topic, tags));
}
return msgList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.factory;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.test.util.RandomUtil;
public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setNamesrvAddr(ns);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.factory;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
public class SendCallBackFactory {
public static SendCallback getSendCallBack() {
return new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
}
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.factory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TagMessage {
private List<String> tags = null;
private String topic = null;
private int msgSize = 0;
private Map<String, List<Object>> rmqMsgs = new HashMap<String, List<Object>>();
public TagMessage(String tag, String topic, int msgSize) {
String[] tags = {tag};
this.tags = Arrays.asList(tags);
this.topic = topic;
this.msgSize = msgSize;
init();
}
public TagMessage(String[] tags, String topic, int msgSize) {
this(Arrays.asList(tags), topic, msgSize);
}
public TagMessage(List<String> tags, String topic, int msgSize) {
this.tags = tags;
this.topic = topic;
this.msgSize = msgSize;
init();
}
private void init() {
for (String tag : tags) {
List<Object> tagMsgs = MQMessageFactory.getRMQMessage(tag, topic, msgSize);
rmqMsgs.put(tag, tagMsgs);
}
}
public List<Object> getMessageByTag(String tag) {
if (tags.contains(tag)) {
return rmqMsgs.get(tag);
} else {
return new ArrayList<Object>();
}
}
public List<Object> getMixedTagMessages() {
List<Object> mixedMsgs = new ArrayList<Object>();
for (int i = 0; i < msgSize; i++) {
for (String tag : tags) {
mixedMsgs.add(rmqMsgs.get(tag).get(i));
}
}
return mixedMsgs;
}
public List<Object> getMessageBodyByTag(String tag) {
if (tags.contains(tag)) {
return MQMessageFactory.getMessageBody(rmqMsgs.get(tag));
} else {
return new ArrayList<Object>();
}
}
public List<Object> getMessageBodyByTag(String... tag) {
return this.getMessageBodyByTag(Arrays.asList(tag));
}
public List<Object> getMessageBodyByTag(List<String> tags) {
List<Object> msgBodys = new ArrayList<Object>();
for (String tag : tags) {
msgBodys.addAll(MQMessageFactory.getMessageBody(rmqMsgs.get(tag)));
}
return msgBodys;
}
public List<Object> getAllTagMessageBody() {
List<Object> msgs = new ArrayList<Object>();
for (String tag : tags) {
msgs.addAll(MQMessageFactory.getMessageBody(rmqMsgs.get(tag)));
}
return msgs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.listener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.test.clientinterface.MQCollector;
import org.apache.rocketmq.test.util.TestUtil;
public class AbstractListener extends MQCollector implements MessageListener {
public static Logger logger = Logger.getLogger(AbstractListener.class);
protected boolean isDebug = false;
protected String listnerName = null;
protected Collection<Object> allSendMsgs = null;
public AbstractListener() {
super();
}
public AbstractListener(String listnerName) {
super();
this.listnerName = listnerName;
}
public AbstractListener(String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
}
public boolean isDebug() {
return isDebug;
}
public void setDebug(boolean debug) {
isDebug = debug;
}
public void waitForMessageConsume(int timeoutMills) {
TestUtil.waitForMonment(timeoutMills);
}
public void stopRecv() {
super.lockCollectors();
}
public Collection<Object> waitForMessageConsume(Collection<Object> allSendMsgs,
int timeoutMills) {
this.allSendMsgs = allSendMsgs;
List<Object> sendMsgs = new ArrayList<Object>();
sendMsgs.addAll(allSendMsgs);
long curTime = System.currentTimeMillis();
while (!sendMsgs.isEmpty()) {
Iterator<Object> iter = sendMsgs.iterator();
while (iter.hasNext()) {
Object msg = iter.next();
if (msgBodys.getAllData().contains(msg)) {
iter.remove();
}
}
if (sendMsgs.isEmpty()) {
break;
} else {
if (System.currentTimeMillis() - curTime >= timeoutMills) {
logger.error(String.format("timeout but [%s] not recv all send messages!",
listnerName));
break;
} else {
logger.info(String.format("[%s] still [%s] msg not recv!", listnerName,
sendMsgs.size()));
TestUtil.waitForMonment(500);
}
}
}
return sendMsgs;
}
public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int timeoutMills) {
Collection<Object> notRecvMsgs = waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills);
for (Object object : notRecvMsgs) {
logger.info(sendMsgIndex.get(object));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.listener.rmq.concurrent;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.RandomUtil;
import org.apache.rocketmq.test.util.data.collect.DataCollector;
import org.apache.rocketmq.test.util.data.collect.DataCollectorManager;
public class RMQDelayListner extends AbstractListener implements MessageListenerConcurrently {
private DataCollector msgDelayTimes = null;
public RMQDelayListner() {
msgDelayTimes = DataCollectorManager.getInstance()
.fetchDataCollector(RandomUtil.getStringByUUID());
}
public Collection<Object> getMsgDelayTimes() {
return msgDelayTimes.getAllData();
}
public void resetMsgDelayTimes() {
msgDelayTimes.resetData();
}
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
long recvTime = System.currentTimeMillis();
for (MessageExt msg : msgs) {
if (isDebug) {
logger.info(listnerName + ":" + msg);
}
msgBodys.addData(new String(msg.getBody()));
originMsgs.addData(msg);
msgDelayTimes.addData(Math.abs(recvTime - msg.getBornTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.listener.rmq.concurrent;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQNormalListner extends AbstractListener implements MessageListenerConcurrently {
private ConsumeConcurrentlyStatus consumeStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
private AtomicInteger msgIndex = new AtomicInteger(0);
public RMQNormalListner() {
super();
}
public RMQNormalListner(String listnerName) {
super(listnerName);
}
public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) {
super();
this.consumeStatus = consumeStatus;
}
public RMQNormalListner(String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
}
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
msgIndex.getAndIncrement();
if (isDebug) {
if (listnerName != null && listnerName != "") {
logger.info(listnerName + ":" + msgIndex.get() + ":"
+ String.format("msgid:%s broker:%s queueId:%s offset:%s",
msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(),
msg.getQueueOffset()));
} else {
logger.info(msg);
}
}
msgBodys.addData(new String(msg.getBody()));
originMsgs.addData(msg);
originMsgIndex.put(new String(msg.getBody()), msg);
}
return consumeStatus;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.listener.rmq.order;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQOrderListener extends AbstractListener implements MessageListenerOrderly {
private Map<String/* brokerId_brokerIp */, Collection<Object>> msgs = new ConcurrentHashMap<String, Collection<Object>>();
public RMQOrderListener() {
super();
}
public RMQOrderListener(String listnerName) {
super(listnerName);
}
public RMQOrderListener(String originMsgCollector, String msgBodyCollector) {
super(originMsgCollector, msgBodyCollector);
}
public Collection<Collection<Object>> getMsgs() {
return msgs.values();
}
private void putMsg(MessageExt msg) {
Collection<Object> msgQueue = null;
String key = getKey(msg.getQueueId(), msg.getStoreHost().toString());
if (!msgs.containsKey(key)) {
msgQueue = new ArrayList<Object>();
} else {
msgQueue = msgs.get(key);
}
msgQueue.add(new String(msg.getBody()));
msgs.put(key, msgQueue);
}
private String getKey(int queueId, String brokerIp) {
return String.format("%s_%s", queueId, brokerIp);
}
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
if (isDebug) {
if (listnerName != null && listnerName != "") {
logger.info(listnerName + ": " + msg);
} else {
logger.info(msg);
}
}
putMsg(msg);
msgBodys.addData(new String(msg.getBody()));
originMsgs.addData(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.factory.MQMessageFactory;
public class MessageQueueMsg {
private Map<MessageQueue, List<Object>> msgsWithMQ = null;
private Map<Integer, List<Object>> msgsWithMQId = null;
private Collection<Object> msgBodys = null;
public MessageQueueMsg(List<MessageQueue> mqs, int msgSize) {
this(mqs, msgSize, null);
}
public MessageQueueMsg(List<MessageQueue> mqs, int msgSize, String tag) {
msgsWithMQ = MQMessageFactory.getMsgByMQ(mqs, msgSize, tag);
msgsWithMQId = new HashMap<Integer, List<Object>>();
msgBodys = new ArrayList<Object>();
init();
}
public Map<MessageQueue, List<Object>> getMsgsWithMQ() {
return msgsWithMQ;
}
public Map<Integer, List<Object>> getMsgWithMQId() {
return msgsWithMQId;
}
public Collection<Object> getMsgBodys() {
return msgBodys;
}
private void init() {
for (MessageQueue mq : msgsWithMQ.keySet()) {
msgsWithMQId.put(mq.getQueueId(), msgsWithMQ.get(mq));
msgBodys.addAll(MQMessageFactory.getMessageBody(msgsWithMQ.get(mq)));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.sendresult;
public class SendResult {
private boolean sendResult = false;
private String msgId = null;
private Exception sendException = null;
private String brokerIp = null;
public String getBrokerIp() {
return brokerIp;
}
public void setBrokerIp(String brokerIp) {
this.brokerIp = brokerIp;
}
public boolean isSendResult() {
return sendResult;
}
public void setSendResult(boolean sendResult) {
this.sendResult = sendResult;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public Exception getSendException() {
return sendException;
}
public void setSendException(Exception sendException) {
this.sendException = sendException;
}
@Override
public String toString() {
return String.format("sendstatus:%s msgId:%s", sendResult, msgId);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
public interface Condition {
boolean meetCondition();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class DuplicateMessageInfo<T> {
public void checkDuplicatedMessageInfo(boolean bPrintLog,
List<List<T>> lQueueList) throws IOException {
int msgListSize = lQueueList.size();
int maxmsgList = 0;
Map<T, Integer> msgIdMap = new HashMap<T, Integer>();
Map<Integer, Integer> dupMsgMap = new HashMap<Integer, Integer>();
for (int i = 0; i < msgListSize; i++) {
if (maxmsgList < lQueueList.get(i).size())
maxmsgList = lQueueList.get(i).size();
}
List<StringBuilder> strBQueue = new LinkedList<StringBuilder>();
for (int i = 0; i < msgListSize; i++)
strBQueue.add(new StringBuilder());
for (int msgListIndex = 0; msgListIndex < maxmsgList; msgListIndex++) {
for (int msgQueueListIndex = 0; msgQueueListIndex < msgListSize; msgQueueListIndex++) {
if (msgListIndex < lQueueList.get(msgQueueListIndex).size()) {
if (msgIdMap.containsKey(lQueueList.get(msgQueueListIndex).get(msgListIndex))) {
if (dupMsgMap.containsKey(msgQueueListIndex)) {
int dupMsgCount = dupMsgMap.get(msgQueueListIndex);
dupMsgCount++;
dupMsgMap.remove(msgQueueListIndex);
dupMsgMap.put(msgQueueListIndex, dupMsgCount);
} else {
dupMsgMap.put(msgQueueListIndex, 1);
}
strBQueue.get(msgQueueListIndex).append("" + msgQueueListIndex + "\t" +
msgIdMap.get(lQueueList.get(msgQueueListIndex).get(msgListIndex)) + "\t"
+ lQueueList.get(msgQueueListIndex).get(msgListIndex) + "\r\n");
} else {
msgIdMap.put(lQueueList.get(msgQueueListIndex).get(msgListIndex), msgQueueListIndex);
}
}
}
}
int msgTotalNum = getMsgTotalNumber(lQueueList);
int msgTotalDupNum = getDuplicateMsgNum(dupMsgMap);
int msgNoDupNum = msgTotalNum - msgTotalDupNum;
float msgDupRate = ((float) msgTotalDupNum / (float) msgTotalNum) * 100.0f;
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("msgTotalNum:" + msgTotalNum + "\r\n");
strBuilder.append("msgTotalDupNum:" + msgTotalDupNum + "\r\n");
strBuilder.append("msgNoDupNum:" + msgNoDupNum + "\r\n");
strBuilder.append("msgDupRate" + getFloatNumString(msgDupRate) + "%\r\n");
strBuilder.append("queue\tmsg(dupNum/dupRate)\tdupRate\r\n");
for (int i = 0; i < dupMsgMap.size(); i++) {
int msgDupNum = dupMsgMap.get(i);
int msgNum = lQueueList.get(i).size();
float msgQueueDupRate = ((float) msgDupNum / (float) msgTotalDupNum) * 100.0f;
float msgQueueInnerDupRate = ((float) msgDupNum / (float) msgNum) * 100.0f;
strBuilder.append(i + "\t" + msgDupNum + "/" + getFloatNumString(msgQueueDupRate) + "%" + "\t\t" +
getFloatNumString(msgQueueInnerDupRate) + "%\r\n");
}
System.out.print(strBuilder.toString());
String titleString = "queue\tdupQueue\tdupMsg\r\n";
System.out.print(titleString);
for (int i = 0; i < msgListSize; i++)
System.out.print(strBQueue.get(i).toString());
if (bPrintLog) {
String logFileNameStr = "D:" + File.separator + "checkDuplicatedMessageInfo.txt";
File logFileNameFile = new File(logFileNameStr);
OutputStream out = new FileOutputStream(logFileNameFile, true);
String strToWrite;
byte[] byteToWrite;
strToWrite = strBuilder.toString() + titleString;
for (int i = 0; i < msgListSize; i++)
strToWrite += strBQueue.get(i).toString() + "\r\n";
byteToWrite = strToWrite.getBytes();
out.write(byteToWrite);
out.close();
}
}
private int getMsgTotalNumber(List<List<T>> lQueueList) {
int msgTotalNum = 0;
for (int i = 0; i < lQueueList.size(); i++) {
msgTotalNum += lQueueList.get(i).size();
}
return msgTotalNum;
}
private int getDuplicateMsgNum(Map<Integer, Integer> msgDupMap) {
int msgDupNum = 0;
for (int i = 0; i < msgDupMap.size(); i++) {
msgDupNum += msgDupMap.get(i);
}
return msgDupNum;
}
private String getFloatNumString(float fNum) {
DecimalFormat dcmFmt = new DecimalFormat("0.00");
return dcmFmt.format(fNum);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
public class FileUtil {
private static String lineSeperator = System.getProperty("line.separator");
private String filePath = "";
private String fileName = "";
public FileUtil(String filePath, String fileName) {
this.filePath = filePath;
this.fileName = fileName;
}
public static void main(String args[]) {
String filePath = FileUtil.class.getResource("/").getPath();
String fileName = "test.txt";
FileUtil fileUtil = new FileUtil(filePath, fileName);
Properties properties = new Properties();
properties.put("xx", "yy");
properties.put("yy", "xx");
fileUtil.writeProperties(properties);
}
public void deleteFile() {
File file = new File(filePath + File.separator + fileName);
if (file.exists()) {
file.delete();
}
}
public void appendFile(String content) {
File file = openFile();
String newContent = lineSeperator + content;
writeFile(file, newContent, true);
}
public void coverFile(String content) {
File file = openFile();
writeFile(file, content, false);
}
public void writeProperties(Properties properties) {
String content = getPropertiesAsString(properties);
this.coverFile(content);
}
private String getPropertiesAsString(Properties properties) {
StringBuilder sb = new StringBuilder();
for (Object key : properties.keySet()) {
sb.append(key).append("=").append(properties.getProperty((String) key))
.append(lineSeperator);
}
return sb.toString();
}
private void writeFile(File file, String content, boolean append) {
FileWriter writer = null;
try {
writer = new FileWriter(file.getAbsoluteFile(), append);
writer.write(content);
writer.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private File openFile() {
File file = new File(filePath + File.separator + fileName);
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
return file;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.util.HashMap;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
public class MQAdmin {
private static Logger log = Logger.getLogger(MQAdmin.class);
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
int queueNum) {
int defaultWaitTime = 5;
return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime);
}
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
int queueNum, int waitTimeSec) {
boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr(nameSrvAddr);
try {
mqAdminExt.start();
mqAdminExt.createTopic(clusterName, topic, queueNum);
} catch (Exception e) {
e.printStackTrace();
}
long startTime = System.currentTimeMillis();
while (!createResult) {
createResult = checkTopicExist(mqAdminExt, topic);
if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) {
TestUtils.waitForMonment(100);
} else {
log.error(String.format("timeout,but create topic[%s] failed!", topic));
break;
}
}
mqAdminExt.shutdown();
return createResult;
}
private static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt, String topic) {
boolean createResult = false;
try {
TopicStatsTable topicInfo = mqAdminExt.examineTopicStats(topic);
createResult = !topicInfo.getOffsetTable().isEmpty();
} catch (Exception e) {
}
return createResult;
}
public static boolean createSub(String nameSrvAddr, String clusterName, String consumerId) {
boolean createResult = true;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr(nameSrvAddr);
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName(consumerId);
try {
mqAdminExt.start();
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
clusterName);
for (String addr : masterSet) {
try {
mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config);
log.info(String.format("create subscription group %s to %s success.\n", consumerId,
addr));
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000 * 1);
}
}
} catch (Exception e) {
createResult = false;
e.printStackTrace();
}
mqAdminExt.shutdown();
return createResult;
}
public static ClusterInfo getCluster(String nameSrvAddr) {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr(nameSrvAddr);
ClusterInfo clusterInfo = null;
try {
mqAdminExt.start();
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception e) {
e.printStackTrace();
}
mqAdminExt.shutdown();
return clusterInfo;
}
public static boolean isBrokerExist(String ns, String ip) {
ClusterInfo clusterInfo = getCluster(ns);
if (clusterInfo == null) {
return false;
} else {
HashMap<String, BrokerData> brokers = clusterInfo.getBrokerAddrTable();
for (String brokerName : brokers.keySet()) {
HashMap<Long, String> brokerIps = brokers.get(brokerName).getBrokerAddrs();
for (long brokerId : brokerIps.keySet()) {
if (brokerIps.get(brokerId).contains(ip))
return true;
}
}
}
return false;
}
public void getSubConnection(String nameSrvAddr, String clusterName, String consumerId) {
boolean createResult = true;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr(nameSrvAddr);
SubscriptionGroupConfig config = new SubscriptionGroupConfig();
config.setGroupName(consumerId);
try {
mqAdminExt.start();
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
clusterName);
for (String addr : masterSet) {
try {
System.out.printf("create subscription group %s to %s success.\n", consumerId,
addr);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000 * 1);
}
}
} catch (Exception e) {
createResult = false;
e.printStackTrace();
}
mqAdminExt.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
public class MQRandomUtils {
public static String getRandomTopic() {
return RandomUtils.getStringByUUID();
}
public static String getRandomConsumerGroup() {
return RandomUtils.getStringByUUID();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.listener.AbstractListener;
import static com.google.common.truth.Truth.assertThat;
public class MQWait {
private static Logger logger = Logger.getLogger(MQWait.class);
public static boolean waitConsumeAll(int timeoutMills, Collection<Object> allSendMsgs,
AbstractListener... listeners) {
boolean recvAll = false;
long startTime = System.currentTimeMillis();
Collection<Object> noDupMsgs = new ArrayList<Object>();
while (!recvAll) {
if ((System.currentTimeMillis() - startTime) < timeoutMills) {
noDupMsgs.clear();
try {
for (AbstractListener listener : listeners) {
Collection<Object> recvMsgs = Collections
.synchronizedCollection(listener.getAllUndupMsgBody());
noDupMsgs.addAll(VerifyUtils.getFilterdMessage(allSendMsgs, recvMsgs));
}
} catch (Exception e) {
e.printStackTrace();
}
try {
assertThat(noDupMsgs).containsAllIn(allSendMsgs);
recvAll = true;
break;
} catch (Throwable e) {
}
TestUtil.waitForMonment(500);
} else {
logger.error(String.format(
"timeout but still not receive all messages,expectSize[%s],realSize[%s]",
allSendMsgs.size(), noDupMsgs.size()));
break;
}
}
return recvAll;
}
public static void setCondition(Condition condition, int waitTimeMills, int intervalMills) {
long startTime = System.currentTimeMillis();
while (!condition.meetCondition()) {
if (System.currentTimeMillis() - startTime > waitTimeMills) {
logger.error("time out,but contidion still not meet!");
break;
} else {
TestUtil.waitForMonment(intervalMills);
}
}
}
public static void main(String args[]) {
long start = System.currentTimeMillis();
MQWait.setCondition(new Condition() {
int i = 0;
public boolean meetCondition() {
i++;
return i == 100;
}
}, 10 * 1000, 200);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
public final class RandomUtil {
private static final int UNICODE_START = '\u4E00';
private static final int UNICODE_END = '\u9FA0';
private static Random rd = new Random();
private RandomUtil() {
}
public static long getLong() {
return rd.nextLong();
}
public static long getLongMoreThanZero() {
long res = rd.nextLong();
while (res <= 0) {
res = rd.nextLong();
}
return res;
}
public static long getLongLessThan(long n) {
long res = rd.nextLong();
return res % n;
}
public static long getLongMoreThanZeroLessThan(long n) {
long res = getLongLessThan(n);
while (res <= 0) {
res = getLongLessThan(n);
}
return res;
}
public static long getLongBetween(long n, long m) {
if (m <= n) {
return n;
}
long res = getLongMoreThanZero();
return n + res % (m - n);
}
public static int getInteger() {
return rd.nextInt();
}
public static int getIntegerMoreThanZero() {
int res = rd.nextInt();
while (res <= 0) {
res = rd.nextInt();
}
return res;
}
public static int getIntegerLessThan(int n) {
int res = rd.nextInt();
return res % n;
}
public static int getIntegerMoreThanZeroLessThan(int n) {
int res = rd.nextInt(n);
while (res == 0) {
res = rd.nextInt(n);
}
return res;
}
public static int getIntegerBetween(int n, int m)// m��ֵ����Ϊ���أ�
{
if (m == n) {
return n;
}
int res = getIntegerMoreThanZero();
return n + res % (m - n);
}
private static char getChar(int arg[]) {
int size = arg.length;
int c = rd.nextInt(size / 2);
c = c * 2;
return (char) (getIntegerBetween(arg[c], arg[c + 1]));
}
private static String getString(int n, int arg[]) {
StringBuilder res = new StringBuilder();
for (int i = 0; i < n; i++) {
res.append(getChar(arg));
}
return res.toString();
}
public static String getStringWithCharacter(int n) {
int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
return getString(n, arg);
}
public static String getStringWithNumber(int n) {
int arg[] = new int[] {'0', '9' + 1};
return getString(n, arg);
}
public static String getStringWithNumAndCha(int n) {
int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1, '0', '9' + 1};
return getString(n, arg);
}
public static String getStringShortenThan(int n) {
int len = getIntegerMoreThanZeroLessThan(n);
return getStringWithCharacter(len);
}
public static String getStringWithNumAndChaShortenThan(int n) {
int len = getIntegerMoreThanZeroLessThan(n);
return getStringWithNumAndCha(len);
}
public static String getStringBetween(int n, int m) {
int len = getIntegerBetween(n, m);
return getStringWithCharacter(len);
}
public static String getStringWithNumAndChaBetween(int n, int m) {
int len = getIntegerBetween(n, m);
return getStringWithNumAndCha(len);
}
public static String getStringWithPrefix(int n, String prefix) {
int len = prefix.length();
if (n <= len)
return prefix;
else {
len = n - len;
StringBuilder res = new StringBuilder(prefix);
res.append(getStringWithCharacter(len));
return res.toString();
}
}
public static String getStringWithSuffix(int n, String suffix) {
int len = suffix.length();
if (n <= len)
return suffix;
else {
len = n - len;
StringBuilder res = new StringBuilder();
res.append(getStringWithCharacter(len));
res.append(suffix);
return res.toString();
}
}
public static String getStringWithBoth(int n, String prefix, String suffix) {
int len = prefix.length() + suffix.length();
StringBuilder res = new StringBuilder(prefix);
if (n <= len)
return res.append(suffix).toString();
else {
len = n - len;
res.append(getStringWithCharacter(len));
res.append(suffix);
return res.toString();
}
}
public static String getCheseWordWithPrifix(int n, String prefix) {
int len = prefix.length();
if (n <= len)
return prefix;
else {
len = n - len;
StringBuilder res = new StringBuilder(prefix);
res.append(getCheseWord(len));
return res.toString();
}
}
public static String getCheseWordWithSuffix(int n, String suffix) {
int len = suffix.length();
if (n <= len)
return suffix;
else {
len = n - len;
StringBuilder res = new StringBuilder();
res.append(getCheseWord(len));
res.append(suffix);
return res.toString();
}
}
public static String getCheseWordWithBoth(int n, String prefix, String suffix) {
int len = prefix.length() + suffix.length();
StringBuilder res = new StringBuilder(prefix);
if (n <= len)
return res.append(suffix).toString();
else {
len = n - len;
res.append(getCheseWord(len));
res.append(suffix);
return res.toString();
}
}
public static String getCheseWord(int len) {
StringBuilder res = new StringBuilder();
for (int i = 0; i < len; i++) {
char str = getCheseChar();
res.append(str);
}
return res.toString();
}
private static char getCheseChar() {
return (char) (UNICODE_START + rd.nextInt(UNICODE_END - UNICODE_START));
}
public static boolean getBoolean() {
return getIntegerMoreThanZeroLessThan(3) == 1;
}
public static String getStringByUUID() {
return UUID.randomUUID().toString();
}
public static int[] getRandomArray(int min, int max, int n) {
int len = max - min + 1;
if (max < min || n > len) {
return null;
}
int[] source = new int[len];
for (int i = min; i < min + len; i++) {
source[i - min] = i;
}
int[] result = new int[n];
Random rd = new Random();
int index = 0;
for (int i = 0; i < result.length; i++) {
index = Math.abs(rd.nextInt() % len--);
result[i] = source[index];
source[index] = source[len];
}
return result;
}
public static Collection<Integer> getRandomCollection(int min, int max, int n) {
Set<Integer> res = new HashSet<Integer>();
int mx = max;
int mn = min;
if (n == (max + 1 - min)) {
for (int i = 1; i <= n; i++) {
res.add(i);
}
return res;
}
for (int i = 0; i < n; i++) {
int v = getIntegerBetween(mn, mx);
if (v == mx) {
mx--;
}
if (v == mn) {
mn++;
}
while (res.contains(v)) {
v = getIntegerBetween(mn, mx);
if (v == mx) {
mx = v;
}
if (v == mn) {
mn = v;
}
}
res.add(v);
}
return res;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.util.Random;
import java.util.UUID;
public class RandomUtils {
private static final int UNICODE_START = '\u4E00';
private static final int UNICODE_END = '\u9FA0';
private static Random rd = new Random();
private RandomUtils() {
}
public static String getStringByUUID() {
return UUID.randomUUID().toString();
}
public static String getCheseWord(int len) {
StringBuilder res = new StringBuilder();
for (int i = 0; i < len; ++i) {
char str = getCheseChar();
res.append(str);
}
return res.toString();
}
public static String getStringWithNumber(int n) {
int arg[] = new int[] {'0', '9' + 1};
return getString(n, arg);
}
public static String getStringWithCharacter(int n) {
int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
return getString(n, arg);
}
private static String getString(int n, int arg[]) {
StringBuilder res = new StringBuilder();
for (int i = 0; i < n; i++) {
res.append(getChar(arg));
}
return res.toString();
}
private static char getChar(int arg[]) {
int size = arg.length;
int c = rd.nextInt(size / 2);
c = c * 2;
return (char) (getIntegerBetween(arg[c], arg[c + 1]));
}
public static int getIntegerBetween(int n, int m) {
if (m == n) {
return n;
}
int res = getIntegerMoreThanZero();
return n + res % (m - n);
}
public static int getIntegerMoreThanZero() {
int res = rd.nextInt();
while (res <= 0) {
res = rd.nextInt();
}
return res;
}
private static char getCheseChar() {
return (char) (UNICODE_START + rd.nextInt(UNICODE_END - UNICODE_START));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public final class TestUtil {
private TestUtil() {
}
public static Long parseStringToLong(String s, Long defval) {
Long val = defval;
try {
val = Long.parseLong(s);
} catch (NumberFormatException e) {
val = defval;
}
return val;
}
public static Integer parseStringToInteger(String s, Integer defval) {
Integer val = defval;
try {
val = Integer.parseInt(s);
} catch (NumberFormatException e) {
val = defval;
}
return val;
}
public static String addQuoteToParamater(String param) {
StringBuilder sb = new StringBuilder("'");
sb.append(param).append("'");
return sb.toString();
}
public static void waitForMonment(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void waitForSeconds(long time) {
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void waitForMinutes(long time) {
try {
TimeUnit.MINUTES.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void waitForInputQuit() {
waitForInput("quit");
}
public static void waitForInput(String keyWord) {
waitForInput(keyWord,
String.format("The thread will wait until you input stop command[%s]:", keyWord));
}
public static void waitForInput(String keyWord, String info) {
try {
byte[] b = new byte[1024];
int n = System.in.read(b);
String s = new String(b, 0, n - 1).replace("\r", "").replace("\n", "");
while (!s.equals(keyWord)) {
n = System.in.read(b);
s = new String(b, 0, n - 1);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(Map<K, V> map) {
List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<K, V>>() {
public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
return (o1.getValue()).compareTo(o2.getValue());
}
});
Map<K, V> result = new LinkedHashMap<K, V>();
for (Map.Entry<K, V> entry : list) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.util.concurrent.TimeUnit;
public class TestUtils {
public static void waitForMonment(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException var3) {
var3.printStackTrace();
}
}
public static void waitForSeconds(long time) {
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException var3) {
var3.printStackTrace();
}
}
public static void waitForMinutes(long time) {
try {
TimeUnit.MINUTES.sleep(time);
} catch (InterruptedException var3) {
var3.printStackTrace();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.log4j.Logger;
import org.apache.rocketmq.common.message.MessageExt;
public class VerifyUtils {
private static Logger logger = Logger.getLogger(VerifyUtils.class);
public static int verify(Collection<Object> sendMsgs, Collection<Object> recvMsgs) {
int miss = 0;
for (Object msg : sendMsgs) {
if (!recvMsgs.contains(msg)) {
miss++;
}
}
return miss;
}
public static Collection<Object> getFilterdMessage(Collection<Object> sendMsgs,
Collection<Object> recvMsgs) {
Collection<Object> recvMsgsSync = Collections.synchronizedCollection(recvMsgs);
Collection<Object> filterdMsgs = new ArrayList<Object>();
int filterNum = 0;
for (Object msg : recvMsgsSync) {
if (sendMsgs.contains(msg)) {
filterdMsgs.add(msg);
} else {
filterNum++;
}
}
logger.info(String.format("[%s] messages is filterd!", filterNum));
return filterdMsgs;
}
public static int verifyUserProperty(Collection<Object> sendMsgs, Collection<Object> recvMsgs) {
return 0;
}
public static void verifyMessageQueueId(int expectId, Collection<Object> msgs) {
for (Object msg : msgs) {
MessageExt msgEx = (MessageExt) msg;
assert expectId == msgEx.getQueueId();
}
}
public static boolean verifyBalance(int msgSize, float error, int... recvSize) {
boolean balance = true;
int evenSize = msgSize / recvSize.length;
for (int size : recvSize) {
if (Math.abs(size - evenSize) > error * evenSize) {
balance = false;
break;
}
}
return balance;
}
public static boolean verifyBalance(int msgSize, int... recvSize) {
return verifyBalance(msgSize, 0.1f, recvSize);
}
public static boolean verifyDelay(long delayTimeMills, Collection<Object> recvMsgTimes,
int errorMills) {
boolean delay = true;
for (Object timeObj : recvMsgTimes) {
long time = (Long) timeObj;
if (Math.abs(time - delayTimeMills) > errorMills) {
delay = false;
logger.info(String.format("delay error:%s", Math.abs(time - delayTimeMills)));
}
}
return delay;
}
public static boolean verifyDelay(long delayTimeMills, Collection<Object> recvMsgTimes) {
int errorMills = 500;
return verifyDelay(delayTimeMills, recvMsgTimes, errorMills);
}
public static boolean verifyOrder(Collection<Collection<Object>> queueMsgs) {
for (Collection<Object> msgs : queueMsgs) {
if (!verifyOrderMsg(msgs)) {
return false;
}
}
return true;
}
public static boolean verifyOrderMsg(Collection<Object> msgs) {
int min = Integer.MIN_VALUE;
int curr;
if (msgs.size() == 0 || msgs.size() == 1) {
return true;
} else {
for (Object msg : msgs) {
curr = Integer.parseInt((String) msg);
if (curr < min) {
return false;
} else {
min = curr;
}
}
}
return true;
}
public static boolean verifyRT(Collection<Object> rts, long maxRTMills) {
boolean rtExpect = true;
for (Object obj : rts) {
long rt = (Long) obj;
if (rt > maxRTMills) {
rtExpect = false;
logger.info(String.format("%s greater thran maxtRT:%s!", rt, maxRTMills));
}
}
return rtExpect;
}
public static void main(String args[]) {
verifyBalance(400, 0.1f, 230, 190);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.data.collect;
import java.util.Collection;
public interface DataCollector {
void resetData();
Collection<Object> getAllData();
Collection<Object> getAllDataWithoutDuplicate();
void addData(Object data);
long getDataSizeWithoutDuplicate();
long getDataSize();
boolean isRepeatedData(Object data);
int getRepeatedTimeForData(Object data);
void removeData(Object data);
void lockIncrement();
void unlockIncrement();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.data.collect;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.test.util.data.collect.impl.ListDataCollectorImpl;
import org.apache.rocketmq.test.util.data.collect.impl.MapDataCollectorImpl;
public final class DataCollectorManager {
private static DataCollectorManager instance = new DataCollectorManager();
private Map<String, DataCollector> collectMap = new HashMap<String, DataCollector>();
private Object lock = new Object();
private DataCollectorManager() {
}
public static DataCollectorManager getInstance() {
return instance;
}
public DataCollector fetchDataCollector(String key) {
String realKey = key;
if (!collectMap.containsKey(realKey)) {
synchronized (lock) {
if (!collectMap.containsKey(realKey)) {
DataCollector collect = (DataCollector) new MapDataCollectorImpl();
collectMap.put(realKey, collect);
}
}
}
return collectMap.get(realKey);
}
public DataCollector fetchMapDataCollector(String key) {
String realKey = key;
if (!collectMap.containsKey(realKey)
|| collectMap.get(realKey) instanceof ListDataCollectorImpl) {
synchronized (lock) {
if (!collectMap.containsKey(realKey)
|| collectMap.get(realKey) instanceof ListDataCollectorImpl) {
DataCollector collect = null;
if (collectMap.containsKey(realKey)) {
DataCollector src = collectMap.get(realKey);
collect = new MapDataCollectorImpl(src.getAllData());
} else {
collect = new MapDataCollectorImpl();
}
collectMap.put(realKey, collect);
}
}
}
return collectMap.get(realKey);
}
public DataCollector fetchListDataCollector(String key) {
String realKey = key;
if (!collectMap.containsKey(realKey)
|| collectMap.get(realKey) instanceof MapDataCollectorImpl) {
synchronized (lock) {
if (!collectMap.containsKey(realKey)
|| collectMap.get(realKey) instanceof MapDataCollectorImpl) {
DataCollector collect = null;
if (collectMap.containsKey(realKey)) {
DataCollector src = collectMap.get(realKey);
collect = new ListDataCollectorImpl(src.getAllData());
} else {
collect = new ListDataCollectorImpl();
}
collectMap.put(realKey, collect);
}
}
}
return collectMap.get(realKey);
}
public void resetDataCollect(String key) {
if (collectMap.containsKey(key)) {
collectMap.get(key).resetData();
}
}
public void resetAll() {
for (Map.Entry<String, DataCollector> entry : collectMap.entrySet()) {
entry.getValue().resetData();
}
}
public void removeDataCollect(String key) {
collectMap.remove(key);
}
public void removeAll() {
collectMap.clear();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.data.collect;
public interface DataFilter {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.data.collect.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.apache.rocketmq.test.util.data.collect.DataCollector;
public class ListDataCollectorImpl implements DataCollector {
private List<Object> datas = new ArrayList<Object>();
private boolean lock = false;
public ListDataCollectorImpl() {
}
public ListDataCollectorImpl(Collection<Object> datas) {
for (Object data : datas) {
addData(data);
}
}
public Collection<Object> getAllData() {
return datas;
}
public void resetData() {
datas.clear();
unlockIncrement();
}
public long getDataSizeWithoutDuplicate() {
return getAllDataWithoutDuplicate().size();
}
public synchronized void addData(Object data) {
if (lock) {
return;
}
datas.add(data);
}
public long getDataSize() {
return datas.size();
}
public boolean isRepeatedData(Object data) {
return Collections.frequency(datas, data) == 1;
}
public Collection<Object> getAllDataWithoutDuplicate() {
return new HashSet<Object>(datas);
}
public int getRepeatedTimeForData(Object data) {
int res = 0;
for (Object obj : datas) {
if (obj.equals(data)) {
res++;
}
}
return res;
}
public void removeData(Object data) {
datas.remove(data);
}
public void lockIncrement() {
lock = true;
}
public void unlockIncrement() {
lock = false;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.data.collect.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.test.util.data.collect.DataCollector;
public class MapDataCollectorImpl implements DataCollector {
private Map<Object, AtomicInteger> datas = new ConcurrentHashMap<Object, AtomicInteger>();
private boolean lock = false;
public MapDataCollectorImpl() {
}
public MapDataCollectorImpl(Collection<Object> datas) {
for (Object data : datas) {
addData(data);
}
}
public synchronized void addData(Object data) {
if (lock) {
return;
}
if (datas.containsKey(data)) {
datas.get(data).addAndGet(1);
} else {
datas.put(data, new AtomicInteger(1));
}
}
public Collection<Object> getAllData() {
List<Object> lst = new ArrayList<Object>();
for (Entry<Object, AtomicInteger> entry : datas.entrySet()) {
for (int i = 0; i < entry.getValue().get(); i++) {
lst.add(entry.getKey());
}
}
return lst;
}
public long getDataSizeWithoutDuplicate() {
return datas.keySet().size();
}
public void resetData() {
datas.clear();
unlockIncrement();
}
public long getDataSize() {
long sum = 0;
for (AtomicInteger count : datas.values()) {
sum = sum + count.get();
}
return sum;
}
public boolean isRepeatedData(Object data) {
if (datas.containsKey(data)) {
return datas.get(data).get() == 1;
}
return false;
}
public Collection<Object> getAllDataWithoutDuplicate() {
return datas.keySet();
}
public int getRepeatedTimeForData(Object data) {
if (datas.containsKey(data)) {
return datas.get(data).intValue();
}
return 0;
}
public void removeData(Object data) {
datas.remove(data);
}
public void lockIncrement() {
lock = true;
}
public void unlockIncrement() {
lock = false;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.parallel;
import java.util.concurrent.CountDownLatch;
public abstract class ParallelTask extends Thread {
private CountDownLatch latch = null;
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public abstract void execute();
@Override
public void run() {
this.execute();
if (latch != null) {
latch.countDown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.parallel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelTaskExecutor {
public List<ParallelTask> tasks = new ArrayList<ParallelTask>();
public ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public CountDownLatch latch = null;
public ParallelTaskExecutor() {
}
public void pushTask(ParallelTask task) {
tasks.add(task);
}
public void startBlock() {
init();
startTask();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void startNoBlock() {
for (ParallelTask task : tasks) {
cachedThreadPool.execute(task);
}
}
private void init() {
latch = new CountDownLatch(tasks.size());
for (ParallelTask task : tasks) {
task.setLatch(latch);
}
}
private void startTask() {
for (ParallelTask task : tasks) {
task.start();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.util.parallel;
public class Task4Test extends ParallelTask {
private String name = "";
public Task4Test(String name) {
this.name = name;
}
@Override
public void execute() {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.base;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.TestUtils;
import org.junit.Assert;
public class BaseConf {
protected static String nsAddr;
protected static String broker1Name;
protected static String broker2Name;
protected static String clusterName;
protected static int brokerNum;
protected static int waitTime = 5;
protected static int consumeTime = 1 * 60 * 1000;
protected static int topicCreateTime = 30 * 1000;
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
protected static List<Object> mqClients = new ArrayList<Object>();
protected static boolean debug = false;
private static Logger log = Logger.getLogger(BaseConf.class);
static {
namesrvController = IntegrationTestBase.createAndStartNamesrv();
nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr);
clusterName = brokerController1.getBrokerConfig().getBrokerClusterName();
broker1Name = brokerController1.getBrokerConfig().getBrokerName();
broker2Name = brokerController2.getBrokerConfig().getBrokerName();
brokerNum = 2;
}
public BaseConf() {
}
public static String initTopic() {
long startTime = System.currentTimeMillis();
String topic = MQRandomUtils.getRandomTopic();
boolean createResult = false;
while (true) {
createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
if (createResult) {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
System.currentTimeMillis() - startTime));
break;
} else {
TestUtils.waitForMonment(500);
continue;
}
}
return topic;
}
public static String initConsumerGroup() {
String group = MQRandomUtils.getRandomConsumerGroup();
return initConsumerGroup(group);
}
public static String initConsumerGroup(String group) {
MQAdmin.createSub(nsAddr, clusterName, group);
return group;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
String instanceName) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
instanceName);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQAsyncSendProducer getAsyncProducer(String nsAddr, String topic) {
RMQAsyncSendProducer producer = new RMQAsyncSendProducer(nsAddr, topic);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
AbstractListener listner) {
String consumerGroup = initConsumerGroup();
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listner);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
String subExpression, AbstractListener listner) {
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
topic, subExpression, listner);
if (debug) {
consumer.setDebug();
}
mqClients.add(consumer);
log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup,
topic, subExpression));
return consumer;
}
public static void shutDown() {
try {
for (Object mqClient : mqClients) {
if (mqClient instanceof AbstractMQProducer) {
((AbstractMQProducer) mqClient).shutdown();
} else {
((AbstractMQConsumer) mqClient).shutdown();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.base;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IntegrationTestBase {
protected static final String SEP = File.separator;
protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
protected static final List<File> TMPE_FILES = new ArrayList<>();
protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<>();
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
protected static Random random = new Random();
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override public void run() {
for (NamesrvController namesrvController : NAMESRV_CONTROLLERS) {
if (namesrvController != null) {
namesrvController.shutdown();
}
}
for (BrokerController brokerController : BROKER_CONTROLLERS) {
if (brokerController != null) {
brokerController.shutdown();
}
}
for (File file : TMPE_FILES) {
deleteFile(file);
}
}
});
}
private static String createBaseDir() {
String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
logger.info(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir));
System.exit(1);
}
TMPE_FILES.add(file);
return baseDir;
}
public static NamesrvController createAndStartNamesrv() {
String baseDir = createBaseDir();
NamesrvConfig namesrvConfig = new NamesrvConfig();
NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties");
nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try {
Assert.assertTrue(namesrvController.initialize());
logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
namesrvController.start();
} catch (Exception e) {
logger.info("Name Server start failed");
System.exit(1);
}
NAMESRV_CONTROLLERS.add(namesrvController);
return namesrvController;
}
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
BrokerConfig brokerConfig = new BrokerConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setHaListenPort(8000 + random.nextInt(1000));
nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {
Assert.assertTrue(brokerController.initialize());
logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
brokerController.start();
} catch (Exception e) {
logger.info("Broker start failed");
System.exit(1);
}
BROKER_CONTROLLERS.add(brokerController);
return brokerController;
}
public static void deleteFile(File file) {
if (!file.exists()) {
return;
}
if (file.isFile()) {
file.delete();
} else if (file.isDirectory()) {
File[] files = file.listFiles();
for (File file1 : files) {
deleteFile(file1);
}
file.delete();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.consumer.balance;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
public class NormalMsgDynamicBalanceIT extends BaseConf {
private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class);
private RMQNormalProducer producer = null;
private String topic = null;
@Before
public void setUp() {
topic = initTopic();
logger.info(String.format("use topic: %s !", topic));
producer = getProducer(nsAddr, topic);
}
@After
public void tearDown() {
super.shutDown();
}
@Test
public void testTwoConsumerAndCrashOne() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
producer.send(msgSize);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(),
consumer2.getListner());
consumer2.shutdown();
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner());
assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size() - msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
@Test
public void test3ConsumerAndCrashOne() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
producer.send(msgSize);
MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(),
consumer2.getListner(), consumer3.getListner());
consumer3.shutdown();
producer.clearMsg();
consumer1.clearMsg();
consumer2.clearMsg();
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner());
assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.consumer.balance;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.util.MQWait;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
public class NormalMsgStaticBalanceIT extends BaseConf {
private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class);
private RMQNormalProducer producer = null;
private String topic = null;
@Before
public void setUp() {
topic = initTopic();
logger.info(String.format("use topic: %s !", topic));
producer = getProducer(nsAddr, topic);
}
@After
public void tearDown() {
super.shutDown();
}
@Test
public void testTwoConsumersBalance() {
int msgSize = 400;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner());
assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils.verifyBalance(msgSize,
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
@Test
public void testFourConsumersBalance() {
int msgSize = 600;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListner());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size());
boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(),
consumer1.getListner(), consumer2.getListner(), consumer3.getListner(),
consumer4.getListner());
assertThat(recvAll).isEqualTo(true);
boolean balance = VerifyUtils
.verifyBalance(msgSize,
VerifyUtils
.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllUndupMsgBody())
.size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer3.getListner().getAllUndupMsgBody()).size(),
VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer4.getListner().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.consumer.broadcast;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
public class BaseBroadCastIT extends BaseConf {
private static Logger logger = Logger.getLogger(BaseBroadCastIT.class);
public static RMQBroadCastConsumer getBroadCastConsumer(String nsAddr, String topic,
String subExpression,
AbstractListener listner) {
String consumerGroup = initConsumerGroup();
return getBroadCastConsumer(nsAddr, consumerGroup, topic, subExpression, listner);
}
public static RMQBroadCastConsumer getBroadCastConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listner) {
RMQBroadCastConsumer consumer = ConsumerFactory.getRMQBroadCastConsumer(nsAddr,
consumerGroup, topic, subExpression, listner);
consumer.setDebug();
mqClients.add(consumer);
logger.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup,
topic, subExpression));
return consumer;
}
public void printSeperator() {
for (int i = 0; i < 3; i++) {
logger.info(
"<<<<<<<<================================================================================>>>>>>>>");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.consumer.broadcast.normal;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
private static Logger logger = Logger
.getLogger(NormalMsgTwoSameGroupConsumerIT.class);
private RMQNormalProducer producer = null;
private String topic = null;
@Before
public void setUp() {
printSeperator();
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
}
@After
public void tearDown() {
super.shutDown();
}
@Test
public void testNotConsumeAfterConsume() {
int msgSize = 16;
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), waitTime);
assertThat(consumer2.getListner().getAllMsgBody().size()).isEqualTo(0);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.consumer.broadcast.normal;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static com.google.common.truth.Truth.assertThat;
public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
private static Logger logger = Logger
.getLogger(NormalMsgTwoSameGroupConsumerIT.class);
private RMQNormalProducer producer = null;
private String topic = null;
@Before
public void setUp() {
printSeperator();
topic = initTopic();
logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic);
}
@After
public void tearDown() {
super.shutDown();
}
@Test
public void testStartTwoAndCrashOneLater() {
int msgSize = 16;
String group = initConsumerGroup();
RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*",
new RMQNormalListner(group + "_1"));
RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2"));
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer2.getListner().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
consumer2.shutdown();
producer.clearMsg();
consumer1.clearMsg();
producer.send(msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size());
consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer1.getListner().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.delay;
import org.apache.rocketmq.test.base.BaseConf;
public class DelayConf extends BaseConf {
protected static final int[] DELAY_LEVEL = {
1, 5, 10, 30, 1 * 60, 5 * 60, 10 * 60,
30 * 60, 1 * 3600, 2 * 3600, 6 * 3600, 12 * 3600, 1 * 24 * 3600};
}
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册