提交 14a61d75 编写于 作者: A ascrutae

remove readme.md

上级 ed5e49d7
...@@ -56,7 +56,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto ...@@ -56,7 +56,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
Object ret) throws Throwable { Object ret) throws Throwable {
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>)ret; Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>)ret;
// //
// The entry span will create when the consumer fetch anyone message from kafka cluster, or the span will not create. // The entry span will only be created when the consumer received at least one message.
// //
if (records.size() > 0) { if (records.size() > 0) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField(); ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
......
...@@ -50,13 +50,11 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto ...@@ -50,13 +50,11 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String)objInst.getSkyWalkingDynamicField()); AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String)objInst.getSkyWalkingDynamicField());
//set tags
Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField()); Tags.MQ_BROKER.set(activeSpan, (String)objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, topicName); Tags.MQ_TOPIC.set(activeSpan, topicName);
SpanLayer.asMQ(activeSpan); SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.KAFKA); activeSpan.setComponent(ComponentsDefine.KAFKA);
// set headers
CarrierItem next = contextCarrier.items(); CarrierItem next = contextCarrier.items();
while (next.hasNext()) { while (next.hasNext()) {
next = next.next(); next = next.next();
......
...@@ -27,7 +27,6 @@ public class ProducerConstructorInterceptor implements InstanceConstructorInterc ...@@ -27,7 +27,6 @@ public class ProducerConstructorInterceptor implements InstanceConstructorInterc
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
ProducerConfig config = (ProducerConfig)allArguments[0]; ProducerConfig config = (ProducerConfig)allArguments[0];
// set the bootstrap server address
objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0]))); objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0])));
} }
} }
...@@ -24,7 +24,6 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceC ...@@ -24,7 +24,6 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceC
public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor { public class ProducerRecordConstructorInterceptor implements InstanceConstructorInterceptor {
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
String topic = (String)allArguments[0]; String topic = (String)allArguments[0];
// set the topic
objInst.setSkyWalkingDynamicField(topic); objInst.setSkyWalkingDynamicField(topic);
} }
} }
...@@ -29,7 +29,8 @@ import static net.bytebuddy.matcher.ElementMatchers.named; ...@@ -29,7 +29,8 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch; import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
/** /**
* {@link CallbackInstrumentation} intercept the method onCompletion in the class <code>org.apache.kafka.clients.producer.Callback</code>. * {@link CallbackInstrumentation} defined that {@link org.apache.skywalking.apm.plugin.kafka.v11.CallbackInterceptor}
* intercept the method onCompletion in the class <code>org.apache.kafka.clients.producer.Callback</code>.
* *
* @author zhangxin * @author zhangxin
*/ */
......
...@@ -30,34 +30,35 @@ import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentType ...@@ -30,34 +30,35 @@ import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentType
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/** /**
* {@link KafkaProducerInstrumentation} intercept the method <code>send</code> in the class * {@link KafkaProducerInstrumentation} define that {@link org.apache.skywalking.apm.plugin.kafka.v11.KafkaConsumerInterceptor}
* <code>org.apache.kafka.clients.producer.KafkaProducer</code>. Here is the intercept process steps. * intercept the method <code>send</code> in the class <code>org.apache.kafka.clients.producer.KafkaProducer</code>.
* Here is the intercept process steps.
* *
* *
* <pre> * <pre>
* 1. Record the topic when the client call <code>subscribed</code> * 1. Record the topic when the client invoke <code>subscribed</code> method
* 3. Create the entry span when the client call the method <code>pollOnce</code>. * 2. Create the entry span when the client invoke the method <code>pollOnce</code>.
* 4. Inject all the <code>Trace Context</code> by iterate all <code>ConsumerRecord</code> * 3. Extract all the <code>Trace Context</code> by iterate all <code>ConsumerRecord</code>
* 5. Stop the entry span when end the <code>pollOnce</code> method. * 4. Stop the entry span when <code>pollOnce</code> method finished.
* </pre> * </pre>
* *
* @author zhang xin * @author zhang xin
*/ */
public class KafkaConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { public class KafkaConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String CONSTRUCTOR_INTERCEPT_FLAG = "org.apache.kafka.clients.consumer.ConsumerConfig"; public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ConsumerConstructorInterceptor"; public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.ConsumerConstructorInterceptor";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.KafkaConsumerInterceptor"; public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.v11.KafkaConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce"; public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe"; public static final String SUBSCRIBE_METHOD = "subscribe";
public static final String SUBSCRIBE_INTERCEPT_FLAG = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener"; public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() { @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] { return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() { new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() { @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_FLAG); return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
} }
@Override public String getConstructorInterceptor() { @Override public String getConstructorInterceptor() {
...@@ -84,7 +85,7 @@ public class KafkaConsumerInstrumentation extends ClassInstanceMethodsEnhancePlu ...@@ -84,7 +85,7 @@ public class KafkaConsumerInstrumentation extends ClassInstanceMethodsEnhancePlu
}, },
new InstanceMethodsInterceptPoint() { new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_FLAG)); return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE));
} }
@Override public String getMethodsInterceptor() { @Override public String getMethodsInterceptor() {
......
...@@ -30,18 +30,17 @@ import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentType ...@@ -30,18 +30,17 @@ import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentType
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/** /**
* {@link KafkaProducerInstrumentation} intercept the method <code>send</code> in the class * {@link KafkaProducerInstrumentation} define that {@link org.apache.skywalking.apm.plugin.kafka.v11.KafkaProducerInterceptor}
* <code>org.apache.kafka.clients.producer.KafkaProducer</code>. Here is the intercept process steps. * intercept the method <code>send</code> in the class <code>org.apache.kafka.clients.producer.KafkaProducer</code>.
* Here is the intercept process steps.
* *
* *
* <pre> * <pre>
* 1. Record the broker address when the client create the <code>org.apache.kafka.clients.producer.KafkaProducer</code> * 1. Record the broker address when the client create the <code>org.apache.kafka.clients.producer.KafkaProducer</code>
* instance * instance
* 2. Fetch the topic name from <code>org.apache.kafka.clients.producer.ProducerRecord</code> when the client call * 2. Create the exit span when the client invoke <code>send</code> method
* <code>send</code> method. * 3. Inject the context to {@link org.apache.kafka.clients.producer.ProducerRecord#headers}
* 3. Create the exit span when the client call <code>send</code> method * 3. Stop the exit span when <code>send</code> method finished.
* 4. Set the <code>Context</code> into the <code>org.apache.kafka.clients.producer.ProducerRecord#headers</code>
* 5. Stop the exit span when end the <code>send</code> method.
* </pre> * </pre>
* *
* @author zhang xin * @author zhang xin
......
...@@ -29,8 +29,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; ...@@ -29,8 +29,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/** /**
* {@link ProducerRecordInstrumentation} intercept the constructor in the class <code>org.apache.kafka.clients.producer.ProducerRecord</code> * {@link ProducerRecordInstrumentation} define that {@link org.apache.skywalking.apm.plugin.kafka.v11.ProducerRecordConstructorInterceptor}
* for record the topic name and propagate the <code>Context</code> of trace. * intercept the constructor in the class <code>org.apache.kafka.clients.producer.ProducerRecord</code> for record the
* topic name and propagate the <code>Context</code> of trace.
* *
* @author zhang xin * @author zhang xin
* @see org.apache.skywalking.apm.plugin.kafka.v11.define.KafkaProducerInstrumentation * @see org.apache.skywalking.apm.plugin.kafka.v11.define.KafkaProducerInstrumentation
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册