diff --git a/client/pom.xml b/client/pom.xml
index 164082c9b26ac7e6fc6384b275266e3e5820057b..87dc89426a211a74ec523d3acbbce6a4afafef3f 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -47,6 +47,10 @@
org.apache.commons
commons-lang3
+
+ commons-codec
+ commons-codec
+
io.opentracing
opentracing-api
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 393bda928e5da1464acb9292c1f2b73aa7017cd4..c54399aa5ed24346fc801e4d9b15050ee7a2b997 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -23,6 +23,10 @@ import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -30,10 +34,13 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
+ private final InternalLogger log = ClientLogger.getLog();
+
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
@@ -153,6 +160,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+ /**
+ * Interface of asynchronous transfer data
+ */
+ private TraceDispatcher traceDispatcher = null;
+
+ /**
+ * The flag for message trace
+ */
+ private boolean enableMsgTrace = false;
+
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ */
+ private String customizedTraceTopic;
+
/**
* Default constructor.
*/
@@ -202,13 +224,24 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void start() throws MQClientException {
+ setTraceDispatcher();
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start();
+ if (null != traceDispatcher) {
+ try {
+ traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
+ } catch (MQClientException e) {
+ log.warn("trace dispatcher start failed ", e);
+ }
+ }
}
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
+ if (null != traceDispatcher) {
+ traceDispatcher.shutdown();
+ }
}
@Override
@@ -490,4 +523,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
+
+ public TraceDispatcher getTraceDispatcher() {
+ return traceDispatcher;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.customizedTraceTopic = customizedTraceTopic;
+ }
+
+ private void setTraceDispatcher() {
+ if (isEnableMsgTrace()) {
+ try {
+ this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
+ this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
+ new ConsumeMessageTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ }
+ }
+ }
+
+ public String getCustomizedTraceTopic() {
+ return customizedTraceTopic;
+ }
+
+ public boolean isEnableMsgTrace() {
+ return enableMsgTrace;
+ }
+
+ public void setEnableMsgTrace(boolean enableMsgTrace) {
+ this.enableMsgTrace = enableMsgTrace;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 1e6b8dc99699aa9236c9e8a3dea0825606b74e2f..4e139c44ce0c25d0eee8b01bc2f87804a036c8b9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -41,12 +41,15 @@ import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
@@ -144,6 +147,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+ private final ArrayList consumeMessageHookList = new ArrayList<>();
+
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
@@ -160,6 +165,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
+ public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+ this.consumeMessageHookList.add(hook);
+ log.info("register consumeMessageHook Hook, {}", hook.hookName());
+ }
+
+ public void executeHookBefore(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable e) {
+ log.error("consumeMessageHook {} executeHookBefore exception", hook.hookName(), e);
+ }
+ }
+ }
+ }
+
+ public void executeHookAfter(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable e) {
+ log.error("consumeMessageHook {} executeHookAfter exception", hook.hookName(), e);
+ }
+ }
+ }
+ }
+
private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING)
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
@@ -858,6 +892,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
+ if (!this.consumeMessageHookList.isEmpty()) {
+ ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
+ consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
+ consumeMessageContext.setConsumerGroup(this.groupName());
+ consumeMessageContext.setMq(mq);
+ consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
+ consumeMessageContext.setSuccess(false);
+ this.executeHookBefore(consumeMessageContext);
+ consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+ consumeMessageContext.setSuccess(true);
+ this.executeHookAfter(consumeMessageContext);
+ }
return pullResult;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 2625d353f84447109055d63c034707ff89da6535..b65ba46a397ca24059dd2e0ccf16a9bb6d32469c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -85,7 +85,6 @@ public class TraceDataEncoder {
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
- bean.setClientHost(line[8]);
subBeforeContext.setTraceBeans(new ArrayList(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
@@ -123,10 +122,9 @@ public class TraceDataEncoder {
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
- bean.setClientHost(line[10]);
- bean.setTransactionId(line[11]);
- bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
- bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
+ bean.setTransactionId(line[10]);
+ bean.setTransactionState(LocalTransactionState.valueOf(line[11]));
+ bean.setFromTransactionCheck(Boolean.parseBoolean(line[12]));
endTransactionContext.setTraceBeans(new ArrayList(1));
endTransactionContext.getTraceBeans().add(bean);
@@ -166,8 +164,7 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);
+ .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);//
}
break;
case SubBefore: {
@@ -179,8 +176,7 @@ public class TraceDataEncoder {
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);//
+ .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
}
}
break;
@@ -211,7 +207,6 @@ public class TraceDataEncoder {
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java
index 14fc360f11bb59b4cad696673d2ce1d1acf13863..e78d37ab2cdbd4c5f0ece7d43aca814f1f44eb20 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java
@@ -17,10 +17,10 @@
package org.apache.rocketmq.client.trace;
-
-
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.codec.Charsets;
+import org.apache.rocketmq.common.message.MessageExt;
public class TraceView {
@@ -38,8 +38,9 @@ public class TraceView {
private String groupName;
private String status;
- public static List decodeFromTraceTransData(String key, String messageBody) {
+ public static List decodeFromTraceTransData(String key, MessageExt messageExt) {
List messageTraceViewList = new ArrayList();
+ String messageBody = new String(messageExt.getBody(), Charsets.UTF_8);
if (messageBody == null || messageBody.length() <= 0) {
return messageTraceViewList;
}
@@ -56,8 +57,7 @@ public class TraceView {
messageTraceView.setGroupName(context.getGroupName());
if (context.isSuccess()) {
messageTraceView.setStatus("success");
- }
- else {
+ } else {
messageTraceView.setStatus("failed");
}
messageTraceView.setKeys(traceBean.getKeys());
@@ -68,7 +68,7 @@ public class TraceView {
messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
messageTraceView.setTimeStamp(context.getTimeStamp());
messageTraceView.setStoreHost(traceBean.getStoreHost());
- messageTraceView.setClientHost(traceBean.getClientHost());
+ messageTraceView.setClientHost(messageExt.getBornHostString());
messageTraceViewList.add(messageTraceView);
}
return messageTraceViewList;
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
index 4f6f916ea972094b79786b7d095de6493bd35371..bce613987ff65753ffca83a99ef2cbc9fadc11a6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java
@@ -16,10 +16,10 @@
*/
package org.apache.rocketmq.client.trace.hook;
+import java.util.Map;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
-import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
@@ -74,7 +74,6 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
- traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
@@ -93,7 +92,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
- // If subbefore bean is null ,skip it
+ // If subBefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
@@ -103,13 +102,16 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
- // Caculate the cost time for processing messages
+ // Calculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
- String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
- if (contextType != null) {
- subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
+ Map props = context.getProps();
+ if (props != null) {
+ String contextType = props.get(MixAll.CONSUME_CONTEXT_TYPE);
+ if (contextType != null) {
+ subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
+ }
}
localDispatcher.append(subAfterContext);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index 4feb276284ed04f327c81e53029c2589bfcefe7b..80c7babdaa61b8e24a576b6ac7220e5b5fc04d00 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -60,7 +60,6 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
- traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean);
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..67ae194b880ce5a497b7d0aaac72566276fb1ec5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace;
+
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
+import org.apache.rocketmq.client.impl.consumer.RebalanceService;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQLitePullConsumerWithTraceTest {
+
+ @Spy
+ private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+
+ @Mock
+ private MQClientAPIImpl mQClientAPIImpl;
+ @Mock
+ private MQAdminImpl mQAdminImpl;
+
+ private AsyncTraceDispatcher asyncTraceDispatcher;
+ private DefaultMQProducer traceProducer;
+ private RebalanceImpl rebalanceImpl;
+ private OffsetStore offsetStore;
+ private DefaultLitePullConsumerImpl litePullConsumerImpl;
+ private String consumerGroup = "LitePullConsumerGroup";
+ private String topic = "LitePullConsumerTest";
+ private String brokerName = "BrokerA";
+ private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
+
+ private String customerTraceTopic = "rmq_trace_topic_12345";
+
+ @Before
+ public void init() throws Exception {
+ Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
+ field.setAccessible(true);
+ RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
+ field = RebalanceService.class.getDeclaredField("waitInterval");
+ field.setAccessible(true);
+ field.set(rebalanceService, 100);
+ }
+
+ @Test
+ public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithDefaultTraceTopic();
+ try {
+ Set messageQueueSet = new HashSet();
+ messageQueueSet.add(createMessageQueue());
+ litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+ litePullConsumer.setPollTimeoutMillis(20 * 1000);
+ List result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+ @Test
+ public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithCustomizedTraceTopic();
+ try {
+ Set messageQueueSet = new HashSet();
+ messageQueueSet.add(createMessageQueue());
+ litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+ litePullConsumer.setPollTimeoutMillis(20 * 1000);
+ List result = litePullConsumer.poll();
+ assertThat(result.get(0).getTopic()).isEqualTo(topic);
+ assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+ } finally {
+ litePullConsumer.shutdown();
+ }
+ }
+
+
+ private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ litePullConsumer.setEnableMsgTrace(true);
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ litePullConsumer.subscribe(topic, "*");
+ suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ return litePullConsumer;
+ }
+
+ private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception {
+ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+ litePullConsumer.setEnableMsgTrace(true);
+ litePullConsumer.setCustomizedTraceTopic(customerTraceTopic);
+ litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ litePullConsumer.subscribe(topic, "*");
+ suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
+ return litePullConsumer;
+ }
+
+ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
+ asyncTraceDispatcher = (AsyncTraceDispatcher) litePullConsumer.getTraceDispatcher();
+ traceProducer = asyncTraceDispatcher.getTraceProducer();
+ Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
+ field.setAccessible(true);
+ litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(litePullConsumerImpl, mQClientFactory);
+
+ PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
+ field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(pullAPIWrapper, mQClientFactory);
+
+ Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
+ fieldTrace.setAccessible(true);
+ fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
+
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQClientAPIImpl);
+
+ field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQAdminImpl);
+
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
+ field.setAccessible(true);
+ rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
+ field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(rebalanceImpl, mQClientFactory);
+
+ offsetStore = spy(litePullConsumerImpl.getOffsetStore());
+ field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
+ field.setAccessible(true);
+ field.set(litePullConsumerImpl, offsetStore);
+
+ traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
+
+ when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer