From f58dbc3edaf7d4136a0e3e0cd90d6f3a58198957 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=97=AD?= Date: Fri, 25 Sep 2020 20:44:14 +0800 Subject: [PATCH] [ISSUE #2300] Enhancement: Benchmark support acl and msg trace (#2301) * [Benchmark]Support acl, add msgTraceEnable option for producer and consumer, fix Algorithm HmacSHA1 not available using openjdk * add apache header * add aclEnable option --- distribution/benchmark/runclass.sh | 2 +- .../rocketmq/example/benchmark/AclClient.java | 33 +++++++++++++++++++ .../rocketmq/example/benchmark/Consumer.java | 19 +++++++++-- .../rocketmq/example/benchmark/Producer.java | 17 ++++++++-- .../benchmark/TransactionProducer.java | 9 ++++- 5 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java diff --git a/distribution/benchmark/runclass.sh b/distribution/benchmark/runclass.sh index 339e11a2..12802ddf 100644 --- a/distribution/benchmark/runclass.sh +++ b/distribution/benchmark/runclass.sh @@ -56,9 +56,9 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPe JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_run_class_gc_%p_%t.log -XX:+PrintGCDetails" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" -JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" JAVA_OPT="${JAVA_OPT} -XX:+PerfDisableSharedMem" +JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java new file mode 100644 index 00000000..04ef5d5e --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.benchmark; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.remoting.RPCHook; + +public class AclClient { + + private static final String ACL_ACCESS_KEY = "rocketmq2"; + + private static final String ACL_SECRET_KEY = "12345678"; + + static RPCHook getAclRPCHook() { + return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 08897faa..c0a2a8b5 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -34,10 +34,12 @@ import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; public class Consumer { @@ -55,12 +57,16 @@ public class Consumer { final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null; final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0; + final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); + final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); + String group = groupPrefix; if (Boolean.parseBoolean(isSuffixEnable)) { group = groupPrefix + "_" + (System.currentTimeMillis() % 100); } - System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s%n", topic, group, isSuffixEnable, filterType, expression); + System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n", + topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); @@ -111,7 +117,8 @@ public class Consumer { } }, 10000, 10000); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null); if (commandLine.hasOption('n')) { String ns = commandLine.getOptionValue('n'); consumer.setNamesrvAddr(ns); @@ -192,6 +199,14 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); + opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + return options; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index ce2b83f9..dbad1692 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; @@ -53,8 +54,11 @@ public class Producer { final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k')); final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0; + final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); + final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); - System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); + System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s traceEnable %s aclEnable %s%n", + topic, threadCount, messageSize, keyEnable, msgTraceEnable, aclEnable); final InternalLogger log = ClientLogger.getLog(); @@ -100,7 +104,8 @@ public class Producer { } }, 10000, 10000); - final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); + RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; + final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null); producer.setInstanceName(Long.toString(System.currentTimeMillis())); if (commandLine.hasOption('n')) { @@ -210,6 +215,14 @@ public class Producer { opt.setRequired(false); options.addOption(opt); + opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + return options; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 3531eb52..951b718d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -68,6 +68,7 @@ public class TransactionProducer { config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0; config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis(); config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0; + config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount); @@ -122,7 +123,8 @@ public class TransactionProducer { }, 10000, 10000); final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); - final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); + final TransactionMQProducer producer = + new TransactionMQProducer("benchmark_transaction_producer", config.aclEnable ? AclClient.getAclRPCHook() : null); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setTransactionListener(transactionCheckListener); producer.setDefaultTopicQueueNums(1000); @@ -250,6 +252,10 @@ public class TransactionProducer { opt.setRequired(false); options.addOption(opt); + opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + return options; } } @@ -432,6 +438,7 @@ class TxSendConfig { double checkUnknownRate; long batchId; int sendInterval; + boolean aclEnable; } class LRUMap extends LinkedHashMap { -- GitLab