From e0414c005034471d9b461b478a581a5027c793ec Mon Sep 17 00:00:00 2001 From: Heng Du Date: Mon, 13 May 2019 16:39:46 +0800 Subject: [PATCH] [ISSUE #1200] Polish default message trace topic trace (#1201) Polish message trace default trace topic implementation --- .../client/trace/AsyncTraceDispatcher.java | 74 +++++++++++-------- .../rocketmq/client/trace/TraceConstants.java | 3 + 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 87a795e4..0aaadb18 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -16,6 +16,20 @@ */ package org.apache.rocketmq.client.trace; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -34,21 +48,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; -import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.UUID; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.Set; -import java.util.HashSet; - - import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; public class AsyncTraceDispatcher implements TraceDispatcher { @@ -73,7 +72,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); - public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; @@ -88,12 +86,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } this.traceExecuter = new ThreadPoolExecutor(// - 10, // - 20, // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.appenderQueue, // - new ThreadFactoryImpl("MQTraceSendThread_")); + 10, // + 20, // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.appenderQueue, // + new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); } @@ -266,8 +264,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); + String regionId = context.getRegionId(); // Use original message entity's topic as key String key = topic; + if (!StringUtils.isBlank(regionId)) { + key = key + TraceConstants.CONTENT_SPLITOR + regionId; + } List transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList(); @@ -277,14 +279,21 @@ public class AsyncTraceDispatcher implements TraceDispatcher { transBeanList.add(traceData); } for (Map.Entry> entry : transBeanMap.entrySet()) { - flushData(entry.getValue()); + String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); + String dataTopic = entry.getKey(); + String regionId = null; + if (key.length > 1) { + dataTopic = key[0]; + regionId = key[1]; + } + flushData(entry.getValue(), dataTopic, regionId); } } /** * Batch sending data actually */ - private void flushData(List transBeanList) { + private void flushData(List transBeanList, String dataTopic, String regionId) { if (transBeanList.size() == 0) { return; } @@ -300,7 +309,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { count++; // Ensure that the size of the package should not exceed the upper limit. if (buffer.length() >= traceProducer.getMaxMessageSize()) { - sendTraceDataByMQ(keySet, buffer.toString()); + sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); // Clear temporary buffer after finishing buffer.delete(0, buffer.length()); keySet.clear(); @@ -308,7 +317,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } } if (count > 0) { - sendTraceDataByMQ(keySet, buffer.toString()); + sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId); } transBeanList.clear(); } @@ -317,16 +326,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher { * Send message trace data * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) - * @param data the message trace data in this batch + * @param data the message trace data in this batch */ - private void sendTraceDataByMQ(Set keySet, final String data) { - String topic = traceTopicName; - final Message message = new Message(topic, data.getBytes()); + private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, String regionId) { + String traceTopic = traceTopicName; + if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) { + traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; + } + final Message message = new Message(traceTopic, data.getBytes()); // Keyset of message trace includes msgId of or original message message.setKeys(keySet); try { - Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); + Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic); SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index b9fd8778..e61ea9d1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -16,10 +16,13 @@ */ package org.apache.rocketmq.client.trace; +import org.apache.rocketmq.common.MixAll; + public class TraceConstants { public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; + public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; } -- GitLab