diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 2149d6756819bd9eb9ef957e948a46cfc4b65f87..cbe22fdb357d59952d5376ef016157bd484e44c3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -31,11 +31,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; -import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType; -import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; -import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; -import org.apache.rocketmq.client.trace.core.hook.ConsumeMessageTraceHookImpl; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceConstants; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.TraceDispatcherType; +import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -259,7 +259,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Interface of asynchronous transfer data */ - private AsyncDispatcher traceDispatcher = null; + private TraceDispatcher traceDispatcher = null; /** * Default constructor. @@ -299,17 +299,17 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume if (msgTraceSwitch) { try { Properties tempProperties = new Properties(); - tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000"); - tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"); - tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); - tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); - tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name()); + tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000"); + tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048"); + tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100"); + tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.CONSUMER.name()); if (!UtilAll.isBlank(traceTopicName)) { - tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName); + tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName); } else { - tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); + tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); } - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; @@ -586,7 +586,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume if (null != traceDispatcher) { try { Properties tempProperties = new Properties(); - tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); + tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); traceDispatcher.start(tempProperties); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); @@ -772,11 +772,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume this.consumeTimeout = consumeTimeout; } - public AsyncDispatcher getTraceDispatcher() { + public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } - public void setTraceDispatcher(AsyncDispatcher traceDispatcher) { + public void setTraceDispatcher(TraceDispatcher traceDispatcher) { this.traceDispatcher = traceDispatcher; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 22c676055fce1e76c0b237cecb41e1e7065677e0..1592d22b249be76765626d992156760d11bbed17 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -27,11 +27,11 @@ import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; -import org.apache.rocketmq.client.trace.core.common.TrackTraceDispatcherType; -import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; -import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; -import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceConstants; +import org.apache.rocketmq.client.trace.TraceDispatcherType; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; @@ -133,7 +133,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * Interface of asynchronous transfer data */ - private AsyncDispatcher traceDispatcher = null; + private TraceDispatcher traceDispatcher = null; /** * Default constructor. @@ -165,25 +165,26 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message track trace feature + //TODO wrap this code to TraceDispatcherFactory if (msgTraceSwitch) { try { Properties tempProperties = new Properties(); - tempProperties.put(TrackTraceConstants.MAX_MSG_SIZE, "128000"); - tempProperties.put(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"); - tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); - tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); - tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name()); + tempProperties.put(TraceConstants.MAX_MSG_SIZE, "128000"); + tempProperties.put(TraceConstants.ASYNC_BUFFER_SIZE, "2048"); + tempProperties.put(TraceConstants.MAX_BATCH_NUM, "100"); + tempProperties.put(TraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); + tempProperties.put(TraceConstants.TRACE_DISPATCHER_TYPE, TraceDispatcherType.PRODUCER.name()); if (!UtilAll.isBlank(traceTopicName)) { - tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName); + tempProperties.put(TraceConstants.TRACE_TOPIC, traceTopicName); } else { - tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); + tempProperties.put(TraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); } - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); + AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(tempProperties, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( - new SendMessageTrackHookImpl(traceDispatcher)); + new SendMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } @@ -234,10 +235,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); + //TODO wrap this code to TraceDispatcherFactory if (null != traceDispatcher) { try { Properties tempProperties = new Properties(); - tempProperties.put(TrackTraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); + tempProperties.put(TraceConstants.NAMESRV_ADDR, this.getNamesrvAddr()); traceDispatcher.start(tempProperties); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); @@ -864,11 +866,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; } - public AsyncDispatcher getTraceDispatcher() { + public TraceDispatcher getTraceDispatcher() { return traceDispatcher; } - public void setTraceDispatcher(AsyncDispatcher traceDispatcher) { + public void setTraceDispatcher(TraceDispatcher traceDispatcher) { this.traceDispatcher = traceDispatcher; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java similarity index 82% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java rename to client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java index 496d290f5cf70eac0b03cc4c29e640335e0f5914..ce82d1bb1c6dad12593e5659e5a99d9275ed1105 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.dispatch.impl; +package org.apache.rocketmq.client.trace; import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQClientException; @@ -26,11 +26,6 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; -import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; -import org.apache.rocketmq.client.trace.core.common.TrackTraceDataEncoder; -import org.apache.rocketmq.client.trace.core.common.TrackTraceTransferBean; -import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; @@ -51,10 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.remoting.RPCHook; -/** - * Created by zongtanghu on 2018/11/6. - */ -public class AsyncArrayDispatcher implements AsyncDispatcher { +public class AsyncTraceDispatcher implements TraceDispatcher { private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; @@ -64,7 +56,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { // the last discard number of log private AtomicLong discardCount; private Thread worker; - private ArrayBlockingQueue traceContextQueue; + private ArrayBlockingQueue traceContextQueue; private ArrayBlockingQueue appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; @@ -75,17 +67,17 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; - public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { - dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); - int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048")); + public AsyncTraceDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { + dispatcherType = properties.getProperty(TraceConstants.TRACE_DISPATCHER_TYPE); + int queueSize = Integer.parseInt(properties.getProperty(TraceConstants.ASYNC_BUFFER_SIZE, "2048")); // queueSize is greater than or equal to the n power of 2 of value queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1)); this.queueSize = queueSize; - batchSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_BATCH_NUM, "1")); + batchSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_BATCH_NUM, "1")); this.discardCount = new AtomicLong(0L); - traceContextQueue = new ArrayBlockingQueue(1024); + traceContextQueue = new ArrayBlockingQueue(1024); appenderQueue = new ArrayBlockingQueue(queueSize); - traceTopicName = properties.getProperty(TrackTraceConstants.TRACE_TOPIC); + traceTopicName = properties.getProperty(TraceConstants.TRACE_TOPIC); this.traceExecuter = new ThreadPoolExecutor(// 10, // 20, // @@ -93,7 +85,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); } public String getTraceTopicName() { @@ -125,8 +117,8 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { } public void start(Properties properties) throws MQClientException { - TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TrackTraceConstants.NAMESRV_ADDR)); - this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId); + TraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TraceConstants.NAMESRV_ADDR)); + this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); @@ -134,7 +126,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { @Override public boolean append(final Object ctx) { - boolean result = traceContextQueue.offer((TrackTraceContext) ctx); + boolean result = traceContextQueue.offer((TraceContext) ctx); if (!result) { log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); } @@ -159,7 +151,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { public void shutdown() { this.stopped = true; this.traceExecuter.shutdown(); - TrackTraceProducerFactory.unregisterTraceDispatcher(dispatcherId); + TraceProducerFactory.unregisterTraceDispatcher(dispatcherId); this.removeShutdownHook(); } @@ -197,9 +189,9 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { @Override public void run() { while (!stopped) { - List contexts = new ArrayList(batchSize); + List contexts = new ArrayList(batchSize); for (int i = 0; i < batchSize; i++) { - TrackTraceContext context = null; + TraceContext context = null; try { //get track trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); @@ -214,7 +206,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { if (contexts.size() > 0) { AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); traceExecuter.submit(request); - } else if (AsyncArrayDispatcher.this.stopped) { + } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } @@ -223,13 +215,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { } class AsyncAppenderRequest implements Runnable { - List contextList; + List contextList; - public AsyncAppenderRequest(final List contextList) { + public AsyncAppenderRequest(final List contextList) { if (contextList != null) { this.contextList = contextList; } else { - this.contextList = new ArrayList(1); + this.contextList = new ArrayList(1); } } @@ -238,9 +230,9 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { sendTraceData(contextList); } - public void sendTraceData(List contextList) { - Map> transBeanMap = new HashMap>(); - for (TrackTraceContext context : contextList) { + public void sendTraceData(List contextList) { + Map> transBeanMap = new HashMap>(); + for (TraceContext context : contextList) { if (context.getTraceBeans().isEmpty()) { continue; } @@ -248,15 +240,15 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { String topic = context.getTraceBeans().get(0).getTopic(); //2.use original message entity's topic as key String key = topic; - List transBeanList = transBeanMap.get(key); + List transBeanList = transBeanMap.get(key); if (transBeanList == null) { - transBeanList = new ArrayList(); + transBeanList = new ArrayList(); transBeanMap.put(key, transBeanList); } - TrackTraceTransferBean traceData = TrackTraceDataEncoder.encoderFromContextBean(context); + TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); transBeanList.add(traceData); } - for (Map.Entry> entry : transBeanMap.entrySet()) { + for (Map.Entry> entry : transBeanMap.entrySet()) { flushData(entry.getValue()); } } @@ -264,7 +256,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { /** * batch sending data actually */ - private void flushData(List transBeanList) { + private void flushData(List transBeanList) { if (transBeanList.size() == 0) { return; } @@ -273,7 +265,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { int count = 0; Set keySet = new HashSet(); - for (TrackTraceTransferBean bean : transBeanList) { + for (TraceTransferBean bean : transBeanList) { // keyset of message track trace includes msgId of or original message keySet.addAll(bean.getTransKey()); buffer.append(bean.getTransData()); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java similarity index 97% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java index 220a128115c5ee66c11a6b63c6767e55c08f73ff..f93aa38b8293712b56a9af289c15d3586f67e76e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageType; -public class TrackTraceBean { +public class TraceBean { private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP()); private String topic = ""; private String msgId = ""; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java similarity index 94% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index a8868614b5b6fe55221f010013c18ecab1fb33c1..970b556350be2883d0a05134ce1a04db9980d03b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; -public class TrackTraceConstants { +public class TraceConstants { public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; public static final String ADDRSRV_URL = "ADDRSRV_URL"; public static final String INSTANCE_NAME = "InstanceName"; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java similarity index 84% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java index a6374a6d58e5a16a9a83dcb48806251faac0dc49..2370db955b16fdf76499ddcc62d67d6d7ea05c10 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceContext.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceContext.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -23,9 +23,9 @@ import java.util.List; /** * The context of Track Trace */ -public class TrackTraceContext implements Comparable { +public class TraceContext implements Comparable { - private TrackTraceType traceType; + private TraceType traceType; private long timeStamp = System.currentTimeMillis(); private String regionId = ""; private String regionName = ""; @@ -34,7 +34,7 @@ public class TrackTraceContext implements Comparable { private boolean isSuccess = true; private String requestId = MessageClientIDSetter.createUniqID(); private int contextCode = 0; - private List traceBeans; + private List traceBeans; public int getContextCode() { return contextCode; @@ -44,11 +44,11 @@ public class TrackTraceContext implements Comparable { this.contextCode = contextCode; } - public List getTraceBeans() { + public List getTraceBeans() { return traceBeans; } - public void setTraceBeans(List traceBeans) { + public void setTraceBeans(List traceBeans) { this.traceBeans = traceBeans; } @@ -60,11 +60,11 @@ public class TrackTraceContext implements Comparable { this.regionId = regionId; } - public TrackTraceType getTraceType() { + public TraceType getTraceType() { return traceType; } - public void setTraceType(TrackTraceType traceType) { + public void setTraceType(TraceType traceType) { this.traceType = traceType; } @@ -117,7 +117,7 @@ public class TrackTraceContext implements Comparable { } @Override - public int compareTo(TrackTraceContext o) { + public int compareTo(TraceContext o) { return (int) (this.timeStamp - o.getTimeStamp()); } @@ -127,10 +127,10 @@ public class TrackTraceContext implements Comparable { sb.append(traceType).append("_").append(groupName) .append("_").append(regionId).append("_").append(isSuccess).append("_"); if (traceBeans != null && traceBeans.size() > 0) { - for (TrackTraceBean bean : traceBeans) { + for (TraceBean bean : traceBeans) { sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_"); } } - return "TrackTraceContext{" + sb.toString() + '}'; + return "TraceContext{" + sb.toString() + '}'; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java similarity index 54% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index 3362106e6259b5dba22f6dc60740cf814223bad3..6015e27e686718d1257949e4db806133d2682db8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -14,19 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; import org.apache.rocketmq.common.message.MessageType; import java.util.ArrayList; import java.util.List; -import static org.apache.rocketmq.client.trace.core.common.TrackTraceType.Pub; - /** * encode/decode for Track Trace Data */ -public class TrackTraceDataEncoder { +public class TraceDataEncoder { /** * resolving traceContext list From track trace data String @@ -34,21 +32,21 @@ public class TrackTraceDataEncoder { * @param traceData * @return */ - public static List decoderFromTraceDataString(String traceData) { - List resList = new ArrayList(); + public static List decoderFromTraceDataString(String traceData) { + List resList = new ArrayList(); if (traceData == null || traceData.length() <= 0) { return resList; } - String[] contextList = traceData.split(String.valueOf(TrackTraceConstants.FIELD_SPLITOR)); + String[] contextList = traceData.split(String.valueOf(TraceConstants.FIELD_SPLITOR)); for (String context : contextList) { - String[] line = context.split(String.valueOf(TrackTraceConstants.CONTENT_SPLITOR)); - if (line[0].equals(Pub.name())) { - TrackTraceContext pubContext = new TrackTraceContext(); - pubContext.setTraceType(Pub); + String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR)); + if (line[0].equals(TraceType.Pub.name())) { + TraceContext pubContext = new TraceContext(); + pubContext.setTraceType(TraceType.Pub); pubContext.setTimeStamp(Long.parseLong(line[1])); pubContext.setRegionId(line[2]); pubContext.setGroupName(line[3]); - TrackTraceBean bean = new TrackTraceBean(); + TraceBean bean = new TraceBean(); bean.setTopic(line[4]); bean.setMsgId(line[5]); bean.setTags(line[6]); @@ -64,31 +62,31 @@ public class TrackTraceDataEncoder { bean.setOffsetMsgId(line[12]); pubContext.setSuccess(Boolean.parseBoolean(line[13])); } - pubContext.setTraceBeans(new ArrayList(1)); + pubContext.setTraceBeans(new ArrayList(1)); pubContext.getTraceBeans().add(bean); resList.add(pubContext); - } else if (line[0].equals(TrackTraceType.SubBefore.name())) { - TrackTraceContext subBeforeContext = new TrackTraceContext(); - subBeforeContext.setTraceType(TrackTraceType.SubBefore); + } else if (line[0].equals(TraceType.SubBefore.name())) { + TraceContext subBeforeContext = new TraceContext(); + subBeforeContext.setTraceType(TraceType.SubBefore); subBeforeContext.setTimeStamp(Long.parseLong(line[1])); subBeforeContext.setRegionId(line[2]); subBeforeContext.setGroupName(line[3]); subBeforeContext.setRequestId(line[4]); - TrackTraceBean bean = new TrackTraceBean(); + TraceBean bean = new TraceBean(); bean.setMsgId(line[5]); bean.setRetryTimes(Integer.parseInt(line[6])); bean.setKeys(line[7]); - subBeforeContext.setTraceBeans(new ArrayList(1)); + subBeforeContext.setTraceBeans(new ArrayList(1)); subBeforeContext.getTraceBeans().add(bean); resList.add(subBeforeContext); - } else if (line[0].equals(TrackTraceType.SubAfter.name())) { - TrackTraceContext subAfterContext = new TrackTraceContext(); - subAfterContext.setTraceType(TrackTraceType.SubAfter); + } else if (line[0].equals(TraceType.SubAfter.name())) { + TraceContext subAfterContext = new TraceContext(); + subAfterContext.setTraceType(TraceType.SubAfter); subAfterContext.setRequestId(line[1]); - TrackTraceBean bean = new TrackTraceBean(); + TraceBean bean = new TraceBean(); bean.setMsgId(line[2]); bean.setKeys(line[5]); - subAfterContext.setTraceBeans(new ArrayList(1)); + subAfterContext.setTraceBeans(new ArrayList(1)); subAfterContext.getTraceBeans().add(bean); subAfterContext.setCostTime(Integer.parseInt(line[3])); subAfterContext.setSuccess(Boolean.parseBoolean(line[4])); @@ -108,62 +106,62 @@ public class TrackTraceDataEncoder { * @param ctx * @return */ - public static TrackTraceTransferBean encoderFromContextBean(TrackTraceContext ctx) { + public static TraceTransferBean encoderFromContextBean(TraceContext ctx) { if (ctx == null) { return null; } //build message track trace of the transfering entity content bean - TrackTraceTransferBean transferBean = new TrackTraceTransferBean(); + TraceTransferBean transferBean = new TraceTransferBean(); StringBuilder sb = new StringBuilder(256); switch (ctx.getTraceType()) { case Pub: { - TrackTraceBean bean = ctx.getTraceBeans().get(0); + TraceBean bean = ctx.getTraceBeans().get(0); //append the content of context and traceBean to transferBean's TransData - sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getTopic()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getTags()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getStoreHost()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getBodyLength()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getMsgType().ordinal()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getOffsetMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.isSuccess()).append(TrackTraceConstants.FIELD_SPLITOR); + sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); } break; case SubBefore: { - for (TrackTraceBean bean : ctx.getTraceBeans()) { - sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getTimeStamp()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getRegionId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getGroupName()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getRetryTimes()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getKeys()).append(TrackTraceConstants.FIELD_SPLITOR);// + for (TraceBean bean : ctx.getTraceBeans()) { + sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// } } break; case SubAfter: { - for (TrackTraceBean bean : ctx.getTraceBeans()) { - sb.append(ctx.getTraceType()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getRequestId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getMsgId()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getCostTime()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.isSuccess()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(bean.getKeys()).append(TrackTraceConstants.CONTENT_SPLITOR)// - .append(ctx.getContextCode()).append(TrackTraceConstants.FIELD_SPLITOR); + for (TraceBean bean : ctx.getTraceBeans()) { + sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR); } } break; default: } transferBean.setTransData(sb.toString()); - for (TrackTraceBean bean : ctx.getTraceBeans()) { + for (TraceBean bean : ctx.getTraceBeans()) { transferBean.getTransKey().add(bean.getMsgId()); if (bean.getKeys() != null && bean.getKeys().length() > 0) { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java similarity index 94% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java index 9b282bb0b1bfd3491f206301b2140a2340333e01..3efef7c46263d26b48ba4cc6b6379ea4b90f4e13 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/AsyncDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.dispatch; +package org.apache.rocketmq.client.trace; import java.util.Properties; import org.apache.rocketmq.client.exception.MQClientException; @@ -23,7 +23,7 @@ import java.io.IOException; /** * Interface of asynchronous transfer data */ -public interface AsyncDispatcher { +public interface TraceDispatcher { /** * Initialize asynchronous transfer data module diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java similarity index 89% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java index a22198e81bd48f2cf60c7c950898930c1628fe5e..f09c9b8db4bd3356fc51bf531e696336f7c2820b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceDispatcherType.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcherType.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; -public enum TrackTraceDispatcherType { +public enum TraceDispatcherType { PRODUCER, CONSUMER } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java similarity index 84% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java index 37c39a14e552a03412a9a9b36b0a274c4fcd0b9f..6e4ed363a856a635fe04462d43f2ab48813a37ea 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceProducerFactory.java @@ -14,11 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.dispatch.impl; +package org.apache.rocketmq.client.trace; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; import org.apache.rocketmq.common.namesrv.TopAddressing; import java.util.Map; @@ -27,7 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.remoting.RPCHook; -public class TrackTraceProducerFactory { +@Deprecated +public class TraceProducerFactory { private static Map dispatcherTable = new ConcurrentHashMap(); private static AtomicBoolean isStarted = new AtomicBoolean(false); @@ -38,18 +38,18 @@ public class TrackTraceProducerFactory { if (traceProducer == null) { traceProducer = new DefaultMQProducer(rpcHook); - traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME); + traceProducer.setProducerGroup(TraceConstants.GROUP_NAME); traceProducer.setSendMsgTimeout(5000); - traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); - String nameSrv = properties.getProperty(TrackTraceConstants.NAMESRV_ADDR); + traceProducer.setInstanceName(properties.getProperty(TraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); + String nameSrv = properties.getProperty(TraceConstants.NAMESRV_ADDR); if (nameSrv == null) { - TopAddressing topAddressing = new TopAddressing(properties.getProperty(TrackTraceConstants.ADDRSRV_URL)); + TopAddressing topAddressing = new TopAddressing(properties.getProperty(TraceConstants.ADDRSRV_URL)); nameSrv = topAddressing.fetchNSAddr(); } traceProducer.setNamesrvAddr(nameSrv); traceProducer.setVipChannelEnabled(false); //the max size of message is 128K - int maxSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.MAX_MSG_SIZE, "128000")); + int maxSize = Integer.parseInt(properties.getProperty(TraceConstants.MAX_MSG_SIZE, "128000")); traceProducer.setMaxMessageSize(maxSize - 10 * 1000); } return traceProducer; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java similarity index 93% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java index 9535b5d472a55975aa79b4c107e9d88cc4f6ff54..d3d25c4d4a039c16dabe5fd8cf427474252dfeef 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceTransferBean.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceTransferBean.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; import java.util.HashSet; import java.util.Set; @@ -22,7 +22,7 @@ import java.util.Set; /** * track trace transfering bean */ -public class TrackTraceTransferBean { +public class TraceTransferBean { private String transData; private Set transKey = new HashSet(); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java similarity index 91% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java rename to client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java index 72fa630554ee702c8bb6fb9e657c2230c870c535..79b19c17e4e53e4213a456fb4862089d6a0a0920 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.common; +package org.apache.rocketmq.client.trace; -public enum TrackTraceType { +public enum TraceType { Pub, SubBefore, SubAfter, diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java similarity index 81% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java rename to client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java index df88ce2d57699c6322a5451513d729ddf0c3340f..7fbad369798a85435069a31651464469fe19a45c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/ConsumeMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java @@ -14,15 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.hook; +package org.apache.rocketmq.client.trace.hook; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageHook; -import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; -import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; -import org.apache.rocketmq.client.trace.core.common.TrackTraceType; -import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.TraceBean; +import org.apache.rocketmq.client.trace.TraceType; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; @@ -32,9 +32,9 @@ import java.util.List; public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { - private AsyncDispatcher localDispatcher; + private TraceDispatcher localDispatcher; - public ConsumeMessageTraceHookImpl(AsyncDispatcher localDispatcher) { + public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) { this.localDispatcher = localDispatcher; } @@ -48,11 +48,11 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { return; } - TrackTraceContext traceContext = new TrackTraceContext(); + TraceContext traceContext = new TraceContext(); context.setMqTraceContext(traceContext); - traceContext.setTraceType(TrackTraceType.SubBefore);// + traceContext.setTraceType(TraceType.SubBefore);// traceContext.setGroupName(context.getConsumerGroup());// - List beans = new ArrayList(); + List beans = new ArrayList(); for (MessageExt msg : context.getMsgList()) { if (msg == null) { continue; @@ -64,7 +64,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { // if trace switch is false ,skip it continue; } - TrackTraceBean traceBean = new TrackTraceBean(); + TraceBean traceBean = new TraceBean(); traceBean.setTopic(msg.getTopic());// traceBean.setMsgId(msg.getMsgId());// traceBean.setTags(msg.getTags());// @@ -87,14 +87,14 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { return; } - TrackTraceContext subBeforeContext = (TrackTraceContext) context.getMqTraceContext(); + TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext(); if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { // if subbefore bean is null ,skip it return; } - TrackTraceContext subAfterContext = new TrackTraceContext(); - subAfterContext.setTraceType(TrackTraceType.SubAfter);// + TraceContext subAfterContext = new TraceContext(); + subAfterContext.setTraceType(TraceType.SubAfter);// subAfterContext.setRegionId(subBeforeContext.getRegionId());// subAfterContext.setGroupName(subBeforeContext.getGroupName());// subAfterContext.setRequestId(subBeforeContext.getRequestId());// diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java similarity index 74% rename from client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java rename to client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index c174f462de3ad0730f6fa5363f91d7a9b2426779..bfe5d7aa35fadf61da76049e11a0dc2aaf812f61 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -14,23 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.client.trace.core.hook; +package org.apache.rocketmq.client.trace.hook; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; -import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; -import org.apache.rocketmq.client.trace.core.common.TrackTraceType; -import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; -import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.TraceBean; +import org.apache.rocketmq.client.trace.TraceType; import java.util.ArrayList; -public class SendMessageTrackHookImpl implements SendMessageHook { +public class SendMessageTraceHookImpl implements SendMessageHook { - private AsyncDispatcher localDispatcher; + private TraceDispatcher localDispatcher; - public SendMessageTrackHookImpl(AsyncDispatcher localDispatcher) { + public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) { this.localDispatcher = localDispatcher; } @@ -42,18 +42,18 @@ public class SendMessageTrackHookImpl implements SendMessageHook { @Override public void sendMessageBefore(SendMessageContext context) { //if it is message track trace data,then it doesn't recorded - if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())) { + if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { return; } //build the context content of TuxeTraceContext - TrackTraceContext tuxeContext = new TrackTraceContext(); - tuxeContext.setTraceBeans(new ArrayList(1)); + TraceContext tuxeContext = new TraceContext(); + tuxeContext.setTraceBeans(new ArrayList(1)); context.setMqTraceContext(tuxeContext); - tuxeContext.setTraceType(TrackTraceType.Pub); + tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(context.getProducerGroup()); //build the data bean object of message track trace - TrackTraceBean traceBean = new TrackTraceBean(); + TraceBean traceBean = new TraceBean(); traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); @@ -66,7 +66,7 @@ public class SendMessageTrackHookImpl implements SendMessageHook { @Override public void sendMessageAfter(SendMessageContext context) { //if it is message track trace data,then it doesn't recorded - if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName()) + if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; } @@ -80,8 +80,8 @@ public class SendMessageTrackHookImpl implements SendMessageHook { return; } - TrackTraceContext tuxeContext = (TrackTraceContext) context.getMqTraceContext(); - TrackTraceBean traceBean = tuxeContext.getTraceBeans().get(0); + TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); + TraceBean traceBean = tuxeContext.getTraceBeans().get(0); int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); tuxeContext.setCostTime(costTime); if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 08382dfe8ef6a3c5e9b9dc6de297a8094d49f42e..b34784055fe76dd49a56ac14313be0180463993c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -53,7 +53,6 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageDecoder; @@ -103,7 +102,7 @@ public class DefaultMQConsumerWithTraceTest { private DefaultMQPushConsumer customTraceTopicpushConsumer; - private AsyncArrayDispatcher asyncArrayDispatcher; + private AsyncTraceDispatcher asyncTraceDispatcher; private MQClientInstance mQClientTraceFactory; @Mock private MQClientAPIImpl mQClientTraceAPIImpl; @@ -120,8 +119,8 @@ public class DefaultMQConsumerWithTraceTest { pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); - asyncArrayDispatcher = (AsyncArrayDispatcher)pushConsumer.getTraceDispatcher(); - traceProducer = asyncArrayDispatcher.getTraceProducer(); + asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 9460d6834ed0e741989a5ae8ebd3e4c5f0285b26..905efb9830a3d42f72397adda450d7ffd19700e3 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -37,7 +37,6 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; @@ -73,7 +72,7 @@ public class DefaultMQProducerWithTraceTest { @Mock private MQClientAPIImpl mQClientTraceAPIImpl; - private AsyncArrayDispatcher asyncArrayDispatcher; + private AsyncTraceDispatcher asyncTraceDispatcher; private DefaultMQProducer producer; private DefaultMQProducer customTraceTopicproducer; @@ -97,11 +96,11 @@ public class DefaultMQProducerWithTraceTest { normalProducer.setNamesrvAddr("127.0.0.1:9877"); customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); message = new Message(topic, new byte[] {'a', 'b' ,'c'}); - asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher(); - asyncArrayDispatcher.setTraceTopicName(customerTraceTopic); - asyncArrayDispatcher.getHostProducer(); - asyncArrayDispatcher.getHostConsumer(); - traceProducer = asyncArrayDispatcher.getTraceProducer(); + asyncTraceDispatcher = (AsyncTraceDispatcher)producer.getTraceDispatcher(); + asyncTraceDispatcher.setTraceTopicName(customerTraceTopic); + asyncTraceDispatcher.getHostProducer(); + asyncTraceDispatcher.getHostConsumer(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); producer.start(); diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java index 898051704bfcea84bc4091e0d2c0de176c266325..f4cb4e4b378e157e81be3e76de0422e89ade8b53 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java @@ -40,14 +40,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; -/** - * - * 1. view the /conf/plain_acl.yml file under the distribution module, pay attention to the accessKey,secretKey, - * globalWhiteRemoteAddresses and whiteRemoteAddress and some other attributes. - * - * 2. Modify ACL_ACCESS_KEY and ACL_SECRET_KEY to the corresponding accessKey and secretKey in plain_acl.yml - * - */ + public class AclClient { private static final Map OFFSE_TABLE = new HashMap();