未验证 提交 83633dea 编写于 作者: H Heng Du 提交者: GitHub

feat(client) plug-in message tracing feature (#1624)

* feat(client) seprated the messaging tracing to ons

* fix(trace) fix the rpchook is null exception
上级 44569e4d
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
limitations under the License. limitations under the License.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<groupId>org.apache.rocketmq</groupId> <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId> <artifactId>rocketmq-all</artifactId>
...@@ -62,5 +63,15 @@ ...@@ -62,5 +63,15 @@
<artifactId>log4j-slf4j-impl</artifactId> <artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>ons-trace-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.consumer; ...@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.consumer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.consumer.listener.MessageListener;
...@@ -30,9 +32,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException; ...@@ -30,9 +32,6 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
...@@ -42,6 +41,11 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -42,6 +41,11 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.ons.open.trace.core.hook.OnsConsumeMessageHookImpl;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
...@@ -100,17 +104,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -100,17 +104,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* <ul> * <ul>
* <li> * <li>
* <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. * <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
* If it were a newly booting up consumer client, according aging of the consumer group, there are two * If it were a newly booting up consumer client, according aging of the consumer group, there are two cases:
* cases:
* <ol> * <ol>
* <li> * <li>
* if the consumer group is created so recently that the earliest message being subscribed has yet * if the consumer group is created so recently that the earliest message being subscribed has yet expired, which
* expired, which means the consumer group represents a lately launched business, consuming will * means the consumer group represents a lately launched business, consuming will start from the very beginning;
* start from the very beginning;
* </li> * </li>
* <li> * <li>
* if the earliest message being subscribed has expired, consuming will start from the latest * if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning
* messages, meaning messages born prior to the booting timestamp would be ignored. * messages born prior to the booting timestamp would be ignored.
* </li> * </li>
* </ol> * </ol>
* </li> * </li>
...@@ -126,10 +128,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -126,10 +128,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/** /**
* Backtracking consumption time with second precision. Time format is * Backtracking consumption time with second precision. Time format is 20131223171201<br> Implying Seventeen twelve
* 20131223171201<br> * and 01 seconds on December 23, 2013 year<br> Default backtracking consumption time Half an hour ago.
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
* Default backtracking consumption time Half an hour ago.
*/ */
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
...@@ -174,8 +174,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -174,8 +174,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000; private int consumeConcurrentlyMaxSpan = 2000;
/** /**
* Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, Consider
* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit * the {@code pullBatchSize}, the instantaneous value may exceed the limit
*/ */
private int pullThresholdForQueue = 1000; private int pullThresholdForQueue = 1000;
...@@ -191,8 +191,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -191,8 +191,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Flow control threshold on topic level, default value is -1(Unlimited) * Flow control threshold on topic level, default value is -1(Unlimited)
* <p> * <p>
* The value of {@code pullThresholdForQueue} will be overwrote and calculated based on * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on {@code
* {@code pullThresholdForTopic} if it is't unlimited * pullThresholdForTopic} if it is't unlimited
* <p> * <p>
* For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
* then pullThresholdForQueue will be set to 100 * then pullThresholdForQueue will be set to 100
...@@ -202,11 +202,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -202,11 +202,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Limit the cached message size on topic level, default value is -1 MiB(Unlimited) * Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
* <p> * <p>
* The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on {@code
* {@code pullThresholdSizeForTopic} if it is't unlimited * pullThresholdSizeForTopic} if it is't unlimited
* <p> * <p>
* For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are assigned to this
* assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB * consumer, then pullThresholdSizeForQueue will be set to 100 MiB
*/ */
private int pullThresholdSizeForTopic = -1; private int pullThresholdSizeForTopic = -1;
...@@ -257,7 +257,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -257,7 +257,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
private TraceDispatcher traceDispatcher = null; private AsyncDispatcher traceDispatcher = null;
/** /**
* Default constructor. * Default constructor.
...@@ -285,7 +285,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -285,7 +285,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely()); this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely());
} }
/** /**
* Constructor specifying RPC hook. * Constructor specifying RPC hook.
* *
...@@ -349,52 +348,70 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -349,52 +348,70 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* *
* @param consumerGroup Consumer group. * @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) { public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic); this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
} }
/** /**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. * Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and
* customized trace topic name.
* *
* @param consumerGroup Consume queue. * @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command. * @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm. * @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic); this(null, consumerGroup, rpcHook, allocateMessageQueueStrategy, enableMsgTrace, customizedTraceTopic);
} }
/** /**
* Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name. * Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace
* flag and customized trace topic name.
* *
* @param namespace Namespace for this MQ Producer instance. * @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue. * @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command. * @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm. * @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace. * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/ */
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) { AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
this.namespace = namespace; this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) { if (enableMsgTrace) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); Properties tempProperties = new Properties();
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
traceDispatcher = dispatcher; tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
new ConsumeMessageTraceHookImpl(traceDispatcher)); tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_CONSUMER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
if (customizedTraceTopic != null) {
tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, customizedTraceTopic);
} else {
tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, MixAll.RMQ_SYS_TRACE_TOPIC);
}
this.traceDispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
((AsyncArrayDispatcher) traceDispatcher).setHostConsumer(this.getDefaultMQPushConsumerImpl());
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new OnsConsumeMessageHookImpl(traceDispatcher));
} catch (Throwable e) { } catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); log.error("system mqtrace hook init failed ,maybe can't send msg trace data", e);
} }
} }
} }
...@@ -693,7 +710,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -693,7 +710,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.start(); this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); if (this.accessChannel == AccessChannel.CLOUD) {
((AsyncArrayDispatcher) this.traceDispatcher).setCustomizedTraceTopic(null);
}
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
} }
...@@ -744,8 +764,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -744,8 +764,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Subscribe a topic to consuming subscription. * Subscribe a topic to consuming subscription.
* *
* @param topic topic to subscribe. * @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
* if null or * expression,meaning subscribe all * null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error. * @throws MQClientException if there is any client error.
*/ */
@Override @Override
...@@ -886,7 +906,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -886,7 +906,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeTimeout = consumeTimeout; this.consumeTimeout = consumeTimeout;
} }
public TraceDispatcher getTraceDispatcher() { public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
} }
...@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.producer; ...@@ -18,7 +18,9 @@ package org.apache.rocketmq.client.producer;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
...@@ -27,9 +29,6 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -27,9 +29,6 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageBatch;
...@@ -39,6 +38,11 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -39,6 +38,11 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
...@@ -51,8 +55,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException; ...@@ -51,8 +55,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
* This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and * This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
* cons; you'd better understand strengths and weakness of them before actually coding. </p> * cons; you'd better understand strengths and weakness of them before actually coding. </p>
* *
* <p> <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe * <p> <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as
* and used among multiple threads context. </p> * thread-safe and used among multiple threads context. </p>
*/ */
public class DefaultMQProducer extends ClientConfig implements MQProducer { public class DefaultMQProducer extends ClientConfig implements MQProducer {
...@@ -118,7 +122,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -118,7 +122,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/** /**
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
private TraceDispatcher traceDispatcher = null; private AsyncDispatcher traceDispatcher = null;
/** /**
* Default constructor. * Default constructor.
...@@ -158,17 +162,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -158,17 +162,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
final String customizedTraceTopic) { final String customizedTraceTopic) {
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) { if (enableMsgTrace) {
try { enableTrace(customizedTraceTopic, rpcHook);
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
} }
} }
...@@ -243,19 +238,33 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -243,19 +238,33 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.namespace = namespace; this.namespace = namespace;
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) { if (enableMsgTrace) {
enableTrace(customizedTraceTopic, rpcHook);
}
}
private void enableTrace(String customizedTraceTopic, RPCHook rpcHook) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); Properties tempProperties = new Properties();
dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
traceDispatcher = dispatcher; tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
this.getDefaultMQProducerImpl().registerSendMessageHook( tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
new SendMessageTraceHookImpl(traceDispatcher)); tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
if (customizedTraceTopic != null) {
tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, customizedTraceTopic);
} else {
tempProperties.put(OnsTraceConstants.CustomizedTraceTopic, MixAll.RMQ_SYS_TRACE_TOPIC);
}
this.traceDispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
((AsyncArrayDispatcher) traceDispatcher).setHostProducer(this.defaultMQProducerImpl);
this.defaultMQProducerImpl.registerSendMessageHook(new OnsClientSendMessageHookImpl(traceDispatcher));
} catch (Throwable e) { } catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
} }
} }
}
/** /**
* Start this producer instance. </p> * Start this producer instance. </p>
...@@ -271,7 +280,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -271,7 +280,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.start(); this.defaultMQProducerImpl.start();
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); if (this.accessChannel == AccessChannel.CLOUD) {
((AsyncArrayDispatcher) this.traceDispatcher).setCustomizedTraceTopic(null);
}
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
} }
...@@ -566,7 +578,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -566,7 +578,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p> * Send request message in synchronous mode. This method returns only when the consumer consume the request message
* and reply a message. </p>
* *
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
...@@ -589,8 +602,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -589,8 +602,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
/** /**
* Request asynchronously. </p> * Request asynchronously. </p> This method returns immediately. On receiving reply message,
* This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p> * <code>requestCallback</code> will be executed. </p>
* *
* Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
...@@ -1062,8 +1075,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -1062,8 +1075,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
} }
public TraceDispatcher getTraceDispatcher() { public AsyncDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
public AccessChannel getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(AccessChannel accessChannel) {
this.accessChannel = accessChannel;
}
public String getTraceTopicName() {
return traceTopicName;
}
public void setTraceTopicName(String traceTopicName) {
this.traceTopicName = traceTopicName;
}
public DefaultMQProducer getTraceProducer() {
return traceProducer;
}
public DefaultMQProducerImpl getHostProducer() {
return hostProducer;
}
public void setHostProducer(DefaultMQProducerImpl hostProducer) {
this.hostProducer = hostProducer;
}
public DefaultMQPushConsumerImpl getHostConsumer() {
return hostConsumer;
}
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
this.hostConsumer = hostConsumer;
}
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
}
return traceProducerInstance;
}
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}
@Override
public void flush() throws IOException {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
}
@Override
public void shutdown() {
this.stopped = true;
this.traceExecutor.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
}
this.removeShutdownHook();
}
public void registerShutDownHook() {
if (shutDownHook == null) {
shutDownHook = new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
try {
flush();
} catch (IOException e) {
log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
}
}
}
}
}, "ShutdownHookMQTrace");
Runtime.getRuntime().addShutdownHook(shutDownHook);
}
}
public void removeShutdownHook() {
if (shutDownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
} catch (IllegalStateException e) {
// ignore - VM is already shutting down
}
}
}
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecutor.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}
@Override
public void run() {
sendTraceData(contextList);
}
public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
String regionId = context.getRegionId();
// Use original message entity's topic as key
String key = topic;
if (!StringUtils.isBlank(regionId)) {
key = key + TraceConstants.CONTENT_SPLITOR + regionId;
}
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
String dataTopic = entry.getKey();
String regionId = null;
if (key.length > 1) {
dataTopic = key[0];
regionId = key[1];
}
flushData(entry.getValue(), dataTopic, regionId);
}
}
/**
* Batch sending data actually
*/
private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
if (transBeanList.size() == 0) {
return;
}
// Temporary buffer
StringBuilder buffer = new StringBuilder(1024);
int count = 0;
Set<String> keySet = new HashSet<String>();
for (TraceTransferBean bean : transBeanList) {
// Keyset of message trace includes msgId of or original message
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;
// Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
// Clear temporary buffer after finishing
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
}
transBeanList.clear();
}
/**
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
if (AccessChannel.CLOUD == accessChannel) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
log.info("send trace data ,the traceData is " + data);
}
};
if (traceBrokerSet.isEmpty()) {
// No cross set
traceProducer.send(message, callback, 5000);
} else {
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Set<String> brokerSet = (Set<String>) arg;
List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}
int index = sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, traceBrokerSet, callback);
}
} catch (Exception e) {
log.info("send trace data,the traceData is" + data);
}
}
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
Set<String> brokerSet = new HashSet<String>();
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
brokerSet.add(queue.getBrokerName());
}
}
return brokerSet;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
public class TraceBean {
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = "";
private String msgId = "";
private String offsetMsgId = "";
private String tags = "";
private String keys = "";
private String storeHost = LOCAL_ADDRESS;
private String clientHost = LOCAL_ADDRESS;
private long storeTime;
private int retryTimes;
private int bodyLength;
private MessageType msgType;
public MessageType getMsgType() {
return msgType;
}
public void setMsgType(final MessageType msgType) {
this.msgType = msgType;
}
public String getOffsetMsgId() {
return offsetMsgId;
}
public void setOffsetMsgId(final String offsetMsgId) {
this.offsetMsgId = offsetMsgId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKeys() {
return keys;
}
public void setKeys(String keys) {
this.keys = keys;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
public String getClientHost() {
return clientHost;
}
public void setClientHost(String clientHost) {
this.clientHost = clientHost;
}
public long getStoreTime() {
return storeTime;
}
public void setStoreTime(long storeTime) {
this.storeTime = storeTime;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.MixAll;
public class TraceConstants {
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC_PREFIX = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import java.util.List;
/**
* The context of Trace
*/
public class TraceContext implements Comparable<TraceContext> {
private TraceType traceType;
private long timeStamp = System.currentTimeMillis();
private String regionId = "";
private String regionName = "";
private String groupName = "";
private int costTime = 0;
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
private List<TraceBean> traceBeans;
public int getContextCode() {
return contextCode;
}
public void setContextCode(final int contextCode) {
this.contextCode = contextCode;
}
public List<TraceBean> getTraceBeans() {
return traceBeans;
}
public void setTraceBeans(List<TraceBean> traceBeans) {
this.traceBeans = traceBeans;
}
public String getRegionId() {
return regionId;
}
public void setRegionId(String regionId) {
this.regionId = regionId;
}
public TraceType getTraceType() {
return traceType;
}
public void setTraceType(TraceType traceType) {
this.traceType = traceType;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public int getCostTime() {
return costTime;
}
public void setCostTime(int costTime) {
this.costTime = costTime;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getRegionName() {
return regionName;
}
public void setRegionName(String regionName) {
this.regionName = regionName;
}
@Override
public int compareTo(TraceContext o) {
return (int) (this.timeStamp - o.getTimeStamp());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(1024);
sb.append(traceType).append("_").append(groupName)
.append("_").append(regionId).append("_").append(isSuccess).append("_");
if (traceBeans != null && traceBeans.size() > 0) {
for (TraceBean bean : traceBeans) {
sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
}
}
return "TraceContext{" + sb.toString() + '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.message.MessageType;
import java.util.ArrayList;
import java.util.List;
/**
* Encode/decode for Trace Data
*/
public class TraceDataEncoder {
/**
* Resolving traceContext list From trace data String
*
* @param traceData
* @return
*/
public static List<TraceContext> decoderFromTraceDataString(String traceData) {
List<TraceContext> resList = new ArrayList<TraceContext>();
if (traceData == null || traceData.length() <= 0) {
return resList;
}
String[] contextList = traceData.split(String.valueOf(TraceConstants.FIELD_SPLITOR));
for (String context : contextList) {
String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
if (line[0].equals(TraceType.Pub.name())) {
TraceContext pubContext = new TraceContext();
pubContext.setTraceType(TraceType.Pub);
pubContext.setTimeStamp(Long.parseLong(line[1]));
pubContext.setRegionId(line[2]);
pubContext.setGroupName(line[3]);
TraceBean bean = new TraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setBodyLength(Integer.parseInt(line[9]));
pubContext.setCostTime(Integer.parseInt(line[10]));
bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]);
if (line.length == 13) {
pubContext.setSuccess(Boolean.parseBoolean(line[12]));
} else if (line.length == 14) {
bean.setOffsetMsgId(line[12]);
pubContext.setSuccess(Boolean.parseBoolean(line[13]));
}
pubContext.setTraceBeans(new ArrayList<TraceBean>(1));
pubContext.getTraceBeans().add(bean);
resList.add(pubContext);
} else if (line[0].equals(TraceType.SubBefore.name())) {
TraceContext subBeforeContext = new TraceContext();
subBeforeContext.setTraceType(TraceType.SubBefore);
subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
subBeforeContext.setRegionId(line[2]);
subBeforeContext.setGroupName(line[3]);
subBeforeContext.setRequestId(line[4]);
TraceBean bean = new TraceBean();
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
} else if (line[0].equals(TraceType.SubAfter.name())) {
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);
subAfterContext.setRequestId(line[1]);
TraceBean bean = new TraceBean();
bean.setMsgId(line[2]);
bean.setKeys(line[5]);
subAfterContext.setTraceBeans(new ArrayList<TraceBean>(1));
subAfterContext.getTraceBeans().add(bean);
subAfterContext.setCostTime(Integer.parseInt(line[3]));
subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
if (line.length >= 7) {
// add the context type
subAfterContext.setContextCode(Integer.parseInt(line[6]));
}
resList.add(subAfterContext);
}
}
return resList;
}
/**
* Encoding the trace context into data strings and keyset sets
*
* @param ctx
* @return
*/
public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
if (ctx == null) {
return null;
}
//build message trace of the transfering entity content bean
TraceTransferBean transferBean = new TraceTransferBean();
StringBuilder sb = new StringBuilder(256);
switch (ctx.getTraceType()) {
case Pub: {
TraceBean bean = ctx.getTraceBeans().get(0);
//append the content of context and traceBean to transferBean's TransData
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}
break;
case SubBefore: {
for (TraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
}
}
break;
case SubAfter: {
for (TraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
}
}
break;
default:
}
transferBean.setTransData(sb.toString());
for (TraceBean bean : ctx.getTraceBeans()) {
transferBean.getTransKey().add(bean.getMsgId());
if (bean.getKeys() != null && bean.getKeys().length() > 0) {
transferBean.getTransKey().add(bean.getKeys());
}
}
return transferBean;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
/**
* Interface of asynchronous transfer data
*/
public interface TraceDispatcher {
/**
* Initialize asynchronous transfer data module
*/
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
/**
* Append the transfering data
* @param ctx data infomation
* @return
*/
boolean append(Object ctx);
/**
* Write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* Close the trace Hook
*/
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
public enum TraceDispatcherType {
PRODUCER,
CONSUMER
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.util.HashSet;
import java.util.Set;
/**
* Trace transfering bean
*/
public class TraceTransferBean {
private String transData;
private Set<String> transKey = new HashSet<String>();
public String getTransData() {
return transData;
}
public void setTransData(String transData) {
this.transData = transData;
}
public Set<String> getTransKey() {
return transKey;
}
public void setTransKey(Set<String> transKey) {
this.transKey = transKey;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
public enum TraceType {
Pub,
SubBefore,
SubAfter,
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
private TraceDispatcher localDispatcher;
public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "ConsumeMessageTraceHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);//
traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (traceOn != null && traceOn.equals("false")) {
// If trace switch is false ,skip it
continue;
}
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
localDispatcher.append(subAfterContext);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
public class SendMessageTraceHookImpl implements SendMessageHook {
private TraceDispatcher localDispatcher;
public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "SendMessageTraceHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}
TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
localDispatcher.append(tuxeContext);
}
}
...@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; ...@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
...@@ -104,7 +105,7 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -104,7 +105,7 @@ public class DefaultMQConsumerWithTraceTest {
private DefaultMQPushConsumer normalPushConsumer; private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer; private DefaultMQPushConsumer customTraceTopicpushConsumer;
private AsyncTraceDispatcher asyncTraceDispatcher; private AsyncArrayDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory; private MQClientInstance mQClientTraceFactory;
@Mock @Mock
private MQClientAPIImpl mQClientTraceAPIImpl; private MQClientAPIImpl mQClientTraceAPIImpl;
...@@ -121,7 +122,7 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -121,7 +122,7 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000); pushConsumer.setPullInterval(60 * 1000);
asyncTraceDispatcher = (AsyncTraceDispatcher) pushConsumer.getTraceDispatcher(); asyncTraceDispatcher = (AsyncArrayDispatcher) pushConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer(); traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() { pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
......
...@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; ...@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
...@@ -64,7 +65,7 @@ public class DefaultMQProducerWithTraceTest { ...@@ -64,7 +65,7 @@ public class DefaultMQProducerWithTraceTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
private AsyncTraceDispatcher asyncTraceDispatcher; private AsyncArrayDispatcher asyncTraceDispatcher;
private DefaultMQProducer producer; private DefaultMQProducer producer;
private DefaultMQProducer customTraceTopicproducer; private DefaultMQProducer customTraceTopicproducer;
...@@ -88,8 +89,8 @@ public class DefaultMQProducerWithTraceTest { ...@@ -88,8 +89,8 @@ public class DefaultMQProducerWithTraceTest {
normalProducer.setNamesrvAddr("127.0.0.1:9877"); normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b', 'c'}); message = new Message(topic, new byte[] {'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); asyncTraceDispatcher = (AsyncArrayDispatcher) producer.getTraceDispatcher();
asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); asyncTraceDispatcher.setCustomizedTraceTopic(customerTraceTopic);
asyncTraceDispatcher.getHostProducer(); asyncTraceDispatcher.getHostProducer();
asyncTraceDispatcher.getHostConsumer(); asyncTraceDispatcher.getHostConsumer();
traceProducer = asyncTraceDispatcher.getTraceProducer(); traceProducer = asyncTraceDispatcher.getTraceProducer();
......
...@@ -625,8 +625,11 @@ ...@@ -625,8 +625,11 @@
<artifactId>commons-validator</artifactId> <artifactId>commons-validator</artifactId>
<version>1.6</version> <version>1.6</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>ons-trace-core</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册