From c048e588c1c9a297ce5c4d00c9d04fb6e6ef6669 Mon Sep 17 00:00:00 2001 From: Hu Zongtang Date: Fri, 3 Jan 2020 15:27:10 +0800 Subject: [PATCH] [ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process can trace only one (#1275) (#1303) * fix trace problem when multi produce/consumer in the same process * uniform parameter manner * variable rename * consumer groups may be same with the producer group Co-authored-by: zhengwen zhu --- .../client/consumer/DefaultMQPushConsumer.java | 2 +- .../rocketmq/client/producer/DefaultMQProducer.java | 4 ++-- .../rocketmq/client/trace/AsyncTraceDispatcher.java | 13 +++++++++++-- .../rocketmq/client/trace/TraceConstants.java | 2 +- .../rocketmq/client/trace/TraceDispatcher.java | 5 ++++- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 339f799f..6ad0fc30 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); if (enableMsgTrace) { try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index faa79f54..04573410 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -161,7 +161,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { //if client open the message trace feature if (enableMsgTrace) { try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.defaultMQProducerImpl); traceDispatcher = dispatcher; this.defaultMQProducerImpl.registerSendMessageHook( @@ -246,7 +246,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { //if client open the message trace feature if (enableMsgTrace) { try { - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 06a28e44..8c3d8865 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private AccessChannel accessChannel = AccessChannel.LOCAL; + private String group; + private Type type; - public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { + public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue(1024); + this.group = group; + this.type = type; + this.appenderQueue = new ArrayBlockingQueue(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; @@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { traceProducerInstance = new DefaultMQProducer(rpcHook); - traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); + traceProducerInstance.setProducerGroup(genGroupNameForTrace()); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K @@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher { return traceProducerInstance; } + private String genGroupNameForTrace() { + return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ; + } + @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index e61ea9d1..cb4a2465 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll; public class TraceConstants { - public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; + public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER"; public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 51cc0deb..33341cf6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -24,7 +24,10 @@ import java.io.IOException; * Interface of asynchronous transfer data */ public interface TraceDispatcher { - + enum Type { + PRODUCE, + CONSUME + } /** * Initialize asynchronous transfer data module */ -- GitLab