未验证 提交 7c7e9aca 编写于 作者: Z zhangjidi2016 提交者: GitHub

[ISSUE #3284]Optimizing benchmark code (#3285)

* [ISSUE #3284]Optimizing benchmark code

* Reduce calls to the longValue method in the loop
Co-authored-by: Nzhangjidi <zhangjidi@cmss.chinamobile.com>
上级 9e66da4b
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.benchmark;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
......@@ -155,20 +156,20 @@ public class Consumer {
MessageExt msg = msgs.get(0);
long now = System.currentTimeMillis();
statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet();
statsBenchmarkConsumer.getReceiveMessageTotalCount().increment();
long born2ConsumerRT = now - msg.getBornTimestamp();
statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT);
statsBenchmarkConsumer.getBorn2ConsumerTotalRT().add(born2ConsumerRT);
long store2ConsumerRT = now - msg.getStoreTimestamp();
statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT);
statsBenchmarkConsumer.getStore2ConsumerTotalRT().add(store2ConsumerRT);
compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT);
compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT);
if (ThreadLocalRandom.current().nextDouble() < failRate) {
statsBenchmarkConsumer.getFailCount().incrementAndGet();
statsBenchmarkConsumer.getFailCount().increment();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
......@@ -233,39 +234,39 @@ public class Consumer {
}
class StatsBenchmarkConsumer {
private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L);
private final LongAdder receiveMessageTotalCount = new LongAdder();
private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L);
private final LongAdder born2ConsumerTotalRT = new LongAdder();
private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L);
private final LongAdder store2ConsumerTotalRT = new LongAdder();
private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L);
private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L);
private final AtomicLong failCount = new AtomicLong(0L);
private final LongAdder failCount = new LongAdder();
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
this.receiveMessageTotalCount.get(),
this.born2ConsumerTotalRT.get(),
this.store2ConsumerTotalRT.get(),
this.failCount.get()
this.receiveMessageTotalCount.longValue(),
this.born2ConsumerTotalRT.longValue(),
this.store2ConsumerTotalRT.longValue(),
this.failCount.longValue()
};
return snap;
}
public AtomicLong getReceiveMessageTotalCount() {
public LongAdder getReceiveMessageTotalCount() {
return receiveMessageTotalCount;
}
public AtomicLong getBorn2ConsumerTotalRT() {
public LongAdder getBorn2ConsumerTotalRT() {
return born2ConsumerTotalRT;
}
public AtomicLong getStore2ConsumerTotalRT() {
public LongAdder getStore2ConsumerTotalRT() {
return store2ConsumerTotalRT;
}
......@@ -277,7 +278,7 @@ class StatsBenchmarkConsumer {
return store2ConsumerMaxRT;
}
public AtomicLong getFailCount() {
public LongAdder getFailCount() {
return failCount;
}
}
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.example.benchmark;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
......@@ -156,8 +157,7 @@ public class Producer {
msg.setDelayTimeLevel(delayLevel);
}
if (tagCount > 0) {
long sendSucCount = statsBenchmark.getReceiveResponseSuccessCount().get();
msg.setTags(String.format("tag%d", sendSucCount % tagCount));
msg.setTags(String.format("tag%d", System.currentTimeMillis() % tagCount));
}
if (propertySize > 0) {
if (msg.getProperties() != null) {
......@@ -180,20 +180,20 @@ public class Producer {
}
}
producer.send(msg);
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
statsBenchmark.getSendRequestSuccessCount().increment();
statsBenchmark.getReceiveResponseSuccessCount().increment();
final long currentRT = System.currentTimeMillis() - beginTimestamp;
statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT);
long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
while (currentRT > prevMaxRT) {
boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT);
if (updated)
break;
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue();
}
} catch (RemotingException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendRequestFailedCount().increment();
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
......@@ -201,16 +201,16 @@ public class Producer {
} catch (InterruptedException ignored) {
}
} catch (InterruptedException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendRequestFailedCount().increment();
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
}
} catch (MQClientException e) {
statsBenchmark.getSendRequestFailedCount().incrementAndGet();
statsBenchmark.getSendRequestFailedCount().increment();
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
} catch (MQBrokerException e) {
statsBenchmark.getReceiveResponseFailedCount().incrementAndGet();
statsBenchmark.getReceiveResponseFailedCount().increment();
log.error("[BENCHMARK_PRODUCER] Send Exception", e);
try {
Thread.sleep(3000);
......@@ -237,8 +237,8 @@ public class Producer {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get());
statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
statsBenchmark.getSendRequestFailedCount().longValue(), statsBenchmark.getReceiveResponseFailedCount().longValue());
}
producer.shutdown();
} catch (InterruptedException e) {
......@@ -294,7 +294,7 @@ public class Producer {
Message msg = new Message();
msg.setTopic(topic);
StringBuilder sb = new StringBuilder();
StringBuilder sb = new StringBuilder(messageSize);
for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby");
}
......@@ -313,58 +313,58 @@ public class Producer {
if (done) {
System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(),
sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
} else {
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]);
}
}
}
class StatsBenchmarkProducer {
private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L);
private final LongAdder sendRequestSuccessCount = new LongAdder();
private final AtomicLong sendRequestFailedCount = new AtomicLong(0L);
private final LongAdder sendRequestFailedCount = new LongAdder();
private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L);
private final LongAdder receiveResponseSuccessCount = new LongAdder();
private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L);
private final LongAdder receiveResponseFailedCount = new LongAdder();
private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L);
private final LongAdder sendMessageSuccessTimeTotal = new LongAdder();
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
public Long[] createSnapshot() {
Long[] snap = new Long[] {
System.currentTimeMillis(),
this.sendRequestSuccessCount.get(),
this.sendRequestFailedCount.get(),
this.receiveResponseSuccessCount.get(),
this.receiveResponseFailedCount.get(),
this.sendMessageSuccessTimeTotal.get(),
this.sendRequestSuccessCount.longValue(),
this.sendRequestFailedCount.longValue(),
this.receiveResponseSuccessCount.longValue(),
this.receiveResponseFailedCount.longValue(),
this.sendMessageSuccessTimeTotal.longValue(),
};
return snap;
}
public AtomicLong getSendRequestSuccessCount() {
public LongAdder getSendRequestSuccessCount() {
return sendRequestSuccessCount;
}
public AtomicLong getSendRequestFailedCount() {
public LongAdder getSendRequestFailedCount() {
return sendRequestFailedCount;
}
public AtomicLong getReceiveResponseSuccessCount() {
public LongAdder getReceiveResponseSuccessCount() {
return receiveResponseSuccessCount;
}
public AtomicLong getReceiveResponseFailedCount() {
public LongAdder getReceiveResponseFailedCount() {
return receiveResponseFailedCount;
}
public AtomicLong getSendMessageSuccessTimeTotal() {
public LongAdder getSendMessageSuccessTimeTotal() {
return sendMessageSuccessTimeTotal;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册