diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index 6a4f1fffe55404f2ae0feb4823a2ccc9479d2fd1..3a4f95a3ffd021fcb74625ac5402d203fd84ba30 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -49,6 +49,7 @@ mysql-5.x-plugin h2-1.x-plugin postgresql-8.x-plugin + rocketMQ-3.x-plugin rocketMQ-4.x-plugin elastic-job-2.x-plugin mongodb-2.x-plugin diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..b01c17209cd228ee310500e98ce55f76d49998fa --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml @@ -0,0 +1,68 @@ + + + + + + apm-sdk-plugin + org.apache.skywalking + 5.0.0-alpha-SNAPSHOT + + 4.0.0 + + apm-rocketmq-3.x-plugin + rocketMQ-3.x-plugin + + + UTF-8 + + + + + com.alibaba.rocketmq + rocketmq-client + 3.6.2.Final + provided + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + attach-sources + + jar + + + + + + + diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..0a28b1b0a08a8158892a1c907f5254a46c346155 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/AbstractMessageConsumeInterceptor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import java.util.List; +import com.alibaba.rocketmq.common.message.MessageExt; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +/** + * {@link AbstractMessageConsumeInterceptor} create entry span when the consumeMessage in the {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly} class. + * + * @author carlvine500 + */ +public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + List msgs = (List)allArguments[0]; + + ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0)); + AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier); + + span.setComponent(ComponentsDefine.ROCKET_MQ); + span.setLayer(SpanLayer.MQ); + for (int i = 1; i < msgs.size(); i++) { + ContextManager.extract(getContextCarrierFromMessage(msgs.get(i))); + } + + } + + @Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private ContextCarrier getContextCarrierFromMessage(MessageExt message) { + ContextCarrier contextCarrier = new ContextCarrier(); + + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + next.setHeadValue(message.getUserProperty(next.getHeadKey())); + } + + return contextCarrier; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..06e1eaf0cc39317731237b85627b1e281b176c07 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageConcurrentlyConsumeInterceptor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute. + * + * @author carlvine500 + */ +public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret; + if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) { + AbstractSpan activeSpan = ContextManager.activeSpan(); + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, status.name()); + } + ContextManager.stopSpan(); + return ret; + } +} + diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..f24ee47c89bf0296361cc24c02520d191abe318c --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageOrderlyConsumeInterceptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * {@link MessageOrderlyConsumeInterceptor} set the process status after the {@link + * com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method execute. + * + * @author carlvine500 + */ +public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor { + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + + ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret; + if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) { + AbstractSpan activeSpan = ContextManager.activeSpan(); + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, status.name()); + } + ContextManager.stopSpan(); + return ret; + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..2c6e6e1f2c8cb1086e9d7a3b5622face2e4ac393 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.CarrierItem; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; +import org.apache.skywalking.apm.util.StringUtil; + +import static com.alibaba.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR; +import static com.alibaba.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR; + +/** + * {@link MessageSendInterceptor} create exit span when the method {@link com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String, + * String, Message, com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, long, + * com.alibaba.rocketmq.client.impl.CommunicationMode, com.alibaba.rocketmq.client.producer.SendCallback, + * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, com.alibaba.rocketmq.client.impl.factory.MQClientInstance, + * int, com.alibaba.rocketmq.client.hook.SendMessageContext, com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)} + * execute. + * + * @author carlvine500 + */ +public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor { + + private static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + Message message = (Message)allArguments[2]; + ContextCarrier contextCarrier = new ContextCarrier(); + String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField()); + AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress); + span.setComponent(ComponentsDefine.ROCKET_MQ); + Tags.MQ_BROKER.set(span, (String)allArguments[0]); + Tags.MQ_TOPIC.set(span, message.getTopic()); + SpanLayer.asMQ(span); + + SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3]; + StringBuilder properties = new StringBuilder(requestHeader.getProperties()); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + if (!StringUtil.isEmpty(next.getHeadValue())) { + properties.append(next.getHeadKey()); + properties.append(NAME_VALUE_SEPARATOR); + properties.append(next.getHeadValue()); + properties.append(PROPERTY_SEPARATOR); + } + } + requestHeader.setProperties(properties.toString()); + + if (allArguments[6] != null) { + ((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture())); + } + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } + + private String buildOperationName(String topicName) { + return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer"; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..04a2203132227e32203573a91af8fde7137003e5 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; + +/** + * {@link OnExceptionInterceptor} create local span when the method {@link com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)} + * execute. + * + * @author carlvine500 + */ +public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor { + + private static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField(); + AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.ROCKET_MQ); + activeSpan.errorOccurred().log((Throwable)allArguments[0]); + ContextManager.continued(enhanceInfo.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..01c9b76b29d79caabaaad72f3783309c843a2012 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.client.producer.SendStatus; +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; + +/** + * {@link OnSuccessInterceptor} create local span when the method {@link com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(SendResult)} + * execute. + * + * @author carlvine500 + */ +public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor { + + public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/"; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField(); + AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.ROCKET_MQ); + SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus(); + if (sendStatus != SendStatus.SEND_OK) { + activeSpan.errorOccurred(); + Tags.STATUS_CODE.set(activeSpan, sendStatus.name()); + } + ContextManager.continued(enhanceInfo.getContextSnapshot()); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + ContextManager.stopSpan(); + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + ContextManager.activeSpan().errorOccurred().log(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..d09e211f18bcca2fb882e34c9df7fa592b46f7ea --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/UpdateNameServerInterceptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.lang.reflect.Method; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +public class UpdateNameServerInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + objInst.setSkyWalkingDynamicField(allArguments[0]); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + return ret; + } + + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..ab184c96a4afae93e6ad2bdf1f9e3785241c393d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageConcurrentlyInstrumentation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method by using {@link + * org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}. + * + * @author carlvine500 + */ +public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently"; + private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage"; + private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(CONSUMER_MESSAGE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..a14e12d1df093a0cf828104848b1b347353c5188 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/ConsumeMessageOrderlyInstrumentation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List, + * com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method by using {@link + * org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}. + * + * @author carlvine500 + */ +public class ConsumeMessageOrderlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly"; + private static final String ENHANCE_METHOD = "consumeMessage"; + private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageOrderlyConsumeInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD); + } + + @Override public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..0f9538c2f5b49ec7d2e612361fd6e5ff3eea7929 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/MQClientAPIImplInstrumentation.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * {@link MQClientAPIImplInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String, + * String, com.alibaba.rocketmq.common.message.Message, com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, + * long, com.alibaba.rocketmq.client.impl.CommunicationMode, com.alibaba.rocketmq.client.producer.SendCallback, + * com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, com.alibaba.rocketmq.client.impl.factory.MQClientInstance, + * int, com.alibaba.rocketmq.client.hook.SendMessageContext, com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)} + * method by using {@link org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor}. + * + * @author carlvine500 + */ +public class MQClientAPIImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.impl.MQClientAPIImpl"; + private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage"; + private static final String ASYNC_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor"; + private static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.UpdateNameServerInterceptor"; + private static final String UPDATE_NAME_SERVER_METHOD_NAME = "updateNameServerAddressList"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(SEND_MESSAGE_METHOD_NAME).and(takesArgumentWithType(6, "com.alibaba.rocketmq.client.producer.SendCallback")); + } + + @Override public String getMethodsInterceptor() { + return ASYNC_METHOD_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(UPDATE_NAME_SERVER_METHOD_NAME); + } + + @Override public String getMethodsInterceptor() { + return UPDATE_NAME_SERVER_INTERCEPT_CLASS; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..82c73b3ec7a1405286d170d5a1521c8b5b8183bb --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallBackEnhanceInfo.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; + +/** + * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link ContextSnapshot} instance for trace. + * + * @author carlvine500 + */ +public class SendCallBackEnhanceInfo { + private String topicId; + private ContextSnapshot contextSnapshot; + + public SendCallBackEnhanceInfo(String topicId, ContextSnapshot contextSnapshot) { + this.topicId = topicId; + this.contextSnapshot = contextSnapshot; + } + + public String getTopicId() { + return topicId; + } + + public ContextSnapshot getContextSnapshot() { + return contextSnapshot; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..1955d12fbcfe1a9e4222e29616bba798c53527c9 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/define/SendCallbackInstrumentation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.rocketMQ.v3.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; + +/** + * {@link SendCallbackInstrumentation} intercepts {@link com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(com.alibaba.rocketmq.client.producer.SendResult sendResult)} + * method by using {@link org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor} and also intercepts {@link + * com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)} by using {@link + * org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor}. + * + * @author carlvine500 + */ +public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.producer.SendCallback"; + private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess"; + private static final String ON_SUCCESS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor"; + private static final String ON_EXCEPTION_METHOD = "onException"; + private static final String ON_EXCEPTION_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor"; + + @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[] { + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, "com.alibaba.rocketmq.client.producer.SendResult")); + } + + @Override public String getMethodsInterceptor() { + return ON_SUCCESS_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override public ElementMatcher getMethodsMatcher() { + return named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable")); + } + + @Override public String getMethodsInterceptor() { + return ON_EXCEPTION_INTERCEPTOR; + } + + @Override public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override protected ClassMatch enhanceClass() { + return byHierarchyMatch(new String[] {ENHANCE_CLASS}); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 0000000000000000000000000000000000000000..8a0f31712cf1809aeb2a499a76206dfb92ef93cd --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,4 @@ +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageConcurrentlyInstrumentation +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageOrderlyInstrumentation +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.MQClientAPIImplInstrumentation +rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallbackInstrumentation diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6e68a5b925a11991b616f7ee27fcc015e95e803d --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptorTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.util.List; +import com.alibaba.rocketmq.client.impl.CommunicationMode; +import com.alibaba.rocketmq.common.message.Message; +import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class MessageSendInterceptorTest { + + private MessageSendInterceptor messageSendInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + private Object[] arguments; + + private Object[] argumentsWithoutCallback; + + @Mock + private Message message; + + @Mock + private SendMessageRequestHeader messageRequestHeader; + + @Mock + private EnhancedInstance callBack; + + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + messageSendInterceptor = new MessageSendInterceptor(); + enhancedInstance = new EnhancedInstance() { + @Override public Object getSkyWalkingDynamicField() { + return "127.0.0.1:6543"; + } + + @Override public void setSkyWalkingDynamicField(Object value) { + + } + }; + + arguments = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, callBack}; + argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, null}; + when(messageRequestHeader.getProperties()).thenReturn(""); + when(message.getTags()).thenReturn("TagA"); + } + + @Test + public void testSendMessage() throws Throwable { + messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null); + messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan mqSpan = spans.get(0); + + SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); + SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); + verify(messageRequestHeader, times(1)).setProperties(anyString()); + verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any()); + } + + @Test + public void testSendMessageWithoutCallBack() throws Throwable { + messageSendInterceptor.beforeMethod(enhancedInstance, null, argumentsWithoutCallback, null, null); + messageSendInterceptor.afterMethod(enhancedInstance, null, argumentsWithoutCallback, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan mqSpan = spans.get(0); + + SpanAssert.assertLayer(mqSpan, SpanLayer.MQ); + SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertTag(mqSpan, 0, "127.0.0.1"); + verify(messageRequestHeader, times(1)).setProperties(anyString()); + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a66d5e4b18e587c32d70ae95e632052f21aa38cd --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnExceptionInterceptorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.util.List; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.helper.SpanHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class OnExceptionInterceptorTest { + + private OnExceptionInterceptor exceptionInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private ContextSnapshot contextSnapshot; + private SendCallBackEnhanceInfo enhanceInfo; + + @Mock + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + exceptionInterceptor = new OnExceptionInterceptor(); + + enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + } + + @Test + public void testOnException() throws Throwable { + exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null); + exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan exceptionSpan = spans.get(0); + SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0), RuntimeException.class); + SpanAssert.assertOccurException(exceptionSpan, true); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ef8e939865138d441d1ea6d77f7e4b8f79286e88 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/OnSuccessInterceptorTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +package org.apache.skywalking.apm.plugin.rocketMQ.v3; + +import java.util.List; +import com.alibaba.rocketmq.client.producer.SendResult; +import com.alibaba.rocketmq.client.producer.SendStatus; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; +import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; +import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; +import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; +import org.apache.skywalking.apm.agent.test.tools.SpanAssert; +import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.modules.junit4.PowerMockRunnerDelegate; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PowerMockRunnerDelegate(TracingSegmentRunner.class) +public class OnSuccessInterceptorTest { + + private OnSuccessInterceptor successInterceptor; + + @SegmentStoragePoint + private SegmentStorage segmentStorage; + + @Rule + public AgentServiceRule serviceRule = new AgentServiceRule(); + + @Mock + private ContextSnapshot contextSnapshot; + @Mock + private SendResult sendResult; + + private SendCallBackEnhanceInfo enhanceInfo; + + @Mock + private EnhancedInstance enhancedInstance; + + @Before + public void setUp() { + successInterceptor = new OnSuccessInterceptor(); + + enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot); + when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo); + when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK); + } + + @Test + public void testOnSuccess() throws Throwable { + successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan successSpan = spans.get(0); + + SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ); + + } + + @Test + public void testOnSuccessWithErrorStatus() throws Throwable { + when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT); + successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null); + + assertThat(segmentStorage.getTraceSegments().size(), is(1)); + TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0); + List spans = SegmentHelper.getSpans(traceSegment); + assertThat(spans.size(), is(1)); + + AbstractTracingSpan successSpan = spans.get(0); + + SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ); + SpanAssert.assertOccurException(successSpan, true); + + } + +}