提交 b8cb82f4 编写于 作者: D dongeforever

Rename tracktrace to trace, and polish the package hierarchy

上级 5f664247
...@@ -31,11 +31,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException; ...@@ -31,11 +31,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType; import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcherType;
import org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
...@@ -259,7 +259,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -259,7 +259,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/** /**
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
private AsyncDispatcher traceDispatcher = null; private TraceDispatcher traceDispatcher = null;
/** /**
* Default constructor. * Default constructor.
...@@ -299,17 +299,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -299,17 +299,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
if (msgTraceSwitch) { if (msgTraceSwitch) {
try { try {
Properties tempProperties = new Properties(); Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000"); tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"); tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name()); tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.CONSUMER.name());
if (!UtilAll.isBlank(traceTopicName)) { if (!UtilAll.isBlank(traceTopicName)) {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName); tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName);
} else { } else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
} }
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
...@@ -586,7 +586,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -586,7 +586,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
Properties tempProperties = new Properties(); Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties); traceDispatcher.start(tempProperties);
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
...@@ -772,11 +772,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -772,11 +772,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeTimeout = consumeTimeout; this.consumeTimeout = consumeTimeout;
} }
public AsyncDispatcher getTraceDispatcher() { public TraceDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
public void setTraceDispatcher(AsyncDispatcher traceDispatcher) { public void setTraceDispatcher(TraceDispatcher traceDispatcher) {
this.traceDispatcher = traceDispatcher; this.traceDispatcher = traceDispatcher;
} }
} }
...@@ -27,11 +27,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException; ...@@ -27,11 +27,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType; import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcherType;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl; import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
...@@ -133,7 +133,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -133,7 +133,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/** /**
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
private AsyncDispatcher traceDispatcher = null; private TraceDispatcher traceDispatcher = null;
/** /**
* Default constructor. * Default constructor.
...@@ -165,25 +165,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -165,25 +165,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.producerGroup = producerGroup; this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message track trace feature //if client open the message track trace feature
//TODO wrap this code to TraceDispatcherFactory
if (msgTraceSwitch) { if (msgTraceSwitch) {
try { try {
Properties tempProperties = new Properties(); Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000"); tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000");
tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"); tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048");
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name()); tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.PRODUCER.name());
if (!UtilAll.isBlank(traceTopicName)) { if (!UtilAll.isBlank(traceTopicName)) {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName); tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName);
} else { } else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
} }
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher; traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook( this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTrackHookImpl(traceDispatcher)); new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) { } catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
} }
...@@ -234,10 +235,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -234,10 +235,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
this.defaultMQProducerImpl.start(); this.defaultMQProducerImpl.start();
//TODO wrap this code to TraceDispatcherFactory
if (null != traceDispatcher) { if (null != traceDispatcher) {
try { try {
Properties tempProperties = new Properties(); Properties tempProperties = new Properties();
tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr());
traceDispatcher.start(tempProperties); traceDispatcher.start(tempProperties);
} catch (MQClientException e) { } catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e); log.warn("trace dispatcher start failed ", e);
...@@ -864,11 +866,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -864,11 +866,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
} }
public AsyncDispatcher getTraceDispatcher() { public TraceDispatcher getTraceDispatcher() {
return traceDispatcher; return traceDispatcher;
} }
public void setTraceDispatcher(AsyncDispatcher traceDispatcher) { public void setTraceDispatcher(TraceDispatcher traceDispatcher) {
this.traceDispatcher = traceDispatcher; this.traceDispatcher = traceDispatcher;
} }
} }
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.dispatch.impl; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -26,11 +26,6 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; ...@@ -26,11 +26,6 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceDataEncoder;
import org.apache.rocketmq.client.trace.core.common.TrackTraceTransferBean;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
...@@ -51,10 +46,7 @@ import java.util.concurrent.TimeUnit; ...@@ -51,10 +46,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
/** public class AsyncTraceDispatcher implements TraceDispatcher {
* Created by zongtanghu on 2018/11/6.
*/
public class AsyncArrayDispatcher implements AsyncDispatcher {
private final static InternalLogger log = ClientLogger.getLog(); private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize; private final int queueSize;
...@@ -64,7 +56,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -64,7 +56,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
// the last discard number of log // the last discard number of log
private AtomicLong discardCount; private AtomicLong discardCount;
private Thread worker; private Thread worker;
private ArrayBlockingQueue<TrackTraceContext> traceContextQueue; private ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue; private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook; private volatile Thread shutDownHook;
private volatile boolean stopped = false; private volatile boolean stopped = false;
...@@ -75,17 +67,17 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -75,17 +67,17 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
private String dispatcherId = UUID.randomUUID().toString(); private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName; private String traceTopicName;
public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { public AsyncTraceDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); dispatcherType = properties.getProperty(TraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048")); int queueSize = Integer.parseInt(properties.getProperty(TraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value // queueSize is greater than or equal to the n power of 2 of value
queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
this.queueSize = queueSize; this.queueSize = queueSize;
batchSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_BATCH_NUM, "1")); batchSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_BATCH_NUM, "1"));
this.discardCount = new AtomicLong(0L); this.discardCount = new AtomicLong(0L);
traceContextQueue = new ArrayBlockingQueue<TrackTraceContext>(1024); traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
traceTopicName = properties.getProperty(TrackTraceConstants.TRACE_TOPIC); traceTopicName = properties.getProperty(TraceConstants.TRACE_TOPIC);
this.traceExecuter = new ThreadPoolExecutor(// this.traceExecuter = new ThreadPoolExecutor(//
10, // 10, //
20, // 20, //
...@@ -93,7 +85,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -93,7 +85,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
TimeUnit.MILLISECONDS, // TimeUnit.MILLISECONDS, //
this.appenderQueue, // this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_")); new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
} }
public String getTraceTopicName() { public String getTraceTopicName() {
...@@ -125,8 +117,8 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -125,8 +117,8 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
} }
public void start(Properties properties) throws MQClientException { public void start(Properties properties) throws MQClientException {
TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TrackTraceConstants.NAMESRV_ADDR)); TraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TraceConstants.NAMESRV_ADDR));
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId); this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true); this.worker.setDaemon(true);
this.worker.start(); this.worker.start();
this.registerShutDownHook(); this.registerShutDownHook();
...@@ -134,7 +126,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -134,7 +126,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
@Override @Override
public boolean append(final Object ctx) { public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TrackTraceContext) ctx); boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) { if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
} }
...@@ -159,7 +151,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -159,7 +151,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
public void shutdown() { public void shutdown() {
this.stopped = true; this.stopped = true;
this.traceExecuter.shutdown(); this.traceExecuter.shutdown();
TrackTraceProducerFactory.unregisterTraceDispatcher(dispatcherId); TraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
this.removeShutdownHook(); this.removeShutdownHook();
} }
...@@ -197,9 +189,9 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -197,9 +189,9 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
@Override @Override
public void run() { public void run() {
while (!stopped) { while (!stopped) {
List<TrackTraceContext> contexts = new ArrayList<TrackTraceContext>(batchSize); List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
TrackTraceContext context = null; TraceContext context = null;
try { try {
//get track trace data element from blocking Queue — traceContextQueue //get track trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
...@@ -214,7 +206,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -214,7 +206,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
if (contexts.size() > 0) { if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecuter.submit(request); traceExecuter.submit(request);
} else if (AsyncArrayDispatcher.this.stopped) { } else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true; this.stopped = true;
} }
} }
...@@ -223,13 +215,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -223,13 +215,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
} }
class AsyncAppenderRequest implements Runnable { class AsyncAppenderRequest implements Runnable {
List<TrackTraceContext> contextList; List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TrackTraceContext> contextList) { public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) { if (contextList != null) {
this.contextList = contextList; this.contextList = contextList;
} else { } else {
this.contextList = new ArrayList<TrackTraceContext>(1); this.contextList = new ArrayList<TraceContext>(1);
} }
} }
...@@ -238,9 +230,9 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -238,9 +230,9 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
sendTraceData(contextList); sendTraceData(contextList);
} }
public void sendTraceData(List<TrackTraceContext> contextList) { public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TrackTraceTransferBean>> transBeanMap = new HashMap<String, List<TrackTraceTransferBean>>(); Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TrackTraceContext context : contextList) { for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) { if (context.getTraceBeans().isEmpty()) {
continue; continue;
} }
...@@ -248,15 +240,15 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -248,15 +240,15 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
String topic = context.getTraceBeans().get(0).getTopic(); String topic = context.getTraceBeans().get(0).getTopic();
//2.use original message entity's topic as key //2.use original message entity's topic as key
String key = topic; String key = topic;
List<TrackTraceTransferBean> transBeanList = transBeanMap.get(key); List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) { if (transBeanList == null) {
transBeanList = new ArrayList<TrackTraceTransferBean>(); transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList); transBeanMap.put(key, transBeanList);
} }
TrackTraceTransferBean traceData = TrackTraceDataEncoder.encoderFromContextBean(context); TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData); transBeanList.add(traceData);
} }
for (Map.Entry<String, List<TrackTraceTransferBean>> entry : transBeanMap.entrySet()) { for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
flushData(entry.getValue()); flushData(entry.getValue());
} }
} }
...@@ -264,7 +256,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -264,7 +256,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
/** /**
* batch sending data actually * batch sending data actually
*/ */
private void flushData(List<TrackTraceTransferBean> transBeanList) { private void flushData(List<TraceTransferBean> transBeanList) {
if (transBeanList.size() == 0) { if (transBeanList.size() == 0) {
return; return;
} }
...@@ -273,7 +265,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { ...@@ -273,7 +265,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
int count = 0; int count = 0;
Set<String> keySet = new HashSet<String>(); Set<String> keySet = new HashSet<String>();
for (TrackTraceTransferBean bean : transBeanList) { for (TraceTransferBean bean : transBeanList) {
// keyset of message track trace includes msgId of or original message // keyset of message track trace includes msgId of or original message
keySet.addAll(bean.getTransKey()); keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData()); buffer.append(bean.getTransData());
......
...@@ -14,12 +14,12 @@ ...@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.message.MessageType;
public class TrackTraceBean { public class TraceBean {
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP()); private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = ""; private String topic = "";
private String msgId = ""; private String msgId = "";
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
public class TrackTraceConstants { public class TraceConstants {
public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
public static final String ADDRSRV_URL = "ADDRSRV_URL"; public static final String ADDRSRV_URL = "ADDRSRV_URL";
public static final String INSTANCE_NAME = "InstanceName"; public static final String INSTANCE_NAME = "InstanceName";
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageClientIDSetter;
...@@ -23,9 +23,9 @@ import java.util.List; ...@@ -23,9 +23,9 @@ import java.util.List;
/** /**
* The context of Track Trace * The context of Track Trace
*/ */
public class TrackTraceContext implements Comparable<TrackTraceContext> { public class TraceContext implements Comparable<TraceContext> {
private TrackTraceType traceType; private TraceType traceType;
private long timeStamp = System.currentTimeMillis(); private long timeStamp = System.currentTimeMillis();
private String regionId = ""; private String regionId = "";
private String regionName = ""; private String regionName = "";
...@@ -34,7 +34,7 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> { ...@@ -34,7 +34,7 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> {
private boolean isSuccess = true; private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID(); private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0; private int contextCode = 0;
private List<TrackTraceBean> traceBeans; private List<TraceBean> traceBeans;
public int getContextCode() { public int getContextCode() {
return contextCode; return contextCode;
...@@ -44,11 +44,11 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> { ...@@ -44,11 +44,11 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> {
this.contextCode = contextCode; this.contextCode = contextCode;
} }
public List<TrackTraceBean> getTraceBeans() { public List<TraceBean> getTraceBeans() {
return traceBeans; return traceBeans;
} }
public void setTraceBeans(List<TrackTraceBean> traceBeans) { public void setTraceBeans(List<TraceBean> traceBeans) {
this.traceBeans = traceBeans; this.traceBeans = traceBeans;
} }
...@@ -60,11 +60,11 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> { ...@@ -60,11 +60,11 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> {
this.regionId = regionId; this.regionId = regionId;
} }
public TrackTraceType getTraceType() { public TraceType getTraceType() {
return traceType; return traceType;
} }
public void setTraceType(TrackTraceType traceType) { public void setTraceType(TraceType traceType) {
this.traceType = traceType; this.traceType = traceType;
} }
...@@ -117,7 +117,7 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> { ...@@ -117,7 +117,7 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> {
} }
@Override @Override
public int compareTo(TrackTraceContext o) { public int compareTo(TraceContext o) {
return (int) (this.timeStamp - o.getTimeStamp()); return (int) (this.timeStamp - o.getTimeStamp());
} }
...@@ -127,10 +127,10 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> { ...@@ -127,10 +127,10 @@ public class TrackTraceContext implements Comparable<TrackTraceContext> {
sb.append(traceType).append("_").append(groupName) sb.append(traceType).append("_").append(groupName)
.append("_").append(regionId).append("_").append(isSuccess).append("_"); .append("_").append(regionId).append("_").append(isSuccess).append("_");
if (traceBeans != null && traceBeans.size() > 0) { if (traceBeans != null && traceBeans.size() > 0) {
for (TrackTraceBean bean : traceBeans) { for (TraceBean bean : traceBeans) {
sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_"); sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
} }
} }
return "TrackTraceContext{" + sb.toString() + '}'; return "TraceContext{" + sb.toString() + '}';
} }
} }
...@@ -14,19 +14,17 @@ ...@@ -14,19 +14,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.message.MessageType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.apache.rocketmq.client.trace.core.common.TrackTraceType.Pub;
/** /**
* encode/decode for Track Trace Data * encode/decode for Track Trace Data
*/ */
public class TrackTraceDataEncoder { public class TraceDataEncoder {
/** /**
* resolving traceContext list From track trace data String * resolving traceContext list From track trace data String
...@@ -34,21 +32,21 @@ public class TrackTraceDataEncoder { ...@@ -34,21 +32,21 @@ public class TrackTraceDataEncoder {
* @param traceData * @param traceData
* @return * @return
*/ */
public static List<TrackTraceContext> decoderFromTraceDataString(String traceData) { public static List<TraceContext> decoderFromTraceDataString(String traceData) {
List<TrackTraceContext> resList = new ArrayList<TrackTraceContext>(); List<TraceContext> resList = new ArrayList<TraceContext>();
if (traceData == null || traceData.length() <= 0) { if (traceData == null || traceData.length() <= 0) {
return resList; return resList;
} }
String[] contextList = traceData.split(String.valueOf(TrackTraceConstants.FIELD_SPLITOR)); String[] contextList = traceData.split(String.valueOf(TraceConstants.FIELD_SPLITOR));
for (String context : contextList) { for (String context : contextList) {
String[] line = context.split(String.valueOf(TrackTraceConstants.CONTENT_SPLITOR)); String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
if (line[0].equals(Pub.name())) { if (line[0].equals(TraceType.Pub.name())) {
TrackTraceContext pubContext = new TrackTraceContext(); TraceContext pubContext = new TraceContext();
pubContext.setTraceType(Pub); pubContext.setTraceType(TraceType.Pub);
pubContext.setTimeStamp(Long.parseLong(line[1])); pubContext.setTimeStamp(Long.parseLong(line[1]));
pubContext.setRegionId(line[2]); pubContext.setRegionId(line[2]);
pubContext.setGroupName(line[3]); pubContext.setGroupName(line[3]);
TrackTraceBean bean = new TrackTraceBean(); TraceBean bean = new TraceBean();
bean.setTopic(line[4]); bean.setTopic(line[4]);
bean.setMsgId(line[5]); bean.setMsgId(line[5]);
bean.setTags(line[6]); bean.setTags(line[6]);
...@@ -64,31 +62,31 @@ public class TrackTraceDataEncoder { ...@@ -64,31 +62,31 @@ public class TrackTraceDataEncoder {
bean.setOffsetMsgId(line[12]); bean.setOffsetMsgId(line[12]);
pubContext.setSuccess(Boolean.parseBoolean(line[13])); pubContext.setSuccess(Boolean.parseBoolean(line[13]));
} }
pubContext.setTraceBeans(new ArrayList<TrackTraceBean>(1)); pubContext.setTraceBeans(new ArrayList<TraceBean>(1));
pubContext.getTraceBeans().add(bean); pubContext.getTraceBeans().add(bean);
resList.add(pubContext); resList.add(pubContext);
} else if (line[0].equals(TrackTraceType.SubBefore.name())) { } else if (line[0].equals(TraceType.SubBefore.name())) {
TrackTraceContext subBeforeContext = new TrackTraceContext(); TraceContext subBeforeContext = new TraceContext();
subBeforeContext.setTraceType(TrackTraceType.SubBefore); subBeforeContext.setTraceType(TraceType.SubBefore);
subBeforeContext.setTimeStamp(Long.parseLong(line[1])); subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
subBeforeContext.setRegionId(line[2]); subBeforeContext.setRegionId(line[2]);
subBeforeContext.setGroupName(line[3]); subBeforeContext.setGroupName(line[3]);
subBeforeContext.setRequestId(line[4]); subBeforeContext.setRequestId(line[4]);
TrackTraceBean bean = new TrackTraceBean(); TraceBean bean = new TraceBean();
bean.setMsgId(line[5]); bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6])); bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]); bean.setKeys(line[7]);
subBeforeContext.setTraceBeans(new ArrayList<TrackTraceBean>(1)); subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean); subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext); resList.add(subBeforeContext);
} else if (line[0].equals(TrackTraceType.SubAfter.name())) { } else if (line[0].equals(TraceType.SubAfter.name())) {
TrackTraceContext subAfterContext = new TrackTraceContext(); TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TrackTraceType.SubAfter); subAfterContext.setTraceType(TraceType.SubAfter);
subAfterContext.setRequestId(line[1]); subAfterContext.setRequestId(line[1]);
TrackTraceBean bean = new TrackTraceBean(); TraceBean bean = new TraceBean();
bean.setMsgId(line[2]); bean.setMsgId(line[2]);
bean.setKeys(line[5]); bean.setKeys(line[5]);
subAfterContext.setTraceBeans(new ArrayList<TrackTraceBean>(1)); subAfterContext.setTraceBeans(new ArrayList<TraceBean>(1));
subAfterContext.getTraceBeans().add(bean); subAfterContext.getTraceBeans().add(bean);
subAfterContext.setCostTime(Integer.parseInt(line[3])); subAfterContext.setCostTime(Integer.parseInt(line[3]));
subAfterContext.setSuccess(Boolean.parseBoolean(line[4])); subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
...@@ -108,62 +106,62 @@ public class TrackTraceDataEncoder { ...@@ -108,62 +106,62 @@ public class TrackTraceDataEncoder {
* @param ctx * @param ctx
* @return * @return
*/ */
public static TrackTraceTransferBean encoderFromContextBean(TrackTraceContext ctx) { public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
if (ctx == null) { if (ctx == null) {
return null; return null;
} }
//build message track trace of the transfering entity content bean //build message track trace of the transfering entity content bean
TrackTraceTransferBean transferBean = new TrackTraceTransferBean(); TraceTransferBean transferBean = new TraceTransferBean();
StringBuilder sb = new StringBuilder(256); StringBuilder sb = new StringBuilder(256);
switch (ctx.getTraceType()) { switch (ctx.getTraceType()) {
case Pub: { case Pub: {
TrackTraceBean bean = ctx.getTraceBeans().get(0); TraceBean bean = ctx.getTraceBeans().get(0);
//append the content of context and traceBean to transferBean's TransData //append the content of context and traceBean to transferBean's TransData
sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getBodyLength()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TrackTraceConstants.FIELD_SPLITOR); .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
} }
break; break;
case SubBefore: { case SubBefore: {
for (TrackTraceBean bean : ctx.getTraceBeans()) { for (TraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TrackTraceConstants.FIELD_SPLITOR);// .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
} }
} }
break; break;
case SubAfter: { case SubAfter: {
for (TrackTraceBean bean : ctx.getTraceBeans()) { for (TraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getContextCode()).append(TrackTraceConstants.FIELD_SPLITOR); .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
} }
} }
break; break;
default: default:
} }
transferBean.setTransData(sb.toString()); transferBean.setTransData(sb.toString());
for (TrackTraceBean bean : ctx.getTraceBeans()) { for (TraceBean bean : ctx.getTraceBeans()) {
transferBean.getTransKey().add(bean.getMsgId()); transferBean.getTransKey().add(bean.getMsgId());
if (bean.getKeys() != null && bean.getKeys().length() > 0) { if (bean.getKeys() != null && bean.getKeys().length() > 0) {
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.dispatch; package org.apache.rocketmq.client.trace;
import java.util.Properties; import java.util.Properties;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -23,7 +23,7 @@ import java.io.IOException; ...@@ -23,7 +23,7 @@ import java.io.IOException;
/** /**
* Interface of asynchronous transfer data * Interface of asynchronous transfer data
*/ */
public interface AsyncDispatcher { public interface TraceDispatcher {
/** /**
* Initialize asynchronous transfer data module * Initialize asynchronous transfer data module
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
public enum TrackTraceDispatcherType { public enum TraceDispatcherType {
PRODUCER, PRODUCER,
CONSUMER CONSUMER
} }
...@@ -14,11 +14,10 @@ ...@@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.dispatch.impl; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.namesrv.TopAddressing;
import java.util.Map; import java.util.Map;
...@@ -27,7 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -27,7 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
public class TrackTraceProducerFactory { @Deprecated
public class TraceProducerFactory {
private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>(); private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>();
private static AtomicBoolean isStarted = new AtomicBoolean(false); private static AtomicBoolean isStarted = new AtomicBoolean(false);
...@@ -38,18 +38,18 @@ public class TrackTraceProducerFactory { ...@@ -38,18 +38,18 @@ public class TrackTraceProducerFactory {
if (traceProducer == null) { if (traceProducer == null) {
traceProducer = new DefaultMQProducer(rpcHook); traceProducer = new DefaultMQProducer(rpcHook);
traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME); traceProducer.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000); traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); traceProducer.setInstanceName(properties.getProperty(TraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
String nameSrv = properties.getProperty(TrackTraceConstants.NAMESRV_ADDR); String nameSrv = properties.getProperty(TraceConstants.NAMESRV_ADDR);
if (nameSrv == null) { if (nameSrv == null) {
TopAddressing topAddressing = new TopAddressing(properties.getProperty(TrackTraceConstants.ADDRSRV_URL)); TopAddressing topAddressing = new TopAddressing(properties.getProperty(TraceConstants.ADDRSRV_URL));
nameSrv = topAddressing.fetchNSAddr(); nameSrv = topAddressing.fetchNSAddr();
} }
traceProducer.setNamesrvAddr(nameSrv); traceProducer.setNamesrvAddr(nameSrv);
traceProducer.setVipChannelEnabled(false); traceProducer.setVipChannelEnabled(false);
//the max size of message is 128K //the max size of message is 128K
int maxSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_MSG_SIZE, "128000")); int maxSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_MSG_SIZE, "128000"));
traceProducer.setMaxMessageSize(maxSize - 10 * 1000); traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
} }
return traceProducer; return traceProducer;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
...@@ -22,7 +22,7 @@ import java.util.Set; ...@@ -22,7 +22,7 @@ import java.util.Set;
/** /**
* track trace transfering bean * track trace transfering bean
*/ */
public class TrackTraceTransferBean { public class TraceTransferBean {
private String transData; private String transData;
private Set<String> transKey = new HashSet<String>(); private Set<String> transKey = new HashSet<String>();
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.common; package org.apache.rocketmq.client.trace;
public enum TrackTraceType { public enum TraceType {
Pub, Pub,
SubBefore, SubBefore,
SubAfter, SubAfter,
......
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.hook; package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.core.common.TrackTraceType; import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -32,9 +32,9 @@ import java.util.List; ...@@ -32,9 +32,9 @@ import java.util.List;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
private AsyncDispatcher localDispatcher; private TraceDispatcher localDispatcher;
public ConsumeMessageTraceHookImpl(AsyncDispatcher localDispatcher) { public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher; this.localDispatcher = localDispatcher;
} }
...@@ -48,11 +48,11 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -48,11 +48,11 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return; return;
} }
TrackTraceContext traceContext = new TrackTraceContext(); TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext); context.setMqTraceContext(traceContext);
traceContext.setTraceType(TrackTraceType.SubBefore);// traceContext.setTraceType(TraceType.SubBefore);//
traceContext.setGroupName(context.getConsumerGroup());// traceContext.setGroupName(context.getConsumerGroup());//
List<TrackTraceBean> beans = new ArrayList<TrackTraceBean>(); List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) { for (MessageExt msg : context.getMsgList()) {
if (msg == null) { if (msg == null) {
continue; continue;
...@@ -64,7 +64,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -64,7 +64,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
// if trace switch is false ,skip it // if trace switch is false ,skip it
continue; continue;
} }
TrackTraceBean traceBean = new TrackTraceBean(); TraceBean traceBean = new TraceBean();
traceBean.setTopic(msg.getTopic());// traceBean.setTopic(msg.getTopic());//
traceBean.setMsgId(msg.getMsgId());// traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());// traceBean.setTags(msg.getTags());//
...@@ -87,14 +87,14 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -87,14 +87,14 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return; return;
} }
TrackTraceContext subBeforeContext = (TrackTraceContext) context.getMqTraceContext(); TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// if subbefore bean is null ,skip it // if subbefore bean is null ,skip it
return; return;
} }
TrackTraceContext subAfterContext = new TrackTraceContext(); TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TrackTraceType.SubAfter);// subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());// subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(subBeforeContext.getGroupName());// subAfterContext.setGroupName(subBeforeContext.getGroupName());//
subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setRequestId(subBeforeContext.getRequestId());//
......
...@@ -14,23 +14,23 @@ ...@@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.client.trace.core.hook; package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceType; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import org.apache.rocketmq.client.trace.TraceType;
import java.util.ArrayList; import java.util.ArrayList;
public class SendMessageTrackHookImpl implements SendMessageHook { public class SendMessageTraceHookImpl implements SendMessageHook {
private AsyncDispatcher localDispatcher; private TraceDispatcher localDispatcher;
public SendMessageTrackHookImpl(AsyncDispatcher localDispatcher) { public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher; this.localDispatcher = localDispatcher;
} }
...@@ -42,18 +42,18 @@ public class SendMessageTrackHookImpl implements SendMessageHook { ...@@ -42,18 +42,18 @@ public class SendMessageTrackHookImpl implements SendMessageHook {
@Override @Override
public void sendMessageBefore(SendMessageContext context) { public void sendMessageBefore(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded //if it is message track trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())) { if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return; return;
} }
//build the context content of TuxeTraceContext //build the context content of TuxeTraceContext
TrackTraceContext tuxeContext = new TrackTraceContext(); TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TrackTraceBean>(1)); tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext); context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TrackTraceType.Pub); tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup()); tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message track trace //build the data bean object of message track trace
TrackTraceBean traceBean = new TrackTraceBean(); TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags()); traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys()); traceBean.setKeys(context.getMessage().getKeys());
...@@ -66,7 +66,7 @@ public class SendMessageTrackHookImpl implements SendMessageHook { ...@@ -66,7 +66,7 @@ public class SendMessageTrackHookImpl implements SendMessageHook {
@Override @Override
public void sendMessageAfter(SendMessageContext context) { public void sendMessageAfter(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded //if it is message track trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName()) if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) { || context.getMqTraceContext() == null) {
return; return;
} }
...@@ -80,8 +80,8 @@ public class SendMessageTrackHookImpl implements SendMessageHook { ...@@ -80,8 +80,8 @@ public class SendMessageTrackHookImpl implements SendMessageHook {
return; return;
} }
TrackTraceContext tuxeContext = (TrackTraceContext) context.getMqTraceContext(); TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TrackTraceBean traceBean = tuxeContext.getTraceBeans().get(0); TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime); tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
......
...@@ -53,7 +53,6 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; ...@@ -53,7 +53,6 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
...@@ -103,7 +102,7 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -103,7 +102,7 @@ public class DefaultMQConsumerWithTraceTest {
private DefaultMQPushConsumer customTraceTopicpushConsumer; private DefaultMQPushConsumer customTraceTopicpushConsumer;
private AsyncArrayDispatcher asyncArrayDispatcher; private AsyncTraceDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory; private MQClientInstance mQClientTraceFactory;
@Mock @Mock
private MQClientAPIImpl mQClientTraceAPIImpl; private MQClientAPIImpl mQClientTraceAPIImpl;
...@@ -120,8 +119,8 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -120,8 +119,8 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000); pushConsumer.setPullInterval(60 * 1000);
asyncArrayDispatcher = (AsyncArrayDispatcher)pushConsumer.getTraceDispatcher(); asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher();
traceProducer = asyncArrayDispatcher.getTraceProducer(); traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() { pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
......
...@@ -37,7 +37,6 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; ...@@ -37,7 +37,6 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
...@@ -73,7 +72,7 @@ public class DefaultMQProducerWithTraceTest { ...@@ -73,7 +72,7 @@ public class DefaultMQProducerWithTraceTest {
@Mock @Mock
private MQClientAPIImpl mQClientTraceAPIImpl; private MQClientAPIImpl mQClientTraceAPIImpl;
private AsyncArrayDispatcher asyncArrayDispatcher; private AsyncTraceDispatcher asyncTraceDispatcher;
private DefaultMQProducer producer; private DefaultMQProducer producer;
private DefaultMQProducer customTraceTopicproducer; private DefaultMQProducer customTraceTopicproducer;
...@@ -97,11 +96,11 @@ public class DefaultMQProducerWithTraceTest { ...@@ -97,11 +96,11 @@ public class DefaultMQProducerWithTraceTest {
normalProducer.setNamesrvAddr("127.0.0.1:9877"); normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b' ,'c'}); message = new Message(topic, new byte[] {'a', 'b' ,'c'});
asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher(); asyncTraceDispatcher = (AsyncTraceDispatcher)producer.getTraceDispatcher();
asyncArrayDispatcher.setTraceTopicName(customerTraceTopic); asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
asyncArrayDispatcher.getHostProducer(); asyncTraceDispatcher.getHostProducer();
asyncArrayDispatcher.getHostConsumer(); asyncTraceDispatcher.getHostConsumer();
traceProducer = asyncArrayDispatcher.getTraceProducer(); traceProducer = asyncTraceDispatcher.getTraceProducer();
producer.start(); producer.start();
......
...@@ -40,14 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -40,14 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
*
* 1. view the /conf/plain_acl.yml file under the distribution module, pay attention to the accessKey,secretKey,
* globalWhiteRemoteAddresses and whiteRemoteAddress and some other attributes.
*
* 2. Modify ACL_ACCESS_KEY and ACL_SECRET_KEY to the corresponding accessKey and secretKey in plain_acl.yml
*
*/
public class AclClient { public class AclClient {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册