From 34c3454957c0811198b477646710625aa1ece3b8 Mon Sep 17 00:00:00 2001 From: huangli Date: Fri, 6 Dec 2019 10:35:01 +0800 Subject: [PATCH] Optimise tx benchmark producer (#1628) --- .../benchmark/TransactionProducer.java | 364 +++++++++++++----- .../apache/rocketmq/srvutil/ServerUtil.java | 3 +- 2 files changed, 266 insertions(+), 101 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 04707e1a..3531eb52 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -18,11 +18,19 @@ package org.apache.rocketmq.example.benchmark; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; @@ -30,35 +38,44 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.srvutil.ServerUtil; public class TransactionProducer { + private static final long START_TIME = System.currentTimeMillis(); + private static final AtomicLong MSG_COUNT = new AtomicLong(0); + + //broker max check times should less than this value + static final int MAX_CHECK_RESULT_IN_MSG = 20; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser()); - - final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; - final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32; - final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048; - final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : false; - final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r')) : true; - - final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); + TxSendConfig config = new TxSendConfig(); + config.topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + config.threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32; + config.messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048; + config.sendRollbackRate = commandLine.hasOption("sr") ? Double.parseDouble(commandLine.getOptionValue("sr")) : 0.0; + config.sendUnknownRate = commandLine.hasOption("su") ? Double.parseDouble(commandLine.getOptionValue("su")) : 0.0; + config.checkRollbackRate = commandLine.hasOption("cr") ? Double.parseDouble(commandLine.getOptionValue("cr")) : 0.0; + config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0; + config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis(); + config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0; + + final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount); final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); final Timer timer = new Timer("BenchmarkTimerThread", true); - final LinkedList snapshotList = new LinkedList(); + final LinkedList snapshotList = new LinkedList<>(); timer.scheduleAtFixedRate(new TimerTask() { @Override @@ -73,16 +90,24 @@ public class TransactionProducer { timer.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { - Long[] begin = snapshotList.getFirst(); - Long[] end = snapshotList.getLast(); + Snapshot begin = snapshotList.getFirst(); + Snapshot end = snapshotList.getLast(); - final long sendTps = - (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + final long sendCount = (end.sendRequestSuccessCount - begin.sendRequestSuccessCount) + + (end.sendRequestFailedCount - begin.sendRequestFailedCount); + final long sendTps = (sendCount * 1000L) / (end.endTime - begin.endTime); + final double averageRT = (end.sendMessageTimeTotal - begin.sendMessageTimeTotal) / (double) (end.sendRequestSuccessCount - begin.sendRequestSuccessCount); + + final long failCount = end.sendRequestFailedCount - begin.sendRequestFailedCount; + final long checkCount = end.checkCount - begin.checkCount; + final long unexpectedCheck = end.unexpectedCheckCount - begin.unexpectedCheckCount; + final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck; System.out.printf( - "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); + "Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n", + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount, + unexpectedCheck, dupCheck); + statsBenchmark.getSendMessageMaxRT().set(0); } } @@ -96,11 +121,10 @@ public class TransactionProducer { } }, 10000, 10000); - final TransactionCheckListener transactionCheckListener = - new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); + final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); - producer.setTransactionCheckListener(transactionCheckListener); + producer.setTransactionListener(transactionCheckListener); producer.setDefaultTopicQueueNums(1000); if (commandLine.hasOption('n')) { String ns = commandLine.getOptionValue('n'); @@ -108,37 +132,42 @@ public class TransactionProducer { } producer.start(); - final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); - - for (int i = 0; i < threadCount; i++) { + for (int i = 0; i < config.threadCount; i++) { sendThreadPool.execute(new Runnable() { @Override public void run() { while (true) { + boolean success = false; + final long beginTimestamp = System.currentTimeMillis(); try { - // Thread.sleep(1000); - final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = - producer.sendMessageInTransaction(buildMessage(messageSize, topic), tranExecuter, null); - if (sendResult != null) { - statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); - statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); - } - + producer.sendMessageInTransaction(buildMessage(config), null); + success = sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK; + } catch (Throwable e) { + success = false; + } finally { final long currentRT = System.currentTimeMillis() - beginTimestamp; - statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); + statsBenchmark.getSendMessageTimeTotal().addAndGet(currentRT); long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); while (currentRT > prevMaxRT) { - boolean updated = - statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, - currentRT); + boolean updated = statsBenchmark.getSendMessageMaxRT() + .compareAndSet(prevMaxRT, currentRT); if (updated) break; prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } - } catch (MQClientException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + if (success) { + statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); + } else { + statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + } + if (config.sendInterval > 0) { + try { + Thread.sleep(config.sendInterval); + } catch (InterruptedException e) { + } + } } } } @@ -146,20 +175,42 @@ public class TransactionProducer { } } - private static Message buildMessage(final int messageSize, String topic) { - try { - Message msg = new Message(); - msg.setTopic(topic); + private static Message buildMessage(TxSendConfig config) { + byte[] bs = new byte[config.messageSize]; + ThreadLocalRandom r = ThreadLocalRandom.current(); + r.nextBytes(bs); + + ByteBuffer buf = ByteBuffer.wrap(bs); + buf.putLong(config.batchId); + long sendMachineId = START_TIME << 32; + long msgId = sendMachineId | MSG_COUNT.getAndIncrement(); + buf.putLong(msgId); + + // save send tx result in message + if (r.nextDouble() < config.sendRollbackRate) { + buf.put((byte) LocalTransactionState.ROLLBACK_MESSAGE.ordinal()); + } else if (r.nextDouble() < config.sendUnknownRate) { + buf.put((byte) LocalTransactionState.UNKNOW.ordinal()); + } else { + buf.put((byte) LocalTransactionState.COMMIT_MESSAGE.ordinal()); + } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i += 10) { - sb.append("hello baby"); + // save check tx result in message + for (int i = 0; i < MAX_CHECK_RESULT_IN_MSG; i++) { + if (r.nextDouble() < config.checkRollbackRate) { + buf.put((byte) LocalTransactionState.ROLLBACK_MESSAGE.ordinal()); + } else if (r.nextDouble() < config.checkUnknownRate) { + buf.put((byte) LocalTransactionState.UNKNOW.ordinal()); + } else { + buf.put((byte) LocalTransactionState.COMMIT_MESSAGE.ordinal()); } - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); - return msg; - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); } + + Message msg = new Message(); + msg.setTopic(config.topic); + + msg.setBody(bs); + return msg; } public static Options buildCommandlineOptions(final Options options) { @@ -175,84 +226,171 @@ public class TransactionProducer { opt.setRequired(false); options.addOption(opt); - opt = new Option("c", "check", true, "Check the message, Default: false"); + opt = new Option("sr", "send rollback rate", true, "Send rollback rate, Default: 0.0"); opt.setRequired(false); options.addOption(opt); - opt = new Option("r", "checkResult", true, "Message check result, Default: true"); + opt = new Option("su", "send unknown rate", true, "Send unknown rate, Default: 0.0"); opt.setRequired(false); options.addOption(opt); + opt = new Option("cr", "check rollback rate", true, "Check rollback rate, Default: 0.0"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("cu", "check unknown rate", true, "Check unknown rate, Default: 0.0"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "test batch id", true, "test batch id, Default: System.currentMillis()"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "send interval", true, "sleep interval in millis between messages, Default: 0"); + opt.setRequired(false); + options.addOption(opt); return options; } } -class TransactionExecuterBImpl implements LocalTransactionExecuter { +class TransactionListenerImpl implements TransactionListener { + private StatsBenchmarkTProducer statBenchmark; + private TxSendConfig sendConfig; + private final LRUMap cache = new LRUMap<>(200000); - private boolean ischeck; + private class MsgMeta { + long batchId; + long msgId; + LocalTransactionState sendResult; + List checkResult; + } - public TransactionExecuterBImpl(boolean ischeck) { - this.ischeck = ischeck; + public TransactionListenerImpl(StatsBenchmarkTProducer statsBenchmark, TxSendConfig sendConfig) { + this.statBenchmark = statsBenchmark; + this.sendConfig = sendConfig; } @Override - public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { - if (ischeck) { - return LocalTransactionState.UNKNOW; - } - return LocalTransactionState.COMMIT_MESSAGE; + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return parseFromMsg(msg).sendResult; } -} -class TransactionCheckListenerBImpl implements TransactionCheckListener { - private boolean ischeckffalse; - private StatsBenchmarkTProducer statsBenchmarkTProducer; - - public TransactionCheckListenerBImpl(boolean ischeckffalse, - StatsBenchmarkTProducer statsBenchmarkTProducer) { - this.ischeckffalse = ischeckffalse; - this.statsBenchmarkTProducer = statsBenchmarkTProducer; + private MsgMeta parseFromMsg(Message msg) { + byte[] bs = msg.getBody(); + ByteBuffer buf = ByteBuffer.wrap(bs); + MsgMeta msgMeta = new MsgMeta(); + msgMeta.batchId = buf.getLong(); + msgMeta.msgId = buf.getLong(); + msgMeta.sendResult = LocalTransactionState.values()[buf.get()]; + msgMeta.checkResult = new ArrayList<>(); + for (int i = 0; i < TransactionProducer.MAX_CHECK_RESULT_IN_MSG; i++) { + msgMeta.checkResult.add(LocalTransactionState.values()[buf.get()]); + } + return msgMeta; } @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); - if (ischeckffalse) { + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + MsgMeta msgMeta = parseFromMsg(msg); + if (msgMeta.batchId != sendConfig.batchId) { + // message not generated in this test + return LocalTransactionState.ROLLBACK_MESSAGE; + } + statBenchmark.getCheckCount().incrementAndGet(); + int times = 0; + try { + String checkTimes = msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES); + times = Integer.parseInt(checkTimes); + } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } + times = times <= 0 ? 1 : times; + + boolean dup; + synchronized (cache) { + Integer oldCheckLog = cache.get(msgMeta.msgId); + Integer newCheckLog; + if (oldCheckLog == null) { + newCheckLog = 1 << (times - 1); + } else { + newCheckLog = oldCheckLog | (1 << (times - 1)); + } + dup = newCheckLog.equals(oldCheckLog); + } + if (dup) { + statBenchmark.getDuplicatedCheckCount().incrementAndGet(); + } + if (msgMeta.sendResult != LocalTransactionState.UNKNOW) { + System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n", + new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()), + msg.getMsgId(), msg.getTransactionId(), + msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), + msgMeta.sendResult.toString()); + statBenchmark.getUnexpectedCheckCount().incrementAndGet(); + return msgMeta.sendResult; + } - return LocalTransactionState.COMMIT_MESSAGE; + for (int i = 0; i < times - 1; i++) { + LocalTransactionState s = msgMeta.checkResult.get(i); + if (s != LocalTransactionState.UNKNOW) { + System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult,lastCheckResult=%s\n", + new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()), + msg.getMsgId(), msg.getTransactionId(), + msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s); + statBenchmark.getUnexpectedCheckCount().incrementAndGet(); + return s; + } + } + return msgMeta.checkResult.get(times - 1); } } +class Snapshot { + long endTime; + + long sendRequestSuccessCount; + + long sendRequestFailedCount; + + long sendMessageTimeTotal; + + long sendMessageMaxRT; + + long checkCount; + + long unexpectedCheckCount; + + long duplicatedCheck; +} + class StatsBenchmarkTProducer { private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); - private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); - - private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); - - private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + private final AtomicLong sendMessageTimeTotal = new AtomicLong(0L); private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); - private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); + private final AtomicLong checkCount = new AtomicLong(0L); + + private final AtomicLong unexpectedCheckCount = new AtomicLong(0L); - public Long[] createSnapshot() { - Long[] snap = new Long[] { - System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.receiveResponseSuccessCount.get(), - this.receiveResponseFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), - this.checkRequestSuccessCount.get()}; + private final AtomicLong duplicatedCheckCount = new AtomicLong(0); - return snap; + public Snapshot createSnapshot() { + Snapshot s = new Snapshot(); + s.endTime = System.currentTimeMillis(); + s.sendRequestSuccessCount = sendRequestSuccessCount.get(); + s.sendRequestFailedCount = sendRequestFailedCount.get(); + s.sendMessageTimeTotal = sendMessageTimeTotal.get(); + s.sendMessageMaxRT = sendMessageMaxRT.get(); + s.checkCount = checkCount.get(); + s.unexpectedCheckCount = unexpectedCheckCount.get(); + s.duplicatedCheck = duplicatedCheckCount.get(); + return s; } public AtomicLong getSendRequestSuccessCount() { @@ -263,23 +401,49 @@ class StatsBenchmarkTProducer { return sendRequestFailedCount; } - public AtomicLong getReceiveResponseSuccessCount() { - return receiveResponseSuccessCount; + public AtomicLong getSendMessageTimeTotal() { + return sendMessageTimeTotal; + } + + public AtomicLong getSendMessageMaxRT() { + return sendMessageMaxRT; } - public AtomicLong getReceiveResponseFailedCount() { - return receiveResponseFailedCount; + public AtomicLong getCheckCount() { + return checkCount; } - public AtomicLong getSendMessageSuccessTimeTotal() { - return sendMessageSuccessTimeTotal; + public AtomicLong getUnexpectedCheckCount() { + return unexpectedCheckCount; } - public AtomicLong getSendMessageMaxRT() { - return sendMessageMaxRT; + public AtomicLong getDuplicatedCheckCount() { + return duplicatedCheckCount; } +} + +class TxSendConfig { + String topic; + int threadCount; + int messageSize; + double sendRollbackRate; + double sendUnknownRate; + double checkRollbackRate; + double checkUnknownRate; + long batchId; + int sendInterval; +} + +class LRUMap extends LinkedHashMap { - public AtomicLong getCheckRequestSuccessCount() { - return checkRequestSuccessCount; + private int maxSize; + + public LRUMap(int maxSize) { + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; } } diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java index 066d36ce..421eedb4 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java @@ -49,10 +49,11 @@ public class ServerUtil { commandLine = parser.parse(options, args); if (commandLine.hasOption('h')) { hf.printHelp(appName, options, true); - return null; + System.exit(0); } } catch (ParseException e) { hf.printHelp(appName, options, true); + System.exit(1); } return commandLine; -- GitLab