提交 a8742514 编写于 作者: L lipenghui 提交者: wu-sheng

Add pulsar apm plugin (#3476)

* Add pulsar apm plugin.

* Fix check style

* Fix pulsar consumer component define.

* Add pulsar to component-libraries.yml

* Fix error interceptor class.

* Add pulsar to agent support list.

* Add Pulsar to ComponentsDefine and component-libraries.yml

* Move create entry span log of consumer from after method to before method

* Fix send callback issue when exception cause.

* Fix test issues

* Move pulsar plugin to optional plugins

* Add none messages tests for interceptor of producer and consumer.

* Remove unused comments.

* Move pulsar plugin back to the apm-sdk-plugin

* Fix comments

* remove set startTime for entry span(default is set by System.currentTimeMillis)

* Fix comments
上级 632e6b75
......@@ -133,7 +133,10 @@ public class ComponentsDefine {
public static final OfficialComponent PLAY = new OfficialComponent(68, "Play");
public static final OfficialComponent CASSANDRA_JAVA_DRIVER = new OfficialComponent(69, "cassandra-java-driver");
public static final OfficialComponent LIGHT_4J = new OfficialComponent(71, "Light4J");
public static final OfficialComponent PULSAR_PRODUCER = new OfficialComponent(73, "pulsar-producer");
public static final OfficialComponent PULSAR_CONSUMER = new OfficialComponent(74, "pulsar-consumer");
}
......@@ -76,6 +76,7 @@
<module>solrj-7.x-plugin</module>
<module>cassandra-java-driver-3.x-plugin</module>
<module>light4j-plugins</module>
<module>pulsar-plugin</module>
</modules>
<packaging>pom</packaging>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-pulsar-plugin</artifactId>
<properties>
<pulsar.version>2.4.0</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* Interceptor of pulsar consumer constructor.
*
* The interceptor create {@link ConsumerEnhanceRequiredInfo} which is required by instance method interceptor,
* So use it to update the skywalking dynamic field of pulsar consumer enhanced instance.
* So that the instance methods can get the {@link ConsumerEnhanceRequiredInfo}
*
* @author penghui
*/
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
String topic = (String) allArguments[1];
ConsumerConfigurationData consumerConfigurationData = (ConsumerConfigurationData) allArguments[2];
ConsumerEnhanceRequiredInfo requireInfo = new ConsumerEnhanceRequiredInfo();
/*
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
* can handle the service url provider which use a dynamic service url
*/
requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
requireInfo.setTopic(topic);
requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
objInst.setSkyWalkingDynamicField(requireInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
/**
* Pulsar consumer enhance required info is required by consumer enhanced object method interceptor
*
* @author penghui
*/
public class ConsumerEnhanceRequiredInfo {
/**
* service url of the consumer
*/
private String serviceUrl;
/**
* topic name of the consumer
*/
private String topic;
/**
* subscription name of the consumer
*/
private String subscriptionName;
public String getServiceUrl() {
return serviceUrl;
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getSubscriptionName() {
return subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* Interceptor of pulsar producer constructor.
*
* The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor,
* So use it to update the skywalking dynamic field of pulsar producer enhanced instance.
* So that the instance methods can get the {@link ProducerEnhanceRequiredInfo}
*
* @author penghui
*/
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
String topic = (String) allArguments[1];
ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo();
producerEnhanceRequiredInfo.setTopic(topic);
/*
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
* can handle the service url provider which use a dynamic service url
*/
producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
/**
* Pulsar producer enhance required info is required by producer enhanced object method interceptor
*
* @author penghui
*/
public class ProducerEnhanceRequiredInfo {
/**
* service url of the pulsar producer
*/
private String serviceUrl;
/**
* topic name of the pulsar producer
*/
private String topic;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getServiceUrl() {
return serviceUrl;
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.api.Message;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
/**
* Interceptor for pulsar consumer enhanced instance
*
* Here is the intercept process steps:
*
* <pre>
* 1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
* 2. Create the entry span when call <code>messageProcessed</code> method
* 3. Extract all the <code>Trace Context</code> when call <code>messageProcessed</code> method
* 4. Stop the entry span when <code>messageProcessed</code> method finished.
* </pre>
*
* @author penghui
*/
public class PulsarConsumerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "Pulsar/";
public static final String CONSUMER_OPERATE_NAME = "/Consumer/";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
if (allArguments[0] != null) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
Message msg = (Message) allArguments[0];
ContextCarrier carrier = new ContextCarrier();
CarrierItem next = carrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(msg.getProperty(next.getHeadKey()));
}
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX +
requiredInfo.getTopic() + CONSUMER_OPERATE_NAME + requiredInfo.getSubscriptionName(), carrier);
activeSpan.setComponent(ComponentsDefine.PULSAR_CONSUMER);
SpanLayer.asMQ(activeSpan);
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (allArguments[0] != null) {
ContextManager.stopSpan();
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
if (allArguments[0] != null) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
/**
* Interceptor for pulsar producer enhanced instance.
*
* Here is the intercept process steps:
*
* <pre>
* 1. Get the {@link ProducerEnhanceRequiredInfo} and record the service url, topic name
* 2. Create the exit span when the producer invoke <code>sendAsync</code> method
* 3. Inject the context to {@link Message#getProperties()}
* 4. Create {@link SendCallbackEnhanceRequiredInfo} with <code>ContextManager.capture()</code> and set the
* callback enhanced instance skywalking dynamic field to the created required info.
* 5. Stop the exit span when <code>sendAsync</code> method finished.
* </pre>
*
* @author penghui
*/
public class PulsarProducerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "Pulsar/";
public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
if (allArguments[0] != null) {
ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
ContextCarrier contextCarrier = new ContextCarrier();
String topicName = requiredInfo.getTopic();
AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName +
PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, requiredInfo.getServiceUrl());
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
Tags.MQ_TOPIC.set(activeSpan, topicName);
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
CarrierItem next = contextCarrier.items();
MessageImpl msg = (MessageImpl) allArguments[0];
while (next.hasNext()) {
next = next.next();
msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder()
.setKey(next.getHeadKey())
.setValue(next.getHeadValue()));
}
if (allArguments.length > 1) {
EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
if (callbackInstance != null) {
ContextSnapshot snapshot = ContextManager.capture();
if (null != snapshot) {
SendCallbackEnhanceRequiredInfo callbackRequiredInfo = new SendCallbackEnhanceRequiredInfo();
callbackRequiredInfo.setTopic(topicName);
callbackRequiredInfo.setContextSnapshot(snapshot);
callbackInstance.setSkyWalkingDynamicField(callbackRequiredInfo);
}
}
}
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (allArguments[0] != null) {
ContextManager.stopSpan();
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
if (allArguments[0] != null) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
/**
* Pulsar {@link org.apache.pulsar.client.impl.SendCallback} enhance required info is required by
* <code>SendCallback</code> enhanced object method interceptor
*
* @author penghui
*/
public class SendCallbackEnhanceRequiredInfo {
/**
* topic name of the producer
*/
private String topic;
/**
* context snapshot
*/
ContextSnapshot contextSnapshot;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public ContextSnapshot getContextSnapshot() {
return contextSnapshot;
}
public void setContextSnapshot(ContextSnapshot contextSnapshot) {
this.contextSnapshot = contextSnapshot;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
/**
* Interceptor for send callback enhanced instance.
*
* Here is the intercept process steps:
*
* <pre>
* 1. Get the @{@link SendCallbackEnhanceRequiredInfo} and record the service url, context snapshot
* 2. Create the local span when the callback invoke <code>sendComplete</code> method
* 3. Stop the local span when <code>sendComplete</code> method finished.
* </pre>
*
* @author penghui
*/
public class SendCallbackInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME = "Pulsar/Producer/Callback";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
if (null != requiredInfo.getContextSnapshot()) {
AbstractSpan activeSpan = ContextManager.createLocalSpan(OPERATION_NAME);
activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopic());
SpanLayer.asMQ(activeSpan);
ContextManager.continued(requiredInfo.getContextSnapshot());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
if (null != requiredInfo.getContextSnapshot()) {
Exception exceptions = (Exception) allArguments[0];
if (exceptions != null) {
ContextManager.activeSpan().errorOccurred().log(exceptions);
}
ContextManager.stopSpan();
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
SendCallbackEnhanceRequiredInfo requiredInfo = (SendCallbackEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
if (null != requiredInfo.getContextSnapshot()) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* The pulsar consumer instrumentation use {@link org.apache.pulsar.client.impl.ConsumerImpl} as an enhanced class.
* {@link org.apache.pulsar.client.api.Consumer} is a user-oriented interface and the implementations are
* {@link org.apache.pulsar.client.impl.ConsumerImpl} and {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl}
*
* The MultiTopicsConsumerImpl is a complex type with multiple ConsumerImpl to support uses receive messages from
* multiple topics. As each ConsumerImpl has it's own topic name and it is the initial unit of a single topic
* to receiving messages, so use ConsumerImpl as an enhanced class is an effective way.
*
* Use <code>messageProcessed</code> as the enhanced method since pulsar
* consumer has multiple ways to receiving messages such as sync method, async method and listeners.
* Method messageProcessed is a basic unit of ConsumerImpl, no matter which way uses uses, messageProcessed will always
* record the message receiving.
*
* @author penghui
*/
public class PulsarConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.pulsar.client.impl.PulsarClientImpl";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ConsumerConstructorInterceptor";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarConsumerInterceptor";
public static final String ENHANCE_METHOD = "messageProcessed";
public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.api.Message";
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ConsumerImpl";
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ENHANCE_METHOD_TYPE));
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Pulsar producer instrumentation.
*
* The pulsar producer instrumentation use {@link org.apache.pulsar.client.impl.ProducerImpl} as an enhanced class.
* {@link org.apache.pulsar.client.api.Producer} is a user-oriented interface and the implementations of the Producer
* are {@link org.apache.pulsar.client.impl.PartitionedProducerImpl} and {@link org.apache.pulsar.client.impl.ProducerImpl}.
*
* And the PartitionedProducerImpl is a complex type with multiple ProducerImpl to support uses send messages to
* multiple partitions. As each ProducerImpl has it's own topic name and it is the initial unit of a single topic
* to send messages, so use ProducerImpl as an enhanced class is an effective way.
*
* About the enhanced methods, currently use {@link org.apache.pulsar.client.impl.ProducerImpl#sendAsync(
* org.apache.pulsar.client.api.Message, org.apache.pulsar.client.impl.SendCallback)} as the enhanced method.
* Pulsar provides users with two kinds of methods for sending messages sync methods and async methods. The async method
* use {@link java.util.concurrent.CompletableFuture as the method result}, if use this method as the enhanced method
* is hard to pass the snapshot of span, because can't ensure that the CompletableFuture is completed after the skywalking
* dynamic field was updated. But execution time of sync method will be inaccurate.
*
* @author penghui
*/
public class PulsarProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor";
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.ProducerImpl";
public static final String ENHANCE_METHOD = "sendAsync";
public static final String ENHANCE_METHOD_TYPE = "org.apache.pulsar.client.impl.SendCallback";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.ProducerConstructorInterceptor";
public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.pulsar.client.impl.PulsarClientImpl";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).and(takesArgumentWithType(1, ENHANCE_METHOD_TYPE));
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
/**
* Pulsar producer send callback instrumentation.
*
* The send callback enhanced object will use {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackEnhanceRequiredInfo}
* which {@link org.apache.skywalking.apm.plugin.pulsar.PulsarProducerInterceptor} set by skywalking dynamic field of
* enhanced object.
*
* When a callback is complete, {@link org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor} will continue
* the {@link SendCallbackEnhanceRequiredInfo#getContextSnapshot()}.
*
* @author penghui
*/
public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.SendCallback";
public static final String ENHANCE_METHOD = "sendComplete";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.SendCallbackInterceptor";
@Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {ENHANCE_CLASS});
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConsumerConstructorInterceptorTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic";
private static final String SUBSCRIPTION_NAME = "my-sub";
@Mock
private PulsarClientImpl pulsarClient;
@Mock
private LookupService lookupService;
@Mock
private ConsumerConfigurationData consumerConfigurationData;
private ConsumerConstructorInterceptor constructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
@Override public Object getSkyWalkingDynamicField() {
return consumerEnhanceRequiredInfo;
}
@Override public void setSkyWalkingDynamicField(Object value) {
consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
}
};
@Before
public void setUp() {
when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
when(pulsarClient.getLookup()).thenReturn(lookupService);
when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
constructorInterceptor = new ConsumerConstructorInterceptor();
}
@Test
public void testOnConsumer() {
constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME, consumerConfigurationData});
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class MockMessage extends MessageImpl {
private PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
private transient Map<String, String> properties;
public MockMessage() {
this(null, "1:1", new HashMap(), null, null);
}
public MockMessage(String topic, String msgId, Map properties, ByteBuf payload, Schema schema) {
super(topic, msgId, properties, payload, schema);
}
@Override
public PulsarApi.MessageMetadata.Builder getMessageBuilder() {
return msgMetadataBuilder;
}
public synchronized Map<String, String> getProperties() {
if (this.properties == null) {
if (this.msgMetadataBuilder.getPropertiesCount() > 0) {
Map<String, String> internalProperties = new HashMap<String, String>();
for (int i = 0; i < this.msgMetadataBuilder.getPropertiesCount(); i++) {
PulsarApi.KeyValue kv = this.msgMetadataBuilder.getProperties(i);
internalProperties.put(kv.getKey(), kv.getValue());
}
this.properties = Collections.unmodifiableMap(internalProperties);
} else {
this.properties = Collections.emptyMap();
}
}
return this.properties;
}
@Override
public String getProperty(String name) {
return this.getProperties().get(name);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ProducerConstructorInterceptorTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "persistent://my-tenant/my-ns/my-topic";
@Mock
private PulsarClientImpl pulsarClient;
@Mock
private LookupService lookupService;
private ProducerConstructorInterceptor constructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private ProducerEnhanceRequiredInfo requiredInfo;
@Override public Object getSkyWalkingDynamicField() {
return requiredInfo;
}
@Override public void setSkyWalkingDynamicField(Object value) {
this.requiredInfo = (ProducerEnhanceRequiredInfo)value;
}
};
@Before
public void setUp() {
when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
when(pulsarClient.getLookup()).thenReturn(lookupService);
constructorInterceptor = new ProducerConstructorInterceptor();
}
@Test
public void testOnConsumer() {
constructorInterceptor.onConstruct(enhancedInstance, new Object[] {pulsarClient, TOPIC_NAME});
ProducerEnhanceRequiredInfo requiredInfo = (ProducerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.List;
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_CONSUMER;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class PulsarConsumerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo;
private PulsarConsumerInterceptor consumerInterceptor;
private MockMessage msg;
private EnhancedInstance consumerInstance = new EnhancedInstance() {
@Override public Object getSkyWalkingDynamicField() {
return consumerEnhanceRequiredInfo;
}
@Override public void setSkyWalkingDynamicField(Object value) {
consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo)value;
}
};
@Before
public void setUp() {
Config.Agent.ACTIVE_V1_HEADER = true;
consumerInterceptor = new PulsarConsumerInterceptor();
consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
msg = new MockMessage();
msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder()
.setKey("sw3")
.setValue("1.234.111|3|1|1|#192.168.1.8:18002|#/portal/|#testEntrySpan|#AQA*#AQA*Et0We0tQNQA*"));
}
@After
public void clear() {
Config.Agent.ACTIVE_V1_HEADER = false;
}
@Test
public void testConsumerWithNullMessage() throws Throwable {
consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{null}, new Class[0], null);
consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{null}, new Class[0], null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
assertThat(traceSegments.size(), is(0));
}
@Test
public void testConsumerWithMessage() throws Throwable {
consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
assertThat(traceSegments.size(), is(1));
TraceSegment traceSegment = traceSegments.get(0);
List<TraceSegmentRef> refs = traceSegment.getRefs();
assertThat(refs.size(), is(1));
assertTraceSegmentRef(refs.get(0));
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(1));
assertConsumerSpan(spans.get(0));
}
private void assertConsumerSpan(AbstractTracingSpan span) {
SpanAssert.assertLayer(span, SpanLayer.MQ);
SpanAssert.assertComponent(span, PULSAR_CONSUMER);
SpanAssert.assertTagSize(span, 2);
SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
}
private void assertTraceSegmentRef(TraceSegmentRef ref) {
MatcherAssert.assertThat(SegmentRefHelper.getEntryServiceInstanceId(ref), is(1));
MatcherAssert.assertThat(SegmentRefHelper.getSpanId(ref), is(3));
MatcherAssert.assertThat(SegmentRefHelper.getTraceSegmentId(ref).toString(), is("1.234.111"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.List;
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.PULSAR_PRODUCER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class PulsarProducerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private PulsarProducerInterceptor producerInterceptor;
private Object[] arguments;
private Class[] argumentType;
private EnhancedInstance pulsarProducerInstance = new EnhancedInstance() {
@Override public Object getSkyWalkingDynamicField() {
ProducerEnhanceRequiredInfo requiredInfo = new ProducerEnhanceRequiredInfo();
requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
requiredInfo.setServiceUrl("pulsar://localhost:6650");
return requiredInfo;
}
@Override public void setSkyWalkingDynamicField(Object value) {
}
};
private MessageImpl msg = new MockMessage();
@Before
public void setUp() {
producerInterceptor = new PulsarProducerInterceptor();
arguments = new Object[] {msg, null};
argumentType = new Class[] {MessageImpl.class};
}
@Test
public void testSendMessage() throws Throwable {
producerInterceptor.beforeMethod(pulsarProducerInstance, null, arguments, argumentType, null);
producerInterceptor.afterMethod(pulsarProducerInstance, null, arguments, argumentType, null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
assertThat(traceSegmentList.size(), is(1));
TraceSegment segment = traceSegmentList.get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertThat(spans.size(), is(1));
assertMessageSpan(spans.get(0));
}
@Test
public void testSendWithNullMessage() throws Throwable {
producerInterceptor.beforeMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null);
producerInterceptor.afterMethod(pulsarProducerInstance, null, new Object[]{null}, argumentType, null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
assertThat(traceSegmentList.size(), is(0));
}
private void assertMessageSpan(AbstractTracingSpan span) {
SpanAssert.assertTag(span, 0, "pulsar://localhost:6650");
SpanAssert.assertTag(span, 1, "persistent://my-tenant/my-ns/my-topic");
SpanAssert.assertComponent(span, PULSAR_PRODUCER);
SpanAssert.assertLayer(span, SpanLayer.MQ);
assertThat(span.getOperationName(), is("Pulsar/persistent://my-tenant/my-ns/my-topic/Producer"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.SpanAssert;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.List;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class SendCallbackInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private SendCallbackInterceptor callbackInterceptor;
private Object[] arguments;
private Object[] argumentsWithException;
private Class[] argumentTypes;
private EnhancedInstance callBackInstance = new EnhancedInstance() {
@Override public Object getSkyWalkingDynamicField() {
SendCallbackEnhanceRequiredInfo requiredInfo = new SendCallbackEnhanceRequiredInfo();
requiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
requiredInfo.setContextSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot());
return requiredInfo;
}
@Override public void setSkyWalkingDynamicField(Object value) {
}
};
@Before
public void setUp() {
callbackInterceptor = new SendCallbackInterceptor();
arguments = new Object[] {
null
};
argumentsWithException = new Object[] {
new RuntimeException()
};
argumentTypes = new Class[] {
Exception.class
};
}
@Test
public void testCallbackWithoutException() throws Throwable {
callbackInterceptor.beforeMethod(callBackInstance, null, arguments, argumentTypes, null);
callbackInterceptor.afterMethod(callBackInstance, null, arguments, argumentTypes, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
assertThat(traceSegments.size(), is(1));
TraceSegment traceSegment = traceSegments.get(0);
List<AbstractTracingSpan> abstractSpans = SegmentHelper.getSpans(traceSegment);
assertThat(abstractSpans.size(), is(1));
assertCallbackSpan(abstractSpans.get(0));
assertCallbackSegmentRef(traceSegment.getRefs());
}
@Test
public void testCallbackWithException() throws Throwable {
callbackInterceptor.beforeMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
callbackInterceptor.afterMethod(callBackInstance, null, argumentsWithException, argumentTypes, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
assertThat(traceSegments.size(), is(1));
TraceSegment traceSegment = traceSegments.get(0);
List<AbstractTracingSpan> abstractSpans = SegmentHelper.getSpans(traceSegment);
assertThat(abstractSpans.size(), is(1));
assertCallbackSpanWithException(abstractSpans.get(0));
assertCallbackSegmentRef(traceSegment.getRefs());
}
private void assertCallbackSpanWithException(AbstractTracingSpan span) {
assertCallbackSpan(span);
SpanAssert.assertException(SpanHelper.getLogs(span).get(0), RuntimeException.class);
assertThat(SpanHelper.getErrorOccurred(span), is(true));
}
private void assertCallbackSegmentRef(List<TraceSegmentRef> refs) {
assertThat(refs.size(), is(1));
TraceSegmentRef segmentRef = refs.get(0);
SegmentRefAssert.assertSpanId(segmentRef, 1);
assertThat(segmentRef.getEntryEndpointName(), is("/for-test-entryOperationName"));
}
private void assertCallbackSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("Pulsar/Producer/Callback"));
}
}
......@@ -44,6 +44,7 @@
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0
* [ActiveMQ](https://github.com/apache/activemq) 5.x
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
* [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x
* NoSQL
* Redis
* [Jedis](https://github.com/xetorthio/jedis) 2.x
......
......@@ -242,7 +242,15 @@ Cassandra:
Light4J:
id: 71
languages: Java
Pulsar:
id: 72
languages: Java
pulsar-producer:
id: 73
languages: Java
pulsar-consumer:
id: 74
languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
......@@ -343,4 +351,6 @@ Component-Server-Mappings:
Npgsql.EntityFrameworkCore.PostgreSQL: PostgreSQL
transport-client: Elasticsearch
SolrJ: Solr
cassandra-java-driver: Cassandra
\ No newline at end of file
cassandra-java-driver: Cassandra
pulsar-producer: Pulsar
pulsar-consumer: Pulsar
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册