提交 df59df37 编写于 作者: H huangli

Add -n (nameServer) option to benchmark consumer; Enable...

Add -n (nameServer) option to benchmark consumer; Enable options(-n/-t/w/-s/-c/-r) to TransactionProducer
上级 d63678dc
......@@ -103,6 +103,10 @@ public class Consumer {
}, 10000, 10000);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
consumer.setNamesrvAddr(ns);
}
consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
if (filterType == null || expression == null) {
......
......@@ -24,6 +24,11 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
......@@ -33,20 +38,19 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.srvutil.ServerUtil;
public class TransactionProducer {
private static int threadCount;
private static int messageSize;
private static boolean ischeck;
private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
final Message msg = buildMessage(messageSize);
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32;
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048;
final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : false;
final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r')) : true;
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
......@@ -98,6 +102,10 @@ public class TransactionProducer {
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
producer.setNamesrvAddr(ns);
}
producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
......@@ -111,7 +119,7 @@ public class TransactionProducer {
// Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.sendMessageInTransaction(buildMessage(messageSize, topic), tranExecuter, null);
if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
......@@ -138,18 +146,45 @@ public class TransactionProducer {
}
}
private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
Message msg = new Message();
msg.setTopic("BenchmarkTest");
private static Message buildMessage(final int messageSize, String topic) {
try {
Message msg = new Message();
msg.setTopic(topic);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby");
}
msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
return msg;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("w", "threadCount", true, "Thread count, Default: 32");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "messageSize", true, "Message Size, Default: 2048");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "check", true, "Check the message, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("r", "checkResult", true, "Message check result, Default: true");
opt.setRequired(false);
options.addOption(opt);
msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
return msg;
return options;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册