提交 06da1045 编写于 作者: H Hu Zongtang 提交者: Zhendong Liu

[ISSUE#525]restructure and optimize codes for message track (#645)

上级 b8cb82f4
...@@ -126,7 +126,7 @@ public class TopicConfigManager extends ConfigManager { ...@@ -126,7 +126,7 @@ public class TopicConfigManager extends ConfigManager {
} }
{ {
if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTrackTopicName(); String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic); TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic); this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1); topicConfig.setReadQueueNums(1);
......
...@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer; ...@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
...@@ -32,9 +31,7 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -32,9 +31,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcherType;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -288,8 +285,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -288,8 +285,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @param consumerGroup Consume queue. * @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command. * @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm. * @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message track trace. * @param msgTraceSwitch switch flag instance for message trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) { AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) {
...@@ -298,21 +295,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -298,21 +295,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (msgTraceSwitch) { if (msgTraceSwitch) {
try { try {
Properties tempProperties = new Properties(); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook);
tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.CONSUMER.name());
if (!UtilAll.isBlank(traceTopicName)) {
tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName);
} else {
tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher)); new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) { } catch (Throwable e) {
...@@ -334,8 +319,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -334,8 +319,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Constructor specifying consumer group. * Constructor specifying consumer group.
* *
* @param consumerGroup Consumer group. * @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message track trace. * @param msgTraceSwitch switch flag instance for message trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) { public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName); this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName);
...@@ -585,9 +570,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -585,9 +570,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.start(); this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
Properties tempProperties = new Properties(); traceDispatcher.start(this.getNamesrvAddr());
tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties);
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
} }
......
...@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.producer; ...@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.producer;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
...@@ -28,12 +27,9 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -28,12 +27,9 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.TraceDispatcherType;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
...@@ -158,31 +154,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -158,31 +154,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution. * @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace. * @param msgTraceSwitch switch flag instance for message trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) { public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) {
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message track trace feature //if client open the message trace feature
//TODO wrap this code to TraceDispatcherFactory
if (msgTraceSwitch) { if (msgTraceSwitch) {
try { try {
Properties tempProperties = new Properties(); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook);
tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.PRODUCER.name());
if (!UtilAll.isBlank(traceTopicName)) {
tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName);
} else {
tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook( this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher)); new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) { } catch (Throwable e) {
...@@ -204,8 +187,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -204,8 +187,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying producer group. * Constructor specifying producer group.
* *
* @param producerGroup Producer group, see the name-sake field. * @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message track trace. * @param msgTraceSwitch switch flag instance for message trace.
* @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name.
*/ */
public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) { public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) {
this(producerGroup, null, msgTraceSwitch, traceTopicName); this(producerGroup, null, msgTraceSwitch, traceTopicName);
...@@ -235,12 +218,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -235,12 +218,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
this.defaultMQProducerImpl.start(); this.defaultMQProducerImpl.start();
//TODO wrap this code to TraceDispatcherFactory
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
Properties tempProperties = new Properties(); traceDispatcher.start(this.getNamesrvAddr());
tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties);
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
...@@ -26,7 +27,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; ...@@ -26,7 +27,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
...@@ -35,7 +38,6 @@ import java.io.IOException; ...@@ -35,7 +38,6 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.HashMap; import java.util.HashMap;
import java.util.UUID; import java.util.UUID;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.HashSet; import java.util.HashSet;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -46,11 +48,14 @@ import java.util.concurrent.TimeUnit; ...@@ -46,11 +48,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
public class AsyncTraceDispatcher implements TraceDispatcher { public class AsyncTraceDispatcher implements TraceDispatcher {
private final static InternalLogger log = ClientLogger.getLog(); private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize; private final int queueSize;
private final int batchSize; private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer; private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter; private final ThreadPoolExecutor traceExecuter;
// the last discard number of log // the last discard number of log
...@@ -60,24 +65,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -60,24 +65,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private ArrayBlockingQueue<Runnable> appenderQueue; private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook; private volatile Thread shutDownHook;
private volatile boolean stopped = false; private volatile boolean stopped = false;
private String dispatcherType;
private DefaultMQProducerImpl hostProducer; private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer; private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString(); private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName; private String traceTopicName;
private static AtomicBoolean isStarted = new AtomicBoolean(false);
public AsyncTraceDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
dispatcherType = properties.getProperty(TraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); this.queueSize = 2048;
this.queueSize = queueSize; this.batchSize = 100;
batchSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_BATCH_NUM, "1")); this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L); this.discardCount = new AtomicLong(0L);
traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024); this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
traceTopicName = properties.getProperty(TraceConstants.TRACE_TOPIC); if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecuter = new ThreadPoolExecutor(// this.traceExecuter = new ThreadPoolExecutor(//
10, // 10, //
20, // 20, //
...@@ -85,7 +93,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -85,7 +93,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
TimeUnit.MILLISECONDS, // TimeUnit.MILLISECONDS, //
this.appenderQueue, // this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_")); new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); traceProducer = getAndCreateTraceProducer(rpcHook);
} }
public String getTraceTopicName() { public String getTraceTopicName() {
...@@ -116,14 +124,31 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -116,14 +124,31 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
this.hostConsumer = hostConsumer; this.hostConsumer = hostConsumer;
} }
public void start(Properties properties) throws MQClientException { public void start(String nameSrvAddr) throws MQClientException {
TraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TraceConstants.NAMESRV_ADDR)); if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.start();
}
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true); this.worker.setDaemon(true);
this.worker.start(); this.worker.start();
this.registerShutDownHook(); this.registerShutDownHook();
} }
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME);
traceProducerInstance.setVipChannelEnabled(false);
//the max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
}
return traceProducerInstance;
}
@Override @Override
public boolean append(final Object ctx) { public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx); boolean result = traceContextQueue.offer((TraceContext) ctx);
...@@ -151,7 +176,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -151,7 +176,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
public void shutdown() { public void shutdown() {
this.stopped = true; this.stopped = true;
this.traceExecuter.shutdown(); this.traceExecuter.shutdown();
TraceProducerFactory.unregisterTraceDispatcher(dispatcherId); if (isStarted.get()) {
traceProducer.shutdown();
}
this.removeShutdownHook(); this.removeShutdownHook();
} }
...@@ -193,7 +220,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -193,7 +220,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
TraceContext context = null; TraceContext context = null;
try { try {
//get track trace data element from blocking Queue — traceContextQueue //get trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
...@@ -266,7 +293,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -266,7 +293,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
Set<String> keySet = new HashSet<String>(); Set<String> keySet = new HashSet<String>();
for (TraceTransferBean bean : transBeanList) { for (TraceTransferBean bean : transBeanList) {
// keyset of message track trace includes msgId of or original message // keyset of message trace includes msgId of or original message
keySet.addAll(bean.getTransKey()); keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData()); buffer.append(bean.getTransData());
count++; count++;
...@@ -286,16 +313,16 @@ public class AsyncTraceDispatcher implements TraceDispatcher { ...@@ -286,16 +313,16 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
} }
/** /**
* send message track trace data * send message trace data
* *
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message track trace data in this batch * @param data the message trace data in this batch
*/ */
private void sendTraceDataByMQ(Set<String> keySet, final String data) { private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = traceTopicName; String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes()); final Message message = new Message(topic, data.getBytes());
//keyset of message track trace includes msgId of or original message //keyset of message trace includes msgId of or original message
message.setKeys(keySet); message.setKeys(keySet);
try { try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
......
...@@ -17,16 +17,9 @@ ...@@ -17,16 +17,9 @@
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
public class TraceConstants { public class TraceConstants {
public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
public static final String ADDRSRV_URL = "ADDRSRV_URL";
public static final String INSTANCE_NAME = "InstanceName";
public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize";
public static final String MAX_BATCH_NUM = "MaxBatchNum";
public static final String WAKE_UP_NUM = "WakeUpNum";
public static final String MAX_MSG_SIZE = "MaxMsgSize";
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME";
public static final char CONTENT_SPLITOR = (char) 1; public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2; public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_DISPATCHER_TYPE = "DispatcherType"; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
} }
...@@ -21,7 +21,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; ...@@ -21,7 +21,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter;
import java.util.List; import java.util.List;
/** /**
* The context of Track Trace * The context of Trace
*/ */
public class TraceContext implements Comparable<TraceContext> { public class TraceContext implements Comparable<TraceContext> {
......
...@@ -22,12 +22,12 @@ import java.util.ArrayList; ...@@ -22,12 +22,12 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* encode/decode for Track Trace Data * encode/decode for Trace Data
*/ */
public class TraceDataEncoder { public class TraceDataEncoder {
/** /**
* resolving traceContext list From track trace data String * resolving traceContext list From trace data String
* *
* @param traceData * @param traceData
* @return * @return
...@@ -101,7 +101,7 @@ public class TraceDataEncoder { ...@@ -101,7 +101,7 @@ public class TraceDataEncoder {
} }
/** /**
* Encoding the trace context into track data strings and keyset sets * Encoding the trace context into data strings and keyset sets
* *
* @param ctx * @param ctx
* @return * @return
...@@ -110,7 +110,7 @@ public class TraceDataEncoder { ...@@ -110,7 +110,7 @@ public class TraceDataEncoder {
if (ctx == null) { if (ctx == null) {
return null; return null;
} }
//build message track trace of the transfering entity content bean //build message trace of the transfering entity content bean
TraceTransferBean transferBean = new TraceTransferBean(); TraceTransferBean transferBean = new TraceTransferBean();
StringBuilder sb = new StringBuilder(256); StringBuilder sb = new StringBuilder(256);
switch (ctx.getTraceType()) { switch (ctx.getTraceType()) {
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.util.Properties;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException; import java.io.IOException;
...@@ -28,7 +27,7 @@ public interface TraceDispatcher { ...@@ -28,7 +27,7 @@ public interface TraceDispatcher {
/** /**
* Initialize asynchronous transfer data module * Initialize asynchronous transfer data module
*/ */
void start(Properties properties) throws MQClientException; void start(String nameSrvAddr) throws MQClientException;
/** /**
* append the transfering data * append the transfering data
...@@ -45,7 +44,7 @@ public interface TraceDispatcher { ...@@ -45,7 +44,7 @@ public interface TraceDispatcher {
void flush() throws IOException; void flush() throws IOException;
/** /**
* close the track trace Hook * close the trace Hook
*/ */
void shutdown(); void shutdown();
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.remoting.RPCHook;
@Deprecated
public class TraceProducerFactory {
private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>();
private static AtomicBoolean isStarted = new AtomicBoolean(false);
private static DefaultMQProducer traceProducer;
public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
if (traceProducer == null) {
traceProducer = new DefaultMQProducer(rpcHook);
traceProducer.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
String nameSrv = properties.getProperty(TraceConstants.NAMESRV_ADDR);
if (nameSrv == null) {
TopAddressing topAddressing = new TopAddressing(properties.getProperty(TraceConstants.ADDRSRV_URL));
nameSrv = topAddressing.fetchNSAddr();
}
traceProducer.setNamesrvAddr(nameSrv);
traceProducer.setVipChannelEnabled(false);
//the max size of message is 128K
int maxSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_MSG_SIZE, "128000"));
traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
}
return traceProducer;
}
public static void registerTraceDispatcher(String dispatcherId, String nameSrvAddr) throws MQClientException {
dispatcherTable.put(dispatcherId, new Object());
if (traceProducer != null && isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.start();
}
}
public static void unregisterTraceDispatcher(String dispatcherId) {
dispatcherTable.remove(dispatcherId);
if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) {
traceProducer.shutdown();
}
}
}
...@@ -20,7 +20,7 @@ import java.util.HashSet; ...@@ -20,7 +20,7 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
/** /**
* track trace transfering bean * trace transfering bean
*/ */
public class TraceTransferBean { public class TraceTransferBean {
private String transData; private String transData;
......
...@@ -36,12 +36,12 @@ public class SendMessageTraceHookImpl implements SendMessageHook { ...@@ -36,12 +36,12 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
@Override @Override
public String hookName() { public String hookName() {
return "SendMessageTrackHook"; return "SendMessageTraceHook";
} }
@Override @Override
public void sendMessageBefore(SendMessageContext context) { public void sendMessageBefore(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded //if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return; return;
} }
...@@ -51,8 +51,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { ...@@ -51,8 +51,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
context.setMqTraceContext(tuxeContext); context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup()); tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message trace
//build the data bean object of message track trace
TraceBean traceBean = new TraceBean(); TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags()); traceBean.setTags(context.getMessage().getTags());
...@@ -65,7 +64,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { ...@@ -65,7 +64,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
@Override @Override
public void sendMessageAfter(SendMessageContext context) { public void sendMessageAfter(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded //if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) { || context.getMqTraceContext() == null) {
return; return;
......
...@@ -87,7 +87,7 @@ import static org.mockito.Mockito.when; ...@@ -87,7 +87,7 @@ import static org.mockito.Mockito.when;
public class DefaultMQConsumerWithTraceTest { public class DefaultMQConsumerWithTraceTest {
private String consumerGroup; private String consumerGroup;
private String consumerGroupNormal; private String consumerGroupNormal;
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String topic = "FooBar"; private String topic = "FooBar";
private String brokerName = "BrokerA"; private String brokerName = "BrokerA";
...@@ -107,7 +107,7 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -107,7 +107,7 @@ public class DefaultMQConsumerWithTraceTest {
@Mock @Mock
private MQClientAPIImpl mQClientTraceAPIImpl; private MQClientAPIImpl mQClientTraceAPIImpl;
private DefaultMQProducer traceProducer; private DefaultMQProducer traceProducer;
private String customerTraceTopic = "rmq_track_trace_topic_12345"; private String customerTraceTopic = "rmq_trace_topic_12345";
@Before @Before
public void init() throws Exception { public void init() throws Exception {
......
...@@ -83,8 +83,8 @@ public class DefaultMQProducerWithTraceTest { ...@@ -83,8 +83,8 @@ public class DefaultMQProducerWithTraceTest {
private String topic = "FooBar"; private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID"; private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_track_trace_topic_12345"; private String customerTraceTopic = "rmq_trace_topic_12345";
@Before @Before
public void init() throws Exception { public void init() throws Exception {
......
...@@ -54,7 +54,7 @@ public class BrokerConfig { ...@@ -54,7 +54,7 @@ public class BrokerConfig {
@ImportantField @ImportantField
private boolean autoTraceBrokerEnable = false; private boolean autoTraceBrokerEnable = false;
@ImportantField @ImportantField
private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
/** /**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1. * value is 1.
...@@ -759,11 +759,12 @@ public class BrokerConfig { ...@@ -759,11 +759,12 @@ public class BrokerConfig {
this.autoTraceBrokerEnable = autoTraceBrokerEnable; this.autoTraceBrokerEnable = autoTraceBrokerEnable;
} }
public String getMsgTrackTopicName() { public String getMsgTraceTopicName() {
return msgTrackTopicName; return msgTraceTopicName;
} }
public void setMsgTrackTopicName(String msgTrackTopicName) { public void setMsgTraceTopicName(String msgTraceTopicName) {
this.msgTrackTopicName = msgTrackTopicName; this.msgTraceTopicName = msgTraceTopicName;
} }
} }
...@@ -91,7 +91,7 @@ public class MixAll { ...@@ -91,7 +91,7 @@ public class MixAll {
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACK_TRACE_TOPIC = "RMQ_SYS_TRACK_TRACE_TOPIC"; public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
......
...@@ -37,10 +37,10 @@ public class BrokerConfigTest { ...@@ -37,10 +37,10 @@ public class BrokerConfigTest {
brokerConfig.setBrokerName("broker-a"); brokerConfig.setBrokerName("broker-a");
brokerConfig.setBrokerId(0); brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster"); brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4"); brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster"); assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4"); assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0); assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a"); assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false); assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册