NOTE: only use it for measuring.
+ * http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/System.html#nanoTime%28%29
+ *
+ * @return current time in nanoseconds.
+ */
+ public static long nowInNano() {
+ return System.nanoTime();
+ }
+
+ /**
+ * Milliseconds elapsed since the time specified, the input is nanoTime
+ * the only conversion happens when computing the elapsed time.
+ *
+ * @param startNanoTime the start of the interval that we are measuring
+ * @return elapsed time in milliseconds.
+ */
+ public static long elapsedMSec(long startNanoTime) {
+ return (System.nanoTime() - startNanoTime) / NANOSECONDS_PER_MILLISECOND;
+ }
+
+ /**
+ * Microseconds elapsed since the time specified, the input is nanoTime
+ * the only conversion happens when computing the elapsed time.
+ *
+ * @param startNanoTime the start of the interval that we are measuring
+ * @return elapsed time in milliseconds.
+ */
+ public static long elapsedMicroSec(long startNanoTime) {
+ return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
+ }
+
+ /**
+ * Nanoseconds elapsed since the time specified, the input is nanoTime
+ * the only conversion happens when computing the elapsed time.
+ *
+ * @param startNanoTime the start of the interval that we are measuring
+ * @return elapsed time in milliseconds.
+ */
+ public static long elapsedNanos(long startNanoTime) {
+ return System.nanoTime() - startNanoTime;
+ }
+}
diff --git a/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..0dab4fad8c7ce38e22fc073fe5d8edf715d0aebe
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/rocketmq/mqtt/util/orderedexecutor/MdcUtils.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.rocketmq.mqtt.util.orderedexecutor;
+
+import java.util.Map;
+import org.slf4j.MDC;
+
+/**
+ * Utils for work with Slf4j MDC.
+ */
+public class MdcUtils {
+
+ public static void restoreContext(Map Metrics are not recorded, making this receiver useful in unit tests and as defaults in
+ * situations where metrics are not strictly required.
+ */
+public class NullStatsLogger implements StatsLogger {
+
+ public static final NullStatsLogger INSTANCE = new NullStatsLogger();
+
+ /**
+ * A no-op {@code OpStatsLogger}.
+ */
+ static class NullOpStatsLogger implements OpStatsLogger {
+ final OpStatsData nullOpStats = new OpStatsData(0, 0, 0, new long[6]);
+
+ @Override
+ public void registerFailedEvent(long eventLatency, TimeUnit unit) {
+ // nop
+ }
+
+ @Override
+ public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
+ // nop
+ }
+
+ @Override
+ public void registerSuccessfulValue(long value) {
+ // nop
+ }
+
+ @Override
+ public void registerFailedValue(long value) {
+ // nop
+ }
+
+ @Override
+ public OpStatsData toOpStatsData() {
+ return nullOpStats;
+ }
+
+ @Override
+ public void clear() {
+ // nop
+ }
+ }
+ static NullOpStatsLogger nullOpStatsLogger = new NullOpStatsLogger();
+
+ /**
+ * A no-op {@code Counter}.
+ */
+ static class NullCounter implements Counter {
+ @Override
+ public void clear() {
+ // nop
+ }
+
+ @Override
+ public void inc() {
+ // nop
+ }
+
+ @Override
+ public void dec() {
+ // nop
+ }
+
+ @Override
+ public void add(long delta) {
+ // nop
+ }
+
+ @Override
+ public Long get() {
+ return 0L;
+ }
+ }
+ static NullCounter nullCounter = new NullCounter();
+
+ @Override
+ public OpStatsLogger getOpStatsLogger(String name) {
+ return nullOpStatsLogger;
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ return nullCounter;
+ }
+
+ @Override
+ public 1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
+ * This means that exceptions in scheduled tasks wont go unnoticed and will be logged.
+ *
+ * 2. It supports submitting tasks with an ordering key, so that tasks submitted
+ * with the same key will always be executed in order, but tasks across different keys can be unordered. This retains
+ * parallelism while retaining the basic amount of ordering we want (e.g. , per ledger handle). Ordering is achieved by
+ * hashing the key objects to threads by their {@link #hashCode()} method.
+ */
+public class OrderedExecutor implements ExecutorService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.MQTT_LOGGER_NAME);
+ public static final int NO_TASK_LIMIT = -1;
+ protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
+
+ final String name;
+ final ExecutorService threads[];
+ final long threadIds[];
+ final Random rand = new Random();
+ final OpStatsLogger taskExecutionStats;
+ final OpStatsLogger taskPendingStats;
+ final boolean traceTaskExecution;
+ final boolean preserveMdcForTaskExecution;
+ final long warnTimeMicroSec;
+ final int maxTasksInQueue;
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder class for an OrderedExecutor.
+ */
+ public static class Builder extends AbstractBuilder Eg:
+ * Eg:
+ *
+ *
+ */
+ static SafeRunnable safeRun(Runnable runnable) {
+ return new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ runnable.run();
+ }
+ };
+ }
+
+ /**
+ * Utility method to use SafeRunnable from lambdas with
+ * a custom exception handler.
+ *
+ *
+ * executor.submit(SafeRunnable.safeRun(() -> {
+ * // My not-safe code
+ * });
+ *
+ *
+ *
+ *
+ * @param runnable
+ * @param exceptionHandler
+ * handler that will be called when there are any exception
+ * @return
+ */
+ static SafeRunnable safeRun(Runnable runnable, Consumer
+ * executor.submit(SafeRunnable.safeRun(() -> {
+ * // My not-safe code
+ * }, exception -> {
+ * // Handle exception
+ * );
+ *
+ *