提交 2dc75d32 编写于 作者: Z zhangjidi2016

The messageTrace related configurations is set as a property

上级 f244ee35
...@@ -165,6 +165,16 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -165,6 +165,16 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/ */
private TraceDispatcher traceDispatcher = null; private TraceDispatcher traceDispatcher = null;
/**
* The flag for message trace
*/
private boolean enableMsgTrace = false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic;
/** /**
* Default constructor. * Default constructor.
*/ */
...@@ -200,57 +210,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -200,57 +210,6 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this(null, consumerGroup, rpcHook); this(null, consumerGroup, rpcHook);
} }
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(null, consumerGroup, null, enableMsgTrace, null);
}
/**
* Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(null, consumerGroup, null, enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying namespace, consumer group, RPC hook, enabled msg trace flag and customized trace topic
* name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/** /**
* Constructor specifying namespace, consumer group and RPC hook. * Constructor specifying namespace, consumer group and RPC hook.
* *
...@@ -265,6 +224,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -265,6 +224,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
setTraceDispatcher();
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start(); this.defaultLitePullConsumerImpl.start();
if (null != traceDispatcher) { if (null != traceDispatcher) {
...@@ -567,4 +527,32 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -567,4 +527,32 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public TraceDispatcher getTraceDispatcher() { public TraceDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
try {
this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
} }
...@@ -148,7 +148,8 @@ public class DefaultMQLitePullConsumerWithTraceTest { ...@@ -148,7 +148,8 @@ public class DefaultMQLitePullConsumerWithTraceTest {
private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setEnableMsgTrace(true);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*"); litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
...@@ -158,7 +159,9 @@ public class DefaultMQLitePullConsumerWithTraceTest { ...@@ -158,7 +159,9 @@ public class DefaultMQLitePullConsumerWithTraceTest {
} }
private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception { private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true, customerTraceTopic); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setEnableMsgTrace(true);
litePullConsumer.setCustomizedTraceTopic(customerTraceTopic);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*"); litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册