未验证 提交 f58dbc3e 编写于 作者: 张旭 提交者: GitHub

[ISSUE #2300] Enhancement: Benchmark support acl and msg trace (#2301)

* [Benchmark]Support acl, add msgTraceEnable option for producer and consumer, fix Algorithm HmacSHA1 not available using openjdk

* add apache header

* add aclEnable option
上级 73f20710
......@@ -56,9 +56,9 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPe
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_run_class_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -XX:+PerfDisableSharedMem"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.benchmark;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.remoting.RPCHook;
public class AclClient {
private static final String ACL_ACCESS_KEY = "rocketmq2";
private static final String ACL_SECRET_KEY = "12345678";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
......@@ -34,10 +34,12 @@ import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
public class Consumer {
......@@ -55,12 +57,16 @@ public class Consumer {
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
String group = groupPrefix;
if (Boolean.parseBoolean(isSuffixEnable)) {
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
}
System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s%n", topic, group, isSuffixEnable, filterType, expression);
System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
......@@ -111,7 +117,8 @@ public class Consumer {
}
}, 10000, 10000);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
consumer.setNamesrvAddr(ns);
......@@ -192,6 +199,14 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......
......@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
......@@ -53,8 +54,11 @@ public class Producer {
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0;
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s traceEnable %s aclEnable %s%n",
topic, threadCount, messageSize, keyEnable, msgTraceEnable, aclEnable);
final InternalLogger log = ClientLogger.getLog();
......@@ -100,7 +104,8 @@ public class Producer {
}
}, 10000, 10000);
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
if (commandLine.hasOption('n')) {
......@@ -210,6 +215,14 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......
......@@ -68,6 +68,7 @@ public class TransactionProducer {
config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0;
config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
......@@ -122,7 +123,8 @@ public class TransactionProducer {
}, 10000, 10000);
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
final TransactionMQProducer producer =
new TransactionMQProducer("benchmark_transaction_producer", config.aclEnable ? AclClient.getAclRPCHook() : null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
......@@ -250,6 +252,10 @@ public class TransactionProducer {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
return options;
}
}
......@@ -432,6 +438,7 @@ class TxSendConfig {
double checkUnknownRate;
long batchId;
int sendInterval;
boolean aclEnable;
}
class LRUMap<K, V> extends LinkedHashMap<K, V> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册