提交 8de9f7d7 编写于 作者: S Stalary 提交者: wu-sheng

MOD: Kafka-plugin Compatible with KafkaTemplate (#3505)

* MOD: Compatible with KafkaTemplate

* MOD: modify test
上级 321a88f5
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
/**
* implements Callback and EnhancedInstance, for transformation kafkaTemplate.buildCallback
* @author stalary
*/
public class CallbackAdapter implements Callback, EnhancedInstance {
private Object instance;
private Callback userCallback;
public CallbackAdapter(Callback userCallback, Object instance) {
this.userCallback = userCallback;
this.instance = instance;
}
public CallbackAdapter() {
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (userCallback != null) {
userCallback.onCompletion(metadata, exception);
}
}
@Override
public Object getSkyWalkingDynamicField() {
return instance;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.instance = value;
}
public Callback getUserCallback() {
return userCallback;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
/**
* cache Callback and ContextSnapshot
* @author stalary
*/
public class CallbackCache {
private Callback callback;
private ContextSnapshot snapshot;
public Callback getCallback() {
return callback;
}
public void setCallback(Callback callback) {
this.callback = callback;
}
public ContextSnapshot getSnapshot() {
return snapshot;
}
public void setSnapshot(ContextSnapshot snapshot) {
this.snapshot = snapshot;
}
@Override
public String toString() {
return "CallbackCache{" +
"callback=" + callback +
", snapshot=" + snapshot +
'}';
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* intercept Callback set cache
* @author stalary
**/
public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Callback callback = (Callback) allArguments[0];
CallbackCache cache;
if (null != objInst.getSkyWalkingDynamicField()) {
cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
} else {
cache = new CallbackCache();
}
cache.setCallback(callback);
objInst.setSkyWalkingDynamicField(cache);
}
}
......@@ -38,27 +38,30 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
//Get the SnapshotContext
ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
if (null != contextSnapshot) {
CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
if (null != cache) {
ContextSnapshot snapshot = getSnapshot(cache);
RecordMetadata metadata = (RecordMetadata) allArguments[0];
AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
ContextManager.continued(contextSnapshot);
ContextManager.continued(snapshot);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
if (null != contextSnapshot) {
Exception exceptions = (Exception) allArguments[1];
if (exceptions != null) {
ContextManager.activeSpan().errorOccurred().log(exceptions);
CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
if (null != cache) {
ContextSnapshot snapshot = getSnapshot(cache);
if (null != snapshot) {
Exception exceptions = (Exception) allArguments[1];
if (exceptions != null) {
ContextManager.activeSpan().errorOccurred().log(exceptions);
}
ContextManager.stopSpan();
}
ContextManager.stopSpan();
}
return ret;
}
......@@ -68,4 +71,12 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
private ContextSnapshot getSnapshot(CallbackCache cache) {
ContextSnapshot snapshot = cache.getSnapshot();
if (snapshot == null) {
snapshot = ((CallbackCache) ((EnhancedInstance) cache.getCallback()).getSkyWalkingDynamicField()).getSnapshot();
}
return snapshot;
}
}
......@@ -61,12 +61,13 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
next = next.next();
record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
}
EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
if (callbackInstance != null) {
if (null != callbackInstance) {
ContextSnapshot snapshot = ContextManager.capture();
if (null != snapshot) {
callbackInstance.setSkyWalkingDynamicField(snapshot);
CallbackCache cache = new CallbackCache();
cache.setSnapshot(snapshot);
callbackInstance.setSkyWalkingDynamicField(cache);
}
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
/**
* transformation kafkaTemplate.buildCallback
* @author stalary
*/
public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst);
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
\ No newline at end of file
......@@ -28,9 +28,13 @@ import java.util.Map;
* @author stalary
*/
public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Map<String, Object> config = (Map<String, Object>) allArguments[0];
objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
// prevent errors caused by secondary interception in kafkaTemplate
if (objInst.getSkyWalkingDynamicField() == null) {
objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.define;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
/**
* Compatible with KafkaTemplate
* @author stalary
*/
public abstract class AbstractKafkaTemplateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override protected String[] witnessClasses() {
return new String[]{"org.springframework.kafka.core.KafkaTemplate"};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* InterceptorCallback wrapped CallbackAdapter, need intercept
* @author stalary
*/
public class KafkaTemplateCallbackInstrumentation extends AbstractKafkaTemplateInstrumentation {
private static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback";
private static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.producer.Callback";
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* intercept kafkaTemplate.buildCallback translation callback
*
* @author stalary
*/
public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumentation {
private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate";
private static final String ENHANCE_METHOD = "buildCallback";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
\ No newline at end of file
......@@ -17,4 +17,6 @@
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
\ No newline at end of file
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
\ No newline at end of file
......@@ -63,7 +63,9 @@ public class CallbackInterceptorTest {
private EnhancedInstance callBackInstance = new EnhancedInstance() {
@Override public Object getSkyWalkingDynamicField() {
return MockContextSnapshot.INSTANCE.mockContextSnapshot();
CallbackCache cache = new CallbackCache();
cache.setSnapshot(MockContextSnapshot.INSTANCE.mockContextSnapshot());
return cache;
}
@Override public void setSkyWalkingDynamicField(Object value) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册