未验证 提交 2122990c 编写于 作者: M mlasek 提交者: GitHub

Resolves #6751 - added kafka consumer assign method interception to p… (#6753)

上级 527d7725
......@@ -13,6 +13,7 @@ Release Notes.
* Add an agent plugin to support elasticsearch7.
* Add `jsonrpc4j` agent plugin.
* Add Seata in the component definition. Seata plugin hosts on Seata project.
* Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned
#### OAP-Backend
* BugFix: filter invalid Envoy access logs whose socket address is empty.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.common.TopicPartition;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
public class AssignMethodInterceptor implements InstanceMethodsAroundInterceptor {
@SuppressWarnings("unchecked")
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
Set<String> topics = new LinkedHashSet<>();
((Collection<TopicPartition>) allArguments[0]).forEach(topicPartition -> topics.add(topicPartition.topic()));
requiredInfo.setTopics(topics);
objInst.setSkyWalkingDynamicField(requiredInfo);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}
\ No newline at end of file
......@@ -50,6 +50,9 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String SUBSCRIBE_INTERCEPT_TYPE_PATTERN = "java.util.regex.Pattern";
public static final String SUBSCRIBE_INTERCEPT_TYPE_NAME = "java.util.Collection";
public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
public static final String ASSIGN_METHOD = "assign";
public static final String ASSIGN_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.AssignMethodInterceptor";
public static final String ASSIGN_INTERCEPT_TYPE_NAME = "java.util.Collection";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
......@@ -120,7 +123,24 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public boolean isOverrideArgs() {
return false;
}
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ASSIGN_METHOD)
.and(takesArgumentWithType(0, ASSIGN_INTERCEPT_TYPE_NAME));
}
@Override
public String getMethodsInterceptor() {
return ASSIGN_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.common.TopicPartition;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Collection;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@RunWith(MockitoJUnitRunner.class)
public class AssignMethodInterceptorTest {
@Mock
private AssignMethodInterceptor interceptor;
private Collection<TopicPartition> partitions = new ArrayList<>();
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
@Override
public Object getSkyWalkingDynamicField() {
return consumerEnhanceRequiredInfo;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) value;
}
};
@Before
public void setup() {
partitions.add(new TopicPartition("test", 0));
partitions.add(new TopicPartition("test-1", 1));
interceptor = new AssignMethodInterceptor();
}
@Test
public void testOnConsumer() {
interceptor.beforeMethod(enhancedInstance, null, new Object[] {partitions}, new Class[] {Collection.class}, null);
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(requiredInfo.getTopics(), is("test;test-1"));
}
}
\ No newline at end of file
......@@ -57,6 +57,26 @@ segmentItems:
parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: 'kafka-scenario', traceId: not null}
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: Kafka/Producer/Callback
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 40
isError: false
spanType: Local
peer: ''
tags:
- { key: mq.topic, value: assign }
refs:
- { parentEndpoint: /case/kafka-case, networkAddress: '', refType: CrossThread,
parentSpanId: 4, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: 'kafka-scenario', traceId: not null }
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: Kafka/test/Producer
......@@ -104,6 +124,21 @@ segmentItems:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: test2}
skipAnalysis: 'false'
- operationName: Kafka/assign/Producer
operationId: 0
parentSpanId: 0
spanId: 4
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 40
isError: false
spanType: Exit
peer: kafka-server:9092
tags:
- { key: mq.broker, value: 'kafka-server:9092' }
- { key: mq.topic, value: assign }
skipAnalysis: 'false'
- operationName: /case/kafka-case
operationId: 0
parentSpanId: -1
......@@ -203,4 +238,26 @@ segmentItems:
- {parentEndpoint: not null, networkAddress: 'localhost:8080',
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: kafka-scenario,
traceId: not null}
\ No newline at end of file
traceId: not null}
- segmentId: not null
spans:
- operationName: Kafka/assign/Consumer/testGroup3
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 41
isError: false
spanType: Entry
peer: ''
tags:
- { key: mq.broker, value: 'kafka-server:9092' }
- { key: mq.topic, value: assign }
- { key: transmission.latency, value: not null }
refs:
- { parentEndpoint: /case/kafka-case, networkAddress: 'kafka-server:9092', refType: CrossProcess,
parentSpanId: 4, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: 'kafka-scenario', traceId: not null }
skipAnalysis: 'false'
\ No newline at end of file
......@@ -25,6 +25,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.List;
import java.util.ArrayList;
import javax.annotation.PostConstruct;
import okhttp3.OkHttpClient;
......@@ -39,6 +41,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.skywalking.apm.toolkit.kafka.KafkaPollAndInvoke;
......@@ -65,6 +69,7 @@ public class CaseController {
private String topicName;
private String topicName2;
private Pattern topicPattern;
private String topicNameForAssign;
private static volatile boolean KAFKA_STATUS = false;
......@@ -72,6 +77,7 @@ public class CaseController {
private void setUp() {
topicName = "test";
topicName2 = "test2";
topicNameForAssign = "assign";
topicPattern = Pattern.compile("test.");
new CheckKafkaProducerThread(bootstrapServers).start();
}
......@@ -110,6 +116,14 @@ public class CaseController {
}
}
}.start();
ProducerRecord<String, String> recordForAssign = new ProducerRecord<String, String>(topicNameForAssign, "testKey", Integer.toString(1));
recordForAssign.headers().add("TEST", "TEST".getBytes());
producer.send(recordForAssign, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
LOGGER.info("send success metadata={}", metadata);
}
});
}, bootstrapServers);
Thread thread = new ConsumerThread();
......@@ -117,9 +131,14 @@ public class CaseController {
Thread thread2 = new ConsumerThread2();
thread2.start();
Thread thread3 = new ConsumerAssignThread();
thread3.start();
try {
thread.join();
thread2.join();
thread3.join();
} catch (InterruptedException e) {
// ignore
}
......@@ -281,5 +300,53 @@ public class CaseController {
return false;
}
}
public class ConsumerAssignThread extends Thread {
@Override
public void run() {
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", bootstrapServers);
consumerProperties.put("group.id", "testGroup3");
consumerProperties.put("enable.auto.commit", "true");
consumerProperties.put("auto.commit.interval.ms", "1000");
consumerProperties.put("auto.offset.reset", "earliest");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
List<TopicPartition> assignTopics = new ArrayList<>();
assignTopics(consumer.partitionsFor(topicNameForAssign), assignTopics);
consumer.assign(assignTopics);
int i = 0;
while (i++ <= 10) {
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
}
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
LOGGER.info("header: {}", new String(record.headers()
.headers("TEST")
.iterator()
.next()
.value()));
LOGGER.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
}
break;
}
}
consumer.close();
}
}
private void assignTopics(List<PartitionInfo> topicInfo, List<TopicPartition> assignTopics) {
for (PartitionInfo pif : topicInfo) {
TopicPartition tp = new TopicPartition(pif.topic(), pif.partition());
assignTopics.add(tp);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册