提交 22ead46d 编写于 作者: A ascrutae

[Agent] Support kafka framework

上级 6554aec1
......@@ -78,6 +78,8 @@ public class ComponentsDefine {
public static final OfficialComponent HTTP_ASYNC_CLIENT = new OfficialComponent(26, "httpasyncclient");
public static final OfficialComponent KAFKA = new OfficialComponent(27, "Kafka");
private static ComponentsDefine INSTANCE = new ComponentsDefine();
private String[] components;
......@@ -87,7 +89,7 @@ public class ComponentsDefine {
}
public ComponentsDefine() {
components = new String[27];
components = new String[28];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
......@@ -114,6 +116,7 @@ public class ComponentsDefine {
addComponent(ELASTIC_JOB);
addComponent(ROCKET_MQ);
addComponent(HTTP_ASYNC_CLIENT);
addComponent(KAFKA);
}
private void addComponent(OfficialComponent component) {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-kafka-0.11.x-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
AbstractSpan abstractSpan = ContextManager.createLocalSpan("Producer/Callback");
//Get the SnapshotContext
ContextSnapshot contextSnapshot = (ContextSnapshot)objInst.getSkyWalkingDynamicField();
ContextManager.continued(contextSnapshot);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Exception exceptions = (Exception)allArguments[1];
if (exceptions != null) {
ContextManager.activeSpan().errorOccurred().log(exceptions);
}
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ConsumerConfig config = (ConsumerConfig)allArguments[0];
// set the bootstrap server address
ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
objInst.setSkyWalkingDynamicField(requiredInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import java.util.Collection;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
public class ConsumerEnhanceRequiredInfo {
private String brokerServers;
private String topics;
public void setBrokerServers(List<String> brokerServers) {
this.brokerServers = StringUtil.join(',', brokerServers.toArray(new String[0]));
}
public void setTopics(Collection<String> topics) {
this.topics = StringUtil.join(',', topics.toArray(new String[0]));
}
public String getBrokerServers() {
return brokerServers;
}
public String getTopics() {
return topics;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* @autor zhang xin
*/
public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
AbstractSpan activeSpan = ContextManager.createEntrySpan("Kafka/Consumer/Poll", null);
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
activeSpan.setComponent(ComponentsDefine.KAFKA);
SpanLayer.asMQ(activeSpan);
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>)ret;
for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) {
for (ConsumerRecord<?, ?> record : consumerRecords) {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
Iterator<Header> iterator = record.headers().headers(next.getHeadKey()).iterator();
if (iterator.hasNext()) {
next.setHeadValue(new String(iterator.next().value()));
}
}
ContextManager.extract(contextCarrier);
}
}
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import java.lang.reflect.Method;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
/**
* @author zhang xin
*/
public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan activeSpan = ContextManager.createExitSpan("Kafka/Producer/send", contextCarrier, (String)objInst.getSkyWalkingDynamicField());
//set tags
ProducerRecord record = (ProducerRecord)allArguments[0];
Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, (String)((EnhancedInstance)record).getSkyWalkingDynamicField());
// set headers
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
}
EnhancedInstance callbackInstance = (EnhancedInstance)allArguments[1];
if (callbackInstance != null) {
callbackInstance.setSkyWalkingDynamicField(ContextManager.capture());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.util.StringUtil;
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ProducerConfig config = (ProducerConfig)allArguments[0];
// set the bootstrap server address
objInst.setSkyWalkingDynamicField(StringUtil.join(',', config.getList("bootstrap.servers").toArray(new String[0])));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor {
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
String topic = (String)allArguments[0];
// set the topic
objInst.setSkyWalkingDynamicField(topic);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11;
import java.lang.reflect.Method;
import java.util.Collection;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
requiredInfo.setTopics((Collection<String>)allArguments[0]);
objInst.setSkyWalkingDynamicField(requiredInfo);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
/**
* {@link CallbackInstrumentation} intercept the method onCompletion in the class <code>org.apache.kafka.clients.producer.Callback</code>.
*
* @author zhangxin
*/
public class CallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback";
public static final String ENHANCE_METHOD = "onCompletion";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.CallbackInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {ENHANCE_CLASS});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link KafkaProducerInstrumentation} intercept the method <code>send</code> in the class
* <code>org.apache.kafka.clients.producer.KafkaProducer</code>. Here is the intercept process steps.
*
*
* <pre>
* 1. Record the topic when the client call <code>subscribed</code>
* 3. Create the entry span when the client call the method <code>pollOnce</code>.
* 4. Inject all the <code>Trace Context</code> by iterate all <code>ConsumerRecord</code>
* 5. Stop the entry span when end the <code>pollOnce</code> method.
* </pre>
*
* @author zhang xin
*/
public class KafkaConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String CONSTRUCTOR_INTERCEPT_FLAG = "org.apache.kafka.clients.consumer.ConsumerConfig";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ConsumerConstructorInterceptor";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.KafkaConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe";
public static final String SUBSCRIBE_INTERCEPT_FLAG = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_FLAG);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_FLAG));
}
@Override public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.kafka.v11.SubscribeMethodInterceptor";
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link KafkaProducerInstrumentation} intercept the method <code>send</code> in the class
* <code>org.apache.kafka.clients.producer.KafkaProducer</code>. Here is the intercept process steps.
*
*
* <pre>
* 1. Record the broker address when the client create the <code>org.apache.kafka.clients.producer.KafkaProducer</code>
* instance
* 2. Fetch the topic name from <code>org.apache.kafka.clients.producer.ProducerRecord</code> when the client call
* <code>send</code> method.
* 3. Create the exit span when the client call <code>send</code> method
* 4. Set the <code>Context</code> into the <code>org.apache.kafka.clients.producer.ProducerRecord#headers</code>
* 5. Stop the exit span when end the <code>send</code> method.
* </pre>
*
* @author zhang xin
*/
public class KafkaProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.KafkaProducerInterceptor";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ProducerConstructorInterceptor";
public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("doSend");
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka.v11.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link ProducerRecordInstrumentation} intercept the constructor in the class <code>org.apache.kafka.clients.producer.ProducerRecord</code>
* for record the topic name and propagate the <code>Context</code> of trace.
*
* @author zhang xin
* @see org.apache.skywalking.apm.plugin.kafka.v11.define.KafkaProducerInstrumentation
*/
public class ProducerRecordInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ProducerRecordConstructorInterceptor";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.ProducerRecord";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArguments(6);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v11.define.CallbackInstrumentation
kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v11.define.KafkaConsumerInstrumentation
kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v11.define.KafkaProducerInstrumentation
kafka-0.11.x=org.apache.skywalking.apm.plugin.kafka.v11.define.ProducerRecordInstrumentation
......@@ -53,6 +53,7 @@
<module>elastic-job-2.x-plugin</module>
<module>mongodb-2.x-plugin</module>
<module>httpasyncclient-4.x-plugin</module>
<module>kafka-0.11.x-plugin</module>
</modules>
<packaging>pom</packaging>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册