diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java index f3e8b604c1e81fb30704ac295b1fa382b6922268..cf207cd4518c9c2112e031c7a8005381871bfc99 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.example.benchmark; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -33,6 +33,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -44,13 +45,14 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; public class BatchProducer { - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + private static byte[] msgBody; + + public static void main(String[] args) throws MQClientException { Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser()); @@ -74,6 +76,12 @@ public class BatchProducer { System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n", topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable); + StringBuilder sb = new StringBuilder(messageSize); + for (int i = 0; i < messageSize; i++) { + sb.append(RandomStringUtils.randomAlphanumeric(1)); + } + msgBody = sb.toString().getBytes(StandardCharsets.UTF_8); + final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(); statsBenchmark.start(); @@ -87,14 +95,7 @@ public class BatchProducer { @Override public void run() { while (true) { - List msgs; - - try { - msgs = buildBathMessage(batchSize, messageSize, topic); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - return; - } + List msgs = buildBathMessage(batchSize, topic); if (CollectionUtils.isEmpty(msgs)) { return; @@ -236,23 +237,12 @@ public class BatchProducer { return defaultValue; } - private static List buildBathMessage(int batchSize, int messageSize, - String topic) throws UnsupportedEncodingException { + private static List buildBathMessage(final int batchSize, final String topic) { List batchMessage = new ArrayList<>(batchSize); - for (int i = 0; i < batchSize; i++) { - Message msg = new Message(); - msg.setTopic(topic); - - StringBuilder sb = new StringBuilder(); - for (int j = 0; j < messageSize; j += 10) { - sb.append("hello baby"); - } - - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + Message msg = new Message(topic, msgBody); batchMessage.add(msg); } - return batchMessage; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index cc2999485a4f17ed5d61a48d7e9250ab79f48a81..feb25cd910c1dc8e88786e54296ebac6fa75ffe6 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -16,11 +16,13 @@ */ package org.apache.rocketmq.example.benchmark; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.LongAdder; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -29,11 +31,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; -import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.LinkedList; import java.util.Random; @@ -47,7 +47,9 @@ import java.util.concurrent.atomic.AtomicLong; public class Producer { - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + private static byte[] msgBody; + + public static void main(String[] args) throws MQClientException { Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); @@ -70,6 +72,12 @@ public class Producer { System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n", topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel); + StringBuilder sb = new StringBuilder(messageSize); + for (int i = 0; i < messageSize; i++) { + sb.append(RandomStringUtils.randomAlphanumeric(1)); + } + msgBody = sb.toString().getBytes(StandardCharsets.UTF_8); + final InternalLogger log = ClientLogger.getLog(); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); @@ -142,13 +150,7 @@ public class Producer { int num = 0; while (true) { try { - final Message msg; - try { - msg = buildMessage(messageSize, topic); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - return; - } + final Message msg = buildMessage(topic); final long beginTimestamp = System.currentTimeMillis(); if (keyEnable) { msg.setKeys(String.valueOf(beginTimestamp / 1000)); @@ -290,18 +292,8 @@ public class Producer { return options; } - private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException { - Message msg = new Message(); - msg.setTopic(topic); - - StringBuilder sb = new StringBuilder(messageSize); - for (int i = 0; i < messageSize; i += 10) { - sb.append("hello baby"); - } - - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); - - return msg; + private static Message buildMessage(final String topic) { + return new Message(topic, msgBody); } private static void doPrintStats(final LinkedList snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) {