diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 339f799f9ac867688ad30d463a79540c6ac852c0..6ad0fc308ecfbf11951a4f56ac778c1f58a289ec 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); if (enableMsgTrace) { try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index b4acf8f1c3241ce96a6c0729a00937bf97c54653..9b36cf0f3361d17333cab940c5fd19f9ac2aebfc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -171,7 +171,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { //if client open the message trace feature if (enableMsgTrace) { try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.defaultMQProducerImpl); traceDispatcher = dispatcher; this.defaultMQProducerImpl.registerSendMessageHook( @@ -256,7 +256,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { //if client open the message trace feature if (enableMsgTrace) { try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index ca3bcfa261a574cd6df668d99f4b8170d4eb40da..b987d96ceeb4272a4d7c16511f9c890d46ff2ed2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private AccessChannel accessChannel = AccessChannel.LOCAL; + private String group; + private Type type; - public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { + public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue(1024); + this.group = group; + this.type = type; + this.appenderQueue = new ArrayBlockingQueue(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; @@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { traceProducerInstance = new DefaultMQProducer(rpcHook); - traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); + traceProducerInstance.setProducerGroup(genGroupNameForTrace()); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K @@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher { return traceProducerInstance; } + private String genGroupNameForTrace() { + return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ; + } + @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index e61ea9d1a0591cf8cdcf6e9688ad73b498b4d61f..cb4a246517d62e711bd7b7dd171fb4e97d575482 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll; public class TraceConstants { - public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; + public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER"; public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 51cc0deb87d61fc81552cd8b04b8080945ce6042..33341cf6b8f8b6e5eb7be2c9c4fddd6fe22f7586 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -24,7 +24,10 @@ import java.io.IOException; * Interface of asynchronous transfer data */ public interface TraceDispatcher { - + enum Type { + PRODUCE, + CONSUME + } /** * Initialize asynchronous transfer data module */