diff --git a/distribution/benchmark/runclass.sh b/distribution/benchmark/runclass.sh index 339e11a2e4351945cb6b10c92bf2aa26dca39e67..12802ddf90d36221a26447a35219fc75a394778f 100644 --- a/distribution/benchmark/runclass.sh +++ b/distribution/benchmark/runclass.sh @@ -56,9 +56,9 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPe JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_run_class_gc_%p_%t.log -XX:+PrintGCDetails" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" -JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" JAVA_OPT="${JAVA_OPT} -XX:+PerfDisableSharedMem" +JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java new file mode 100644 index 0000000000000000000000000000000000000000..04ef5d5ef6b7c255cc859f897f04b32fbd49482c --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/AclClient.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.benchmark; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.remoting.RPCHook; + +public class AclClient { + + private static final String ACL_ACCESS_KEY = "rocketmq2"; + + private static final String ACL_SECRET_KEY = "12345678"; + + static RPCHook getAclRPCHook() { + return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 08897faa2636359e8bb5c982afd71c7a265fbb4a..c0a2a8b53a6ef34df63273bfb090c864bac14999 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -34,10 +34,12 @@ import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; public class Consumer { @@ -55,12 +57,16 @@ public class Consumer { final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null; final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0; + final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); + final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); + String group = groupPrefix; if (Boolean.parseBoolean(isSuffixEnable)) { group = groupPrefix + "_" + (System.currentTimeMillis() % 100); } - System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s%n", topic, group, isSuffixEnable, filterType, expression); + System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n", + topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); @@ -111,7 +117,8 @@ public class Consumer { } }, 10000, 10000); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null); if (commandLine.hasOption('n')) { String ns = commandLine.getOptionValue('n'); consumer.setNamesrvAddr(ns); @@ -192,6 +199,14 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); + opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + return options; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index ce2b83f97755c51a3fabda1f22ef13d276245929..dbad169221aac6c5c1e0dfdd22fe3fc7014284ef 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; @@ -53,8 +54,11 @@ public class Producer { final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k')); final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0; + final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); + final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); - System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); + System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s traceEnable %s aclEnable %s%n", + topic, threadCount, messageSize, keyEnable, msgTraceEnable, aclEnable); final InternalLogger log = ClientLogger.getLog(); @@ -100,7 +104,8 @@ public class Producer { } }, 10000, 10000); - final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); + RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; + final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null); producer.setInstanceName(Long.toString(System.currentTimeMillis())); if (commandLine.hasOption('n')) { @@ -210,6 +215,14 @@ public class Producer { opt.setRequired(false); options.addOption(opt); + opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + return options; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 3531eb528fdf55de4d82d43fca9b36345903e15a..951b718d1ed925506dbd788605f87fa9390fba50 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -68,6 +68,7 @@ public class TransactionProducer { config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0; config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis(); config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0; + config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount); @@ -122,7 +123,8 @@ public class TransactionProducer { }, 10000, 10000); final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); - final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); + final TransactionMQProducer producer = + new TransactionMQProducer("benchmark_transaction_producer", config.aclEnable ? AclClient.getAclRPCHook() : null); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setTransactionListener(transactionCheckListener); producer.setDefaultTopicQueueNums(1000); @@ -250,6 +252,10 @@ public class TransactionProducer { opt.setRequired(false); options.addOption(opt); + opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false"); + opt.setRequired(false); + options.addOption(opt); + return options; } } @@ -432,6 +438,7 @@ class TxSendConfig { double checkUnknownRate; long batchId; int sendInterval; + boolean aclEnable; } class LRUMap extends LinkedHashMap {