diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..f4a8142795d57f6dbbb0b524c2d5233e544cf229 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackAdapter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.skywalking.apm.plugin.kafka; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; + +/** + * implements Callback and EnhancedInstance, for transformation kafkaTemplate.buildCallback + * @author stalary + */ +public class CallbackAdapter implements Callback, EnhancedInstance { + + private Object instance; + + private Callback userCallback; + + public CallbackAdapter(Callback userCallback, Object instance) { + this.userCallback = userCallback; + this.instance = instance; + } + + public CallbackAdapter() { + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (userCallback != null) { + userCallback.onCompletion(metadata, exception); + } + } + + @Override + public Object getSkyWalkingDynamicField() { + return instance; + } + + @Override + public void setSkyWalkingDynamicField(Object value) { + this.instance = value; + } + + public Callback getUserCallback() { + return userCallback; + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java new file mode 100644 index 0000000000000000000000000000000000000000..6e48f8356ddf4548d40479dd3d1f2f97715699e3 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackCache.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.skywalking.apm.plugin.kafka; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; + +/** + * cache Callback and ContextSnapshot + * @author stalary + */ +public class CallbackCache { + + private Callback callback; + + private ContextSnapshot snapshot; + + public Callback getCallback() { + return callback; + } + + public void setCallback(Callback callback) { + this.callback = callback; + } + + public ContextSnapshot getSnapshot() { + return snapshot; + } + + public void setSnapshot(ContextSnapshot snapshot) { + this.snapshot = snapshot; + } + + @Override + public String toString() { + return "CallbackCache{" + + "callback=" + callback + + ", snapshot=" + snapshot + + '}'; + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..9fb123abb7671f01573456e1520061e3b56fa5ba --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; + +/** + * intercept Callback set cache + * @author stalary + **/ +public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor { + + @Override + public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { + Callback callback = (Callback) allArguments[0]; + CallbackCache cache; + if (null != objInst.getSkyWalkingDynamicField()) { + cache = (CallbackCache) objInst.getSkyWalkingDynamicField(); + } else { + cache = new CallbackCache(); + } + cache.setCallback(callback); + objInst.setSkyWalkingDynamicField(cache); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java index cda65f244b30d660086744fd66b482b115bab26a..68b41d9d038e17d61b7e92e0dcc60efbad41bf3f 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java @@ -38,27 +38,30 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { - //Get the SnapshotContext - ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); - if (null != contextSnapshot) { + CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField(); + if (null != cache) { + ContextSnapshot snapshot = getSnapshot(cache); RecordMetadata metadata = (RecordMetadata) allArguments[0]; AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback"); activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER); Tags.MQ_TOPIC.set(activeSpan, metadata.topic()); - ContextManager.continued(contextSnapshot); + ContextManager.continued(snapshot); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { - ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField(); - if (null != contextSnapshot) { - Exception exceptions = (Exception) allArguments[1]; - if (exceptions != null) { - ContextManager.activeSpan().errorOccurred().log(exceptions); + CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField(); + if (null != cache) { + ContextSnapshot snapshot = getSnapshot(cache); + if (null != snapshot) { + Exception exceptions = (Exception) allArguments[1]; + if (exceptions != null) { + ContextManager.activeSpan().errorOccurred().log(exceptions); + } + ContextManager.stopSpan(); } - ContextManager.stopSpan(); } return ret; } @@ -68,4 +71,12 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor { Class[] argumentsTypes, Throwable t) { ContextManager.activeSpan().errorOccurred().log(t); } + + private ContextSnapshot getSnapshot(CallbackCache cache) { + ContextSnapshot snapshot = cache.getSnapshot(); + if (snapshot == null) { + snapshot = ((CallbackCache) ((EnhancedInstance) cache.getCallback()).getSkyWalkingDynamicField()).getSnapshot(); + } + return snapshot; + } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java index e6b23a8b01a04961d1052d4fcca4facd1565a259..5cc5abcde9c3b726fbf88b61ed351cb2eaa65836 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java @@ -61,12 +61,13 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto next = next.next(); record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes()); } - EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1]; - if (callbackInstance != null) { + if (null != callbackInstance) { ContextSnapshot snapshot = ContextManager.capture(); if (null != snapshot) { - callbackInstance.setSkyWalkingDynamicField(snapshot); + CallbackCache cache = new CallbackCache(); + cache.setSnapshot(snapshot); + callbackInstance.setSkyWalkingDynamicField(cache); } } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..a23b83a042dffb19ba39c388200105c9c0ef621e --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.skywalking.apm.plugin.kafka; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; + +import java.lang.reflect.Method; + +/** + * transformation kafkaTemplate.buildCallback + * @author stalary + */ +public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst); + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java index 8695995917c401b7c966c1a5075f2bf6686ffeb8..c9c0f541f46eb9d3a61b376cd916a19f5af8fb23 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java @@ -28,9 +28,13 @@ import java.util.Map; * @author stalary */ public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor { + @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { Map config = (Map) allArguments[0]; - objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(","))); + // prevent errors caused by secondary interception in kafkaTemplate + if (objInst.getSkyWalkingDynamicField() == null) { + objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(","))); + } } } \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..8f797d0d7737e7723be377d6c79717a1cfac5dd4 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka.define; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; + +/** + * Compatible with KafkaTemplate + * @author stalary + */ +public abstract class AbstractKafkaTemplateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + @Override protected String[] witnessClasses() { + return new String[]{"org.springframework.kafka.core.KafkaTemplate"}; + } +} diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..e8481dd6db5f9936f0e61f1a721a97534c5cd0ed --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * InterceptorCallback wrapped CallbackAdapter, need intercept + * @author stalary + */ +public class KafkaTemplateCallbackInstrumentation extends AbstractKafkaTemplateInstrumentation { + + private static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback"; + private static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.producer.Callback"; + private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor"; + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[]{ + new ConstructorInterceptPoint() { + @Override + public ElementMatcher getConstructorMatcher() { + return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE); + } + + @Override + public String getConstructorInterceptor() { + return CONSTRUCTOR_INTERCEPTOR_CLASS; + } + } + }; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[0]; + } + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..9b480a11ccbd07d3cc374707c1d3c617dfa24592 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.skywalking.apm.plugin.kafka.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +/** + * intercept kafkaTemplate.buildCallback translation callback + * + * @author stalary + */ +public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumentation { + + private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate"; + private static final String ENHANCE_METHOD = "buildCallback"; + private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor"; + + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named(ENHANCE_METHOD); + } + + @Override + public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override + protected ClassMatch enhanceClass() { + return byName(ENHANCE_CLASS); + } +} \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def index 1a2fb5e76fe97f5a2a40e56e95383131001a315b..64f3d72fc7d4d2a332cfe9557acaddaeb3bf2b4d 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def @@ -17,4 +17,6 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation -kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation \ No newline at end of file +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation +kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java index ac6c206c22e94cf64799b656255b03c46d05f6ec..818a52ab47d71191fd5cd749d1db092f6e18f513 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/test/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptorTest.java @@ -63,7 +63,9 @@ public class CallbackInterceptorTest { private EnhancedInstance callBackInstance = new EnhancedInstance() { @Override public Object getSkyWalkingDynamicField() { - return MockContextSnapshot.INSTANCE.mockContextSnapshot(); + CallbackCache cache = new CallbackCache(); + cache.setSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot()); + return cache; } @Override public void setSkyWalkingDynamicField(Object value) {