提交 0c5dae7c 编写于 作者: Z zhengwen zhu 提交者: Heng Du

[ISSSUE 1188]Fix the problem when more than one producer or consumer in the...

[ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process can trace only one (#1275)

* fix trace problem when multi produce/consumer in the same process

* uniform parameter manner

* variable rename

* consumer groups may be same with the producer group
上级 d66243c0
......@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
......
......@@ -171,7 +171,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
......@@ -256,7 +256,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
......
......@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.group = group;
this.type = type;
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
......@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducerInstance.setProducerGroup(genGroupNameForTrace());
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
......@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
return traceProducerInstance;
}
private String genGroupNameForTrace() {
return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
}
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
......
......@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
public class TraceConstants {
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
......
......@@ -24,7 +24,10 @@ import java.io.IOException;
* Interface of asynchronous transfer data
*/
public interface TraceDispatcher {
enum Type {
PRODUCE,
CONSUME
}
/**
* Initialize asynchronous transfer data module
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册