提交 f244ee35 编写于 作者: Z zhangjidi2016

[ISSUE #2556] LitePullConsumer support message trace.

上级 db05f817
......@@ -47,6 +47,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
......
......@@ -23,6 +23,10 @@ import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -30,10 +34,13 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
......@@ -153,6 +160,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
* Interface of asynchronous transfer data
*/
private TraceDispatcher traceDispatcher = null;
/**
* Default constructor.
*/
......@@ -188,6 +200,57 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this(null, consumerGroup, rpcHook);
}
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(null, consumerGroup, null, enableMsgTrace, null);
}
/**
* Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultLitePullConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(null, consumerGroup, null, enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying namespace, consumer group, RPC hook, enabled msg trace flag and customized trace topic
* name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying namespace, consumer group and RPC hook.
*
......@@ -204,11 +267,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
@Override
......@@ -490,4 +563,8 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
}
......@@ -41,12 +41,15 @@ import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
......@@ -142,6 +145,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
......@@ -158,6 +163,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
this.consumeMessageHookList.add(hook);
log.info("register consumeMessageHook Hook, {}", hook.hookName());
}
public void executeHookBefore(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
} catch (Throwable e) {
log.error("consumeMessageHook {} executeHookBefore exception", hook.hookName(), e);
}
}
}
}
public void executeHookAfter(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
log.error("consumeMessageHook {} executeHookAfter exception", hook.hookName(), e);
}
}
}
}
private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING)
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
......@@ -848,6 +882,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
......
......@@ -85,7 +85,6 @@ public class TraceDataEncoder {
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
bean.setClientHost(line[8]);
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
......@@ -123,10 +122,9 @@ public class TraceDataEncoder {
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setClientHost(line[10]);
bean.setTransactionId(line[11]);
bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
bean.setTransactionId(line[10]);
bean.setTransactionState(LocalTransactionState.valueOf(line[11]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[12]));
endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean);
......@@ -166,8 +164,7 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);//
}
break;
case SubBefore: {
......@@ -179,8 +176,7 @@ public class TraceDataEncoder {
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);//
.append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
}
}
break;
......@@ -211,7 +207,6 @@ public class TraceDataEncoder {
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
......
......@@ -17,10 +17,10 @@
package org.apache.rocketmq.client.trace;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
public class TraceView {
......@@ -38,8 +38,9 @@ public class TraceView {
private String groupName;
private String status;
public static List<TraceView> decodeFromTraceTransData(String key, String messageBody) {
public static List<TraceView> decodeFromTraceTransData(String key, MessageExt messageExt) {
List<TraceView> messageTraceViewList = new ArrayList<TraceView>();
String messageBody = new String(messageExt.getBody(), Charsets.UTF_8);
if (messageBody == null || messageBody.length() <= 0) {
return messageTraceViewList;
}
......@@ -56,8 +57,7 @@ public class TraceView {
messageTraceView.setGroupName(context.getGroupName());
if (context.isSuccess()) {
messageTraceView.setStatus("success");
}
else {
} else {
messageTraceView.setStatus("failed");
}
messageTraceView.setKeys(traceBean.getKeys());
......@@ -68,7 +68,7 @@ public class TraceView {
messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
messageTraceView.setTimeStamp(context.getTimeStamp());
messageTraceView.setStoreHost(traceBean.getStoreHost());
messageTraceView.setClientHost(traceBean.getClientHost());
messageTraceView.setClientHost(messageExt.getBornHostString());
messageTraceViewList.add(messageTraceView);
}
return messageTraceViewList;
......
......@@ -16,10 +16,10 @@
*/
package org.apache.rocketmq.client.trace.hook;
import java.util.Map;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
......@@ -74,7 +74,6 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
......@@ -93,7 +92,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
// If subBefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
......@@ -103,13 +102,16 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages
// Calculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
Map<String, String> props = context.getProps();
if (props != null) {
String contextType = props.get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
}
localDispatcher.append(subAfterContext);
}
......
......@@ -60,7 +60,6 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQLitePullConsumerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private MQAdminImpl mQAdminImpl;
private AsyncTraceDispatcher asyncTraceDispatcher;
private DefaultMQProducer traceProducer;
private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
private DefaultLitePullConsumerImpl litePullConsumerImpl;
private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest";
private String brokerName = "BrokerA";
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
}
@Test
public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithDefaultTraceTopic();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithCustomizedTraceTopic();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
}
private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis(), true, customerTraceTopic);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
asyncTraceDispatcher = (AsyncTraceDispatcher) litePullConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(litePullConsumerImpl, mQClientFactory);
PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pullAPIWrapper, mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQAdminImpl);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(rebalanceImpl, mQClientFactory);
offsetStore = spy(litePullConsumerImpl.getOffsetStore());
field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
field.setAccessible(true);
field.set(litePullConsumerImpl, offsetStore);
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
return pullResult;
}
});
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
return messageQueue;
}
private TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
}
MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
}
}
\ No newline at end of file
......@@ -18,7 +18,6 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
import org.junit.Before;
......@@ -50,8 +49,7 @@ public class TraceDataEncoderTest {
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.append(true).append(TraceConstants.FIELD_SPLITOR)
.toString();
}
......@@ -104,7 +102,6 @@ public class TraceDataEncoderTest {
traceBean.setTags("Tags");
traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
traceBean.setStoreHost("127.0.0.1:10911");
traceBean.setClientHost("127.0.0.1@41700");
traceBean.setMsgType(MessageType.Trans_msg_Commit);
traceBean.setTransactionId("transactionId");
traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
......
......@@ -17,7 +17,8 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll;
import org.apache.commons.codec.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
import org.junit.Test;
......@@ -29,29 +30,30 @@ public class TraceViewTest {
@Test
public void testDecodeFromTraceTransData() {
String messageBody = new StringBuilder()
.append("Pub").append(TraceConstants.CONTENT_SPLITOR)
.append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR)
.append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
.append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
.append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
.append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
.append("Tags").append(TraceConstants.CONTENT_SPLITOR)
.append("Keys").append(TraceConstants.CONTENT_SPLITOR)
.append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
.append(26).append(TraceConstants.CONTENT_SPLITOR)
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.toString();
.append("Pub").append(TraceConstants.CONTENT_SPLITOR)
.append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR)
.append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
.append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
.append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
.append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
.append("Tags").append(TraceConstants.CONTENT_SPLITOR)
.append("Keys").append(TraceConstants.CONTENT_SPLITOR)
.append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
.append(26).append(TraceConstants.CONTENT_SPLITOR)
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.FIELD_SPLITOR)
.toString();
MessageExt message = new MessageExt();
message.setBody(messageBody.getBytes(Charsets.UTF_8));
String key = "AC1415116D1418B4AAC217FE1B4E0000";
List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, message);
Assert.assertEquals(traceViews.size(), 1);
Assert.assertEquals(traceViews.get(0).getMsgId(), key);
key = "AD4233434334AAC217FEFFD0000";
traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
traceViews = TraceView.decodeFromTraceTransData(key, message);
Assert.assertEquals(traceViews.size(), 0);
}
}
......@@ -19,7 +19,6 @@ package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -38,7 +37,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class QueryMsgTraceByIdSubCommand implements SubCommand {
@Override
......@@ -74,13 +72,13 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
}
private void queryTraceByMsgId(final DefaultMQAdminExt admin, String msgId)
throws MQClientException, InterruptedException {
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(TopicValidator.RMQ_SYS_TRACE_TOPIC, msgId, 64, 0, System.currentTimeMillis());
List<MessageExt> messageList = queryResult.getMessageList();
List<TraceView> traceViews = new ArrayList<>();
for (MessageExt message : messageList) {
List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, new String(message.getBody(), Charsets.UTF_8));
List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, message);
traceViews.addAll(traceView);
}
......@@ -92,20 +90,20 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
for (TraceView traceView : traceViews) {
if (traceView.getMsgType().equals(TraceType.Pub.name())) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"#Type",
"#ProducerGroup",
"#ClientHost",
"#SendTime",
"#CostTimes",
"#Status"
"#Type",
"#ProducerGroup",
"#ClientHost",
"#SendTime",
"#CostTimes",
"#Status"
);
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"Pub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
"Pub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
);
System.out.printf("\n");
}
......@@ -124,22 +122,22 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
Iterator<String> consumers = consumerTraceMap.keySet().iterator();
while (consumers.hasNext()) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"#Type",
"#ConsumerGroup",
"#ClientHost",
"#ConsumerTime",
"#CostTimes",
"#Status"
"#Type",
"#ConsumerGroup",
"#ClientHost",
"#ConsumerTime",
"#CostTimes",
"#Status"
);
List<TraceView> consumerTraces = consumerTraceMap.get(consumers.next());
for (TraceView traceView : consumerTraces) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"Sub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
"Sub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
);
}
System.out.printf("\n");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册