提交 f50c5b1c 编写于 作者: C carlvine

add rocketMQ-3.x-plugin

上级 3298c383
......@@ -49,6 +49,7 @@
<module>mysql-5.x-plugin</module>
<module>h2-1.x-plugin</module>
<module>postgresql-8.x-plugin</module>
<module>rocketMQ-3.x-plugin</module>
<module>rocketMQ-4.x-plugin</module>
<module>elastic-job-2.x-plugin</module>
<module>mongodb-2.x-plugin</module>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-rocketmq-3.x-plugin</artifactId>
<name>rocketMQ-3.x-plugin</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.6.2.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import java.lang.reflect.Method;
import java.util.List;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
* com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link
* com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
*
* @author carlvine500
*/
public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {
public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
@Override
public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
List<MessageExt> msgs = (List<MessageExt>)allArguments[0];
ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);
span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
for (int i = 1; i < msgs.size(); i++) {
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
}
}
@Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(message.getUserProperty(next.getHeadKey()));
}
return contextCarrier;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
/**
* {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link
* com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
* com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute.
*
* @author carlvine500
*/
public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret;
if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
Tags.STATUS_CODE.set(activeSpan, status.name());
}
ContextManager.stopSpan();
return ret;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import java.lang.reflect.Method;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
/**
* {@link MessageOrderlyConsumeInterceptor} set the process status after the {@link
* com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
* com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method execute.
*
* @author carlvine500
*/
public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret;
if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
Tags.STATUS_CODE.set(activeSpan, status.name());
}
ContextManager.stopSpan();
return ret;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
import org.apache.skywalking.apm.util.StringUtil;
import static com.alibaba.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static com.alibaba.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
/**
* {@link MessageSendInterceptor} create exit span when the method {@link com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
* String, Message, com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader, long,
* com.alibaba.rocketmq.client.impl.CommunicationMode, com.alibaba.rocketmq.client.producer.SendCallback,
* com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, com.alibaba.rocketmq.client.impl.factory.MQClientInstance,
* int, com.alibaba.rocketmq.client.hook.SendMessageContext, com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
* execute.
*
* @author carlvine500
*/
public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {
private static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Message message = (Message)allArguments[2];
ContextCarrier contextCarrier = new ContextCarrier();
String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());
AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
span.tag("brokerName", (String)allArguments[1]);
span.tag("tags", message.getTags());
span.tag("communication.mode", ((CommunicationMode)allArguments[5]).name());
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3];
StringBuilder properties = new StringBuilder(requestHeader.getProperties());
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
properties.append(next.getHeadKey());
properties.append(NAME_VALUE_SEPARATOR);
properties.append(next.getHeadValue());
properties.append(PROPERTY_SEPARATOR);
}
}
requestHeader.setProperties(properties.toString());
if (allArguments[6] != null) {
((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture()));
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
private String buildOperationName(String topicName) {
return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
/**
* {@link OnExceptionInterceptor} create local span when the method {@link com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)}
* execute.
*
* @author carlvine500
*/
public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor {
private static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
activeSpan.errorOccurred().log((Throwable)allArguments[0]);
ContextManager.continued(enhanceInfo.getContextSnapshot());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
/**
* {@link OnSuccessInterceptor} create local span when the method {@link com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
* execute.
*
* @author carlvine500
*/
public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor {
public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus();
if (sendStatus != SendStatus.SEND_OK) {
activeSpan.errorOccurred();
Tags.STATUS_CODE.set(activeSpan, sendStatus.name());
}
ContextManager.continued(enhanceInfo.getContextSnapshot());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class UpdateNameServerInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
objInst.setSkyWalkingDynamicField(allArguments[0]);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
/**
* copy from {@link com.alibaba.rocketmq.common.MQVersion.Version}(rocketmq-client:3.6.2.Final) ,
* to make sure all version exists in low-version rocketMQ-client-3.x
*/
public enum Version {
V3_0_0_SNAPSHOT,
V3_0_0_ALPHA1,
V3_0_0_BETA1,
V3_0_0_BETA2,
V3_0_0_BETA3,
V3_0_0_BETA4,
V3_0_0_BETA5,
V3_0_0_BETA6_SNAPSHOT,
V3_0_0_BETA6,
V3_0_0_BETA7_SNAPSHOT,
V3_0_0_BETA7,
V3_0_0_BETA8_SNAPSHOT,
V3_0_0_BETA8,
V3_0_0_BETA9_SNAPSHOT,
V3_0_0_BETA9,
V3_0_0_FINAL,
V3_0_1_SNAPSHOT,
V3_0_1,
V3_0_2_SNAPSHOT,
V3_0_2,
V3_0_3_SNAPSHOT,
V3_0_3,
V3_0_4_SNAPSHOT,
V3_0_4,
V3_0_5_SNAPSHOT,
V3_0_5,
V3_0_6_SNAPSHOT,
V3_0_6,
V3_0_7_SNAPSHOT,
V3_0_7,
V3_0_8_SNAPSHOT,
V3_0_8,
V3_0_9_SNAPSHOT,
V3_0_9,
V3_0_10_SNAPSHOT,
V3_0_10,
V3_0_11_SNAPSHOT,
V3_0_11,
V3_0_12_SNAPSHOT,
V3_0_12,
V3_0_13_SNAPSHOT,
V3_0_13,
V3_0_14_SNAPSHOT,
V3_0_14,
V3_0_15_SNAPSHOT,
V3_0_15,
V3_1_0_SNAPSHOT,
V3_1_0,
V3_1_1_SNAPSHOT,
V3_1_1,
V3_1_2_SNAPSHOT,
V3_1_2,
V3_1_3_SNAPSHOT,
V3_1_3,
V3_1_4_SNAPSHOT,
V3_1_4,
V3_1_5_SNAPSHOT,
V3_1_5,
V3_1_6_SNAPSHOT,
V3_1_6,
V3_1_7_SNAPSHOT,
V3_1_7,
V3_1_8_SNAPSHOT,
V3_1_8,
V3_1_9_SNAPSHOT,
V3_1_9,
V3_2_0_SNAPSHOT,
V3_2_0,
V3_2_1_SNAPSHOT,
V3_2_1,
V3_2_2_SNAPSHOT,
V3_2_2,
V3_2_3_SNAPSHOT,
V3_2_3,
V3_2_4_SNAPSHOT,
V3_2_4,
V3_2_5_SNAPSHOT,
V3_2_5,
V3_2_6_SNAPSHOT,
V3_2_6,
V3_2_7_SNAPSHOT,
V3_2_7,
V3_2_8_SNAPSHOT,
V3_2_8,
V3_2_9_SNAPSHOT,
V3_2_9,
V3_3_1_SNAPSHOT,
V3_3_1,
V3_3_2_SNAPSHOT,
V3_3_2,
V3_3_3_SNAPSHOT,
V3_3_3,
V3_3_4_SNAPSHOT,
V3_3_4,
V3_3_5_SNAPSHOT,
V3_3_5,
V3_3_6_SNAPSHOT,
V3_3_6,
V3_3_7_SNAPSHOT,
V3_3_7,
V3_3_8_SNAPSHOT,
V3_3_8,
V3_3_9_SNAPSHOT,
V3_3_9,
V3_4_1_SNAPSHOT,
V3_4_1,
V3_4_2_SNAPSHOT,
V3_4_2,
V3_4_3_SNAPSHOT,
V3_4_3,
V3_4_4_SNAPSHOT,
V3_4_4,
V3_4_5_SNAPSHOT,
V3_4_5,
V3_4_6_SNAPSHOT,
V3_4_6,
V3_4_7_SNAPSHOT,
V3_4_7,
V3_4_8_SNAPSHOT,
V3_4_8,
V3_4_9_SNAPSHOT,
V3_4_9,
V3_5_1_SNAPSHOT,
V3_5_1,
V3_5_2_SNAPSHOT,
V3_5_2,
V3_5_3_SNAPSHOT,
V3_5_3,
V3_5_4_SNAPSHOT,
V3_5_4,
V3_5_5_SNAPSHOT,
V3_5_5,
V3_5_6_SNAPSHOT,
V3_5_6,
V3_5_7_SNAPSHOT,
V3_5_7,
V3_5_8_SNAPSHOT,
V3_5_8,
V3_5_9_SNAPSHOT,
V3_5_9,
V3_6_1_SNAPSHOT,
V3_6_1,
V3_6_2_SNAPSHOT,
V3_6_2,
V3_6_3_SNAPSHOT,
V3_6_3,
V3_6_4_SNAPSHOT,
V3_6_4,
V3_6_5_SNAPSHOT,
V3_6_5,
V3_6_6_SNAPSHOT,
V3_6_6,
V3_6_7_SNAPSHOT,
V3_6_7,
V3_6_8_SNAPSHOT,
V3_6_8,
V3_6_9_SNAPSHOT,
V3_6_9,
V3_7_1_SNAPSHOT,
V3_7_1,
V3_7_2_SNAPSHOT,
V3_7_2,
V3_7_3_SNAPSHOT,
V3_7_3,
V3_7_4_SNAPSHOT,
V3_7_4,
V3_7_5_SNAPSHOT,
V3_7_5,
V3_7_6_SNAPSHOT,
V3_7_6,
V3_7_7_SNAPSHOT,
V3_7_7,
V3_7_8_SNAPSHOT,
V3_7_8,
V3_7_9_SNAPSHOT,
V3_7_9,
V3_8_1_SNAPSHOT,
V3_8_1,
V3_8_2_SNAPSHOT,
V3_8_2,
V3_8_3_SNAPSHOT,
V3_8_3,
V3_8_4_SNAPSHOT,
V3_8_4,
V3_8_5_SNAPSHOT,
V3_8_5,
V3_8_6_SNAPSHOT,
V3_8_6,
V3_8_7_SNAPSHOT,
V3_8_7,
V3_8_8_SNAPSHOT,
V3_8_8,
V3_8_9_SNAPSHOT,
V3_8_9,
V3_9_1_SNAPSHOT,
V3_9_1,
V3_9_2_SNAPSHOT,
V3_9_2,
V3_9_3_SNAPSHOT,
V3_9_3,
V3_9_4_SNAPSHOT,
V3_9_4,
V3_9_5_SNAPSHOT,
V3_9_5,
V3_9_6_SNAPSHOT,
V3_9_6,
V3_9_7_SNAPSHOT,
V3_9_7,
V3_9_8_SNAPSHOT,
V3_9_8,
V3_9_9_SNAPSHOT,
V3_9_9;
private Version() {
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
/**
* {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
* com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method by using {@link
* org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}.
*
* @author carlvine500
*/
public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently";
private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(CONSUMER_MESSAGE_METHOD);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {ENHANCE_CLASS});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
/**
* {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
* com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method by using {@link
* org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageConcurrentlyConsumeInterceptor}.
*
* @author carlvine500
*/
public class ConsumeMessageOrderlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly";
private static final String ENHANCE_METHOD = "consumeMessage";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageOrderlyConsumeInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {ENHANCE_CLASS});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
import com.alibaba.rocketmq.common.MQVersion;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.Version;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link MQClientAPIImplInstrumentation} intercepts the {@link com.alibaba.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
* String, com.alibaba.rocketmq.common.message.Message, com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader,
* long, com.alibaba.rocketmq.client.impl.CommunicationMode, com.alibaba.rocketmq.client.producer.SendCallback,
* com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo, com.alibaba.rocketmq.client.impl.factory.MQClientInstance,
* int, com.alibaba.rocketmq.client.hook.SendMessageContext, com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
* method by using {@link MessageSendInterceptor}.
*
* @author carlvine500
*/
public class MQClientAPIImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.impl.MQClientAPIImpl";
private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";
private static final String ASYNC_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.MessageSendInterceptor";
private static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v3.UpdateNameServerInterceptor";
private static final String UPDATE_NAME_SERVER_METHOD_NAME = "updateNameServerAddressList";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
int argumentsLength = getArgumentsLength();
return named(SEND_MESSAGE_METHOD_NAME).and(takesArguments(argumentsLength));
}
@Override public String getMethodsInterceptor() {
return ASYNC_METHOD_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(UPDATE_NAME_SERVER_METHOD_NAME);
}
@Override public String getMethodsInterceptor() {
return UPDATE_NAME_SERVER_INTERCEPT_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
private int getArgumentsLength() {
if (MQVersion.CurrentVersion <= Version.V3_4_6.ordinal()) {
return 7;
} else if (MQVersion.CurrentVersion <= Version.V3_5_5.ordinal()) {
return 11;
}
return 12;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
/**
* {@link SendCallBackEnhanceInfo} saves the topic Id and {@link ContextSnapshot} instance for trace.
*
* @author carlvine500
*/
public class SendCallBackEnhanceInfo {
private String topicId;
private ContextSnapshot contextSnapshot;
public SendCallBackEnhanceInfo(String topicId, ContextSnapshot contextSnapshot) {
this.topicId = topicId;
this.contextSnapshot = contextSnapshot;
}
public String getTopicId() {
return topicId;
}
public ContextSnapshot getContextSnapshot() {
return contextSnapshot;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
/**
* {@link SendCallbackInstrumentation} intercepts {@link com.alibaba.rocketmq.client.producer.SendCallback#onSuccess(com.alibaba.rocketmq.client.producer.SendResult sendResult)}
* method by using {@link OnSuccessInterceptor} and also intercepts {@link
* com.alibaba.rocketmq.client.producer.SendCallback#onException(Throwable)} by using {@link
* org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor}.
*
* @author carlvine500
*/
public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.alibaba.rocketmq.client.producer.SendCallback";
private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";
private static final String ON_SUCCESS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.OnSuccessInterceptor";
private static final String ON_EXCEPTION_METHOD = "onException";
private static final String ON_EXCEPTION_INTERCEPTOR = "org.apache.skywalking.apm.plugin.rocketMQ.v3.OnExceptionInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, "com.alibaba.rocketmq.client.producer.SendResult"));
}
@Override public String getMethodsInterceptor() {
return ON_SUCCESS_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable"));
}
@Override public String getMethodsInterceptor() {
return ON_EXCEPTION_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {ENHANCE_CLASS});
}
}
rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageConcurrentlyInstrumentation
rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.ConsumeMessageOrderlyInstrumentation
rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.MQClientAPIImplInstrumentation
rocketMQ-3.x=org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallbackInstrumentation
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
import java.util.List;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class MessageSendInterceptorTest {
private MessageSendInterceptor messageSendInterceptor;
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private Object[] arguments;
private Object[] argumentsWithoutCallback;
@Mock
private Message message;
@Mock
private SendMessageRequestHeader messageRequestHeader;
@Mock
private EnhancedInstance callBack;
private EnhancedInstance enhancedInstance;
@Before
public void setUp() {
messageSendInterceptor = new MessageSendInterceptor();
enhancedInstance = new EnhancedInstance() {
@Override public Object getSkyWalkingDynamicField() {
return "127.0.0.1:6543";
}
@Override public void setSkyWalkingDynamicField(Object value) {
}
};
arguments = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, callBack};
argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, null};
when(messageRequestHeader.getProperties()).thenReturn("");
when(message.getTags()).thenReturn("TagA");
}
@Test
public void testSendMessage() throws Throwable {
messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(1));
AbstractTracingSpan mqSpan = spans.get(0);
SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
SpanAssert.assertTag(mqSpan, 0, "test");
SpanAssert.assertTag(mqSpan, 1, "TagA");
verify(messageRequestHeader, times(1)).setProperties(anyString());
verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any());
}
@Test
public void testSendMessageWithoutCallBack() throws Throwable {
messageSendInterceptor.beforeMethod(enhancedInstance, null, argumentsWithoutCallback, null, null);
messageSendInterceptor.afterMethod(enhancedInstance, null, argumentsWithoutCallback, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(1));
AbstractTracingSpan mqSpan = spans.get(0);
SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
SpanAssert.assertTag(mqSpan, 0, "test");
SpanAssert.assertTag(mqSpan, 1, "TagA");
verify(messageRequestHeader, times(1)).setProperties(anyString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import java.util.List;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class OnExceptionInterceptorTest {
private OnExceptionInterceptor exceptionInterceptor;
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
@Mock
private ContextSnapshot contextSnapshot;
private SendCallBackEnhanceInfo enhanceInfo;
@Mock
private EnhancedInstance enhancedInstance;
@Before
public void setUp() {
exceptionInterceptor = new OnExceptionInterceptor();
enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
}
@Test
public void testOnException() throws Throwable {
exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null);
exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(1));
AbstractTracingSpan exceptionSpan = spans.get(0);
SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0), RuntimeException.class);
SpanAssert.assertOccurException(exceptionSpan, true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rocketMQ.v3;
import java.util.List;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import org.apache.skywalking.apm.plugin.rocketMQ.v3.define.SendCallBackEnhanceInfo;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class OnSuccessInterceptorTest {
private OnSuccessInterceptor successInterceptor;
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
@Mock
private ContextSnapshot contextSnapshot;
@Mock
private SendResult sendResult;
private SendCallBackEnhanceInfo enhanceInfo;
@Mock
private EnhancedInstance enhancedInstance;
@Before
public void setUp() {
successInterceptor = new OnSuccessInterceptor();
enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK);
}
@Test
public void testOnSuccess() throws Throwable {
successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(1));
AbstractTracingSpan successSpan = spans.get(0);
SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
}
@Test
public void testOnSuccessWithErrorStatus() throws Throwable {
when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT);
successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(1));
AbstractTracingSpan successSpan = spans.get(0);
SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
SpanAssert.assertOccurException(successSpan, true);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册