diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index a33652d6b7bd3e8d577bd3f42f2b8bde5bcaa654..c54399aa5ed24346fc801e4d9b15050ee7a2b997 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -165,6 +165,16 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon */ private TraceDispatcher traceDispatcher = null; + /** + * The flag for message trace + */ + private boolean enableMsgTrace = false; + + /** + * The name value of message trace topic.If you don't config,you can use the default trace topic name. + */ + private String customizedTraceTopic; + /** * Default constructor. */ @@ -200,57 +210,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this(null, consumerGroup, rpcHook); } - /** - * Constructor specifying consumer group and enabled msg trace flag. - * - * @param consumerGroup Consumer group. - * @param enableMsgTrace Switch flag instance for message trace. - */ - public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace) { - this(null, consumerGroup, null, enableMsgTrace, null); - } - - /** - * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name. - * - * @param consumerGroup Consumer group. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. - */ - public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace, - final String customizedTraceTopic) { - this(null, consumerGroup, null, enableMsgTrace, customizedTraceTopic); - } - - /** - * Constructor specifying namespace, consumer group, RPC hook, enabled msg trace flag and customized trace topic - * name. - * - * @param namespace Namespace for this MQ Producer instance. - * @param consumerGroup Consume queue. - * @param rpcHook RPC hook to execute before each remoting command. - * @param enableMsgTrace Switch flag instance for message trace. - * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default - * trace topic name. - */ - public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, - boolean enableMsgTrace, final String customizedTraceTopic) { - this.namespace = namespace; - this.consumerGroup = consumerGroup; - defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); - if (enableMsgTrace) { - try { - this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); - this.defaultLitePullConsumerImpl.registerConsumeMessageHook( - new ConsumeMessageTraceHookImpl(traceDispatcher)); - } catch (Throwable e) { - log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); - } - } - } - - /** * Constructor specifying namespace, consumer group and RPC hook. * @@ -265,6 +224,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void start() throws MQClientException { + setTraceDispatcher(); setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultLitePullConsumerImpl.start(); if (null != traceDispatcher) { @@ -567,4 +527,32 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } + + private void setTraceDispatcher() { + if (isEnableMsgTrace()) { + try { + this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + this.defaultLitePullConsumerImpl.registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public boolean isEnableMsgTrace() { + return enableMsgTrace; + } + + public void setEnableMsgTrace(boolean enableMsgTrace) { + this.enableMsgTrace = enableMsgTrace; + } } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index d79790d19cd6f8875b30fd454e06a0c0d845458f..67ae194b880ce5a497b7d0aaac72566276fb1ec5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -148,7 +148,8 @@ public class DefaultMQLitePullConsumerWithTraceTest { private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true); + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setEnableMsgTrace(true); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe(topic, "*"); suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); @@ -158,7 +159,9 @@ public class DefaultMQLitePullConsumerWithTraceTest { } private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception { - DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true, customerTraceTopic); + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); + litePullConsumer.setEnableMsgTrace(true); + litePullConsumer.setCustomizedTraceTopic(customerTraceTopic); litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.subscribe(topic, "*"); suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);