From 06da1045e317de32e9c12888fcbde9ae93e4cce0 Mon Sep 17 00:00:00 2001 From: Hu Zongtang Date: Sat, 29 Dec 2018 10:35:07 +0800 Subject: [PATCH] [ISSUE#525]restructure and optimize codes for message track (#645) --- .../broker/topic/TopicConfigManager.java | 2 +- .../consumer/DefaultMQPushConsumer.java | 29 ++------ .../client/producer/DefaultMQProducer.java | 34 ++------- .../client/trace/AsyncTraceDispatcher.java | 67 +++++++++++------ .../rocketmq/client/trace/TraceConstants.java | 11 +-- .../rocketmq/client/trace/TraceContext.java | 2 +- .../client/trace/TraceDataEncoder.java | 8 +-- .../client/trace/TraceDispatcher.java | 5 +- .../client/trace/TraceProducerFactory.java | 72 ------------------- .../client/trace/TraceTransferBean.java | 2 +- .../trace/hook/SendMessageTraceHookImpl.java | 9 ++- .../trace/DefaultMQConsumerWithTraceTest.java | 4 +- .../trace/DefaultMQProducerWithTraceTest.java | 4 +- .../apache/rocketmq/common/BrokerConfig.java | 11 +-- .../org/apache/rocketmq/common/MixAll.java | 2 +- .../rocketmq/common/BrokerConfigTest.java | 4 +- 16 files changed, 88 insertions(+), 178 deletions(-) delete mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index bd5eaeea..f1ae2612 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -126,7 +126,7 @@ public class TopicConfigManager extends ConfigManager { } { if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { - String topic = this.brokerController.getBrokerConfig().getMsgTrackTopicName(); + String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index cbe22fdb..edb8cb5a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -32,9 +31,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; -import org.apache.rocketmq.client.trace.TraceConstants; import org.apache.rocketmq.client.trace.TraceDispatcher; -import org.apache.rocketmq.client.trace.TraceDispatcherType; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -288,8 +285,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param consumerGroup Consume queue. * @param rpcHook RPC hook to execute before each remoting command. * @param allocateMessageQueueStrategy message queue allocating algorithm. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) { @@ -298,21 +295,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); if (msgTraceSwitch) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000"); - tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048"); - tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100"); - tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); - tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.CONSUMER.name()); - if (!UtilAll.isBlank(traceTopicName)) { - tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName); - } else { - tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); - } - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; - this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( new ConsumeMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { @@ -334,8 +319,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Constructor specifying consumer group. * * @param consumerGroup Consumer group. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) { this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName); @@ -585,9 +570,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); - traceDispatcher.start(tempProperties); + traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 1592d22b..23cb53ca 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.client.producer; import java.util.Collection; import java.util.List; -import java.util.Properties; import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -28,12 +27,9 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; -import org.apache.rocketmq.client.trace.TraceConstants; -import org.apache.rocketmq.client.trace.TraceDispatcherType; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -158,31 +154,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); - //if client open the message track trace feature - //TODO wrap this code to TraceDispatcherFactory + //if client open the message trace feature if (msgTraceSwitch) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000"); - tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048"); - tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100"); - tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); - tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.PRODUCER.name()); - if (!UtilAll.isBlank(traceTopicName)) { - tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName); - } else { - tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); - } - AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(traceTopicName, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; - this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { @@ -204,8 +187,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Constructor specifying producer group. * * @param producerGroup Producer group, see the name-sake field. - * @param msgTraceSwitch switch flag instance for message track trace. - * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. + * @param msgTraceSwitch switch flag instance for message trace. + * @param traceTopicName the name value of message trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) { this(producerGroup, null, msgTraceSwitch, traceTopicName); @@ -235,12 +218,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); - //TODO wrap this code to TraceDispatcherFactory if (null != traceDispatcher) { try { - Properties tempProperties = new Properties(); - tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); - traceDispatcher.start(tempProperties); + traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index ce82d1bb..04ef8e09 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.trace; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; @@ -26,7 +27,9 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; @@ -35,7 +38,6 @@ import java.io.IOException; import java.util.List; import java.util.HashMap; import java.util.UUID; -import java.util.Properties; import java.util.Set; import java.util.HashSet; import java.util.ArrayList; @@ -46,11 +48,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.remoting.RPCHook; +import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; + public class AsyncTraceDispatcher implements TraceDispatcher { private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; private final int batchSize; + private final int maxMsgSize; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecuter; // the last discard number of log @@ -60,24 +65,27 @@ public class AsyncTraceDispatcher implements TraceDispatcher { private ArrayBlockingQueue appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; - private String dispatcherType; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; + private static AtomicBoolean isStarted = new AtomicBoolean(false); + - public AsyncTraceDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { - dispatcherType = properties.getProperty(TraceConstants.TRACE_DISPATCHER_TYPE); - int queueSize = Integer.parseInt(properties.getProperty(TraceConstants.ASYNC_BUFFER_SIZE, "2048")); + public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { // queueSize is greater than or equal to the n power of 2 of value - queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); - this.queueSize = queueSize; - batchSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_BATCH_NUM, "1")); + this.queueSize = 2048; + this.batchSize = 100; + this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); - traceContextQueue = new ArrayBlockingQueue(1024); - appenderQueue = new ArrayBlockingQueue(queueSize); - traceTopicName = properties.getProperty(TraceConstants.TRACE_TOPIC); + this.traceContextQueue = new ArrayBlockingQueue(1024); + this.appenderQueue = new ArrayBlockingQueue(queueSize); + if (!UtilAll.isBlank(traceTopicName)) { + this.traceTopicName = traceTopicName; + } else { + this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; + } this.traceExecuter = new ThreadPoolExecutor(// 10, // 20, // @@ -85,7 +93,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); + traceProducer = getAndCreateTraceProducer(rpcHook); } public String getTraceTopicName() { @@ -116,14 +124,31 @@ public class AsyncTraceDispatcher implements TraceDispatcher { this.hostConsumer = hostConsumer; } - public void start(Properties properties) throws MQClientException { - TraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TraceConstants.NAMESRV_ADDR)); + public void start(String nameSrvAddr) throws MQClientException { + if (isStarted.compareAndSet(false, true)) { + traceProducer.setNamesrvAddr(nameSrvAddr); + traceProducer.start(); + } this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); } + private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { + DefaultMQProducer traceProducerInstance = this.traceProducer; + if (traceProducerInstance == null) { + traceProducerInstance = new DefaultMQProducer(rpcHook); + traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); + traceProducerInstance.setSendMsgTimeout(5000); + traceProducerInstance.setInstanceName(TRACE_INSTANCE_NAME); + traceProducerInstance.setVipChannelEnabled(false); + //the max size of message is 128K + traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); + } + return traceProducerInstance; + } + @Override public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); @@ -151,7 +176,9 @@ public class AsyncTraceDispatcher implements TraceDispatcher { public void shutdown() { this.stopped = true; this.traceExecuter.shutdown(); - TraceProducerFactory.unregisterTraceDispatcher(dispatcherId); + if (isStarted.get()) { + traceProducer.shutdown(); + } this.removeShutdownHook(); } @@ -193,7 +220,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { - //get track trace data element from blocking Queue — traceContextQueue + //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } @@ -266,7 +293,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher { Set keySet = new HashSet(); for (TraceTransferBean bean : transBeanList) { - // keyset of message track trace includes msgId of or original message + // keyset of message trace includes msgId of or original message keySet.addAll(bean.getTransKey()); buffer.append(bean.getTransData()); count++; @@ -286,16 +313,16 @@ public class AsyncTraceDispatcher implements TraceDispatcher { } /** - * send message track trace data + * send message trace data * * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId) - * @param data the message track trace data in this batch + * @param data the message trace data in this batch */ private void sendTraceDataByMQ(Set keySet, final String data) { String topic = traceTopicName; final Message message = new Message(topic, data.getBytes()); - //keyset of message track trace includes msgId of or original message + //keyset of message trace includes msgId of or original message message.setKeys(keySet); try { Set traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index 970b5563..b9fd8778 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -17,16 +17,9 @@ package org.apache.rocketmq.client.trace; public class TraceConstants { - public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; - public static final String ADDRSRV_URL = "ADDRSRV_URL"; - public static final String INSTANCE_NAME = "InstanceName"; - public static final String ASYNC_BUFFER_SIZE = "AsyncBufferSize"; - public static final String MAX_BATCH_NUM = "MaxBatchNum"; - public static final String WAKE_UP_NUM = "WakeUpNum"; - public static final String MAX_MSG_SIZE = "MaxMsgSize"; + public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; - public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME"; public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; - public static final String TRACE_DISPATCHER_TYPE = "DispatcherType"; + public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java index 2370db95..f61ba888 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import java.util.List; /** - * The context of Track Trace + * The context of Trace */ public class TraceContext implements Comparable { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index 6015e27e..2ed89402 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -22,12 +22,12 @@ import java.util.ArrayList; import java.util.List; /** - * encode/decode for Track Trace Data + * encode/decode for Trace Data */ public class TraceDataEncoder { /** - * resolving traceContext list From track trace data String + * resolving traceContext list From trace data String * * @param traceData * @return @@ -101,7 +101,7 @@ public class TraceDataEncoder { } /** - * Encoding the trace context into track data strings and keyset sets + * Encoding the trace context into data strings and keyset sets * * @param ctx * @return @@ -110,7 +110,7 @@ public class TraceDataEncoder { if (ctx == null) { return null; } - //build message track trace of the transfering entity content bean + //build message trace of the transfering entity content bean TraceTransferBean transferBean = new TraceTransferBean(); StringBuilder sb = new StringBuilder(256); switch (ctx.getTraceType()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 3efef7c4..2b0f4530 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.trace; -import java.util.Properties; import org.apache.rocketmq.client.exception.MQClientException; import java.io.IOException; @@ -28,7 +27,7 @@ public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ - void start(Properties properties) throws MQClientException; + void start(String nameSrvAddr) throws MQClientException; /** * append the transfering data @@ -45,7 +44,7 @@ public interface TraceDispatcher { void flush() throws IOException; /** - * close the track trace Hook + * close the trace Hook */ void shutdown(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java deleted file mode 100644 index 6e4ed363..00000000 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.trace; - -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.common.namesrv.TopAddressing; - -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rocketmq.remoting.RPCHook; - -@Deprecated -public class TraceProducerFactory { - - private static Map dispatcherTable = new ConcurrentHashMap(); - private static AtomicBoolean isStarted = new AtomicBoolean(false); - private static DefaultMQProducer traceProducer; - - - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) { - if (traceProducer == null) { - - traceProducer = new DefaultMQProducer(rpcHook); - traceProducer.setProducerGroup(TraceConstants.GROUP_NAME); - traceProducer.setSendMsgTimeout(5000); - traceProducer.setInstanceName(properties.getProperty(TraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); - String nameSrv = properties.getProperty(TraceConstants.NAMESRV_ADDR); - if (nameSrv == null) { - TopAddressing topAddressing = new TopAddressing(properties.getProperty(TraceConstants.ADDRSRV_URL)); - nameSrv = topAddressing.fetchNSAddr(); - } - traceProducer.setNamesrvAddr(nameSrv); - traceProducer.setVipChannelEnabled(false); - //the max size of message is 128K - int maxSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_MSG_SIZE, "128000")); - traceProducer.setMaxMessageSize(maxSize - 10 * 1000); - } - return traceProducer; - } - - public static void registerTraceDispatcher(String dispatcherId, String nameSrvAddr) throws MQClientException { - dispatcherTable.put(dispatcherId, new Object()); - if (traceProducer != null && isStarted.compareAndSet(false, true)) { - traceProducer.setNamesrvAddr(nameSrvAddr); - traceProducer.start(); - } - } - - public static void unregisterTraceDispatcher(String dispatcherId) { - dispatcherTable.remove(dispatcherId); - if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) { - traceProducer.shutdown(); - } - } -} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java index d3d25c4d..2e054ee1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java @@ -20,7 +20,7 @@ import java.util.HashSet; import java.util.Set; /** - * track trace transfering bean + * trace transfering bean */ public class TraceTransferBean { private String transData; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index bfe5d7aa..20396c6d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -36,12 +36,12 @@ public class SendMessageTraceHookImpl implements SendMessageHook { @Override public String hookName() { - return "SendMessageTrackHook"; + return "SendMessageTraceHook"; } @Override public void sendMessageBefore(SendMessageContext context) { - //if it is message track trace data,then it doesn't recorded + //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return; } @@ -51,8 +51,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(context.getProducerGroup()); - - //build the data bean object of message track trace + //build the data bean object of message trace TraceBean traceBean = new TraceBean(); traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTags(context.getMessage().getTags()); @@ -65,7 +64,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { @Override public void sendMessageAfter(SendMessageContext context) { - //if it is message track trace data,then it doesn't recorded + //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index b3478405..b45ad028 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -87,7 +87,7 @@ import static org.mockito.Mockito.when; public class DefaultMQConsumerWithTraceTest { private String consumerGroup; private String consumerGroupNormal; - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); + private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); private String topic = "FooBar"; private String brokerName = "BrokerA"; @@ -107,7 +107,7 @@ public class DefaultMQConsumerWithTraceTest { @Mock private MQClientAPIImpl mQClientTraceAPIImpl; private DefaultMQProducer traceProducer; - private String customerTraceTopic = "rmq_track_trace_topic_12345"; + private String customerTraceTopic = "rmq_trace_topic_12345"; @Before public void init() throws Exception { diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 905efb98..6dcceeb5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -83,8 +83,8 @@ public class DefaultMQProducerWithTraceTest { private String topic = "FooBar"; private String producerGroupPrefix = "FooBar_PID"; private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); - private String customerTraceTopic = "rmq_track_trace_topic_12345"; + private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String customerTraceTopic = "rmq_trace_topic_12345"; @Before public void init() throws Exception { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9823ca04..eb1a684d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -54,7 +54,7 @@ public class BrokerConfig { @ImportantField private boolean autoTraceBrokerEnable = false; @ImportantField - private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; + private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; /** * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default * value is 1. @@ -759,11 +759,12 @@ public class BrokerConfig { this.autoTraceBrokerEnable = autoTraceBrokerEnable; } - public String getMsgTrackTopicName() { - return msgTrackTopicName; + public String getMsgTraceTopicName() { + return msgTraceTopicName; } - public void setMsgTrackTopicName(String msgTrackTopicName) { - this.msgTrackTopicName = msgTrackTopicName; + public void setMsgTraceTopicName(String msgTraceTopicName) { + this.msgTraceTopicName = msgTraceTopicName; } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 0573c762..5fdb0120 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -91,7 +91,7 @@ public class MixAll { public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType"; public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; - public static final String RMQ_SYS_TRACK_TRACE_TOPIC = "RMQ_SYS_TRACK_TRACE_TOPIC"; + public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC"; public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java index 8b10eef2..7e197522 100644 --- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java @@ -37,10 +37,10 @@ public class BrokerConfigTest { brokerConfig.setBrokerName("broker-a"); brokerConfig.setBrokerId(0); brokerConfig.setBrokerClusterName("DefaultCluster"); - brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4"); + brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4"); assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster"); assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); - assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4"); + assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4"); assertThat(brokerConfig.getBrokerId()).isEqualTo(0); assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a"); assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false); -- GitLab