diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java index 54f660ed01ad87067aa95d75d9474bd6cab4e57a..e8802d472446e0f81495a05195ae670dd12893dd 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/tag/Tags.java @@ -59,6 +59,16 @@ public final class Tags { */ public static final StringTag DB_BIND_VARIABLES = new StringTag("db.bind_vars"); + /** + * MQ_BROKER records the broker address of message-middleware + */ + public static final StringTag MQ_BROKER = new StringTag("mq.broker"); + + /** + * MQ_TOPIC records the topic name of message-middleware + */ + public static final StringTag MQ_TOPIC = new StringTag("mq.topic"); + public static final class HTTP { public static final StringTag METHOD = new StringTag("http.method"); } diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java index 05eadfc52ea6d3626874727f93b53811ff896081..b392fcb65c1ae84d3b3c8394a9dd8c0ad51146d4 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java @@ -20,12 +20,12 @@ package org.apache.skywalking.apm.plugin.rocketMQ.v4; import java.lang.reflect.Method; -import org.apache.rocketmq.client.impl.CommunicationMode; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.skywalking.apm.agent.core.context.CarrierItem; import org.apache.skywalking.apm.agent.core.context.ContextCarrier; import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; @@ -60,10 +60,9 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField()); AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress); span.setComponent(ComponentsDefine.ROCKET_MQ); + Tags.MQ_BROKER.set(span, (String)allArguments[0]); + Tags.MQ_TOPIC.set(span, message.getTopic()); SpanLayer.asMQ(span); - span.tag("brokerName", (String)allArguments[1]); - span.tag("tags", message.getTags()); - span.tag("communication.mode", ((CommunicationMode)allArguments[5]).name()); SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3]; StringBuilder properties = new StringBuilder(requestHeader.getProperties()); diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java index c9abeb57bb4d157d1778ffdb8a1c8576942e8047..52e32963b9dca946ebb1deaad0a95a96d2947a7a 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java @@ -110,8 +110,7 @@ public class MessageSendInterceptorTest { SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); - SpanAssert.assertTag(mqSpan, 0, "test"); - SpanAssert.assertTag(mqSpan, 1, "TagA"); + SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); verify(messageRequestHeader, times(1)).setProperties(anyString()); verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any()); } @@ -130,8 +129,7 @@ public class MessageSendInterceptorTest { SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); - SpanAssert.assertTag(mqSpan, 0, "test"); - SpanAssert.assertTag(mqSpan, 1, "TagA"); + SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); verify(messageRequestHeader, times(1)).setProperties(anyString()); }