提交 c048e588 编写于 作者: H Hu Zongtang 提交者: Heng Du

[ISSSUE 1188]Fix the problem when more than one producer or consumer in the...

[ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process can trace only one (#1275) (#1303)

* fix trace problem when multi produce/consumer in the same process

* uniform parameter manner

* variable rename

* consumer groups may be same with the producer group
Co-authored-by: Nzhengwen zhu <ahuazhu@gmail.com>
上级 9db11c74
...@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) { if (enableMsgTrace) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
......
...@@ -161,7 +161,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -161,7 +161,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
//if client open the message trace feature //if client open the message trace feature
if (enableMsgTrace) { if (enableMsgTrace) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl); dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook( this.defaultMQProducerImpl.registerSendMessageHook(
...@@ -246,7 +246,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -246,7 +246,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
//if client open the message trace feature //if client open the message trace feature
if (enableMsgTrace) { if (enableMsgTrace) {
try { try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook( this.getDefaultMQProducerImpl().registerSendMessageHook(
......
...@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String traceTopicName; private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL; private AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) { public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048; this.queueSize = 2048;
this.batchSize = 100; this.batchSize = 100;
this.maxMsgSize = 128000; this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L); this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024); this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.group = group;
this.type = type;
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) { if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName; this.traceTopicName = traceTopicName;
...@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
DefaultMQProducer traceProducerInstance = this.traceProducer; DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) { if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setProducerGroup(genGroupNameForTrace());
traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false); traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K // The max size of message is 128K
...@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
return traceProducerInstance; return traceProducerInstance;
} }
private String genGroupNameForTrace() {
return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
}
@Override @Override
public boolean append(final Object ctx) { public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx); boolean result = traceContextQueue.offer((TraceContext) ctx);
......
...@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
public class TraceConstants { public class TraceConstants {
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1; public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2; public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
......
...@@ -24,7 +24,10 @@ import java.io.IOException; ...@@ -24,7 +24,10 @@ import java.io.IOException;
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
public interface TraceDispatcher { public interface TraceDispatcher {
enum Type {
PRODUCE,
CONSUME
}
/** /**
* Initialize asynchronous transfer data module * Initialize asynchronous transfer data module
*/ */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册