From 2a8ba5a79935ab0a23646455427647ebd57d9e07 Mon Sep 17 00:00:00 2001 From: ChaosYjh <763900954@qq.com> Date: Tue, 15 Sep 2020 15:07:47 +0800 Subject: [PATCH] [ISSUE #1473] Trace message`s clientHost was wrong (#1474) --- .../rocketmq/client/trace/TraceDataEncoder.java | 15 +++++++++++++-- .../trace/hook/ConsumeMessageTraceHookImpl.java | 2 ++ .../trace/hook/SendMessageTraceHookImpl.java | 1 + 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index 5a1afaf3..9569cc02 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -62,6 +62,14 @@ public class TraceDataEncoder { bean.setOffsetMsgId(line[12]); pubContext.setSuccess(Boolean.parseBoolean(line[13])); } + + // compatible with the old version + if (line.length >= 15) { + bean.setOffsetMsgId(line[12]); + pubContext.setSuccess(Boolean.parseBoolean(line[13])); + bean.setClientHost(line[14]); + } + pubContext.setTraceBeans(new ArrayList(1)); pubContext.getTraceBeans().add(bean); resList.add(pubContext); @@ -76,6 +84,7 @@ public class TraceDataEncoder { bean.setMsgId(line[5]); bean.setRetryTimes(Integer.parseInt(line[6])); bean.setKeys(line[7]); + bean.setClientHost(line[8]); subBeforeContext.setTraceBeans(new ArrayList(1)); subBeforeContext.getTraceBeans().add(bean); resList.add(subBeforeContext); @@ -130,7 +139,8 @@ public class TraceDataEncoder { .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// - .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); + .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR); } break; case SubBefore: { @@ -142,7 +152,8 @@ public class TraceDataEncoder { .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// - .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// + .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);// } } break; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java index f30b1211..4f6f916e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageTraceHookImpl.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.client.trace.hook; import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType; import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceContext; import org.apache.rocketmq.client.trace.TraceDispatcher; import org.apache.rocketmq.client.trace.TraceBean; @@ -73,6 +74,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook { traceBean.setStoreTime(msg.getStoreTimestamp());// traceBean.setBodyLength(msg.getStoreSize());// traceBean.setRetryTimes(msg.getReconsumeTimes());// + traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId()); traceContext.setRegionId(regionId);// beans.add(traceBean); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index 80c7babd..4feb2762 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -60,6 +60,7 @@ public class SendMessageTraceHookImpl implements SendMessageHook { traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); + traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId()); tuxeContext.getTraceBeans().add(traceBean); } -- GitLab