未验证 提交 620d02fb 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2930 from zhangjidi2016/litePullConsumer_support_msgTrace

[ISSUE #2556] Lite pull consumer support msg trace
...@@ -47,6 +47,10 @@ ...@@ -47,6 +47,10 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.opentracing</groupId> <groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId> <artifactId>opentracing-api</artifactId>
......
...@@ -23,6 +23,10 @@ import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage ...@@ -23,6 +23,10 @@ import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage
import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
...@@ -30,10 +34,13 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -30,10 +34,13 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/** /**
...@@ -153,6 +160,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -153,6 +160,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/ */
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
* Interface of asynchronous transfer data
*/
private TraceDispatcher traceDispatcher = null;
/**
* The flag for message trace
*/
private boolean enableMsgTrace = false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic;
/** /**
* Default constructor. * Default constructor.
*/ */
...@@ -202,13 +224,24 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -202,13 +224,24 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
setTraceDispatcher();
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start(); this.defaultLitePullConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
} }
@Override @Override
public void shutdown() { public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown(); this.defaultLitePullConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
} }
@Override @Override
...@@ -490,4 +523,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -490,4 +523,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumeTimestamp(String consumeTimestamp) { public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp; this.consumeTimestamp = consumeTimestamp;
} }
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
try {
this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
} }
...@@ -41,12 +41,15 @@ import org.apache.rocketmq.client.consumer.MessageQueueListener; ...@@ -41,12 +41,15 @@ import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener; import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook; import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.MQClientManager;
...@@ -144,6 +147,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -144,6 +147,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final MessageQueueLock messageQueueLock = new MessageQueueLock(); private final MessageQueueLock messageQueueLock = new MessageQueueLock();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer; this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
...@@ -160,6 +165,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -160,6 +165,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException(); this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
} }
public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
this.consumeMessageHookList.add(hook);
log.info("register consumeMessageHook Hook, {}", hook.hookName());
}
public void executeHookBefore(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
} catch (Throwable e) {
log.error("consumeMessageHook {} executeHookBefore exception", hook.hookName(), e);
}
}
}
}
public void executeHookAfter(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
log.error("consumeMessageHook {} executeHookAfter exception", hook.hookName(), e);
}
}
}
}
private void checkServiceState() { private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING) if (this.serviceState != ServiceState.RUNNING)
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
...@@ -858,6 +892,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -858,6 +892,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null null
); );
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult; return pullResult;
} }
......
...@@ -85,7 +85,6 @@ public class TraceDataEncoder { ...@@ -85,7 +85,6 @@ public class TraceDataEncoder {
bean.setMsgId(line[5]); bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6])); bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]); bean.setKeys(line[7]);
bean.setClientHost(line[8]);
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1)); subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean); subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext); resList.add(subBeforeContext);
...@@ -123,10 +122,9 @@ public class TraceDataEncoder { ...@@ -123,10 +122,9 @@ public class TraceDataEncoder {
bean.setKeys(line[7]); bean.setKeys(line[7]);
bean.setStoreHost(line[8]); bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]); bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setClientHost(line[10]); bean.setTransactionId(line[10]);
bean.setTransactionId(line[11]); bean.setTransactionState(LocalTransactionState.valueOf(line[11]));
bean.setTransactionState(LocalTransactionState.valueOf(line[12])); bean.setFromTransactionCheck(Boolean.parseBoolean(line[12]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1)); endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean); endTransactionContext.getTraceBeans().add(bean);
...@@ -166,8 +164,7 @@ public class TraceDataEncoder { ...@@ -166,8 +164,7 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);//
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);
} }
break; break;
case SubBefore: { case SubBefore: {
...@@ -179,8 +176,7 @@ public class TraceDataEncoder { ...@@ -179,8 +176,7 @@ public class TraceDataEncoder {
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);//
} }
} }
break; break;
...@@ -211,7 +207,6 @@ public class TraceDataEncoder { ...@@ -211,7 +207,6 @@ public class TraceDataEncoder {
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR); .append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
......
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.codec.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
public class TraceView { public class TraceView {
...@@ -38,8 +38,9 @@ public class TraceView { ...@@ -38,8 +38,9 @@ public class TraceView {
private String groupName; private String groupName;
private String status; private String status;
public static List<TraceView> decodeFromTraceTransData(String key, String messageBody) { public static List<TraceView> decodeFromTraceTransData(String key, MessageExt messageExt) {
List<TraceView> messageTraceViewList = new ArrayList<TraceView>(); List<TraceView> messageTraceViewList = new ArrayList<TraceView>();
String messageBody = new String(messageExt.getBody(), Charsets.UTF_8);
if (messageBody == null || messageBody.length() <= 0) { if (messageBody == null || messageBody.length() <= 0) {
return messageTraceViewList; return messageTraceViewList;
} }
...@@ -56,8 +57,7 @@ public class TraceView { ...@@ -56,8 +57,7 @@ public class TraceView {
messageTraceView.setGroupName(context.getGroupName()); messageTraceView.setGroupName(context.getGroupName());
if (context.isSuccess()) { if (context.isSuccess()) {
messageTraceView.setStatus("success"); messageTraceView.setStatus("success");
} } else {
else {
messageTraceView.setStatus("failed"); messageTraceView.setStatus("failed");
} }
messageTraceView.setKeys(traceBean.getKeys()); messageTraceView.setKeys(traceBean.getKeys());
...@@ -68,7 +68,7 @@ public class TraceView { ...@@ -68,7 +68,7 @@ public class TraceView {
messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId()); messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
messageTraceView.setTimeStamp(context.getTimeStamp()); messageTraceView.setTimeStamp(context.getTimeStamp());
messageTraceView.setStoreHost(traceBean.getStoreHost()); messageTraceView.setStoreHost(traceBean.getStoreHost());
messageTraceView.setClientHost(traceBean.getClientHost()); messageTraceView.setClientHost(messageExt.getBornHostString());
messageTraceViewList.add(messageTraceView); messageTraceViewList.add(messageTraceView);
} }
return messageTraceViewList; return messageTraceViewList;
......
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
*/ */
package org.apache.rocketmq.client.trace.hook; package org.apache.rocketmq.client.trace.hook;
import java.util.Map;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean; import org.apache.rocketmq.client.trace.TraceBean;
...@@ -74,7 +74,6 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -74,7 +74,6 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
traceBean.setStoreTime(msg.getStoreTimestamp());// traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());// traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());// traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
traceContext.setRegionId(regionId);// traceContext.setRegionId(regionId);//
beans.add(traceBean); beans.add(traceBean);
} }
...@@ -93,7 +92,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -93,7 +92,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext(); TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) { if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it // If subBefore bean is null ,skip it
return; return;
} }
TraceContext subAfterContext = new TraceContext(); TraceContext subAfterContext = new TraceContext();
...@@ -103,14 +102,17 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { ...@@ -103,14 +102,17 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
subAfterContext.setRequestId(subBeforeContext.getRequestId());// subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());// subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages // Calculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);// subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans()); subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE); Map<String, String> props = context.getProps();
if (props != null) {
String contextType = props.get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) { if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal()); subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
} }
}
localDispatcher.append(subAfterContext); localDispatcher.append(subAfterContext);
} }
} }
...@@ -60,7 +60,6 @@ public class SendMessageTraceHookImpl implements SendMessageHook { ...@@ -60,7 +60,6 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType()); traceBean.setMsgType(context.getMsgType());
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean); tuxeContext.getTraceBeans().add(traceBean);
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQLitePullConsumerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private MQAdminImpl mQAdminImpl;
private AsyncTraceDispatcher asyncTraceDispatcher;
private DefaultMQProducer traceProducer;
private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
private DefaultLitePullConsumerImpl litePullConsumerImpl;
private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest";
private String brokerName = "BrokerA";
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
}
@Test
public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithDefaultTraceTopic();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithCustomizedTraceTopic();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setEnableMsgTrace(true);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
}
private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setEnableMsgTrace(true);
litePullConsumer.setCustomizedTraceTopic(customerTraceTopic);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
asyncTraceDispatcher = (AsyncTraceDispatcher) litePullConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(litePullConsumerImpl, mQClientFactory);
PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pullAPIWrapper, mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQAdminImpl);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(rebalanceImpl, mQClientFactory);
offsetStore = spy(litePullConsumerImpl.getOffsetStore());
field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
field.setAccessible(true);
field.set(litePullConsumerImpl, offsetStore);
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
return pullResult;
}
});
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
return messageQueue;
}
private TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
}
MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
}
}
\ No newline at end of file
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
...@@ -50,8 +49,7 @@ public class TraceDataEncoderTest { ...@@ -50,8 +49,7 @@ public class TraceDataEncoderTest {
.append(245).append(TraceConstants.CONTENT_SPLITOR) .append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR) .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR) .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR) .append(true).append(TraceConstants.FIELD_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.toString(); .toString();
} }
...@@ -104,7 +102,6 @@ public class TraceDataEncoderTest { ...@@ -104,7 +102,6 @@ public class TraceDataEncoderTest {
traceBean.setTags("Tags"); traceBean.setTags("Tags");
traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000"); traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
traceBean.setStoreHost("127.0.0.1:10911"); traceBean.setStoreHost("127.0.0.1:10911");
traceBean.setClientHost("127.0.0.1@41700");
traceBean.setMsgType(MessageType.Trans_msg_Commit); traceBean.setMsgType(MessageType.Trans_msg_Commit);
traceBean.setTransactionId("transactionId"); traceBean.setTransactionId("transactionId");
traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE); traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
......
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
package org.apache.rocketmq.client.trace; package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll; import org.apache.commons.codec.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageType; import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -42,16 +43,17 @@ public class TraceViewTest { ...@@ -42,16 +43,17 @@ public class TraceViewTest {
.append(245).append(TraceConstants.CONTENT_SPLITOR) .append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR) .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR) .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR) .append(true).append(TraceConstants.FIELD_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.toString(); .toString();
MessageExt message = new MessageExt();
message.setBody(messageBody.getBytes(Charsets.UTF_8));
String key = "AC1415116D1418B4AAC217FE1B4E0000"; String key = "AC1415116D1418B4AAC217FE1B4E0000";
List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, messageBody); List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, message);
Assert.assertEquals(traceViews.size(), 1); Assert.assertEquals(traceViews.size(), 1);
Assert.assertEquals(traceViews.get(0).getMsgId(), key); Assert.assertEquals(traceViews.get(0).getMsgId(), key);
key = "AD4233434334AAC217FEFFD0000"; key = "AD4233434334AAC217FEFFD0000";
traceViews = TraceView.decodeFromTraceTransData(key, messageBody); traceViews = TraceView.decodeFromTraceTransData(key, message);
Assert.assertEquals(traceViews.size(), 0); Assert.assertEquals(traceViews.size(), 0);
} }
} }
...@@ -19,7 +19,6 @@ package org.apache.rocketmq.tools.command.message; ...@@ -19,7 +19,6 @@ package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -38,7 +37,6 @@ import java.util.Iterator; ...@@ -38,7 +37,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class QueryMsgTraceByIdSubCommand implements SubCommand { public class QueryMsgTraceByIdSubCommand implements SubCommand {
@Override @Override
...@@ -80,7 +78,7 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand { ...@@ -80,7 +78,7 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
List<MessageExt> messageList = queryResult.getMessageList(); List<MessageExt> messageList = queryResult.getMessageList();
List<TraceView> traceViews = new ArrayList<>(); List<TraceView> traceViews = new ArrayList<>();
for (MessageExt message : messageList) { for (MessageExt message : messageList) {
List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, new String(message.getBody(), Charsets.UTF_8)); List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, message);
traceViews.addAll(traceView); traceViews.addAll(traceView);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册