diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index bd5eaeeab2e7429cc299818b60970cd1c337e200..f1ae2612aa02c52452c0dc6c1a7447b9d69ebceb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -126,7 +126,7 @@ public class TopicConfigManager extends ConfigManager { } { if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { - String topic = this.brokerController.getBrokerConfig().getMsgTrackTopicName(); + String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index cbe22fdb357d59952d5376ef016157bd484e44c3..edb8cb5abf21db84105ec8693173f6f2d965ef40 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -32,9 +31,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; -import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceDispatcher; -import org.apache.rocketmq.client.trace.TraceDispatcherType; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -288,8 +285,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. * @param allocateMessageQueueStrategy message queue allocating algorithm. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) { @@ -298,21 +295,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); if (msgTraceSwitch) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000"); - tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048"); - tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100"); - tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); - tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.CONSUMER.name()); - if (!UtilAll.isBlank(traceTopicName)) { - tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName); - } else { - tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); - } - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; - this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( new ConsumeMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { @@ -334,8 +319,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Constructor specifying consumer group. * * @param consumerGroup Consumer group. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) { this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName); @@ -585,9 +570,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); - traceDispatcher.start(tempProperties); + traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 1592d22b249be76765626d992156760d11bbed17..23cb53ca1f001b55ff33e72baad56f15b28a5f9e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.client.producer; import java.util.Collection; import java.util.List; -import java.util.Properties; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -28,12 +27,9 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; -import org.apache.rocketmq.client.trace.TraceConstants; -import org.apache.rocketmq.client.trace.TraceDispatcherType; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -158,31 +154,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - //if client open the message track trace feature - //TODO wrap this code to TraceDispatcherFactory + //if client open the message trace feature if (msgTraceSwitch) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000"); - tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048"); - tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100"); - tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); - tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.PRODUCER.name()); - if (!UtilAll.isBlank(traceTopicName)) { - tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName); - } else { - tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); - } - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; - this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { @@ -204,8 +187,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Constructor specifying producer group. * * @param producerGroup Producer group, see the name-sake field. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) { this(producerGroup, null, msgTraceSwitch, traceTopicName); @@ -235,12 +218,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); - //TODO wrap this code to TraceDispatcherFactory if (null != traceDispatcher) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); - traceDispatcher.start(tempProperties); + traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index ce82d1bb1c6dad12593e5659e5a99d9275ed1105..04ef8e09a16e707f4db939363c9770a5797bafa7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.trace; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -26,7 +27,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; @@ -35,7 +38,6 @@ import java.io.IOException; import java.util.List; import java.util.HashMap; import java.util.UUID; -import java.util.Properties; import java.util.Set; import java.util.HashSet; import java.util.ArrayList; @@ -46,11 +48,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.remoting.RPCHook; +import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; + public class AsyncTraceDispatcher implements TraceDispatcher { private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; private final int batchSize; + private final int maxMsgSize; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecuter; // the last discard number of log @@ -60,24 +65,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private ArrayBlockingQueue appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; - private String dispatcherType; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; + private static AtomicBoolean isStarted = new AtomicBoolean(false); + - public AsyncTraceDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { - dispatcherType = properties.getProperty(TraceConstants.TRACE_DISPATCHER_TYPE); - int queueSize = Integer.parseInt(properties.getProperty(TraceConstants.ASYNC_BUFFER_SIZE, "2048")); + public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { // queueSize is greater than or equal to the n power of 2 of value - queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); - this.queueSize = queueSize; - batchSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_BATCH_NUM, "1")); + this.queueSize = 2048; + this.batchSize = 100; + this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); - traceContextQueue = new ArrayBlockingQueue(1024); - appenderQueue = new ArrayBlockingQueue(queueSize); - traceTopicName = properties.getProperty(TraceConstants.TRACE_TOPIC); + this.traceContextQueue = new ArrayBlockingQueue(1024); + this.appenderQueue = new ArrayBlockingQueue(queueSize); + if (!UtilAll.isBlank(traceTopicName)) { + this.traceTopicName = traceTopicName; + } else { + this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; + } this.traceExecuter = new ThreadPoolExecutor(// 10, // 20, // @@ -85,7 +93,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); + traceProducer = getAndCreateTraceProducer(rpcHook); } public String getTraceTopicName() { @@ -116,14 +124,31 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.hostConsumer = hostConsumer; } - public void start(Properties properties) throws MQClientException { - TraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TraceConstants.NAMESRV_ADDR)); + public void start(String nameSrvAddr) throws MQClientException { + if (isStarted.compareAndSet(false, true)) { + traceProducer.setNamesrvAddr(nameSrvAddr); + traceProducer.start(); + } this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); } + private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { + DefaultMQProducer traceProducerInstance = this.traceProducer; + if (traceProducerInstance == null) { + traceProducerInstance = new DefaultMQProducer(rpcHook); + traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); + traceProducerInstance.setSendMsgTimeout(5000); + traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME); + traceProducerInstance.setVipChannelEnabled(false); + //the max size of message is 128K + traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); + } + return traceProducerInstance; + } + @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); @@ -151,7 +176,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void shutdown() { this.stopped = true; this.traceExecuter.shutdown(); - TraceProducerFactory.unregisterTraceDispatcher(dispatcherId); + if (isStarted.get()) { + traceProducer.shutdown(); + } this.removeShutdownHook(); } @@ -193,7 +220,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { - //get track trace data element from blocking Queue — traceContextQueue + //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } @@ -266,7 +293,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { Set keySet = new HashSet(); for (TraceTransferBean bean : transBeanList) { - // keyset of message track trace includes msgId of or original message + // keyset of message trace includes msgId of or original message keySet.addAll(bean.getTransKey()); buffer.append(bean.getTransData()); count++; @@ -286,16 +313,16 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } /** - * send message track trace data + * send message trace data * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) - * @param data the message track trace data in this batch + * @param data the message trace data in this batch */ private void sendTraceDataByMQ(Set keySet, final String data) { String topic = traceTopicName; final Message message = new Message(topic, data.getBytes()); - //keyset of message track trace includes msgId of or original message + //keyset of message trace includes msgId of or original message message.setKeys(keySet); try { Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index 970b556350be2883d0a05134ce1a04db9980d03b..b9fd8778eabc3f0e0e1f6d3898f6099f84ded48d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -17,16 +17,9 @@ package org.apache.rocketmq.client.trace; public class TraceConstants { - public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; - public static final String ADDRSRV_URL = "ADDRSRV_URL"; - public static final String INSTANCE_NAME = "InstanceName"; - public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize"; - public static final String MAX_BATCH_NUM = "MaxBatchNum"; - public static final String WAKE_UP_NUM = "WakeUpNum"; - public static final String MAX_MSG_SIZE = "MaxMsgSize"; + public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; - public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME"; public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; - public static final String TRACE_DISPATCHER_TYPE = "DispatcherType"; + public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java index 2370db955b16fdf76499ddcc62d67d6d7ea05c10..f61ba888cb3320fa26dc8c742fb2bdf586fad45a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import java.util.List; /** - * The context of Track Trace + * The context of Trace */ public class TraceContext implements Comparable { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index 6015e27e686718d1257949e4db806133d2682db8..2ed894024829674f4fed15599ee7c88fe4f95773 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -22,12 +22,12 @@ import java.util.ArrayList; import java.util.List; /** - * encode/decode for Track Trace Data + * encode/decode for Trace Data */ public class TraceDataEncoder { /** - * resolving traceContext list From track trace data String + * resolving traceContext list From trace data String * * @param traceData * @return @@ -101,7 +101,7 @@ public class TraceDataEncoder { } /** - * Encoding the trace context into track data strings and keyset sets + * Encoding the trace context into data strings and keyset sets * * @param ctx * @return @@ -110,7 +110,7 @@ public class TraceDataEncoder { if (ctx == null) { return null; } - //build message track trace of the transfering entity content bean + //build message trace of the transfering entity content bean TraceTransferBean transferBean = new TraceTransferBean(); StringBuilder sb = new StringBuilder(256); switch (ctx.getTraceType()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 3efef7c46263d26b48ba4cc6b6379ea4b90f4e13..2b0f453090692c18cb9f5658208c39b26ef0a0c0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.trace; -import java.util.Properties; import org.apache.rocketmq.client.exception.MQClientException; import java.io.IOException; @@ -28,7 +27,7 @@ public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ - void start(Properties properties) throws MQClientException; + void start(String nameSrvAddr) throws MQClientException; /** * append the transfering data @@ -45,7 +44,7 @@ public interface TraceDispatcher { void flush() throws IOException; /** - * close the track trace Hook + * close the trace Hook */ void shutdown(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java deleted file mode 100644 index 6e4ed363a856a635fe04462d43f2ab48813a37ea..0000000000000000000000000000000000000000 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.trace; - -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.common.namesrv.TopAddressing; - -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rocketmq.remoting.RPCHook; - -@Deprecated -public class TraceProducerFactory { - - private static Map dispatcherTable = new ConcurrentHashMap(); - private static AtomicBoolean isStarted = new AtomicBoolean(false); - private static DefaultMQProducer traceProducer; - - - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) { - if (traceProducer == null) { - - traceProducer = new DefaultMQProducer(rpcHook); - traceProducer.setProducerGroup(TraceConstants.GROUP_NAME); - traceProducer.setSendMsgTimeout(5000); - traceProducer.setInstanceName(properties.getProperty(TraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); - String nameSrv = properties.getProperty(TraceConstants.NAMESRV_ADDR); - if (nameSrv == null) { - TopAddressing topAddressing = new TopAddressing(properties.getProperty(TraceConstants.ADDRSRV_URL)); - nameSrv = topAddressing.fetchNSAddr(); - } - traceProducer.setNamesrvAddr(nameSrv); - traceProducer.setVipChannelEnabled(false); - //the max size of message is 128K - int maxSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_MSG_SIZE, "128000")); - traceProducer.setMaxMessageSize(maxSize - 10 * 1000); - } - return traceProducer; - } - - public static void registerTraceDispatcher(String dispatcherId, String nameSrvAddr) throws MQClientException { - dispatcherTable.put(dispatcherId, new Object()); - if (traceProducer != null && isStarted.compareAndSet(false, true)) { - traceProducer.setNamesrvAddr(nameSrvAddr); - traceProducer.start(); - } - } - - public static void unregisterTraceDispatcher(String dispatcherId) { - dispatcherTable.remove(dispatcherId); - if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) { - traceProducer.shutdown(); - } - } -} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java index d3d25c4d4a039c16dabe5fd8cf427474252dfeef..2e054ee1eb5da87dbbc12359d642523792f4d67a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java @@ -20,7 +20,7 @@ import java.util.HashSet; import java.util.Set; /** - * track trace transfering bean + * trace transfering bean */ public class TraceTransferBean { private String transData; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index bfe5d7aa35fadf61da76049e11a0dc2aaf812f61..20396c6ddcc82d63d964e47f38b1d079de2a9c2c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -36,12 +36,12 @@ public class SendMessageTraceHookImpl implements SendMessageHook { @Override public String hookName() { - return "SendMessageTrackHook"; + return "SendMessageTraceHook"; } @Override public void sendMessageBefore(SendMessageContext context) { - //if it is message track trace data,then it doesn't recorded + //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return; } @@ -51,8 +51,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(context.getProducerGroup()); - - //build the data bean object of message track trace + //build the data bean object of message trace TraceBean traceBean = new TraceBean(); traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTags(context.getMessage().getTags()); @@ -65,7 +64,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { @Override public void sendMessageAfter(SendMessageContext context) { - //if it is message track trace data,then it doesn't recorded + //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index b34784055fe76dd49a56ac14313be0180463993c..b45ad02818594a414783a3708da5d8668decab63 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -87,7 +87,7 @@ import static org.mockito.Mockito.when; public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); + private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); private String topic = "FooBar"; private String brokerName = "BrokerA"; @@ -107,7 +107,7 @@ public class DefaultMQConsumerWithTraceTest { @Mock private MQClientAPIImpl mQClientTraceAPIImpl; private DefaultMQProducer traceProducer; - private String customerTraceTopic = "rmq_track_trace_topic_12345"; + private String customerTraceTopic = "rmq_trace_topic_12345"; @Before public void init() throws Exception { diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 905efb9830a3d42f72397adda450d7ffd19700e3..6dcceeb5c0d7e2526aee0fe7cc96cadc7f68d63d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -83,8 +83,8 @@ public class DefaultMQProducerWithTraceTest { private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); - private String customerTraceTopic = "rmq_track_trace_topic_12345"; + private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String customerTraceTopic = "rmq_trace_topic_12345"; @Before public void init() throws Exception { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9823ca047999f3568571c0368ae440a28f11b7f7..eb1a684dfc19086b8e6d8a93f507f13eb4936e84 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -54,7 +54,7 @@ public class BrokerConfig { @ImportantField private boolean autoTraceBrokerEnable = false; @ImportantField - private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; + private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; /** * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default * value is 1. @@ -759,11 +759,12 @@ public class BrokerConfig { this.autoTraceBrokerEnable = autoTraceBrokerEnable; } - public String getMsgTrackTopicName() { - return msgTrackTopicName; + public String getMsgTraceTopicName() { + return msgTraceTopicName; } - public void setMsgTrackTopicName(String msgTrackTopicName) { - this.msgTrackTopicName = msgTrackTopicName; + public void setMsgTraceTopicName(String msgTraceTopicName) { + this.msgTraceTopicName = msgTraceTopicName; } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 0573c762abeecfd61c65504604b83e5f847614ec..5fdb0120f45c77184e4853b59eac12ec08bcc792 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -91,7 +91,7 @@ public class MixAll { public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; - public static final String RMQ_SYS_TRACK_TRACE_TOPIC = "RMQ_SYS_TRACK_TRACE_TOPIC"; + public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java index 8b10eef2a13dad76da9233e494a1178250399fef..7e197522f30d05ee58bb6e0f6b5aea5a2751f1c3 100644 --- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java @@ -37,10 +37,10 @@ public class BrokerConfigTest { brokerConfig.setBrokerName("broker-a"); brokerConfig.setBrokerId(0); brokerConfig.setBrokerClusterName("DefaultCluster"); - brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4"); + brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4"); assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster"); assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); - assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4"); + assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4"); assertThat(brokerConfig.getBrokerId()).isEqualTo(0); assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a"); assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);