提交 e0414c00 编写于 作者: H Heng Du 提交者: dinglei

[ISSUE #1200] Polish default message trace topic trace (#1201)

Polish message trace default trace topic implementation
上级 84d2260b
...@@ -16,6 +16,20 @@ ...@@ -16,6 +16,20 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
...@@ -34,21 +48,6 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -34,21 +48,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
public class AsyncTraceDispatcher implements TraceDispatcher { public class AsyncTraceDispatcher implements TraceDispatcher {
...@@ -73,7 +72,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -73,7 +72,6 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String traceTopicName; private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isStarted = new AtomicBoolean(false);
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048; this.queueSize = 2048;
...@@ -88,12 +86,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -88,12 +86,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
} }
this.traceExecuter = new ThreadPoolExecutor(// this.traceExecuter = new ThreadPoolExecutor(//
10, // 10, //
20, // 20, //
1000 * 60, // 1000 * 60, //
TimeUnit.MILLISECONDS, // TimeUnit.MILLISECONDS, //
this.appenderQueue, // this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_")); new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook); traceProducer = getAndCreateTraceProducer(rpcHook);
} }
...@@ -266,8 +264,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -266,8 +264,12 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
// Topic value corresponding to original message entity content // Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic(); String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
// Use original message entity's topic as key // Use original message entity's topic as key
String key = topic; String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
}
List<TraceTransferBean> transBeanList = transBeanMap.get(key); List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) { if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>(); transBeanList = new ArrayList<TraceTransferBean>();
...@@ -277,14 +279,21 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -277,14 +279,21 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
transBeanList.add(traceData); transBeanList.add(traceData);
} }
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) { for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
flushData(entry.getValue()); String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
}
flushData(entry.getValue(), dataTopic, regionId);
} }
} }
/** /**
* Batch sending data actually * Batch sending data actually
*/ */
private void flushData(List<TraceTransferBean> transBeanList) { private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
if (transBeanList.size() == 0) { if (transBeanList.size() == 0) {
return; return;
} }
...@@ -300,7 +309,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -300,7 +309,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
count++; count++;
// Ensure that the size of the package should not exceed the upper limit. // Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) { if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString()); sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
// Clear temporary buffer after finishing // Clear temporary buffer after finishing
buffer.delete(0, buffer.length()); buffer.delete(0, buffer.length());
keySet.clear(); keySet.clear();
...@@ -308,7 +317,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -308,7 +317,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
} }
if (count > 0) { if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString()); sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
} }
transBeanList.clear(); transBeanList.clear();
} }
...@@ -317,16 +326,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -317,16 +326,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
* Send message trace data * Send message trace data
* *
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch * @param data the message trace data in this batch
*/ */
private void sendTraceDataByMQ(Set<String> keySet, final String data) { private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String topic = traceTopicName; String traceTopic = traceTopicName;
final Message message = new Message(topic, data.getBytes()); if (StringUtils.isNotEmpty(regionId) && MixAll.RMQ_SYS_TRACE_TOPIC.equals(traceTopic)) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message // Keyset of message trace includes msgId of or original message
message.setKeys(keySet); message.setKeys(keySet);
try { try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
SendCallback callback = new SendCallback() { SendCallback callback = new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
......
...@@ -16,10 +16,13 @@ ...@@ -16,10 +16,13 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.MixAll;
public class TraceConstants { public class TraceConstants {
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1; public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2; public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册