未验证 提交 a2f8810c 编写于 作者: Y yuz10 提交者: GitHub

[ISSUE #2988] fix fail to send trace of last message before shutting down producer

上级 6b519ef1
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
...@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit; ...@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
...@@ -64,7 +64,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -64,7 +64,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
// The last discard number of log // The last discard number of log
private AtomicLong discardCount; private AtomicLong discardCount;
private Thread worker; private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue; private final ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue; private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook; private volatile Thread shutDownHook;
private volatile boolean stopped = false; private volatile boolean stopped = false;
...@@ -78,7 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -78,7 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String group; private String group;
private Type type; private Type type;
public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) { public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048; this.queueSize = 2048;
this.batchSize = 100; this.batchSize = 100;
...@@ -95,12 +95,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -95,12 +95,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC; this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
} }
this.traceExecutor = new ThreadPoolExecutor(// this.traceExecutor = new ThreadPoolExecutor(//
10, // 10, //
20, // 20, //
1000 * 60, // 1000 * 60, //
TimeUnit.MILLISECONDS, // TimeUnit.MILLISECONDS, //
this.appenderQueue, // this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_")); new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook); traceProducer = getAndCreateTraceProducer(rpcHook);
} }
...@@ -180,10 +180,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -180,10 +180,15 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
@Override @Override
public void flush() throws IOException { public void flush() {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return. // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500; long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) { while (System.currentTimeMillis() <= end) {
synchronized (traceContextQueue) {
if (traceContextQueue.size() == 0 && appenderQueue.size() == 0) {
break;
}
}
try { try {
Thread.sleep(1); Thread.sleep(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -196,6 +201,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -196,6 +201,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
@Override @Override
public void shutdown() { public void shutdown() {
this.stopped = true; this.stopped = true;
flush();
this.traceExecutor.shutdown(); this.traceExecutor.shutdown();
if (isStarted.get()) { if (isStarted.get()) {
traceProducer.shutdown(); traceProducer.shutdown();
...@@ -212,11 +218,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -212,11 +218,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() { public void run() {
synchronized (this) { synchronized (this) {
if (!this.hasShutdown) { if (!this.hasShutdown) {
try { flush();
flush();
} catch (IOException e) {
log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
}
} }
} }
} }
...@@ -242,25 +244,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -242,25 +244,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void run() { public void run() {
while (!stopped) { while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize); List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) { synchronized (traceContextQueue) {
TraceContext context = null; for (int i = 0; i < batchSize; i++) {
try { TraceContext context = null;
//get trace data element from blocking Queue — traceContextQueue try {
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); //get trace data element from blocking Queue - traceContextQueue
} catch (InterruptedException e) { context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
} }
if (context != null) { if (contexts.size() > 0) {
contexts.add(context); AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
} else { traceExecutor.submit(request);
break; } else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
} }
} }
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
} }
} }
...@@ -352,7 +356,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -352,7 +356,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
* Send message trace data * Send message trace data
* *
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch * @param data the message trace data in this batch
*/ */
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) { private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName; String traceTopic = traceTopicName;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册