From fea769da7c721e7497e892d3aed38115f8091679 Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Wed, 8 Sep 2021 19:58:23 +0800 Subject: [PATCH] [ISSUE #3284]Optimize the buildMessage method (#3324) * [ISSUE #3284]Optimize the buildMessage method Co-authored-by: zhangjidi2016 --- .../example/benchmark/BatchProducer.java | 38 +++++++------------ .../rocketmq/example/benchmark/Producer.java | 36 +++++++----------- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java index f3e8b604..cf207cd4 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.example.benchmark; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -33,6 +33,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -44,13 +45,14 @@ import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; public class BatchProducer { - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + private static byte[] msgBody; + + public static void main(String[] args) throws MQClientException { Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser()); @@ -74,6 +76,12 @@ public class BatchProducer { System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n", topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable); + StringBuilder sb = new StringBuilder(messageSize); + for (int i = 0; i < messageSize; i++) { + sb.append(RandomStringUtils.randomAlphanumeric(1)); + } + msgBody = sb.toString().getBytes(StandardCharsets.UTF_8); + final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer(); statsBenchmark.start(); @@ -87,14 +95,7 @@ public class BatchProducer { @Override public void run() { while (true) { - List msgs; - - try { - msgs = buildBathMessage(batchSize, messageSize, topic); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - return; - } + List msgs = buildBathMessage(batchSize, topic); if (CollectionUtils.isEmpty(msgs)) { return; @@ -236,23 +237,12 @@ public class BatchProducer { return defaultValue; } - private static List buildBathMessage(int batchSize, int messageSize, - String topic) throws UnsupportedEncodingException { + private static List buildBathMessage(final int batchSize, final String topic) { List batchMessage = new ArrayList<>(batchSize); - for (int i = 0; i < batchSize; i++) { - Message msg = new Message(); - msg.setTopic(topic); - - StringBuilder sb = new StringBuilder(); - for (int j = 0; j < messageSize; j += 10) { - sb.append("hello baby"); - } - - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + Message msg = new Message(topic, msgBody); batchMessage.add(msg); } - return batchMessage; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index cc299948..feb25cd9 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -16,11 +16,13 @@ */ package org.apache.rocketmq.example.benchmark; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.LongAdder; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -29,11 +31,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; -import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.LinkedList; import java.util.Random; @@ -47,7 +47,9 @@ import java.util.concurrent.atomic.AtomicLong; public class Producer { - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { + private static byte[] msgBody; + + public static void main(String[] args) throws MQClientException { Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser()); @@ -70,6 +72,12 @@ public class Producer { System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n", topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel); + StringBuilder sb = new StringBuilder(messageSize); + for (int i = 0; i < messageSize; i++) { + sb.append(RandomStringUtils.randomAlphanumeric(1)); + } + msgBody = sb.toString().getBytes(StandardCharsets.UTF_8); + final InternalLogger log = ClientLogger.getLog(); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); @@ -142,13 +150,7 @@ public class Producer { int num = 0; while (true) { try { - final Message msg; - try { - msg = buildMessage(messageSize, topic); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - return; - } + final Message msg = buildMessage(topic); final long beginTimestamp = System.currentTimeMillis(); if (keyEnable) { msg.setKeys(String.valueOf(beginTimestamp / 1000)); @@ -290,18 +292,8 @@ public class Producer { return options; } - private static Message buildMessage(final int messageSize, final String topic) throws UnsupportedEncodingException { - Message msg = new Message(); - msg.setTopic(topic); - - StringBuilder sb = new StringBuilder(messageSize); - for (int i = 0; i < messageSize; i += 10) { - sb.append("hello baby"); - } - - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); - - return msg; + private static Message buildMessage(final String topic) { + return new Message(topic, msgBody); } private static void doPrintStats(final LinkedList snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) { -- GitLab