From 2dc75d32099cf12e3caff0b32c7fa5fd1ce66a4c Mon Sep 17 00:00:00 2001 From: zhangjidi2016 Date: Wed, 26 May 2021 21:49:31 +0800 Subject: [PATCH] The messageTrace related configurations is set as a property --- .../consumer/DefaultLitePullConsumer.java | 90 ++++++++----------- ...efaultMQLitePullConsumerWithTraceTest.java | 7 +- 2 files changed, 44 insertions(+), 53 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index a33652d6..c54399aa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -165,6 +165,16 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon */ private TraceDispatcher traceDispatcher = null; + /** + * The flag for message trace + */ + private boolean enableMsgTrace = false; + + /** + * The name value of message trace topic.If you don't config,you can use the default trace topic name. + */ + private String customizedTraceTopic; + /** * Default constructor. */ @@ -200,57 +210,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this(null, consumerGroup, rpcHook); } - /** - * Constructor specifying consumer group and enabled msg trace flag. - * - * @param consumerGroup Consumer group. - * @param enableMsgTrace Switch flag instance for message trace. - */ - public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace) { - this(null, consumerGroup, null, enableMsgTrace, null); - } - - /** - * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name. - * - * @param consumerGroup Consumer group. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. - */ - public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace, - final String customizedTraceTopic) { - this(null, consumerGroup, null, enableMsgTrace, customizedTraceTopic); - } - - /** - * Constructor specifying namespace, consumer group, RPC hook, enabled msg trace flag and customized trace topic - * name. - * - * @param namespace Namespace for this MQ Producer instance. - * @param consumerGroup Consume queue. - * @param rpcHook RPC hook to execute before each remoting command. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. - */ - public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, - boolean enableMsgTrace, final String customizedTraceTopic) { - this.namespace = namespace; - this.consumerGroup = consumerGroup; - defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); - if (enableMsgTrace) { - try { - this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); - this.defaultLitePullConsumerImpl.registerConsumeMessageHook( - new ConsumeMessageTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } - } - - /** * Constructor specifying namespace, consumer group and RPC hook. * @@ -265,6 +224,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void start() throws MQClientException { + setTraceDispatcher(); setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultLitePullConsumerImpl.start(); if (null != traceDispatcher) { @@ -567,4 +527,32 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } + + private void setTraceDispatcher() { + if (isEnableMsgTrace()) { + try { + this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + this.defaultLitePullConsumerImpl.registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public boolean isEnableMsgTrace() { + return enableMsgTrace; + } + + public void setEnableMsgTrace(boolean enableMsgTrace) { + this.enableMsgTrace = enableMsgTrace; + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index d79790d1..67ae194b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -148,7 +148,8 @@ public class DefaultMQLitePullConsumerWithTraceTest { private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true); + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setEnableMsgTrace(true); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe(topic, "*"); suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); @@ -158,7 +159,9 @@ public class DefaultMQLitePullConsumerWithTraceTest { } private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true, customerTraceTopic); + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setEnableMsgTrace(true); + litePullConsumer.setCustomizedTraceTopic(customerTraceTopic); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe(topic, "*"); suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); -- GitLab