From 05d9c0a93a1d7e2decdf985d36bffa437f23a922 Mon Sep 17 00:00:00 2001 From: Stalary Date: Wed, 4 Sep 2019 09:40:33 +0800 Subject: [PATCH] MOD: Modify compatibility of kafka plugin and expand operationName (#3390) * ADD: add operationName length threshold * MOD:move operationName threshold to Config, simplified code * MOD:update agent set-up document, clean code * MOD:add agent.operation_name_threshold conf prefix * MOD: Modify compatibility of kafka plugin and expand operationName * MOD: Delete localSpan layer, modify consumer operationName, delete producer key * MOD: Modify operationName, reduce String operation * FIX: Fix callback break, the new version of kafka callback is intercepted twice and needs to be judged * FIX: Fix different problems with KafkaProducer constructor in old and new versions * MOD: separate code --- .../{kafka-v1-plugin => kafka-plugin}/pom.xml | 2 +- .../plugin/kafka}/CallbackInterceptor.java | 40 +++++++++---- .../ConsumerConstructorInterceptor.java | 6 +- .../kafka}/ConsumerEnhanceRequiredInfo.java | 11 +++- .../kafka}/KafkaConsumerInterceptor.java | 8 +-- .../kafka}/KafkaProducerInterceptor.java | 33 +++++----- .../ProducerConstructorInterceptor.java | 7 ++- .../ProducerConstructorMapInterceptor.java} | 19 ++++-- .../kafka}/SubscribeMethodInterceptor.java | 2 +- .../define/AbstractKafkaInstrumentation.java | 2 +- .../define/CallbackInstrumentation.java | 4 +- .../define/KafkaConsumerInstrumentation.java | 13 ++-- .../define/KafkaProducerInstrumentation.java | 39 +++++++----- .../KafkaProducerMapInstrumentation.java} | 37 +++++++----- .../src/main/resources/skywalking-plugin.def | 8 +-- .../kafka}/CallbackInterceptorTest.java | 15 ++--- .../ConsumerConstructorInterceptorTest.java | 5 +- .../kafka}/KafkaConsumerInterceptorTest.java | 20 +++---- .../kafka}/KafkaProducerInterceptorTest.java | 14 ++--- .../ProducerConstructorInterceptorTest.java | 8 +-- .../SubscribeMethodInterceptorTest.java | 11 ++-- ...ducerRecordConstructorInterceptorTest.java | 60 ------------------- apm-sniffer/apm-sdk-plugin/pom.xml | 2 +- 23 files changed, 175 insertions(+), 191 deletions(-) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin => kafka-plugin}/pom.xml (96%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/CallbackInterceptor.java (58%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/ConsumerConstructorInterceptor.java (91%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/ConsumerEnhanceRequiredInfo.java (87%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/KafkaConsumerInterceptor.java (94%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/KafkaProducerInterceptor.java (73%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/ProducerConstructorInterceptor.java (86%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java} (65%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/SubscribeMethodInterceptor.java (97%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/define/AbstractKafkaInstrumentation.java (95%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/define/CallbackInstrumentation.java (95%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/define/KafkaConsumerInstrumentation.java (90%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1 => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka}/define/KafkaProducerInstrumentation.java (71%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java => kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java} (60%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin => kafka-plugin}/src/main/resources/skywalking-plugin.def (67%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11 => kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka}/CallbackInterceptorTest.java (89%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11 => kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka}/ConsumerConstructorInterceptorTest.java (92%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11 => kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka}/KafkaConsumerInterceptorTest.java (91%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11 => kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka}/KafkaProducerInterceptorTest.java (88%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11 => kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka}/ProducerConstructorInterceptorTest.java (94%) rename apm-sniffer/apm-sdk-plugin/{kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11 => kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka}/SubscribeMethodInterceptorTest.java (92%) delete mode 100644 apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml similarity index 96% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml index 6287dce509..0c830f251e 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml @@ -25,7 +25,7 @@ 4.0.0 - apm-kafka-v1-plugin + apm-kafka-plugin 0.11.0.0 diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java similarity index 58% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java index 8e38bd943b..cda65f244b 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/CallbackInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java @@ -16,42 +16,56 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; -import java.lang.reflect.Method; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.skywalking.apm.agent.core.context.ContextManager; import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.tag.Tags; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; + +import java.lang.reflect.Method; +/** + * @author zhang xin, stalary + **/ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor { + @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - MethodInterceptResult result) throws Throwable { - AbstractSpan abstractSpan = ContextManager.createLocalSpan("Producer/Callback"); - + MethodInterceptResult result) throws Throwable { //Get the SnapshotContext - ContextSnapshot contextSnapshot = (ContextSnapshot)objInst.getSkyWalkingDynamicField(); + ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); if (null != contextSnapshot) { + RecordMetadata metadata = (RecordMetadata) allArguments[0]; + AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback"); + activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER); + Tags.MQ_TOPIC.set(activeSpan, metadata.topic()); ContextManager.continued(contextSnapshot); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - Object ret) throws Throwable { - Exception exceptions = (Exception)allArguments[1]; - if (exceptions != null) { - ContextManager.activeSpan().errorOccurred().log(exceptions); + Object ret) throws Throwable { + ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); + if (null != contextSnapshot) { + Exception exceptions = (Exception) allArguments[1]; + if (exceptions != null) { + ContextManager.activeSpan().errorOccurred().log(exceptions); + } + ContextManager.stopSpan(); } - ContextManager.stopSpan(); return ret; } - @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, - Class[] argumentsTypes, Throwable t) { + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { ContextManager.activeSpan().errorOccurred().log(t); } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java similarity index 91% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java index 77880a7ce9..35124a3864 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerConstructorInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java @@ -16,12 +16,15 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; +/** + * @author zhang xin, stalary + **/ public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { @@ -29,6 +32,7 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc // set the bootstrap server address ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo(); requiredInfo.setBrokerServers(config.getList("bootstrap.servers")); + requiredInfo.setGroupId(config.getString("group.id")); objInst.setSkyWalkingDynamicField(requiredInfo); } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java similarity index 87% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java index 62018beb63..88556232bc 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ConsumerEnhanceRequiredInfo.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerEnhanceRequiredInfo.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; import java.util.Collection; import java.util.List; @@ -25,6 +25,7 @@ import org.apache.skywalking.apm.util.StringUtil; public class ConsumerEnhanceRequiredInfo { private String brokerServers; private String topics; + private String groupId; private long startTime; public void setBrokerServers(List brokerServers) { @@ -35,6 +36,10 @@ public class ConsumerEnhanceRequiredInfo { this.topics = StringUtil.join(';', topics.toArray(new String[0])); } + public void setGroupId(String groupId) { + this.groupId = groupId; + } + public String getBrokerServers() { return brokerServers; } @@ -43,6 +48,10 @@ public class ConsumerEnhanceRequiredInfo { return topics; } + public String getGroupId() { + return groupId; + } + public void setStartTime(long startTime) { this.startTime = startTime; } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java similarity index 94% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java index 9c0c5d2f62..99753c847e 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaConsumerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; import java.lang.reflect.Method; import java.util.Iterator; @@ -37,12 +37,12 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInt import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; /** - * @author zhang xin + * @author zhang xin, stalary */ public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor { public static final String OPERATE_NAME_PREFIX = "Kafka/"; - public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer"; + public static final String CONSUMER_OPERATE_NAME = "/Consumer/"; @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, @@ -60,7 +60,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto // if (records.size() > 0) { ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField(); - AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(requiredInfo.getStartTime()); + AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(), null).start(requiredInfo.getStartTime()); activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER); SpanLayer.asMQ(activeSpan); diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java similarity index 73% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java index 5880a1a6e9..e6b23a8b01 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/KafkaProducerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java @@ -16,13 +16,13 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; -import java.lang.reflect.Method; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.skywalking.apm.agent.core.context.CarrierItem; import org.apache.skywalking.apm.agent.core.context.ContextCarrier; import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; import org.apache.skywalking.apm.agent.core.context.tag.Tags; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; @@ -31,8 +31,10 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceM import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; +import java.lang.reflect.Method; + /** - * @author zhang xin + * @author zhang xin, stalary */ public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor { @@ -41,16 +43,15 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - MethodInterceptResult result) throws Throwable { + MethodInterceptResult result) throws Throwable { ContextCarrier contextCarrier = new ContextCarrier(); - ProducerRecord record = (ProducerRecord)allArguments[0]; - String topicName = (String)((EnhancedInstance)record).getSkyWalkingDynamicField(); + ProducerRecord record = (ProducerRecord) allArguments[0]; + String topicName = record.topic(); + AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst.getSkyWalkingDynamicField()); - AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String)objInst.getSkyWalkingDynamicField()); - - Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField()); + Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField()); Tags.MQ_TOPIC.set(activeSpan, topicName); SpanLayer.asMQ(activeSpan); activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER); @@ -61,21 +62,25 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes()); } - EnhancedInstance callbackInstance = (EnhancedInstance)allArguments[1]; + EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1]; if (callbackInstance != null) { - callbackInstance.setSkyWalkingDynamicField(ContextManager.capture()); + ContextSnapshot snapshot = ContextManager.capture(); + if (null != snapshot) { + callbackInstance.setSkyWalkingDynamicField(snapshot); + } } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, - Object ret) throws Throwable { + Object ret) throws Throwable { ContextManager.stopSpan(); return ret; } - @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, - Class[] argumentsTypes, Throwable t) { + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java similarity index 86% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java index 604f080c83..c9c0d517ba 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerConstructorInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; @@ -25,8 +25,9 @@ import org.apache.skywalking.apm.util.StringUtil; public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor { - @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { - ProducerConfig config = (ProducerConfig)allArguments[0]; + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + ProducerConfig config = (ProducerConfig) allArguments[0]; objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0]))); } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java similarity index 65% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java index 1a2d5e3ffd..8695995917 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/ProducerRecordConstructorInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java @@ -16,14 +16,21 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; +import org.apache.skywalking.apm.util.StringUtil; -public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor { - @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { - String topic = (String)allArguments[0]; - objInst.setSkyWalkingDynamicField(topic); +import java.util.Map; + +/** + * @author stalary + */ +public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor { + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + Map config = (Map) allArguments[0]; + objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(","))); } -} +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java similarity index 97% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java index 7c4e65598d..0453a5ed1d 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/SubscribeMethodInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1; +package org.apache.skywalking.apm.plugin.kafka; import java.lang.reflect.Method; import java.util.Collection; diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java similarity index 95% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java index e737441b0e..cf867118d9 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/AbstractKafkaInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1.define; +package org.apache.skywalking.apm.plugin.kafka.define; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java similarity index 95% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java index 922968cdf1..7bf7b40149 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/CallbackInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1.define; +package org.apache.skywalking.apm.plugin.kafka.define; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -31,7 +31,7 @@ public class CallbackInstrumentation extends AbstractKafkaInstrumentation { public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback"; public static final String ENHANCE_METHOD = "onCompletion"; - public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor"; + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java similarity index 90% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java index 0aef949197..cfc9471c3b 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaConsumerInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1.define; +package org.apache.skywalking.apm.plugin.kafka.define; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -38,18 +38,19 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName * 4. Stop the entry span when pollOnce method finished. * * - * @author zhang xin + * @author zhang xin,stalary */ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation { public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig"; - public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor"; - public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor"; + public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor"; + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor"; public static final String ENHANCE_METHOD = "pollOnce"; + public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches"; public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; public static final String SUBSCRIBE_METHOD = "subscribe"; public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener"; - public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor"; + public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[] { @@ -69,7 +70,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation { return new InstanceMethodsInterceptPoint[] { new InstanceMethodsInterceptPoint() { @Override public ElementMatcher getMethodsMatcher() { - return named(ENHANCE_METHOD); + return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD)); } @Override public String getMethodsInterceptor() { diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java similarity index 71% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java index fa702c8be4..9fe0d3b6df 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/KafkaProducerInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v1.define; +package org.apache.skywalking.apm.plugin.kafka.define; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -39,48 +39,57 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName * 3. Stop the exit span when send method finished. * * - * @author zhang xin + * @author zhang xin, stalary */ public class KafkaProducerInstrumentation extends AbstractKafkaInstrumentation { - public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor"; + public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor"; public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer"; - public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor"; + public static final String ENHANCE_METHOD = "doSend"; + public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor"; public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig"; - @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { - return new ConstructorInterceptPoint[] { + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ new ConstructorInterceptPoint() { - @Override public ElementMatcher getConstructorMatcher() { + @Override + public ElementMatcher getConstructorMatcher() { return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG); } - @Override public String getConstructorInterceptor() { + @Override + public String getConstructorInterceptor() { return CONSTRUCTOR_INTERCEPTOR_CLASS; } } }; } - @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { - return new InstanceMethodsInterceptPoint[] { + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ new InstanceMethodsInterceptPoint() { - @Override public ElementMatcher getMethodsMatcher() { - return named("doSend"); + @Override + public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD); } - @Override public String getMethodsInterceptor() { + @Override + public String getMethodsInterceptor() { return INTERCEPTOR_CLASS; } - @Override public boolean isOverrideArgs() { + @Override + public boolean isOverrideArgs() { return false; } } }; } - @Override protected ClassMatch enhanceClass() { + @Override + protected ClassMatch enhanceClass() { return byName(ENHANCE_CLASS); } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java similarity index 60% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java index e40f14bb27..8888c80ad2 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/v1/define/ProducerRecordInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java @@ -15,8 +15,7 @@ * limitations under the License. * */ - -package org.apache.skywalking.apm.plugin.kafka.v1.define; +package org.apache.skywalking.apm.plugin.kafka.define; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -24,33 +23,43 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; -public class ProducerRecordInstrumentation extends AbstractKafkaInstrumentation { +/** + * after version 2.1.0 use Map to config + * @author stalary + */ +public class KafkaProducerMapInstrumentation extends AbstractKafkaInstrumentation { - public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor"; - public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.ProducerRecord"; + public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer"; + public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor"; + public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "java.util.Map"; - @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { - return new ConstructorInterceptPoint[] { + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ new ConstructorInterceptPoint() { - @Override public ElementMatcher getConstructorMatcher() { - return takesArguments(6); + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG); } - @Override public String getConstructorInterceptor() { + @Override + public String getConstructorInterceptor() { return CONSTRUCTOR_INTERCEPTOR_CLASS; } } }; } - @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[0]; } - @Override protected ClassMatch enhanceClass() { + @Override + protected ClassMatch enhanceClass() { return byName(ENHANCE_CLASS); } -} +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def similarity index 67% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def index 59e564aa78..1a2fb5e76f 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.CallbackInstrumentation -kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaConsumerInstrumentation -kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.KafkaProducerInstrumentation -kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v1.define.ProducerRecordInstrumentation +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java similarity index 89% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java index 0629b204ab..ac6c206c22 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/CallbackInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java @@ -16,9 +16,8 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v11; +package org.apache.skywalking.apm.plugin.kafka; -import java.util.List; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.skywalking.apm.agent.core.context.MockContextSnapshot; import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; @@ -27,13 +26,7 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; import org.apache.skywalking.apm.agent.test.helper.SpanHelper; -import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; -import org.apache.skywalking.apm.agent.test.tools.SegmentRefAssert; -import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; -import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; -import org.apache.skywalking.apm.agent.test.tools.SpanAssert; -import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; -import org.apache.skywalking.apm.plugin.kafka.v1.CallbackInterceptor; +import org.apache.skywalking.apm.agent.test.tools.*; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -43,6 +36,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import java.util.List; + import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -143,6 +138,6 @@ public class CallbackInterceptorTest { } private void assertCallbackSpan(AbstractTracingSpan span) { - assertThat(span.getOperationName(), is("Producer/Callback")); + assertThat(span.getOperationName(), is("Kafka/Producer/Callback")); } } \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java similarity index 92% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java index ad642dd843..dff640094d 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ConsumerConstructorInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptorTest.java @@ -16,17 +16,16 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v11; +package org.apache.skywalking.apm.plugin.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerConstructorInterceptor; -import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; + import java.util.ArrayList; import java.util.List; diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java similarity index 91% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java index dc8a832999..aa3543b354 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaConsumerInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptorTest.java @@ -16,12 +16,8 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v11; +package org.apache.skywalking.apm.plugin.kafka; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.skywalking.apm.agent.core.conf.Config; @@ -32,13 +28,7 @@ import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; import org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper; -import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; -import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; -import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; -import org.apache.skywalking.apm.agent.test.tools.SpanAssert; -import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; -import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo; -import org.apache.skywalking.apm.plugin.kafka.v1.KafkaConsumerInterceptor; +import org.apache.skywalking.apm.agent.test.tools.*; import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; @@ -48,6 +38,11 @@ import org.junit.runner.RunWith; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_CONSUMER; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -92,6 +87,7 @@ public class KafkaConsumerInterceptorTest { brokers.add("localhost:9092"); brokers.add("localhost:19092"); consumerEnhanceRequiredInfo.setBrokerServers(brokers); + consumerEnhanceRequiredInfo.setGroupId("test"); messages = new HashMap>(); TopicPartition topicPartition = new TopicPartition("test", 1); diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java similarity index 88% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java index 0ca6fd7efb..4dbde023e7 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/KafkaProducerInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptorTest.java @@ -16,21 +16,15 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v11; +package org.apache.skywalking.apm.plugin.kafka; -import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.test.helper.SegmentHelper; -import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule; -import org.apache.skywalking.apm.agent.test.tools.SegmentStorage; -import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint; -import org.apache.skywalking.apm.agent.test.tools.SpanAssert; -import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner; -import org.apache.skywalking.apm.plugin.kafka.v1.KafkaProducerInterceptor; +import org.apache.skywalking.apm.agent.test.tools.*; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -38,6 +32,8 @@ import org.junit.runner.RunWith; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunnerDelegate; +import java.util.List; + import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.KAFKA_PRODUCER; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -71,7 +67,7 @@ public class KafkaProducerInterceptorTest { private class MockProducerMessage extends ProducerRecord implements EnhancedInstance { public MockProducerMessage() { - super("test", ""); + super("test", "key1", ""); } @Override public Object getSkyWalkingDynamicField() { diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java similarity index 94% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java index 6eee953f5b..cdc3b02b00 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerConstructorInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptorTest.java @@ -16,19 +16,19 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v11; +package org.apache.skywalking.apm.plugin.kafka; -import java.util.ArrayList; -import java.util.List; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.plugin.kafka.v1.ProducerConstructorInterceptor; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import java.util.ArrayList; +import java.util.List; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.mockito.Mockito.when; diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java similarity index 92% rename from apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java rename to apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java index 66a0842934..a4336d9820 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/SubscribeMethodInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptorTest.java @@ -16,20 +16,19 @@ * */ -package org.apache.skywalking.apm.plugin.kafka.v11; +package org.apache.skywalking.apm.plugin.kafka; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.plugin.kafka.v1.ConsumerEnhanceRequiredInfo; -import org.apache.skywalking.apm.plugin.kafka.v1.SubscribeMethodInterceptor; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; diff --git a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java deleted file mode 100644 index 5a1a9d6774..0000000000 --- a/apm-sniffer/apm-sdk-plugin/kafka-v1-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/v11/ProducerRecordConstructorInterceptorTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.apm.plugin.kafka.v11; - -import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; -import org.apache.skywalking.apm.plugin.kafka.v1.ProducerRecordConstructorInterceptor; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -@RunWith(MockitoJUnitRunner.class) -public class ProducerRecordConstructorInterceptorTest { - - @Mock - private ProducerRecordConstructorInterceptor constructorInterceptor; - - private EnhancedInstance enhancedInstance = new EnhancedInstance() { - private String brokerServers; - - @Override public Object getSkyWalkingDynamicField() { - return brokerServers; - } - - @Override public void setSkyWalkingDynamicField(Object value) { - brokerServers = (String)value; - } - }; - - @Before - public void setUp() { - constructorInterceptor = new ProducerRecordConstructorInterceptor(); - } - - @Test - public void testOnConsumer() { - constructorInterceptor.onConstruct(enhancedInstance, new Object[] {"test"}); - assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("test")); - } -} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml index 96509d5762..f5669cd2bc 100644 --- a/apm-sniffer/apm-sdk-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/pom.xml @@ -59,7 +59,7 @@ elastic-job-2.x-plugin mongodb-2.x-plugin httpasyncclient-4.x-plugin - kafka-v1-plugin + kafka-plugin servicecomb-plugin hystrix-1.x-plugin sofarpc-plugin -- GitLab