diff --git a/client/pom.xml b/client/pom.xml index 164082c9b26ac7e6fc6384b275266e3e5820057b..87dc89426a211a74ec523d3acbbce6a4afafef3f 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -47,6 +47,10 @@ org.apache.commons commons-lang3 + + commons-codec + commons-codec + io.opentracing opentracing-api diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 393bda928e5da1464acb9292c1f2b73aa7017cd4..a33652d6b7bd3e8d577bd3f42f2b8bde5bcaa654 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -23,6 +23,10 @@ import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; @@ -30,10 +34,13 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { + private final InternalLogger log = ClientLogger.getLog(); + private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; /** @@ -153,6 +160,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon */ private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + /** + * Interface of asynchronous transfer data + */ + private TraceDispatcher traceDispatcher = null; + /** * Default constructor. */ @@ -188,6 +200,57 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon this(null, consumerGroup, rpcHook); } + /** + * Constructor specifying consumer group and enabled msg trace flag. + * + * @param consumerGroup Consumer group. + * @param enableMsgTrace Switch flag instance for message trace. + */ + public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace) { + this(null, consumerGroup, null, enableMsgTrace, null); + } + + /** + * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name. + * + * @param consumerGroup Consumer group. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace, + final String customizedTraceTopic) { + this(null, consumerGroup, null, enableMsgTrace, customizedTraceTopic); + } + + /** + * Constructor specifying namespace, consumer group, RPC hook, enabled msg trace flag and customized trace topic + * name. + * + * @param namespace Namespace for this MQ Producer instance. + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param enableMsgTrace Switch flag instance for message trace. + * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, + boolean enableMsgTrace, final String customizedTraceTopic) { + this.namespace = namespace; + this.consumerGroup = consumerGroup; + defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); + if (enableMsgTrace) { + try { + this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook); + this.defaultLitePullConsumerImpl.registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + + /** * Constructor specifying namespace, consumer group and RPC hook. * @@ -204,11 +267,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void start() throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultLitePullConsumerImpl.start(); + if (null != traceDispatcher) { + try { + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); + } catch (MQClientException e) { + log.warn("trace dispatcher start failed ", e); + } + } } @Override public void shutdown() { this.defaultLitePullConsumerImpl.shutdown(); + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } } @Override @@ -490,4 +563,8 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon public void setConsumeTimestamp(String consumeTimestamp) { this.consumeTimestamp = consumeTimestamp; } + + public TraceDispatcher getTraceDispatcher() { + return traceDispatcher; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 280de46ff2d6d8a09e202083e48948f38ce10b9c..621f5c20ca9ca74382d34b6f5bfff2420877145a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -41,12 +41,15 @@ import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.FilterMessageHook; import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.MQClientManager; @@ -142,6 +145,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { private final MessageQueueLock messageQueueLock = new MessageQueueLock(); + private final ArrayList consumeMessageHookList = new ArrayList<>(); + public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { this.defaultLitePullConsumer = defaultLitePullConsumer; this.rpcHook = rpcHook; @@ -158,6 +163,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException(); } + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register consumeMessageHook Hook, {}", hook.hookName()); + } + + public void executeHookBefore(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + log.error("consumeMessageHook {} executeHookBefore exception", hook.hookName(), e); + } + } + } + } + + public void executeHookAfter(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + log.error("consumeMessageHook {} executeHookAfter exception", hook.hookName(), e); + } + } + } + } + private void checkServiceState() { if (this.serviceState != ServiceState.RUNNING) throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); @@ -848,6 +882,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); + if (!this.consumeMessageHookList.isEmpty()) { + ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace()); + consumeMessageContext.setConsumerGroup(this.groupName()); + consumeMessageContext.setMq(mq); + consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); + consumeMessageContext.setSuccess(false); + this.executeHookBefore(consumeMessageContext); + consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); + consumeMessageContext.setSuccess(true); + this.executeHookAfter(consumeMessageContext); + } return pullResult; } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index 2625d353f84447109055d63c034707ff89da6535..b65ba46a397ca24059dd2e0ccf16a9bb6d32469c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -85,7 +85,6 @@ public class TraceDataEncoder { bean.setMsgId(line[5]); bean.setRetryTimes(Integer.parseInt(line[6])); bean.setKeys(line[7]); - bean.setClientHost(line[8]); subBeforeContext.setTraceBeans(new ArrayList(1)); subBeforeContext.getTraceBeans().add(bean); resList.add(subBeforeContext); @@ -123,10 +122,9 @@ public class TraceDataEncoder { bean.setKeys(line[7]); bean.setStoreHost(line[8]); bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]); - bean.setClientHost(line[10]); - bean.setTransactionId(line[11]); - bean.setTransactionState(LocalTransactionState.valueOf(line[12])); - bean.setFromTransactionCheck(Boolean.parseBoolean(line[13])); + bean.setTransactionId(line[10]); + bean.setTransactionState(LocalTransactionState.valueOf(line[11])); + bean.setFromTransactionCheck(Boolean.parseBoolean(line[12])); endTransactionContext.setTraceBeans(new ArrayList(1)); endTransactionContext.getTraceBeans().add(bean); @@ -166,8 +164,7 @@ public class TraceDataEncoder { .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// - .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// - .append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR); + .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);// } break; case SubBefore: { @@ -179,8 +176,7 @@ public class TraceDataEncoder { .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// - .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// - .append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);// + .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// } } break; @@ -211,7 +207,6 @@ public class TraceDataEncoder { .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// - .append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java index 14fc360f11bb59b4cad696673d2ce1d1acf13863..e78d37ab2cdbd4c5f0ece7d43aca814f1f44eb20 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java @@ -17,10 +17,10 @@ package org.apache.rocketmq.client.trace; - - import java.util.ArrayList; import java.util.List; +import org.apache.commons.codec.Charsets; +import org.apache.rocketmq.common.message.MessageExt; public class TraceView { @@ -38,8 +38,9 @@ public class TraceView { private String groupName; private String status; - public static List decodeFromTraceTransData(String key, String messageBody) { + public static List decodeFromTraceTransData(String key, MessageExt messageExt) { List messageTraceViewList = new ArrayList(); + String messageBody = new String(messageExt.getBody(), Charsets.UTF_8); if (messageBody == null || messageBody.length() <= 0) { return messageTraceViewList; } @@ -56,8 +57,7 @@ public class TraceView { messageTraceView.setGroupName(context.getGroupName()); if (context.isSuccess()) { messageTraceView.setStatus("success"); - } - else { + } else { messageTraceView.setStatus("failed"); } messageTraceView.setKeys(traceBean.getKeys()); @@ -68,7 +68,7 @@ public class TraceView { messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId()); messageTraceView.setTimeStamp(context.getTimeStamp()); messageTraceView.setStoreHost(traceBean.getStoreHost()); - messageTraceView.setClientHost(traceBean.getClientHost()); + messageTraceView.setClientHost(messageExt.getBornHostString()); messageTraceViewList.add(messageTraceView); } return messageTraceViewList; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java index 4f6f916ea972094b79786b7d095de6493bd35371..bce613987ff65753ffca83a99ef2cbc9fadc11a6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java @@ -16,10 +16,10 @@ */ package org.apache.rocketmq.client.trace.hook; +import java.util.Map; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageHook; -import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceBean; @@ -74,7 +74,6 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { traceBean.setStoreTime(msg.getStoreTimestamp());// traceBean.setBodyLength(msg.getStoreSize());// traceBean.setRetryTimes(msg.getReconsumeTimes());// - traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId()); traceContext.setRegionId(regionId);// beans.add(traceBean); } @@ -93,7 +92,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext(); if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { - // If subbefore bean is null ,skip it + // If subBefore bean is null ,skip it return; } TraceContext subAfterContext = new TraceContext(); @@ -103,13 +102,16 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setSuccess(context.isSuccess());// - // Caculate the cost time for processing messages + // Calculate the cost time for processing messages int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); subAfterContext.setCostTime(costTime);// subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); - String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE); - if (contextType != null) { - subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); + Map props = context.getProps(); + if (props != null) { + String contextType = props.get(MixAll.CONSUME_CONTEXT_TYPE); + if (contextType != null) { + subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); + } } localDispatcher.append(subAfterContext); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index 4feb276284ed04f327c81e53029c2589bfcefe7b..80c7babdaa61b8e24a576b6ac7220e5b5fc04d00 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -60,7 +60,6 @@ public class SendMessageTraceHookImpl implements SendMessageHook { traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); - traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId()); tuxeContext.getTraceBeans().add(traceBean); } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d79790d19cd6f8875b30fd454e06a0c0d845458f --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace; + +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQAdminImpl; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.consumer.RebalanceService; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.RPCHook; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultMQLitePullConsumerWithTraceTest { + + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + + @Mock + private MQClientAPIImpl mQClientAPIImpl; + @Mock + private MQAdminImpl mQAdminImpl; + + private AsyncTraceDispatcher asyncTraceDispatcher; + private DefaultMQProducer traceProducer; + private RebalanceImpl rebalanceImpl; + private OffsetStore offsetStore; + private DefaultLitePullConsumerImpl litePullConsumerImpl; + private String consumerGroup = "LitePullConsumerGroup"; + private String topic = "LitePullConsumerTest"; + private String brokerName = "BrokerA"; + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + + private String customerTraceTopic = "rmq_trace_topic_12345"; + + @Before + public void init() throws Exception { + Field field = MQClientInstance.class.getDeclaredField("rebalanceService"); + field.setAccessible(true); + RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory); + field = RebalanceService.class.getDeclaredField("waitInterval"); + field.setAccessible(true); + field.set(rebalanceService, 100); + } + + @Test + public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exception { + DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithDefaultTraceTopic(); + try { + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createMessageQueue()); + litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + litePullConsumer.setPollTimeoutMillis(20 * 1000); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + + @Test + public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws Exception { + DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithCustomizedTraceTopic(); + try { + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createMessageQueue()); + litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + litePullConsumer.setPollTimeoutMillis(20 * 1000); + List result = litePullConsumer.poll(); + assertThat(result.get(0).getTopic()).isEqualTo(topic); + assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'}); + } finally { + litePullConsumer.shutdown(); + } + } + + + private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + litePullConsumer.subscribe(topic, "*"); + suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); + litePullConsumer.start(); + initDefaultLitePullConsumer(litePullConsumer); + return litePullConsumer; + } + + private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception { + DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true, customerTraceTopic); + litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); + litePullConsumer.subscribe(topic, "*"); + suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer); + litePullConsumer.start(); + initDefaultLitePullConsumer(litePullConsumer); + return litePullConsumer; + } + + private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception { + asyncTraceDispatcher = (AsyncTraceDispatcher) litePullConsumer.getTraceDispatcher(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); + Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl"); + field.setAccessible(true); + litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(litePullConsumerImpl, mQClientFactory); + + PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper(); + field = PullAPIWrapper.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(pullAPIWrapper, mQClientFactory); + + Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + fieldTrace.setAccessible(true); + fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + field = MQClientInstance.class.getDeclaredField("mQAdminImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQAdminImpl); + + field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl"); + field.setAccessible(true); + rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl); + field = RebalanceImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(rebalanceImpl, mQClientFactory); + + offsetStore = spy(litePullConsumerImpl.getOffsetStore()); + field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore"); + field.setAccessible(true); + field.set(litePullConsumerImpl, offsetStore); + + traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); + + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + return pullResult; + } + }); + + when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false)); + + doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString()); + + doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class)); + + } + + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List messageExtList) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (MessageExt messageExt : messageExtList) { + outputStream.write(MessageDecoder.encode(messageExt, false)); + } + return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); + } + + private MessageQueue createMessageQueue() { + MessageQueue messageQueue = new MessageQueue(); + messageQueue.setBrokerName(brokerName); + messageQueue.setQueueId(0); + messageQueue.setTopic(topic); + return messageQueue; + } + + private TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("BrokerA"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("BrokerA"); + queueData.setPerm(6); + queueData.setReadQueueNums(3); + queueData.setWriteQueueNums(4); + queueData.setTopicSysFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + sendResult.setOffsetMsgId("123"); + sendResult.setQueueOffset(456); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + return sendResult; + } + + private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException { + DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true); + if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { + litePullConsumer.changeInstanceNameToPID(); + } + MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true))); + ConcurrentMap factoryTable = (ConcurrentMap) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true); + factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory); + doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); + } + +} \ No newline at end of file diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java index bac12ea0ced4eb08b5ba1542a704d962d4291b09..af5a7053957929e62e3d934299a3a6558742b5bf 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.client.trace; import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageType; import org.junit.Assert; import org.junit.Before; @@ -50,8 +49,7 @@ public class TraceDataEncoderTest { .append(245).append(TraceConstants.CONTENT_SPLITOR) .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR) .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR) - .append(true).append(TraceConstants.CONTENT_SPLITOR) - .append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR) + .append(true).append(TraceConstants.FIELD_SPLITOR) .toString(); } @@ -104,7 +102,6 @@ public class TraceDataEncoderTest { traceBean.setTags("Tags"); traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000"); traceBean.setStoreHost("127.0.0.1:10911"); - traceBean.setClientHost("127.0.0.1@41700"); traceBean.setMsgType(MessageType.Trans_msg_Commit); traceBean.setTransactionId("transactionId"); traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java index 51a1543ee5e1235dfc3f98fb87361cff0ad7c9df..b1fdbaf965ad1d6fe30e35ea9fdcc672a52b29e2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java @@ -17,7 +17,8 @@ package org.apache.rocketmq.client.trace; -import org.apache.rocketmq.common.UtilAll; +import org.apache.commons.codec.Charsets; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageType; import org.junit.Assert; import org.junit.Test; @@ -29,29 +30,30 @@ public class TraceViewTest { @Test public void testDecodeFromTraceTransData() { String messageBody = new StringBuilder() - .append("Pub").append(TraceConstants.CONTENT_SPLITOR) - .append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR) - .append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR) - .append("PID-test").append(TraceConstants.CONTENT_SPLITOR) - .append("topic-test").append(TraceConstants.CONTENT_SPLITOR) - .append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR) - .append("Tags").append(TraceConstants.CONTENT_SPLITOR) - .append("Keys").append(TraceConstants.CONTENT_SPLITOR) - .append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR) - .append(26).append(TraceConstants.CONTENT_SPLITOR) - .append(245).append(TraceConstants.CONTENT_SPLITOR) - .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR) - .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR) - .append(true).append(TraceConstants.CONTENT_SPLITOR) - .append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR) - .toString(); + .append("Pub").append(TraceConstants.CONTENT_SPLITOR) + .append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR) + .append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR) + .append("PID-test").append(TraceConstants.CONTENT_SPLITOR) + .append("topic-test").append(TraceConstants.CONTENT_SPLITOR) + .append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR) + .append("Tags").append(TraceConstants.CONTENT_SPLITOR) + .append("Keys").append(TraceConstants.CONTENT_SPLITOR) + .append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR) + .append(26).append(TraceConstants.CONTENT_SPLITOR) + .append(245).append(TraceConstants.CONTENT_SPLITOR) + .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR) + .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR) + .append(true).append(TraceConstants.FIELD_SPLITOR) + .toString(); + MessageExt message = new MessageExt(); + message.setBody(messageBody.getBytes(Charsets.UTF_8)); String key = "AC1415116D1418B4AAC217FE1B4E0000"; - List traceViews = TraceView.decodeFromTraceTransData(key, messageBody); + List traceViews = TraceView.decodeFromTraceTransData(key, message); Assert.assertEquals(traceViews.size(), 1); Assert.assertEquals(traceViews.get(0).getMsgId(), key); key = "AD4233434334AAC217FEFFD0000"; - traceViews = TraceView.decodeFromTraceTransData(key, messageBody); + traceViews = TraceView.decodeFromTraceTransData(key, message); Assert.assertEquals(traceViews.size(), 0); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java index bed2763dadcde8e06c9bababb8e9538f3b48e478..7c2b51fe864bcd53bd339ebcc5d06157e2a6e031 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.tools.command.message; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.commons.codec.Charsets; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQClientException; @@ -38,7 +37,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; - public class QueryMsgTraceByIdSubCommand implements SubCommand { @Override @@ -74,13 +72,13 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand { } private void queryTraceByMsgId(final DefaultMQAdminExt admin, String msgId) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { admin.start(); QueryResult queryResult = admin.queryMessage(TopicValidator.RMQ_SYS_TRACE_TOPIC, msgId, 64, 0, System.currentTimeMillis()); List messageList = queryResult.getMessageList(); List traceViews = new ArrayList<>(); for (MessageExt message : messageList) { - List traceView = TraceView.decodeFromTraceTransData(msgId, new String(message.getBody(), Charsets.UTF_8)); + List traceView = TraceView.decodeFromTraceTransData(msgId, message); traceViews.addAll(traceView); } @@ -92,20 +90,20 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand { for (TraceView traceView : traceViews) { if (traceView.getMsgType().equals(TraceType.Pub.name())) { System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n", - "#Type", - "#ProducerGroup", - "#ClientHost", - "#SendTime", - "#CostTimes", - "#Status" + "#Type", + "#ProducerGroup", + "#ClientHost", + "#SendTime", + "#CostTimes", + "#Status" ); System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n", - "Pub", - traceView.getGroupName(), - traceView.getClientHost(), - DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"), - traceView.getCostTime() + "ms", - traceView.getStatus() + "Pub", + traceView.getGroupName(), + traceView.getClientHost(), + DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"), + traceView.getCostTime() + "ms", + traceView.getStatus() ); System.out.printf("\n"); } @@ -124,22 +122,22 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand { Iterator consumers = consumerTraceMap.keySet().iterator(); while (consumers.hasNext()) { System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n", - "#Type", - "#ConsumerGroup", - "#ClientHost", - "#ConsumerTime", - "#CostTimes", - "#Status" + "#Type", + "#ConsumerGroup", + "#ClientHost", + "#ConsumerTime", + "#CostTimes", + "#Status" ); List consumerTraces = consumerTraceMap.get(consumers.next()); for (TraceView traceView : consumerTraces) { System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n", - "Sub", - traceView.getGroupName(), - traceView.getClientHost(), - DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"), - traceView.getCostTime() + "ms", - traceView.getStatus() + "Sub", + traceView.getGroupName(), + traceView.getClientHost(), + DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"), + traceView.getCostTime() + "ms", + traceView.getStatus() ); } System.out.printf("\n");