From 4acbca00539a360b05efb4a886d04fed2fd581e7 Mon Sep 17 00:00:00 2001 From: Ax1an Date: Fri, 8 Jan 2021 00:27:10 +0800 Subject: [PATCH] Fix bug that rocketmq-plugin set the wrong tag. (#6144) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix bug that rocketmq-plugin set the wrong tag. * Update CHANGES.md Co-authored-by: 吴晟 Wu Sheng --- CHANGES.md | 1 + .../skywalking/apm/agent/core/context/tag/Tags.java | 11 ++++++++--- .../v3/MessageConcurrentlyConsumeInterceptor.java | 2 +- .../rocketMQ/v3/MessageOrderlyConsumeInterceptor.java | 2 +- .../apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java | 2 +- .../v4/MessageConcurrentlyConsumeInterceptor.java | 2 +- .../rocketMQ/v4/MessageOrderlyConsumeInterceptor.java | 2 +- .../apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java | 2 +- 8 files changed, 15 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d63f0cfb2c..982c9bdf0f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -26,6 +26,7 @@ Release Notes. * Add Dolphinscheduler plugin definition. * Make sampling still works when the trace ignores plug-in activation. * Fix mssql-plugin occur ClassCastException when call the method of return generate key. +* Fix bug that rocketmq-plugin set the wrong tag. #### OAP-Backend * Make meter receiver support MAL. diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java index 291dfae611..a95d4dbf6b 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java @@ -63,20 +63,25 @@ public final class Tags { public static final StringTag DB_BIND_VARIABLES = new StringTag(6, "db.bind_vars"); /** - * MQ_QUEUE records the queue name of message-middleware + * MQ_QUEUE records the queue name of message-middleware. */ public static final StringTag MQ_QUEUE = new StringTag(7, "mq.queue"); /** - * MQ_BROKER records the broker address of message-middleware + * MQ_BROKER records the broker address of message-middleware. */ public static final StringTag MQ_BROKER = new StringTag(8, "mq.broker"); /** - * MQ_TOPIC records the topic name of message-middleware + * MQ_TOPIC records the topic name of message-middleware. */ public static final StringTag MQ_TOPIC = new StringTag(9, "mq.topic"); + /** + * MQ_STATUS records the send/consume message status of message-middleware. + */ + public static final StringTag MQ_STATUS = new StringTag(16, "mq_status"); + /** * The latency of transmission. When there are more than one downstream parent/segment-ref(s), multiple tags will be * recorded, such as a batch consumption in MQ. diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java index 3188e3a4a6..deab82eb1c 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java @@ -39,7 +39,7 @@ public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsum if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) { AbstractSpan activeSpan = ContextManager.activeSpan(); activeSpan.errorOccurred(); - Tags.STATUS_CODE.set(activeSpan, status.name()); + Tags.MQ_STATUS.set(activeSpan, status.name()); } ContextManager.stopSpan(); return ret; diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java index f2abdfe597..503c000461 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java @@ -40,7 +40,7 @@ public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInte if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) { AbstractSpan activeSpan = ContextManager.activeSpan(); activeSpan.errorOccurred(); - Tags.STATUS_CODE.set(activeSpan, status.name()); + Tags.MQ_STATUS.set(activeSpan, status.name()); } ContextManager.stopSpan(); return ret; diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java index 968955177a..6b66e46263 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java @@ -47,7 +47,7 @@ public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor { SendStatus sendStatus = ((SendResult) allArguments[0]).getSendStatus(); if (sendStatus != SendStatus.SEND_OK) { activeSpan.errorOccurred(); - Tags.STATUS_CODE.set(activeSpan, sendStatus.name()); + Tags.MQ_STATUS.set(activeSpan, sendStatus.name()); } ContextManager.continued(enhanceInfo.getContextSnapshot()); } diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java index 64693580db..23b3ca39b0 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java @@ -39,7 +39,7 @@ public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsum if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) { AbstractSpan activeSpan = ContextManager.activeSpan(); activeSpan.errorOccurred(); - Tags.STATUS_CODE.set(activeSpan, status.name()); + Tags.MQ_STATUS.set(activeSpan, status.name()); } ContextManager.stopSpan(); return ret; diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java index 70b1150996..68a39cbb23 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java @@ -40,7 +40,7 @@ public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInte if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) { AbstractSpan activeSpan = ContextManager.activeSpan(); activeSpan.errorOccurred(); - Tags.STATUS_CODE.set(activeSpan, status.name()); + Tags.MQ_STATUS.set(activeSpan, status.name()); } ContextManager.stopSpan(); return ret; diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java index 1fc80c44c8..0b37226a1f 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java @@ -47,7 +47,7 @@ public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor { SendStatus sendStatus = ((SendResult) allArguments[0]).getSendStatus(); if (sendStatus != SendStatus.SEND_OK) { activeSpan.errorOccurred(); - Tags.STATUS_CODE.set(activeSpan, sendStatus.name()); + Tags.MQ_STATUS.set(activeSpan, sendStatus.name()); } ContextManager.continued(enhanceInfo.getContextSnapshot()); } -- GitLab