From 30dbdc82645ef4a538dffb125d9bddaf063cc47e Mon Sep 17 00:00:00 2001 From: ascrutae Date: Sun, 5 Nov 2017 08:20:56 +0800 Subject: [PATCH] support rocket mq plugin --- .../trace/component/ComponentsDefine.java | 5 +- apm-sniffer/apm-sdk-plugin/pom.xml | 1 + .../rocketMQ-4.x-plugin/pom.xml | 68 ++++++++++ .../AbstractMessageConsumeInterceptor.java | 77 +++++++++++ ...MessageConcurrentlyConsumeInterceptor.java | 50 +++++++ .../MessageOrderlyConsumeInterceptor.java | 51 +++++++ .../rocketMQ/v4/MessageSendInterceptor.java | 100 ++++++++++++++ .../rocketMQ/v4/OnExceptionInterceptor.java | 61 +++++++++ .../rocketMQ/v4/OnSuccessInterceptor.java | 68 ++++++++++ ...umeMessageConcurrentlyInstrumentation.java | 68 ++++++++++ .../ConsumeMessageOrderlyInstrumentation.java | 68 ++++++++++ .../MQClientAPIImplInstrumentation.java | 73 ++++++++++ .../v4/define/SendCallBackEnhanceInfo.java | 44 ++++++ .../define/SendCallbackInstrumentation.java | 87 ++++++++++++ .../src/main/resources/skywalking-plugin.def | 4 + .../v4/MessageSendInterceptorTest.java | 126 ++++++++++++++++++ .../v4/OnExceptionInterceptorTest.java | 87 ++++++++++++ .../rocketMQ/v4/OnSuccessInterceptorTest.java | 113 ++++++++++++++++ 18 files changed, 1150 insertions(+), 1 deletion(-) create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/AbstractMessageConsumeInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageConcurrentlyConsumeInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageOrderlyConsumeInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java create mode 100644 apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java diff --git a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java index 0151346e8..9819e74c9 100644 --- a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java +++ b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java @@ -71,6 +71,8 @@ public class ComponentsDefine { public static final OfficialComponent GRPC = new OfficialComponent(23, "GRPC"); + public static final OfficialComponent ROCKET_MQ = new OfficialComponent(24, "RocketMQ"); + private static ComponentsDefine instance = new ComponentsDefine(); private String[] components; @@ -80,7 +82,7 @@ public class ComponentsDefine { } public ComponentsDefine() { - components = new String[24]; + components = new String[25]; addComponent(TOMCAT); addComponent(HTTPCLIENT); addComponent(DUBBO); @@ -104,6 +106,7 @@ public class ComponentsDefine { addComponent(SHARDING_JDBC); addComponent(POSTGRESQL); addComponent(GRPC); + addComponent(ROCKET_MQ); } private void addComponent(OfficialComponent component) { diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index bae33b837..65dcca1bd 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -52,6 +52,7 @@ h2-1.x-plugin postgresql-8.x-plugin oracle-10.x-plugin + rocketMQ-4.x-plugin pom diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml new file mode 100644 index 000000000..8ec9e4c08 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml @@ -0,0 +1,68 @@ + + + + + + apm-sdk-plugin + org.skywalking + 3.2.3-2017 + + 4.0.0 + + apm-rocketmq-4.x-plugin + rocketMQ-4.x-plugin + + + UTF-8 + + + + + org.apache.rocketmq + rocketmq-client + 4.1.0-incubating + provided + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + attach-sources + + jar + + + + + + + diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/AbstractMessageConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/AbstractMessageConsumeInterceptor.java new file mode 100644 index 000000000..04af5c96a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/AbstractMessageConsumeInterceptor.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.apache.rocketmq.common.message; + +import java.lang.reflect.Method; +import java.util.List; +import org.skywalking.apm.agent.core.context.CarrierItem; +import org.skywalking.apm.agent.core.context.ContextCarrier; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.skywalking.apm.network.trace.component.ComponentsDefine; + +/** + * {@link AbstractMessageConsumeInterceptor} create entry span when the consumeMessage in the {@link + * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link + * org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly} class. + * + * @author zhangxin + */ +public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String COMSUMER_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + List msgs = (List)allArguments[0]; + + ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0)); + AbstractSpan span = ContextManager.createEntrySpan(COMSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier); + + span.setComponent(ComponentsDefine.ROCKET_MQ); + span.setLayer(SpanLayer.MQ); + for (int i = 1; i < msgs.size(); i++) { + ContextManager.extract(getContextCarrierFromMessage(msgs.get(i))); + } + + } + + @Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private ContextCarrier getContextCarrierFromMessage(MessageExt message) { + ContextCarrier contextCarrier = new ContextCarrier(); + + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + next.setHeadValue(message.getUserProperty(next.getHeadKey())); + } + + return contextCarrier; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageConcurrentlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageConcurrentlyConsumeInterceptor.java new file mode 100644 index 000000000..fb5725228 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageConcurrentlyConsumeInterceptor.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.apache.rocketmq.common.message; + +import java.lang.reflect.Method; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.tag.Tags; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link + * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List, + * org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute. + * + * @author zhang xin + */ +public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret; + if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) { + AbstractSpan activeSpan = ContextManager.activeSpan(); + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, status.name()); + } + ContextManager.stopSpan(); + return ret; + } +} + diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageOrderlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageOrderlyConsumeInterceptor.java new file mode 100644 index 000000000..dcf94717e --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageOrderlyConsumeInterceptor.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.apache.rocketmq.common.message; + +import java.lang.reflect.Method; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.tag.Tags; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * {@link MessageOrderlyConsumeInterceptor} set the process status after the {@link + * org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List, + * org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method execute. + * + * @author zhang xin + */ +public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + + ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret; + if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) { + AbstractSpan activeSpan = ContextManager.activeSpan(); + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, status.name()); + } + ContextManager.stopSpan(); + return ret; + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java new file mode 100644 index 000000000..b85165856 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java @@ -0,0 +1,100 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4; + +import java.lang.reflect.Method; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.skywalking.apm.agent.core.context.CarrierItem; +import org.skywalking.apm.agent.core.context.ContextCarrier; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.skywalking.apm.network.trace.component.ComponentsDefine; +import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo; +import org.skywalking.apm.util.StringUtil; + +import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + +/** + * {@link MessageSendInterceptor} create exit span when the method {@link org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String, + * String, Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, long, + * org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback, + * org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance, + * int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)} + * execute. + * + * @author zhang xin + */ +public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + Message message = (Message)allArguments[2]; + ContextCarrier contextCarrier = new ContextCarrier(); + AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, (String)allArguments[0]); + span.setComponent(ComponentsDefine.ROCKET_MQ); + span.setLayer(SpanLayer.MQ); + span.tag("brokerName", (String)allArguments[1]); + span.tag("tags", message.getTags()); + span.tag("communication.mode", ((CommunicationMode)allArguments[5]).name()); + + SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3]; + StringBuilder properties = new StringBuilder(requestHeader.getProperties()); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + if (!StringUtil.isEmpty(next.getHeadValue())) { + properties.append(next.getHeadKey()); + properties.append(NAME_VALUE_SEPARATOR); + properties.append(next.getHeadValue()); + properties.append(PROPERTY_SEPARATOR); + } + } + requestHeader.setProperties(properties.toString()); + + if (allArguments[6] != null) { + ((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture())); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private String buildOperationName(String topicName) { + return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer"; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java new file mode 100644 index 000000000..067303538 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4; + +import java.lang.reflect.Method; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.skywalking.apm.network.trace.component.ComponentsDefine; +import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo; + +/** + * {@link OnExceptionInterceptor} create local span when the method {@link org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)} + * execute. + * + * @author zhang xin + */ +public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField(); + AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.ROCKET_MQ); + activeSpan.errorOccurred().log((Throwable)allArguments[0]); + ContextManager.continued(enhanceInfo.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java new file mode 100644 index 000000000..cf927d46b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4; + +import java.lang.reflect.Method; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.tag.Tags; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.skywalking.apm.network.trace.component.ComponentsDefine; +import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo; + +/** + * {@link OnSuccessInterceptor} create local span when the method {@link org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)} + * execute. + * + * @author zhang xin + */ +public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField(); + AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.ROCKET_MQ); + SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus(); + if (sendStatus != SendStatus.SEND_OK) { + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, sendStatus.name()); + } + ContextManager.continued(enhanceInfo.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java new file mode 100644 index 000000000..225471878 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List, + * org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method by using {@link + * org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor}. + * + * @author zhang xin + */ +public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently"; + private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage"; + private static final String INTERCEPTOR_CLASS = "org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(CONSUMER_MESSAGE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java new file mode 100644 index 000000000..78984cd66 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List, + * org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method by using {@link + * org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor}. + * + * @author zhang xin + */ +public class ConsumeMessageOrderlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly"; + private static final String ENHANCE_METHOD = "consumeMessage"; + private static final String INTERCEPTOR_CLASS = "org.apache.rocketmq.common.message.MessageOrderlyConsumeInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java new file mode 100644 index 000000000..582e31134 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * {@link MQClientAPIImplInstrumentation} intercepts the {@link org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String, + * String, org.apache.rocketmq.common.message.Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, + * long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback, + * org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance, + * int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)} + * method by using {@link org.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor}. + * + * @author zhang xin + */ +public class MQClientAPIImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.MQClientAPIImpl"; + private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage"; + private static final String ASYNC_METHOD_INTERCEPTOR = "org.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(SEND_MESSAGE_METHOD_NAME).and(takesArguments(12)); + } + + @Override public String getMethodsInterceptor() { + return ASYNC_METHOD_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java new file mode 100644 index 000000000..c5fb21728 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java @@ -0,0 +1,44 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4.define; + +import org.skywalking.apm.agent.core.context.ContextSnapshot; + +/** + * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link ContextSnapshot} instance for trace. + * + * @author zhang xin + */ +public class SendCallBackEnhanceInfo { + private String topicId; + private ContextSnapshot contextSnapshot; + + public SendCallBackEnhanceInfo(String topicId, ContextSnapshot contextSnapshot) { + this.topicId = topicId; + this.contextSnapshot = contextSnapshot; + } + + public String getTopicId() { + return topicId; + } + + public ContextSnapshot getContextSnapshot() { + return contextSnapshot; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java new file mode 100644 index 000000000..d63c0a3d7 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.producer.SendResult; +import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link SendCallbackInstrumentation} intercepts {@link org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)} + * method by using {@link org.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor} and also intercepts {@link + * org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)} by using {@link + * org.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor}. + * + * @author zhang xin + */ +public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.producer.SendCallback"; + private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess"; + private static final String ON_SUCCESS_INTERCEPTOR = "org.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor"; + private static final String ON_EXCEPTION_METHOD = "onException"; + private static final String ON_EXCEPTION_INTERCEPTOR = "org.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, "org.apache.rocketmq.client.producer.SendResult")); + } + + @Override public String getMethodsInterceptor() { + return ON_SUCCESS_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable")); + } + + @Override public String getMethodsInterceptor() { + return ON_EXCEPTION_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 000000000..1accc2bff --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,4 @@ +rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageConcurrentlyInstrumentation +rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation +rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation +rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java new file mode 100644 index 000000000..8bc0a0a72 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4; + +import java.util.List; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.test.helper.SegmentHelper; +import org.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.skywalking.apm.agent.test.tools.SegmentStorage; +import org.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.skywalking.apm.agent.test.tools.SpanAssert; +import org.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.skywalking.apm.network.trace.component.ComponentsDefine; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class MessageSendInterceptorTest { + + private MessageSendInterceptor messageSendInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + private Object[] arguments; + + private Object[] argumentsWithoutCallback; + + @Mock + private Message message; + + @Mock + private SendMessageRequestHeader messageRequestHeader; + + @Mock + private EnhancedInstance callBack; + + @Before + public void setUp() { + messageSendInterceptor = new MessageSendInterceptor(); + + arguments = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, callBack}; + argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, null}; + when(messageRequestHeader.getProperties()).thenReturn(""); + when(message.getTags()).thenReturn("TagA"); + } + + @Test + public void testSendMessage() throws Throwable { + messageSendInterceptor.beforeMethod(null, null, arguments, null, null); + messageSendInterceptor.afterMethod(null, null, arguments, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan mqSpan = spans.get(0); + + SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); + SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertTag(mqSpan, 0, "test"); + SpanAssert.assertTag(mqSpan, 1, "TagA"); + verify(messageRequestHeader, times(1)).setProperties(anyString()); + verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any()); + } + + @Test + public void testSendMessageWithoutCallBack() throws Throwable { + messageSendInterceptor.beforeMethod(null, null, argumentsWithoutCallback, null, null); + messageSendInterceptor.afterMethod(null, null, argumentsWithoutCallback, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan mqSpan = spans.get(0); + + SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); + SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertTag(mqSpan, 0, "test"); + SpanAssert.assertTag(mqSpan, 1, "TagA"); + verify(messageRequestHeader, times(1)).setProperties(anyString()); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java new file mode 100644 index 000000000..bc749493a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4; + +import java.util.List; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.skywalking.apm.agent.core.context.ContextSnapshot; +import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.test.helper.SegmentHelper; +import org.skywalking.apm.agent.test.helper.SpanHelper; +import org.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.skywalking.apm.agent.test.tools.SegmentStorage; +import org.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.skywalking.apm.agent.test.tools.SpanAssert; +import org.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class OnExceptionInterceptorTest { + + private OnExceptionInterceptor exceptionInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private ContextSnapshot contextSnapshot; + private SendCallBackEnhanceInfo enhanceInfo; + + @Mock + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + exceptionInterceptor = new OnExceptionInterceptor(); + + enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + } + + @Test + public void testOnException() throws Throwable { + exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null); + exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan exceptionSpan = spans.get(0); + SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0), RuntimeException.class); + SpanAssert.assertOccurException(exceptionSpan, true); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java new file mode 100644 index 000000000..5fcbb0cf2 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.plugin.rocketMQ.v4; + +import java.util.List; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.skywalking.apm.agent.core.context.ContextSnapshot; +import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.skywalking.apm.agent.test.helper.SegmentHelper; +import org.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.skywalking.apm.agent.test.tools.SegmentStorage; +import org.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.skywalking.apm.agent.test.tools.SpanAssert; +import org.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.skywalking.apm.network.trace.component.ComponentsDefine; +import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class OnSuccessInterceptorTest { + + private OnSuccessInterceptor successInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private ContextSnapshot contextSnapshot; + @Mock + private SendResult sendResult; + + private SendCallBackEnhanceInfo enhanceInfo; + + @Mock + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + successInterceptor = new OnSuccessInterceptor(); + + enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK); + } + + @Test + public void testOnSuccess() throws Throwable { + successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan successSpan = spans.get(0); + + SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ); + + } + + @Test + public void testOnSuccessWithErrorStatus() throws Throwable { + when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT); + successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan successSpan = spans.get(0); + + SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertOccurException(successSpan, true); + + } + +} -- GitLab