From 83633deae16f6aeecf6aded8a9169d7c98ea7bd9 Mon Sep 17 00:00:00 2001
From: Heng Du
Date: Fri, 29 Nov 2019 12:23:18 +0800
Subject: [PATCH] feat(client) plug-in message tracing feature (#1624)
* feat(client) seprated the messaging tracing to ons
* fix(trace) fix the rpchook is null exception
---
client/pom.xml | 13 +-
.../consumer/DefaultMQPushConsumer.java | 106 +++--
.../client/producer/DefaultMQProducer.java | 74 ++--
.../client/trace/AsyncTraceDispatcher.java | 413 ------------------
.../rocketmq/client/trace/TraceBean.java | 144 ------
.../rocketmq/client/trace/TraceConstants.java | 28 --
.../rocketmq/client/trace/TraceContext.java | 136 ------
.../client/trace/TraceDataEncoder.java | 173 --------
.../client/trace/TraceDispatcher.java | 51 ---
.../client/trace/TraceDispatcherType.java | 22 -
.../client/trace/TraceTransferBean.java | 44 --
.../rocketmq/client/trace/TraceType.java | 23 -
.../hook/ConsumeMessageTraceHookImpl.java | 114 -----
.../trace/hook/SendMessageTraceHookImpl.java | 98 -----
.../trace/DefaultMQConsumerWithTraceTest.java | 5 +-
.../trace/DefaultMQProducerWithTraceTest.java | 7 +-
pom.xml | 7 +-
17 files changed, 130 insertions(+), 1328 deletions(-)
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
diff --git a/client/pom.xml b/client/pom.xml
index 1cf292be..f1b61d3a 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -15,7 +15,8 @@
limitations under the License.
-->
-
+
org.apache.rocketmq
rocketmq-all
@@ -62,5 +63,15 @@
log4j-slf4j-impl
test
+
+ org.apache.rocketmq
+ ons-trace-core
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 339f799f..07239f89 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.consumer;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -30,9 +32,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -42,6 +41,11 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsConsumeMessageHookImpl;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -100,17 +104,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* -
*
CONSUME_FROM_LAST_OFFSET
: consumer clients pick up where it stopped previously.
- * If it were a newly booting up consumer client, according aging of the consumer group, there are two
- * cases:
+ * If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
*
* -
- * if the consumer group is created so recently that the earliest message being subscribed has yet
- * expired, which means the consumer group represents a lately launched business, consuming will
- * start from the very beginning;
+ * if the consumer group is created so recently that the earliest message being subscribed has yet expired, which
+ * means the consumer group represents a lately launched business, consuming will start from the very beginning;
*
* -
- * if the earliest message being subscribed has expired, consuming will start from the latest
- * messages, meaning messages born prior to the booting timestamp would be ignored.
+ * if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning
+ * messages born prior to the booting timestamp would be ignored.
*
*
*
@@ -126,10 +128,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
- * Backtracking consumption time with second precision. Time format is
- * 20131223171201
- * Implying Seventeen twelve and 01 seconds on December 23, 2013 year
- * Default backtracking consumption time Half an hour ago.
+ * Backtracking consumption time with second precision. Time format is 20131223171201
Implying Seventeen twelve
+ * and 01 seconds on December 23, 2013 year
Default backtracking consumption time Half an hour ago.
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
@@ -174,8 +174,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
- * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
- * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+ * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
+ * the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/
private int pullThresholdForQueue = 1000;
@@ -191,8 +191,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Flow control threshold on topic level, default value is -1(Unlimited)
*
- * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
- * {@code pullThresholdForTopic} if it is't unlimited
+ * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code
+ * pullThresholdForTopic} if it is't unlimited
*
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100
@@ -202,11 +202,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
*
- * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
- * {@code pullThresholdSizeForTopic} if it is't unlimited
+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code
+ * pullThresholdSizeForTopic} if it is't unlimited
*
- * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
- * assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
+ * consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/
private int pullThresholdSizeForTopic = -1;
@@ -257,7 +257,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Interface of asynchronous transfer data
*/
- private TraceDispatcher traceDispatcher = null;
+ private AsyncDispatcher traceDispatcher = null;
/**
* Default constructor.
@@ -285,7 +285,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely());
}
-
/**
* Constructor specifying RPC hook.
*
@@ -349,52 +348,70 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
+ public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
-
/**
- * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
+ * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
+ * customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic);
}
/**
- * Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
+ * Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace
+ * flag and customized trace topic name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
- dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
- traceDispatcher = dispatcher;
- this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
- new ConsumeMessageTraceHookImpl(traceDispatcher));
+ Properties tempProperties = new Properties();
+ tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
+ tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
+ tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
+ tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
+ tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_CONSUMER");
+ tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
+ if (customizedTraceTopic != null) {
+ tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, customizedTraceTopic);
+ } else {
+ tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, MixAll.RMQ_SYS_TRACE_TOPIC);
+ }
+ this.traceDispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
+
+ ((AsyncArrayDispatcher) traceDispatcher).setHostConsumer(this.getDefaultMQPushConsumerImpl());
+ this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new OnsConsumeMessageHookImpl(traceDispatcher));
} catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ log.error("system mqtrace hook init failed ,maybe can't send msg trace data", e);
}
}
}
@@ -693,7 +710,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
- traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
+ if (this.accessChannel == AccessChannel.CLOUD) {
+ ((AsyncArrayDispatcher) this.traceDispatcher).setCustomizedTraceTopic(null);
+ }
+ traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
@@ -744,8 +764,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
- * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
- * if null or * expression,meaning subscribe all
+ * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
if
+ * null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
@@ -886,7 +906,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeTimeout = consumeTimeout;
}
- public TraceDispatcher getTraceDispatcher() {
+ public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher;
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index faa79f54..63b337f8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.producer;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
@@ -27,9 +29,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
@@ -39,6 +38,11 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -51,8 +55,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* This class aggregates various send
methods to deliver messages to brokers. Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding.
*
- * Thread Safety: After configuring and starting process, this class can be regarded as thread-safe
- * and used among multiple threads context.
+ * Thread Safety: After configuring and starting process, this class can be regarded as
+ * thread-safe and used among multiple threads context.
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
@@ -118,7 +122,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Interface of asynchronous transfer data
*/
- private TraceDispatcher traceDispatcher = null;
+ private AsyncDispatcher traceDispatcher = null;
/**
* Default constructor.
@@ -158,17 +162,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
final String customizedTraceTopic) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- //if client open the message trace feature
if (enableMsgTrace) {
- try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
- dispatcher.setHostProducer(this.defaultMQProducerImpl);
- traceDispatcher = dispatcher;
- this.defaultMQProducerImpl.registerSendMessageHook(
- new SendMessageTraceHookImpl(traceDispatcher));
- } catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
- }
+ enableTrace(customizedTraceTopic, rpcHook);
}
}
@@ -243,17 +238,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- //if client open the message trace feature
if (enableMsgTrace) {
- try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
- dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
- traceDispatcher = dispatcher;
- this.getDefaultMQProducerImpl().registerSendMessageHook(
- new SendMessageTraceHookImpl(traceDispatcher));
- } catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ enableTrace(customizedTraceTopic, rpcHook);
+ }
+ }
+
+ private void enableTrace(String customizedTraceTopic, RPCHook rpcHook) {
+ try {
+ Properties tempProperties = new Properties();
+ tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
+ tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
+ tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
+ tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
+ tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
+ tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
+ if (customizedTraceTopic != null) {
+ tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, customizedTraceTopic);
+ } else {
+ tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, MixAll.RMQ_SYS_TRACE_TOPIC);
}
+ this.traceDispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
+
+ ((AsyncArrayDispatcher) traceDispatcher).setHostProducer(this.defaultMQProducerImpl);
+ this.defaultMQProducerImpl.registerSendMessageHook(new OnsClientSendMessageHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
@@ -271,7 +280,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
- traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
+ if (this.accessChannel == AccessChannel.CLOUD) {
+ ((AsyncArrayDispatcher) this.traceDispatcher).setCustomizedTraceTopic(null);
+ }
+ traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
@@ -566,7 +578,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message.
+ * Send request message in synchronous mode. This method returns only when the consumer consume the request message
+ * and reply a message.
*
* Warn: this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
@@ -589,8 +602,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Request asynchronously.
- * This method returns immediately. On receiving reply message, requestCallback
will be executed.
+ * Request asynchronously. This method returns immediately. On receiving reply message,
+ * requestCallback
will be executed.
*
* Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
@@ -1062,8 +1075,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
- public TraceDispatcher getTraceDispatcher() {
+ public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher;
}
-
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
deleted file mode 100644
index 06a28e44..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.common.ThreadLocalIndex;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
-import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.MessageQueueSelector;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.remoting.RPCHook;
-
-import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
-
-public class AsyncTraceDispatcher implements TraceDispatcher {
-
- private final static InternalLogger log = ClientLogger.getLog();
- private final int queueSize;
- private final int batchSize;
- private final int maxMsgSize;
- private final DefaultMQProducer traceProducer;
- private final ThreadPoolExecutor traceExecutor;
- // The last discard number of log
- private AtomicLong discardCount;
- private Thread worker;
- private ArrayBlockingQueue traceContextQueue;
- private ArrayBlockingQueue appenderQueue;
- private volatile Thread shutDownHook;
- private volatile boolean stopped = false;
- private DefaultMQProducerImpl hostProducer;
- private DefaultMQPushConsumerImpl hostConsumer;
- private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
- private String dispatcherId = UUID.randomUUID().toString();
- private String traceTopicName;
- private AtomicBoolean isStarted = new AtomicBoolean(false);
- private AccessChannel accessChannel = AccessChannel.LOCAL;
-
- public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
- // queueSize is greater than or equal to the n power of 2 of value
- this.queueSize = 2048;
- this.batchSize = 100;
- this.maxMsgSize = 128000;
- this.discardCount = new AtomicLong(0L);
- this.traceContextQueue = new ArrayBlockingQueue(1024);
- this.appenderQueue = new ArrayBlockingQueue(queueSize);
- if (!UtilAll.isBlank(traceTopicName)) {
- this.traceTopicName = traceTopicName;
- } else {
- this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
- }
- this.traceExecutor = new ThreadPoolExecutor(//
- 10, //
- 20, //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.appenderQueue, //
- new ThreadFactoryImpl("MQTraceSendThread_"));
- traceProducer = getAndCreateTraceProducer(rpcHook);
- }
-
- public AccessChannel getAccessChannel() {
- return accessChannel;
- }
-
- public void setAccessChannel(AccessChannel accessChannel) {
- this.accessChannel = accessChannel;
- }
-
- public String getTraceTopicName() {
- return traceTopicName;
- }
-
- public void setTraceTopicName(String traceTopicName) {
- this.traceTopicName = traceTopicName;
- }
-
- public DefaultMQProducer getTraceProducer() {
- return traceProducer;
- }
-
- public DefaultMQProducerImpl getHostProducer() {
- return hostProducer;
- }
-
- public void setHostProducer(DefaultMQProducerImpl hostProducer) {
- this.hostProducer = hostProducer;
- }
-
- public DefaultMQPushConsumerImpl getHostConsumer() {
- return hostConsumer;
- }
-
- public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
- this.hostConsumer = hostConsumer;
- }
-
- public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
- if (isStarted.compareAndSet(false, true)) {
- traceProducer.setNamesrvAddr(nameSrvAddr);
- traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
- traceProducer.start();
- }
- this.accessChannel = accessChannel;
- this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
- this.worker.setDaemon(true);
- this.worker.start();
- this.registerShutDownHook();
- }
-
- private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
- DefaultMQProducer traceProducerInstance = this.traceProducer;
- if (traceProducerInstance == null) {
- traceProducerInstance = new DefaultMQProducer(rpcHook);
- traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
- traceProducerInstance.setSendMsgTimeout(5000);
- traceProducerInstance.setVipChannelEnabled(false);
- // The max size of message is 128K
- traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
- }
- return traceProducerInstance;
- }
-
- @Override
- public boolean append(final Object ctx) {
- boolean result = traceContextQueue.offer((TraceContext) ctx);
- if (!result) {
- log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
- }
- return result;
- }
-
- @Override
- public void flush() throws IOException {
- // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
- long end = System.currentTimeMillis() + 500;
- while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- break;
- }
- }
- log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
- }
-
- @Override
- public void shutdown() {
- this.stopped = true;
- this.traceExecutor.shutdown();
- if (isStarted.get()) {
- traceProducer.shutdown();
- }
- this.removeShutdownHook();
- }
-
- public void registerShutDownHook() {
- if (shutDownHook == null) {
- shutDownHook = new Thread(new Runnable() {
- private volatile boolean hasShutdown = false;
-
- @Override
- public void run() {
- synchronized (this) {
- if (!this.hasShutdown) {
- try {
- flush();
- } catch (IOException e) {
- log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
- }
- }
- }
- }
- }, "ShutdownHookMQTrace");
- Runtime.getRuntime().addShutdownHook(shutDownHook);
- }
- }
-
- public void removeShutdownHook() {
- if (shutDownHook != null) {
- try {
- Runtime.getRuntime().removeShutdownHook(shutDownHook);
- } catch (IllegalStateException e) {
- // ignore - VM is already shutting down
- }
- }
- }
-
- class AsyncRunnable implements Runnable {
- private boolean stopped;
-
- @Override
- public void run() {
- while (!stopped) {
- List contexts = new ArrayList(batchSize);
- for (int i = 0; i < batchSize; i++) {
- TraceContext context = null;
- try {
- //get trace data element from blocking Queue — traceContextQueue
- context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- if (context != null) {
- contexts.add(context);
- } else {
- break;
- }
- }
- if (contexts.size() > 0) {
- AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
- traceExecutor.submit(request);
- } else if (AsyncTraceDispatcher.this.stopped) {
- this.stopped = true;
- }
- }
-
- }
- }
-
- class AsyncAppenderRequest implements Runnable {
- List contextList;
-
- public AsyncAppenderRequest(final List contextList) {
- if (contextList != null) {
- this.contextList = contextList;
- } else {
- this.contextList = new ArrayList(1);
- }
- }
-
- @Override
- public void run() {
- sendTraceData(contextList);
- }
-
- public void sendTraceData(List contextList) {
- Map> transBeanMap = new HashMap>();
- for (TraceContext context : contextList) {
- if (context.getTraceBeans().isEmpty()) {
- continue;
- }
- // Topic value corresponding to original message entity content
- String topic = context.getTraceBeans().get(0).getTopic();
- String regionId = context.getRegionId();
- // Use original message entity's topic as key
- String key = topic;
- if (!StringUtils.isBlank(regionId)) {
- key = key + TraceConstants.CONTENT_SPLITOR + regionId;
- }
- List transBeanList = transBeanMap.get(key);
- if (transBeanList == null) {
- transBeanList = new ArrayList();
- transBeanMap.put(key, transBeanList);
- }
- TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
- transBeanList.add(traceData);
- }
- for (Map.Entry> entry : transBeanMap.entrySet()) {
- String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
- String dataTopic = entry.getKey();
- String regionId = null;
- if (key.length > 1) {
- dataTopic = key[0];
- regionId = key[1];
- }
- flushData(entry.getValue(), dataTopic, regionId);
- }
- }
-
- /**
- * Batch sending data actually
- */
- private void flushData(List transBeanList, String dataTopic, String regionId) {
- if (transBeanList.size() == 0) {
- return;
- }
- // Temporary buffer
- StringBuilder buffer = new StringBuilder(1024);
- int count = 0;
- Set keySet = new HashSet();
-
- for (TraceTransferBean bean : transBeanList) {
- // Keyset of message trace includes msgId of or original message
- keySet.addAll(bean.getTransKey());
- buffer.append(bean.getTransData());
- count++;
- // Ensure that the size of the package should not exceed the upper limit.
- if (buffer.length() >= traceProducer.getMaxMessageSize()) {
- sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
- // Clear temporary buffer after finishing
- buffer.delete(0, buffer.length());
- keySet.clear();
- count = 0;
- }
- }
- if (count > 0) {
- sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
- }
- transBeanList.clear();
- }
-
- /**
- * Send message trace data
- *
- * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
- * @param data the message trace data in this batch
- */
- private void sendTraceDataByMQ(Set keySet, final String data, String dataTopic, String regionId) {
- String traceTopic = traceTopicName;
- if (AccessChannel.CLOUD == accessChannel) {
- traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
- }
- final Message message = new Message(traceTopic, data.getBytes());
- // Keyset of message trace includes msgId of or original message
- message.setKeys(keySet);
- try {
- Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
- SendCallback callback = new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
-
- }
-
- @Override
- public void onException(Throwable e) {
- log.info("send trace data ,the traceData is " + data);
- }
- };
- if (traceBrokerSet.isEmpty()) {
- // No cross set
- traceProducer.send(message, callback, 5000);
- } else {
- traceProducer.send(message, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List mqs, Message msg, Object arg) {
- Set brokerSet = (Set) arg;
- List filterMqs = new ArrayList();
- for (MessageQueue queue : mqs) {
- if (brokerSet.contains(queue.getBrokerName())) {
- filterMqs.add(queue);
- }
- }
- int index = sendWhichQueue.getAndIncrement();
- int pos = Math.abs(index) % filterMqs.size();
- if (pos < 0) {
- pos = 0;
- }
- return filterMqs.get(pos);
- }
- }, traceBrokerSet, callback);
- }
-
- } catch (Exception e) {
- log.info("send trace data,the traceData is" + data);
- }
- }
-
- private Set tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
- Set brokerSet = new HashSet();
- TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
- if (null == topicPublishInfo || !topicPublishInfo.ok()) {
- producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
- producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
- topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
- }
- if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
- for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
- brokerSet.add(queue.getBrokerName());
- }
- }
- return brokerSet;
- }
- }
-
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
deleted file mode 100644
index f93aa38b..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageType;
-
-public class TraceBean {
- private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
- private String topic = "";
- private String msgId = "";
- private String offsetMsgId = "";
- private String tags = "";
- private String keys = "";
- private String storeHost = LOCAL_ADDRESS;
- private String clientHost = LOCAL_ADDRESS;
- private long storeTime;
- private int retryTimes;
- private int bodyLength;
- private MessageType msgType;
-
-
- public MessageType getMsgType() {
- return msgType;
- }
-
-
- public void setMsgType(final MessageType msgType) {
- this.msgType = msgType;
- }
-
-
- public String getOffsetMsgId() {
- return offsetMsgId;
- }
-
-
- public void setOffsetMsgId(final String offsetMsgId) {
- this.offsetMsgId = offsetMsgId;
- }
-
- public String getTopic() {
- return topic;
- }
-
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
- public String getMsgId() {
- return msgId;
- }
-
-
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
-
-
- public String getTags() {
- return tags;
- }
-
-
- public void setTags(String tags) {
- this.tags = tags;
- }
-
-
- public String getKeys() {
- return keys;
- }
-
-
- public void setKeys(String keys) {
- this.keys = keys;
- }
-
-
- public String getStoreHost() {
- return storeHost;
- }
-
-
- public void setStoreHost(String storeHost) {
- this.storeHost = storeHost;
- }
-
-
- public String getClientHost() {
- return clientHost;
- }
-
-
- public void setClientHost(String clientHost) {
- this.clientHost = clientHost;
- }
-
-
- public long getStoreTime() {
- return storeTime;
- }
-
-
- public void setStoreTime(long storeTime) {
- this.storeTime = storeTime;
- }
-
-
- public int getRetryTimes() {
- return retryTimes;
- }
-
-
- public void setRetryTimes(int retryTimes) {
- this.retryTimes = retryTimes;
- }
-
-
- public int getBodyLength() {
- return bodyLength;
- }
-
-
- public void setBodyLength(int bodyLength) {
- this.bodyLength = bodyLength;
- }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
deleted file mode 100644
index e61ea9d1..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.MixAll;
-
-public class TraceConstants {
-
- public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
- public static final char CONTENT_SPLITOR = (char) 1;
- public static final char FIELD_SPLITOR = (char) 2;
- public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
- public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
deleted file mode 100644
index f61ba888..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-
-import java.util.List;
-
-/**
- * The context of Trace
- */
-public class TraceContext implements Comparable {
-
- private TraceType traceType;
- private long timeStamp = System.currentTimeMillis();
- private String regionId = "";
- private String regionName = "";
- private String groupName = "";
- private int costTime = 0;
- private boolean isSuccess = true;
- private String requestId = MessageClientIDSetter.createUniqID();
- private int contextCode = 0;
- private List traceBeans;
-
- public int getContextCode() {
- return contextCode;
- }
-
- public void setContextCode(final int contextCode) {
- this.contextCode = contextCode;
- }
-
- public List getTraceBeans() {
- return traceBeans;
- }
-
- public void setTraceBeans(List traceBeans) {
- this.traceBeans = traceBeans;
- }
-
- public String getRegionId() {
- return regionId;
- }
-
- public void setRegionId(String regionId) {
- this.regionId = regionId;
- }
-
- public TraceType getTraceType() {
- return traceType;
- }
-
- public void setTraceType(TraceType traceType) {
- this.traceType = traceType;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public String getGroupName() {
- return groupName;
- }
-
- public void setGroupName(String groupName) {
- this.groupName = groupName;
- }
-
- public int getCostTime() {
- return costTime;
- }
-
- public void setCostTime(int costTime) {
- this.costTime = costTime;
- }
-
- public boolean isSuccess() {
- return isSuccess;
- }
-
- public void setSuccess(boolean success) {
- isSuccess = success;
- }
-
- public String getRequestId() {
- return requestId;
- }
-
- public void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
- public String getRegionName() {
- return regionName;
- }
-
- public void setRegionName(String regionName) {
- this.regionName = regionName;
- }
-
- @Override
- public int compareTo(TraceContext o) {
- return (int) (this.timeStamp - o.getTimeStamp());
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(1024);
- sb.append(traceType).append("_").append(groupName)
- .append("_").append(regionId).append("_").append(isSuccess).append("_");
- if (traceBeans != null && traceBeans.size() > 0) {
- for (TraceBean bean : traceBeans) {
- sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
- }
- }
- return "TraceContext{" + sb.toString() + '}';
- }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
deleted file mode 100644
index 5a1afaf3..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.common.message.MessageType;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Encode/decode for Trace Data
- */
-public class TraceDataEncoder {
-
- /**
- * Resolving traceContext list From trace data String
- *
- * @param traceData
- * @return
- */
- public static List decoderFromTraceDataString(String traceData) {
- List resList = new ArrayList();
- if (traceData == null || traceData.length() <= 0) {
- return resList;
- }
- String[] contextList = traceData.split(String.valueOf(TraceConstants.FIELD_SPLITOR));
- for (String context : contextList) {
- String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
- if (line[0].equals(TraceType.Pub.name())) {
- TraceContext pubContext = new TraceContext();
- pubContext.setTraceType(TraceType.Pub);
- pubContext.setTimeStamp(Long.parseLong(line[1]));
- pubContext.setRegionId(line[2]);
- pubContext.setGroupName(line[3]);
- TraceBean bean = new TraceBean();
- bean.setTopic(line[4]);
- bean.setMsgId(line[5]);
- bean.setTags(line[6]);
- bean.setKeys(line[7]);
- bean.setStoreHost(line[8]);
- bean.setBodyLength(Integer.parseInt(line[9]));
- pubContext.setCostTime(Integer.parseInt(line[10]));
- bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]);
-
- if (line.length == 13) {
- pubContext.setSuccess(Boolean.parseBoolean(line[12]));
- } else if (line.length == 14) {
- bean.setOffsetMsgId(line[12]);
- pubContext.setSuccess(Boolean.parseBoolean(line[13]));
- }
- pubContext.setTraceBeans(new ArrayList(1));
- pubContext.getTraceBeans().add(bean);
- resList.add(pubContext);
- } else if (line[0].equals(TraceType.SubBefore.name())) {
- TraceContext subBeforeContext = new TraceContext();
- subBeforeContext.setTraceType(TraceType.SubBefore);
- subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
- subBeforeContext.setRegionId(line[2]);
- subBeforeContext.setGroupName(line[3]);
- subBeforeContext.setRequestId(line[4]);
- TraceBean bean = new TraceBean();
- bean.setMsgId(line[5]);
- bean.setRetryTimes(Integer.parseInt(line[6]));
- bean.setKeys(line[7]);
- subBeforeContext.setTraceBeans(new ArrayList(1));
- subBeforeContext.getTraceBeans().add(bean);
- resList.add(subBeforeContext);
- } else if (line[0].equals(TraceType.SubAfter.name())) {
- TraceContext subAfterContext = new TraceContext();
- subAfterContext.setTraceType(TraceType.SubAfter);
- subAfterContext.setRequestId(line[1]);
- TraceBean bean = new TraceBean();
- bean.setMsgId(line[2]);
- bean.setKeys(line[5]);
- subAfterContext.setTraceBeans(new ArrayList(1));
- subAfterContext.getTraceBeans().add(bean);
- subAfterContext.setCostTime(Integer.parseInt(line[3]));
- subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
- if (line.length >= 7) {
- // add the context type
- subAfterContext.setContextCode(Integer.parseInt(line[6]));
- }
- resList.add(subAfterContext);
- }
- }
- return resList;
- }
-
- /**
- * Encoding the trace context into data strings and keyset sets
- *
- * @param ctx
- * @return
- */
- public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
- if (ctx == null) {
- return null;
- }
- //build message trace of the transfering entity content bean
- TraceTransferBean transferBean = new TraceTransferBean();
- StringBuilder sb = new StringBuilder(256);
- switch (ctx.getTraceType()) {
- case Pub: {
- TraceBean bean = ctx.getTraceBeans().get(0);
- //append the content of context and traceBean to transferBean's TransData
- sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
- }
- break;
- case SubBefore: {
- for (TraceBean bean : ctx.getTraceBeans()) {
- sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
- }
- }
- break;
- case SubAfter: {
- for (TraceBean bean : ctx.getTraceBeans()) {
- sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
- }
- }
- break;
- default:
- }
- transferBean.setTransData(sb.toString());
- for (TraceBean bean : ctx.getTraceBeans()) {
-
- transferBean.getTransKey().add(bean.getMsgId());
- if (bean.getKeys() != null && bean.getKeys().length() > 0) {
- transferBean.getTransKey().add(bean.getKeys());
- }
- }
- return transferBean;
- }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
deleted file mode 100644
index 51cc0deb..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import org.apache.rocketmq.client.AccessChannel;
-import org.apache.rocketmq.client.exception.MQClientException;
-import java.io.IOException;
-
-/**
- * Interface of asynchronous transfer data
- */
-public interface TraceDispatcher {
-
- /**
- * Initialize asynchronous transfer data module
- */
- void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
-
- /**
- * Append the transfering data
- * @param ctx data infomation
- * @return
- */
- boolean append(Object ctx);
-
- /**
- * Write flush action
- *
- * @throws IOException
- */
- void flush() throws IOException;
-
- /**
- * Close the trace Hook
- */
- void shutdown();
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java
deleted file mode 100644
index f09c9b8d..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-public enum TraceDispatcherType {
- PRODUCER,
- CONSUMER
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
deleted file mode 100644
index 052ca365..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Trace transfering bean
- */
-public class TraceTransferBean {
- private String transData;
- private Set transKey = new HashSet();
-
- public String getTransData() {
- return transData;
- }
-
- public void setTransData(String transData) {
- this.transData = transData;
- }
-
- public Set getTransKey() {
- return transKey;
- }
-
- public void setTransKey(Set transKey) {
- this.transKey = transKey;
- }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
deleted file mode 100644
index 79b19c17..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace;
-
-public enum TraceType {
- Pub,
- SubBefore,
- SubAfter,
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
deleted file mode 100644
index f30b1211..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace.hook;
-
-import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
-import org.apache.rocketmq.client.hook.ConsumeMessageContext;
-import org.apache.rocketmq.client.hook.ConsumeMessageHook;
-import org.apache.rocketmq.client.trace.TraceContext;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
-import org.apache.rocketmq.client.trace.TraceType;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-
-public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
-
- private TraceDispatcher localDispatcher;
-
- public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
- this.localDispatcher = localDispatcher;
- }
-
- @Override
- public String hookName() {
- return "ConsumeMessageTraceHook";
- }
-
- @Override
- public void consumeMessageBefore(ConsumeMessageContext context) {
- if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
- return;
- }
- TraceContext traceContext = new TraceContext();
- context.setMqTraceContext(traceContext);
- traceContext.setTraceType(TraceType.SubBefore);//
- traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
- List beans = new ArrayList();
- for (MessageExt msg : context.getMsgList()) {
- if (msg == null) {
- continue;
- }
- String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
- String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
-
- if (traceOn != null && traceOn.equals("false")) {
- // If trace switch is false ,skip it
- continue;
- }
- TraceBean traceBean = new TraceBean();
- traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
- traceBean.setMsgId(msg.getMsgId());//
- traceBean.setTags(msg.getTags());//
- traceBean.setKeys(msg.getKeys());//
- traceBean.setStoreTime(msg.getStoreTimestamp());//
- traceBean.setBodyLength(msg.getStoreSize());//
- traceBean.setRetryTimes(msg.getReconsumeTimes());//
- traceContext.setRegionId(regionId);//
- beans.add(traceBean);
- }
- if (beans.size() > 0) {
- traceContext.setTraceBeans(beans);
- traceContext.setTimeStamp(System.currentTimeMillis());
- localDispatcher.append(traceContext);
- }
- }
-
- @Override
- public void consumeMessageAfter(ConsumeMessageContext context) {
- if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
- return;
- }
- TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
-
- if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
- // If subbefore bean is null ,skip it
- return;
- }
- TraceContext subAfterContext = new TraceContext();
- subAfterContext.setTraceType(TraceType.SubAfter);//
- subAfterContext.setRegionId(subBeforeContext.getRegionId());//
- subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
- subAfterContext.setRequestId(subBeforeContext.getRequestId());//
- subAfterContext.setSuccess(context.isSuccess());//
-
- // Caculate the cost time for processing messages
- int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
- subAfterContext.setCostTime(costTime);//
- subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
- String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
- if (contextType != null) {
- subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
- }
- localDispatcher.append(subAfterContext);
- }
-}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
deleted file mode 100644
index 80c7babd..00000000
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.client.trace.hook;
-
-import java.util.ArrayList;
-import org.apache.rocketmq.client.hook.SendMessageContext;
-import org.apache.rocketmq.client.hook.SendMessageHook;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceBean;
-import org.apache.rocketmq.client.trace.TraceContext;
-import org.apache.rocketmq.client.trace.TraceDispatcher;
-import org.apache.rocketmq.client.trace.TraceType;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-
-public class SendMessageTraceHookImpl implements SendMessageHook {
-
- private TraceDispatcher localDispatcher;
-
- public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
- this.localDispatcher = localDispatcher;
- }
-
- @Override
- public String hookName() {
- return "SendMessageTraceHook";
- }
-
- @Override
- public void sendMessageBefore(SendMessageContext context) {
- //if it is message trace data,then it doesn't recorded
- if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
- return;
- }
- //build the context content of TuxeTraceContext
- TraceContext tuxeContext = new TraceContext();
- tuxeContext.setTraceBeans(new ArrayList(1));
- context.setMqTraceContext(tuxeContext);
- tuxeContext.setTraceType(TraceType.Pub);
- tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
- //build the data bean object of message trace
- TraceBean traceBean = new TraceBean();
- traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
- traceBean.setTags(context.getMessage().getTags());
- traceBean.setKeys(context.getMessage().getKeys());
- traceBean.setStoreHost(context.getBrokerAddr());
- traceBean.setBodyLength(context.getMessage().getBody().length);
- traceBean.setMsgType(context.getMsgType());
- tuxeContext.getTraceBeans().add(traceBean);
- }
-
- @Override
- public void sendMessageAfter(SendMessageContext context) {
- //if it is message trace data,then it doesn't recorded
- if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
- || context.getMqTraceContext() == null) {
- return;
- }
- if (context.getSendResult() == null) {
- return;
- }
-
- if (context.getSendResult().getRegionId() == null
- || !context.getSendResult().isTraceOn()) {
- // if switch is false,skip it
- return;
- }
-
- TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
- TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
- int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
- tuxeContext.setCostTime(costTime);
- if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
- tuxeContext.setSuccess(true);
- } else {
- tuxeContext.setSuccess(false);
- }
- tuxeContext.setRegionId(context.getSendResult().getRegionId());
- traceBean.setMsgId(context.getSendResult().getMsgId());
- traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
- traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
- localDispatcher.append(tuxeContext);
- }
-}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 496c5143..83474f02 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
@@ -104,7 +105,7 @@ public class DefaultMQConsumerWithTraceTest {
private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer;
- private AsyncTraceDispatcher asyncTraceDispatcher;
+ private AsyncArrayDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
@@ -121,7 +122,7 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
- asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher();
+ asyncTraceDispatcher = (AsyncArrayDispatcher) pushConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index 3759acba..392b5c9a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
@@ -64,7 +65,7 @@ public class DefaultMQProducerWithTraceTest {
@Mock
private MQClientAPIImpl mQClientAPIImpl;
- private AsyncTraceDispatcher asyncTraceDispatcher;
+ private AsyncArrayDispatcher asyncTraceDispatcher;
private DefaultMQProducer producer;
private DefaultMQProducer customTraceTopicproducer;
@@ -88,8 +89,8 @@ public class DefaultMQProducerWithTraceTest {
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
- asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
- asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
+ asyncTraceDispatcher = (AsyncArrayDispatcher) producer.getTraceDispatcher();
+ asyncTraceDispatcher.setCustomizedTraceTopic(customerTraceTopic);
asyncTraceDispatcher.getHostProducer();
asyncTraceDispatcher.getHostConsumer();
traceProducer = asyncTraceDispatcher.getTraceProducer();
diff --git a/pom.xml b/pom.xml
index f8a391b2..f6db46b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,8 +625,11 @@
commons-validator
1.6
-
-
+
+ org.apache.rocketmq
+ ons-trace-core
+ 1.2.0-SNAPSHOT
+
--
GitLab