未验证 提交 fea769da 编写于 作者: Z zhangjidi2016 提交者: GitHub

[ISSUE #3284]Optimize the buildMessage method (#3324)

* [ISSUE #3284]Optimize the buildMessage method
Co-authored-by: Nzhangjidi2016 <zhangjidi@cmss.chinamobile.com>
上级 d7a830cf
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
*/ */
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -33,6 +33,7 @@ import org.apache.commons.cli.Option; ...@@ -33,6 +33,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
...@@ -44,13 +45,14 @@ import org.apache.rocketmq.client.producer.SendStatus; ...@@ -44,13 +45,14 @@ import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
public class BatchProducer { public class BatchProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { private static byte[] msgBody;
public static void main(String[] args) throws MQClientException {
Options options = ServerUtil.buildCommandlineOptions(new Options()); Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser());
...@@ -74,6 +76,12 @@ public class BatchProducer { ...@@ -74,6 +76,12 @@ public class BatchProducer {
System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n", System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n",
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable); topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
sb.append(RandomStringUtils.randomAlphanumeric(1));
}
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);
final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(); final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer();
statsBenchmark.start(); statsBenchmark.start();
...@@ -87,14 +95,7 @@ public class BatchProducer { ...@@ -87,14 +95,7 @@ public class BatchProducer {
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
List<Message> msgs; List<Message> msgs = buildBathMessage(batchSize, topic);
try {
msgs = buildBathMessage(batchSize, messageSize, topic);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return;
}
if (CollectionUtils.isEmpty(msgs)) { if (CollectionUtils.isEmpty(msgs)) {
return; return;
...@@ -236,23 +237,12 @@ public class BatchProducer { ...@@ -236,23 +237,12 @@ public class BatchProducer {
return defaultValue; return defaultValue;
} }
private static List<Message> buildBathMessage(int batchSize, int messageSize, private static List<Message> buildBathMessage(final int batchSize, final String topic) {
String topic) throws UnsupportedEncodingException {
List<Message> batchMessage = new ArrayList<>(batchSize); List<Message> batchMessage = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
Message msg = new Message(); Message msg = new Message(topic, msgBody);
msg.setTopic(topic);
StringBuilder sb = new StringBuilder();
for (int j = 0; j < messageSize; j += 10) {
sb.append("hello baby");
}
msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
batchMessage.add(msg); batchMessage.add(msg);
} }
return batchMessage; return batchMessage;
} }
......
...@@ -16,11 +16,13 @@ ...@@ -16,11 +16,13 @@
*/ */
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -29,11 +31,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; ...@@ -29,11 +31,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.UnsupportedEncodingException;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Random; import java.util.Random;
...@@ -47,7 +47,9 @@ import java.util.concurrent.atomic.AtomicLong; ...@@ -47,7 +47,9 @@ import java.util.concurrent.atomic.AtomicLong;
public class Producer { public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { private static byte[] msgBody;
public static void main(String[] args) throws MQClientException {
Options options = ServerUtil.buildCommandlineOptions(new Options()); Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
...@@ -70,6 +72,12 @@ public class Producer { ...@@ -70,6 +72,12 @@ public class Producer {
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n", System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel); topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i++) {
sb.append(RandomStringUtils.randomAlphanumeric(1));
}
msgBody = sb.toString().getBytes(StandardCharsets.UTF_8);
final InternalLogger log = ClientLogger.getLog(); final InternalLogger log = ClientLogger.getLog();
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
...@@ -142,13 +150,7 @@ public class Producer { ...@@ -142,13 +150,7 @@ public class Producer {
int num = 0; int num = 0;
while (true) { while (true) {
try { try {
final Message msg; final Message msg = buildMessage(topic);
try {
msg = buildMessage(messageSize, topic);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return;
}
final long beginTimestamp = System.currentTimeMillis(); final long beginTimestamp = System.currentTimeMillis();
if (keyEnable) { if (keyEnable) {
msg.setKeys(String.valueOf(beginTimestamp / 1000)); msg.setKeys(String.valueOf(beginTimestamp / 1000));
...@@ -290,18 +292,8 @@ public class Producer { ...@@ -290,18 +292,8 @@ public class Producer {
return options; return options;
} }
private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException { private static Message buildMessage(final String topic) {
Message msg = new Message(); return new Message(topic, msgBody);
msg.setTopic(topic);
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby");
}
msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
return msg;
} }
private static void doPrintStats(final LinkedList<Long[]> snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) { private static void doPrintStats(final LinkedList<Long[]> snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册