diff --git a/README.md b/README.md index 45766990db4ea2d038375894c452122d2609a389..a47efbcea346022445452299a0a0ea778a8a2438 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,12 @@ It offers a variety of features: -* Pub/Sub messaging model +* Messageing patterns including publish/subscribe, request/reply and streaming * Financial grade transactional message +* Built-in fault tolerance and high availability configuration options base on [DLedger](https://github.com/openmessaging/openmessaging-storage-dledger) * A variety of cross language clients, such as Java, C/C++, Python, Go * Pluggable transport protocols, such as TCP, SSL, AIO -* Inbuilt message tracing capability, also support opentracing +* Built-in message tracing capability, also support opentracing * Versatile big-data and streaming ecosytem integration * Message retroactivity by time or offset * Reliable FIFO and strict ordered messaging in the same queue @@ -27,7 +28,8 @@ It offers a variety of features: * Various message filter mechanics such as SQL and Tag * Docker images for isolated testing and cloud isolated clusters * Feature-rich administrative dashboard for configuration, metrics and monitoring -* Authentication and authorisation +* Authentication and authorization +* Free open source connectors, for both sources and sinks ---------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 4724a1d7ab69b7d9338cd130e12f2fef257d728d..08897faa2636359e8bb5c982afd71c7a265fbb4a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -22,7 +22,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -49,15 +51,16 @@ public class Consumer { final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; - final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; + final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null; + final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0; String group = groupPrefix; - if (Boolean.parseBoolean(isPrefixEnable)) { - group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100); + if (Boolean.parseBoolean(isSuffixEnable)) { + group = groupPrefix + "_" + (System.currentTimeMillis() % 100); } - System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s, expression: %s%n", topic, group, isPrefixEnable, filterType, expression); + System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s%n", topic, group, isSuffixEnable, filterType, expression); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); @@ -85,9 +88,15 @@ public class Consumer { (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); + final long failCount = end[4] - begin[4]; + final long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get(); + final long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get(); + + statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0); + statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0); - System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", - consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] + System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", + consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax ); } } @@ -144,7 +153,12 @@ public class Consumer { compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + if (ThreadLocalRandom.current().nextDouble() < failRate) { + statsBenchmarkConsumer.getFailCount().incrementAndGet(); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } else { + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } } }); @@ -174,6 +188,10 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); + opt = new Option("r", "fail rate", true, "consumer fail rate, default 0"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -200,14 +218,15 @@ class StatsBenchmarkConsumer { private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L); + private final AtomicLong failCount = new AtomicLong(0L); + public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), this.receiveMessageTotalCount.get(), this.born2ConsumerTotalRT.get(), this.store2ConsumerTotalRT.get(), - this.born2ConsumerMaxRT.get(), - this.store2ConsumerMaxRT.get(), + this.failCount.get() }; return snap; @@ -232,4 +251,8 @@ class StatsBenchmarkConsumer { public AtomicLong getStore2ConsumerMaxRT() { return store2ConsumerMaxRT; } + + public AtomicLong getFailCount() { + return failCount; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3d89fe4c5198cafc22fd33033716215a577fe6ef..352585cbb5fb91c62aff9cfc002de7fde81404d7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -596,7 +596,7 @@ public class CommitLog { msg.setStoreHostAddressV6Flag(); } - long eclipsedTimeInLock = 0; + long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); @@ -647,14 +647,14 @@ public class CommitLog { return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } - eclipsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } - if (eclipsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipsedTimeInLock, msg.getBody().length, result); + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { @@ -752,7 +752,7 @@ public class CommitLog { messageExtBatch.setStoreHostAddressV6Flag(); } - long eclipsedTimeInLock = 0; + long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); @@ -807,14 +807,14 @@ public class CommitLog { return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } - eclipsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; + elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } - if (eclipsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipsedTimeInLock, messageExtBatch.getBody().length, result); + if (elapsedTimeInLock > 500) { + log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {