提交 94525752 编写于 作者: D dongeforever

Refine the interface for msg_trace

上级 10d84bbe
...@@ -285,17 +285,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -285,17 +285,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param consumerGroup Consume queue. * @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command. * @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm. * @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message trace. * @param enableMsgTrace switch flag instance for message trace.
* @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) { AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup; this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (msgTraceSwitch) { if (enableMsgTrace) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
...@@ -315,15 +315,26 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -315,15 +315,26 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
} }
/**
* Constructor specifying consumer group.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace switch flag instance for message trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
/** /**
* Constructor specifying consumer group. * Constructor specifying consumer group.
* *
* @param consumerGroup Consumer group. * @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message trace. * @param enableMsgTrace switch flag instance for message trace.
* @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) { public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName); this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
} }
/** /**
......
...@@ -154,16 +154,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -154,16 +154,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution. * @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message trace. * @param enableMsgTrace switch flag instance for message trace.
* @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. * @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) { public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature //if client open the message trace feature
if (msgTraceSwitch) { if (enableMsgTrace) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook( this.getDefaultMQProducerImpl().registerSendMessageHook(
...@@ -187,11 +187,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -187,11 +187,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying producer group. * Constructor specifying producer group.
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message trace. * @param enableMsgTrace switch flag instance for message trace.
* @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) { public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
this(producerGroup, null, msgTraceSwitch, traceTopicName); this(producerGroup, null, enableMsgTrace, null);
}
/**
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace switch flag instance for message trace.
* @param customizedTraceTopic the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(producerGroup, null, enableMsgTrace, customizedTraceTopic);
} }
/** /**
......
...@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; ...@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TraceProducer { public class TraceProducer {
public static void main(String[] args) throws MQClientException, InterruptedException { public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true, ""); DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.start(); producer.start();
for (int i = 0; i < 128; i++) for (int i = 0; i < 128; i++)
......
...@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class TracePushConsumer { public class TracePushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException { public static void main(String[] args) throws InterruptedException, MQClientException {
//here,we use the default message track trace topic name //here,we use the default message track trace topic name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true, ""); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*"); consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800 //wrong time format 2017_0422_221800
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册