未验证 提交 d93dc4df 编写于 作者: S sunhangda 提交者: GitHub

Merge branch 'apache:develop' into develop

......@@ -116,6 +116,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
......@@ -235,10 +236,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
default:
break;
return getUnknownCmdResponse(ctx, request);
}
return null;
}
@Override
......@@ -459,6 +458,13 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return null;
}
private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, RemotingCommand request) {
String error = " request type " + request.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
return response;
}
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
// final GetAllTopicConfigResponseHeader responseHeader =
......
......@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.client.trace;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
......@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
......@@ -64,7 +64,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue;
private final ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
......@@ -78,7 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String group;
private Type type;
public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
......@@ -95,12 +95,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
......@@ -180,10 +180,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
}
@Override
public void flush() throws IOException {
public void flush() {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
while (System.currentTimeMillis() <= end) {
synchronized (traceContextQueue) {
if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
break;
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
......@@ -196,6 +201,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override
public void shutdown() {
this.stopped = true;
flush();
this.traceExecutor.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
......@@ -212,11 +218,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
try {
flush();
} catch (IOException e) {
log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
}
flush();
}
}
}
......@@ -242,25 +244,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
synchronized (traceContextQueue) {
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue - traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (context != null) {
contexts.add(context);
} else {
break;
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
......@@ -352,7 +356,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
* @param data the message trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
......
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
sh ./runclass.sh org.apache.rocketmq.example.benchmark.BatchProducer $@ &
......@@ -74,6 +74,10 @@
<dependencySets>
<dependencySet>
<outputDirectory>lib/</outputDirectory>
<excludes>
<exclude>io.jaegertracing:jaeger-core</exclude>
<exclude>io.jaegertracing:jaeger-client</exclude>
</excludes>
</dependencySet>
</dependencySets>
</binaries>
......
......@@ -95,7 +95,7 @@ M1 | NettyServerCodecThread_%d | Worker线程池
M2 | RemotingExecutorThread_%d | 业务processor处理线程池
### 3 消息过滤
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正基于这个字段值的。
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正基于这个字段值的。
![](image/rocketmq_design_7.png)
......
......@@ -89,7 +89,7 @@ public class TransactionListenerImpl implements TransactionListener {
## 3 Usage Constraint
1. Messages of the transactional have no schedule and batch support.
2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the ```transactionCheckMax``` parameter in the configuration of the broker, if one message has been checked over ```transactionCheckMax``` times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the ```AbstractTransactionalMessageCheckListener``` class.
3. A transactional message will be checked after a certain period of time that determined by parameter g```transactionTimeout``` in the configuration of the broker. And users also can change this limit by set user property ```CHECK_IMMUNITY_TIME_IN_SECONDS``` when sending transactional message, this parameter takes precedence over the ```transactionTimeout``` parameter.
3. A transactional message will be checked after a certain period of time that determined by parameter ```transactionTimeout``` in the configuration of the broker. And users also can change this limit by set user property ```CHECK_IMMUNITY_TIME_IN_SECONDS``` when sending transactional message, this parameter takes precedence over the ```transactionTimeout``` parameter.
4. A transactional message maybe checked or consumed more than once.
5. Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
6. Producer IDs of transactional messages cannot be shared with producer IDs of other types of messages. Unlike other types of message, transactional messages allow backward queries. MQ Server query clients by their Producer IDs.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
public class BatchProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final String namesrv = getOptionValue(commandLine, 'n', "127.0.0.1:9876");
final String topic = getOptionValue(commandLine, 't', "BenchmarkTest");
final int threadCount = getOptionValue(commandLine, 'w', 64);
final int messageSize = getOptionValue(commandLine, 's', 128);
final int batchSize = getOptionValue(commandLine, 'b', 16);
final boolean keyEnable = getOptionValue(commandLine, 'k', false);
final int propertySize = getOptionValue(commandLine, 'p', 0);
final int tagCount = getOptionValue(commandLine, 'l', 0);
final boolean msgTraceEnable = getOptionValue(commandLine, 'm', false);
final boolean aclEnable = getOptionValue(commandLine, 'a', false);
final String ak = getOptionValue(commandLine, 'c', "rocketmq2");
final String sk = getOptionValue(commandLine, 'e', "12346789");
System.out.printf("topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n",
topic, threadCount, messageSize, batchSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
final StatsBenchmarkBatchProducer statsBenchmark = new StatsBenchmarkBatchProducer();
statsBenchmark.start();
final DefaultMQProducer producer = initInstance(namesrv, msgTraceEnable, aclEnable, ak, sk);
producer.start();
final InternalLogger log = ClientLogger.getLog();
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() {
@Override
public void run() {
while (true) {
List<Message> msgs;
try {
msgs = buildBathMessage(batchSize, messageSize, topic);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return;
}
if (CollectionUtils.isEmpty(msgs)) {
return;
}
try {
long beginTimestamp = System.currentTimeMillis();
long sendSucCount = statsBenchmark.getSendMessageSuccessCount().get();
setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000));
setTags(tagCount, msgs, sendSucCount);
setProperties(propertySize, msgs);
SendResult sendResult = producer.send(msgs);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getSendMessageSuccessCount().addAndGet(msgs.size());
} else {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
}
long currentRT = System.currentTimeMillis() - beginTimestamp;
statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
if (updated) {
break;
}
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
} catch (RemotingException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
}
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQClientException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQBrokerException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
}
}
}
});
}
}
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("w", "threadCount", true, "Thread count, Default: 64");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "messageSize", true, "Message Size, Default: 128");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("b", "batchSize", true, "Batch Size, Default: 16");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("k", "keyEnable", true, "Message Key Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("l", "tagCount", true, "Tag count, Default: 0");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "accessKey", true, "Acl Access Key, Default: rocketmq2");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("e", "secretKey", true, "Acl Secret Key, Default: 123456789");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "propertySize", true, "Property Size, Default: 0");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("n", "namesrv", true, "name server, Default: 127.0.0.1:9876");
opt.setRequired(false);
options.addOption(opt);
return options;
}
private static String getOptionValue(CommandLine commandLine, char key, String defaultValue) {
if (commandLine.hasOption(key)) {
return commandLine.getOptionValue(key).trim();
}
return defaultValue;
}
private static int getOptionValue(CommandLine commandLine, char key, int defaultValue) {
if (commandLine.hasOption(key)) {
return Integer.parseInt(commandLine.getOptionValue(key).trim());
}
return defaultValue;
}
private static boolean getOptionValue(CommandLine commandLine, char key, boolean defaultValue) {
if (commandLine.hasOption(key)) {
return Boolean.parseBoolean(commandLine.getOptionValue(key).trim());
}
return defaultValue;
}
private static List<Message> buildBathMessage(int batchSize, int messageSize,
String topic) throws UnsupportedEncodingException {
List<Message> batchMessage = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
Message msg = new Message();
msg.setTopic(topic);
StringBuilder sb = new StringBuilder();
for (int j = 0; j < messageSize; j += 10) {
sb.append("hello baby");
}
msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
batchMessage.add(msg);
}
return batchMessage;
}
private static void setKeys(boolean keyEnable, List<Message> msgs, String keys) {
if (!keyEnable) {
return;
}
for (Message msg : msgs) {
msg.setKeys(keys);
}
}
private static void setTags(int tagCount, List<Message> msgs, long startTagId) {
if (tagCount <= 0) {
return;
}
long tagId = startTagId % tagCount;
for (Message msg : msgs) {
msg.setTags(String.format("tag%d", tagId++));
}
}
private static void setProperties(int propertySize, List<Message> msgs) {
if (propertySize <= 0) {
return;
}
for (Message msg : msgs) {
if (msg.getProperties() != null) {
msg.getProperties().clear();
}
int startValue = (new Random(System.currentTimeMillis())).nextInt(100);
int size = 0;
for (int i = 0; ; i++) {
String prop1 = "prop" + i, prop1V = "hello" + startValue;
msg.putUserProperty(prop1, prop1V);
size += prop1.length() + prop1V.length();
if (size > propertySize) {
break;
}
startValue++;
}
}
}
private static DefaultMQProducer initInstance(String namesrv, boolean traceEnable, boolean aclEnable, String ak,
String sk) {
RPCHook rpcHook = aclEnable ? new AclClientRPCHook(new SessionCredentials(ak, sk)) : null;
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_batch_producer", rpcHook, traceEnable, null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setNamesrvAddr(namesrv);
producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
return producer;
}
}
class StatsBenchmarkBatchProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
private final AtomicLong sendMessageSuccessCount = new AtomicLong(0L);
private final AtomicLong sendMessageFailedCount = new AtomicLong(0L);
private final Timer timer = new Timer("BenchmarkTimerThread", true);
private final LinkedList<Long[]> snapshotList = new LinkedList<>();
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
this.sendRequestSuccessCount.get(),
this.sendRequestFailedCount.get(),
this.sendMessageSuccessCount.get(),
this.sendMessageFailedCount.get(),
this.sendMessageSuccessTimeTotal.get(),
};
return snap;
}
public AtomicLong getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
public AtomicLong getSendRequestFailedCount() {
return sendRequestFailedCount;
}
public AtomicLong getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}
public AtomicLong getSendMessageMaxRT() {
return sendMessageMaxRT;
}
public AtomicLong getSendMessageSuccessCount() {
return sendMessageSuccessCount;
}
public AtomicLong getSendMessageFailedCount() {
return sendMessageFailedCount;
}
public void start() {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000);
timer.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
final long sendTps = (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
final long sendMps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[1] - begin[1]);
final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n",
System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().get(), averageRT, averageMsgRT, end[2], end[4]);
}
}
@Override
public void run() {
try {
this.printStats();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 10000, 10000);
}
public void shutdown() {
timer.cancel();
}
}
\ No newline at end of file
......@@ -61,9 +61,11 @@ public class Producer {
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0;
final boolean delayEnable = commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d'));
final int delayLevel = commandLine.hasOption('e') ? Integer.parseInt(commandLine.getOptionValue('e')) : 1;
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum);
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n delayEnable: %s%n delayLevel: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum, delayEnable, delayLevel);
final InternalLogger log = ClientLogger.getLog();
......@@ -147,6 +149,9 @@ public class Producer {
if (keyEnable) {
msg.setKeys(String.valueOf(beginTimestamp / 1000));
}
if (delayEnable) {
msg.setDelayTimeLevel(delayLevel);
}
if (tagCount > 0) {
long sendSucCount = statsBenchmark.getReceiveResponseSuccessCount().get();
msg.setTags(String.format("tag%d", sendSucCount % tagCount));
......@@ -266,6 +271,14 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("d", "delayEnable", true, "Delay message Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("e", "delayLevel", true, "Delay message level, Default: 1");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册