diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 754cbd55ac4598a576064ae59ac2a9ea70a4505c..7ff8bd77e03152f2469db28ee90891ff1dd59be3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.trace; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.common.ThreadLocalIndex; @@ -64,7 +64,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { // The last discard number of log private AtomicLong discardCount; private Thread worker; - private ArrayBlockingQueue traceContextQueue; + private final ArrayBlockingQueue traceContextQueue; private ArrayBlockingQueue appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; @@ -78,7 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private String group; private Type type; - public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) { + public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; @@ -95,12 +95,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; } this.traceExecutor = new ThreadPoolExecutor(// - 10, // - 20, // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.appenderQueue, // - new ThreadFactoryImpl("MQTraceSendThread_")); + 10, // + 20, // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.appenderQueue, // + new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); } @@ -180,10 +180,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } @Override - public void flush() throws IOException { + public void flush() { // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return. long end = System.currentTimeMillis() + 500; - while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) { + while (System.currentTimeMillis() <= end) { + synchronized (traceContextQueue) { + if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) { + break; + } + } try { Thread.sleep(1); } catch (InterruptedException e) { @@ -196,6 +201,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { @Override public void shutdown() { this.stopped = true; + flush(); this.traceExecutor.shutdown(); if (isStarted.get()) { traceProducer.shutdown(); @@ -212,11 +218,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void run() { synchronized (this) { if (!this.hasShutdown) { - try { - flush(); - } catch (IOException e) { - log.error("system MQTrace hook shutdown failed ,maybe loss some trace data"); - } + flush(); } } } @@ -242,25 +244,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void run() { while (!stopped) { List contexts = new ArrayList(batchSize); - for (int i = 0; i < batchSize; i++) { - TraceContext context = null; - try { - //get trace data element from blocking Queue — traceContextQueue - context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { + synchronized (traceContextQueue) { + for (int i = 0; i < batchSize; i++) { + TraceContext context = null; + try { + //get trace data element from blocking Queue - traceContextQueue + context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + if (context != null) { + contexts.add(context); + } else { + break; + } } - if (context != null) { - contexts.add(context); - } else { - break; + if (contexts.size() > 0) { + AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); + traceExecutor.submit(request); + } else if (AsyncTraceDispatcher.this.stopped) { + this.stopped = true; } } - if (contexts.size() > 0) { - AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); - traceExecutor.submit(request); - } else if (AsyncTraceDispatcher.this.stopped) { - this.stopped = true; - } } } @@ -352,7 +356,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { * Send message trace data * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) - * @param data the message trace data in this batch + * @param data the message trace data in this batch */ private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName;