未验证 提交 857d28df 编写于 作者: 【keepal】 提交者: GitHub

[ISSUE #3284]Optimizing benchmark code (#3317)

上级 2cac8662
......@@ -26,6 +26,8 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
......@@ -100,22 +102,22 @@ public class BatchProducer {
try {
long beginTimestamp = System.currentTimeMillis();
long sendSucCount = statsBenchmark.getSendMessageSuccessCount().get();
long sendSucCount = statsBenchmark.getSendMessageSuccessCount().longValue();
setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000));
setTags(tagCount, msgs, sendSucCount);
setProperties(propertySize, msgs);
SendResult sendResult = producer.send(msgs);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getSendMessageSuccessCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestSuccessCount().increment();
statsBenchmark.getSendMessageSuccessCount().add(msgs.size());
} else {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
}
long currentRT = System.currentTimeMillis() - beginTimestamp;
statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
if (updated) {
......@@ -125,8 +127,8 @@ public class BatchProducer {
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
} catch (RemotingException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
......@@ -134,22 +136,22 @@ public class BatchProducer {
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
}
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQClientException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQBrokerException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size());
statsBenchmark.getSendRequestFailedCount().increment();
statsBenchmark.getSendMessageFailedCount().add(msgs.size());
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
......@@ -313,17 +315,17 @@ public class BatchProducer {
class StatsBenchmarkBatchProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
private final LongAdder sendRequestSuccessCount = new LongAdder();
private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
private final LongAdder sendRequestFailedCount = new LongAdder();
private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
private final LongAdder sendMessageSuccessTimeTotal = new LongAdder();
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
private final AtomicLong sendMessageSuccessCount = new AtomicLong(0L);
private final LongAdder sendMessageSuccessCount = new LongAdder();
private final AtomicLong sendMessageFailedCount = new AtomicLong(0L);
private final LongAdder sendMessageFailedCount = new LongAdder();
private final Timer timer = new Timer("BenchmarkTimerThread", true);
......@@ -332,25 +334,25 @@ class StatsBenchmarkBatchProducer {
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
this.sendRequestSuccessCount.get(),
this.sendRequestFailedCount.get(),
this.sendMessageSuccessCount.get(),
this.sendMessageFailedCount.get(),
this.sendMessageSuccessTimeTotal.get(),
this.sendRequestSuccessCount.longValue(),
this.sendRequestFailedCount.longValue(),
this.sendMessageSuccessCount.longValue(),
this.sendMessageFailedCount.longValue(),
this.sendMessageSuccessTimeTotal.longValue(),
};
return snap;
}
public AtomicLong getSendRequestSuccessCount() {
public LongAdder getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
public AtomicLong getSendRequestFailedCount() {
public LongAdder getSendRequestFailedCount() {
return sendRequestFailedCount;
}
public AtomicLong getSendMessageSuccessTimeTotal() {
public LongAdder getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}
......@@ -358,11 +360,11 @@ class StatsBenchmarkBatchProducer {
return sendMessageMaxRT;
}
public AtomicLong getSendMessageSuccessCount() {
public LongAdder getSendMessageSuccessCount() {
return sendMessageSuccessCount;
}
public AtomicLong getSendMessageFailedCount() {
public LongAdder getSendMessageFailedCount() {
return sendMessageFailedCount;
}
......@@ -390,7 +392,7 @@ class StatsBenchmarkBatchProducer {
final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n",
System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().get(), averageRT, averageMsgRT, end[2], end[4]);
System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]);
}
}
......
......@@ -50,10 +50,11 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
public class TransactionProducer {
private static final long START_TIME = System.currentTimeMillis();
private static final AtomicLong MSG_COUNT = new AtomicLong(0);
private static final LongAdder MSG_COUNT = new LongAdder();
//broker max check times should less than this value
static final int MAX_CHECK_RESULT_IN_MSG = 20;
......@@ -158,7 +159,7 @@ public class TransactionProducer {
success = false;
} finally {
final long currentRT = System.currentTimeMillis() - beginTimestamp;
statsBenchmark.getSendMessageTimeTotal().addAndGet(currentRT);
statsBenchmark.getSendMessageTimeTotal().add(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT()
......@@ -169,9 +170,9 @@ public class TransactionProducer {
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
if (success) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getSendRequestSuccessCount().increment();
} else {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendRequestFailedCount().increment();
}
if (config.sendInterval > 0) {
try {
......@@ -194,7 +195,9 @@ public class TransactionProducer {
ByteBuffer buf = ByteBuffer.wrap(bs);
buf.putLong(config.batchId);
long sendMachineId = START_TIME << 32;
long msgId = sendMachineId | MSG_COUNT.getAndIncrement();
long count = MSG_COUNT.longValue();
long msgId = sendMachineId | count;
MSG_COUNT.increment();
buf.putLong(msgId);
// save send tx result in message
......@@ -316,7 +319,7 @@ class TransactionListenerImpl implements TransactionListener {
// message not generated in this test
return LocalTransactionState.ROLLBACK_MESSAGE;
}
statBenchmark.getCheckCount().incrementAndGet();
statBenchmark.getCheckCount().increment();
int times = 0;
try {
......@@ -339,7 +342,7 @@ class TransactionListenerImpl implements TransactionListener {
dup = newCheckLog.equals(oldCheckLog);
}
if (dup) {
statBenchmark.getDuplicatedCheckCount().incrementAndGet();
statBenchmark.getDuplicatedCheckCount().increment();
}
if (msgMeta.sendResult != LocalTransactionState.UNKNOW) {
System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n",
......@@ -347,7 +350,7 @@ class TransactionListenerImpl implements TransactionListener {
msg.getMsgId(), msg.getTransactionId(),
msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES),
msgMeta.sendResult.toString());
statBenchmark.getUnexpectedCheckCount().incrementAndGet();
statBenchmark.getUnexpectedCheckCount().increment();
return msgMeta.sendResult;
}
......@@ -358,7 +361,7 @@ class TransactionListenerImpl implements TransactionListener {
new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()),
msg.getMsgId(), msg.getTransactionId(),
msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s);
statBenchmark.getUnexpectedCheckCount().incrementAndGet();
statBenchmark.getUnexpectedCheckCount().increment();
return s;
}
}
......@@ -385,42 +388,42 @@ class Snapshot {
}
class StatsBenchmarkTProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
private final LongAdder sendRequestSuccessCount = new LongAdder();
private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
private final LongAdder sendRequestFailedCount = new LongAdder();
private final AtomicLong sendMessageTimeTotal = new AtomicLong(0L);
private final LongAdder sendMessageTimeTotal = new LongAdder();
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
private final AtomicLong checkCount = new AtomicLong(0L);
private final LongAdder checkCount = new LongAdder();
private final AtomicLong unexpectedCheckCount = new AtomicLong(0L);
private final LongAdder unexpectedCheckCount = new LongAdder();
private final AtomicLong duplicatedCheckCount = new AtomicLong(0);
private final LongAdder duplicatedCheckCount = new LongAdder();
public Snapshot createSnapshot() {
Snapshot s = new Snapshot();
s.endTime = System.currentTimeMillis();
s.sendRequestSuccessCount = sendRequestSuccessCount.get();
s.sendRequestFailedCount = sendRequestFailedCount.get();
s.sendMessageTimeTotal = sendMessageTimeTotal.get();
s.sendRequestSuccessCount = sendRequestSuccessCount.longValue();
s.sendRequestFailedCount = sendRequestFailedCount.longValue();
s.sendMessageTimeTotal = sendMessageTimeTotal.longValue();
s.sendMessageMaxRT = sendMessageMaxRT.get();
s.checkCount = checkCount.get();
s.unexpectedCheckCount = unexpectedCheckCount.get();
s.duplicatedCheck = duplicatedCheckCount.get();
s.checkCount = checkCount.longValue();
s.unexpectedCheckCount = unexpectedCheckCount.longValue();
s.duplicatedCheck = duplicatedCheckCount.longValue();
return s;
}
public AtomicLong getSendRequestSuccessCount() {
public LongAdder getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
public AtomicLong getSendRequestFailedCount() {
public LongAdder getSendRequestFailedCount() {
return sendRequestFailedCount;
}
public AtomicLong getSendMessageTimeTotal() {
public LongAdder getSendMessageTimeTotal() {
return sendMessageTimeTotal;
}
......@@ -428,15 +431,15 @@ class StatsBenchmarkTProducer {
return sendMessageMaxRT;
}
public AtomicLong getCheckCount() {
public LongAdder getCheckCount() {
return checkCount;
}
public AtomicLong getUnexpectedCheckCount() {
public LongAdder getUnexpectedCheckCount() {
return unexpectedCheckCount;
}
public AtomicLong getDuplicatedCheckCount() {
public LongAdder getDuplicatedCheckCount() {
return duplicatedCheckCount;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册