未验证 提交 e2b04b58 编写于 作者: D denis 提交者: GitHub

fix Kafka consumer subscribe topics from pattern (#4873)

上级 5038f070
...@@ -20,24 +20,32 @@ package org.apache.skywalking.apm.plugin.kafka; ...@@ -20,24 +20,32 @@ package org.apache.skywalking.apm.plugin.kafka;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.regex.Pattern;
import org.apache.skywalking.apm.agent.core.context.ContextManager; import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor { public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor {
@SuppressWarnings("unchecked")
@Override @Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable { MethodInterceptResult result) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField(); ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
if (argumentsTypes[0] == Pattern.class) {
requiredInfo.setTopics(Collections.singletonList(((Pattern) allArguments[0]).pattern()));
} else {
requiredInfo.setTopics((Collection<String>) allArguments[0]); requiredInfo.setTopics((Collection<String>) allArguments[0]);
}
objInst.setSkyWalkingDynamicField(requiredInfo); objInst.setSkyWalkingDynamicField(requiredInfo);
} }
@Override @Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable { Object ret) {
return ret; return ret;
} }
......
...@@ -47,7 +47,8 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation { ...@@ -47,7 +47,8 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches"; public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
public static final String SUBSCRIBE_METHOD = "subscribe"; public static final String SUBSCRIBE_METHOD = "subscribe";
public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener"; public static final String SUBSCRIBE_INTERCEPT_TYPE_PATTERN = "java.util.regex.Pattern";
public static final String SUBSCRIBE_INTERCEPT_TYPE_NAME = "java.util.Collection";
public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor"; public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";
@Override @Override
...@@ -89,7 +90,25 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation { ...@@ -89,7 +90,25 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
new InstanceMethodsInterceptPoint() { new InstanceMethodsInterceptPoint() {
@Override @Override
public ElementMatcher<MethodDescription> getMethodsMatcher() { public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE)); return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_NAME));
}
@Override
public String getMethodsInterceptor() {
return SUBSCRIBE_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(SUBSCRIBE_METHOD)
.and(takesArgumentWithType(0, SUBSCRIBE_INTERCEPT_TYPE_PATTERN));
} }
@Override @Override
......
...@@ -28,6 +28,7 @@ import org.mockito.runners.MockitoJUnitRunner; ...@@ -28,6 +28,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.regex.Pattern;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
...@@ -38,7 +39,9 @@ public class SubscribeMethodInterceptorTest { ...@@ -38,7 +39,9 @@ public class SubscribeMethodInterceptorTest {
@Mock @Mock
private SubscribeMethodInterceptor constructorInterceptor; private SubscribeMethodInterceptor constructorInterceptor;
private List<String> mockTopics = new ArrayList<String>(); private List<String> mockTopics = new ArrayList<>();
private Pattern mockTopicPattern = Pattern.compile("test-.*");
private EnhancedInstance enhancedInstance = new EnhancedInstance() { private EnhancedInstance enhancedInstance = new EnhancedInstance() {
ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo(); ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = new ConsumerEnhanceRequiredInfo();
...@@ -62,9 +65,16 @@ public class SubscribeMethodInterceptorTest { ...@@ -62,9 +65,16 @@ public class SubscribeMethodInterceptorTest {
} }
@Test @Test
public void testOnConsumer() throws Throwable { public void testOnConsumer() {
constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopics}, new Class[] {Collection.class}, null); constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopics}, new Class[] {Collection.class}, null);
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField(); ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(requiredInfo.getTopics(), is("test;test-1")); assertThat(requiredInfo.getTopics(), is("test;test-1"));
} }
@Test
public void testSubscribeForPattern() {
constructorInterceptor.beforeMethod(enhancedInstance, null, new Object[] {mockTopicPattern}, new Class[] {Pattern.class}, null);
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
assertThat(requiredInfo.getTopics(), is("test-.*"));
}
} }
\ No newline at end of file
...@@ -125,3 +125,24 @@ segmentItems: ...@@ -125,3 +125,24 @@ segmentItems:
parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: 'kafka-scenario', traceId: not null} null, parentService: 'kafka-scenario', traceId: not null}
skipAnalysis: 'false' skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: Kafka/test./Consumer/testGroup2
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 41
isError: false
spanType: Entry
peer: ''
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: test.}
refs:
- {parentEndpoint: /case/kafka-case, networkAddress: 'kafka-server:9092', refType: CrossProcess,
parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: 'kafka-scenario', traceId: not null}
skipAnalysis: 'false'
...@@ -21,10 +21,12 @@ package test.org.apache.skywalking.apm.testcase.kafka.controller; ...@@ -21,10 +21,12 @@ package test.org.apache.skywalking.apm.testcase.kafka.controller;
import java.util.Arrays; import java.util.Arrays;
import java.util.Properties; import java.util.Properties;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
...@@ -54,6 +56,7 @@ public class CaseController { ...@@ -54,6 +56,7 @@ public class CaseController {
private String topicName; private String topicName;
private String topicName2; private String topicName2;
private Pattern topicPattern;
private static volatile boolean KAFKA_STATUS = false; private static volatile boolean KAFKA_STATUS = false;
...@@ -61,6 +64,7 @@ public class CaseController { ...@@ -61,6 +64,7 @@ public class CaseController {
private void setUp() { private void setUp() {
topicName = "test"; topicName = "test";
topicName2 = "test2"; topicName2 = "test2";
topicPattern = Pattern.compile("test.");
new CheckKafkaProducerThread(bootstrapServers).start(); new CheckKafkaProducerThread(bootstrapServers).start();
} }
...@@ -84,10 +88,15 @@ public class CaseController { ...@@ -84,10 +88,15 @@ public class CaseController {
}; };
producer.send(record2, callback2); producer.send(record2, callback2);
}, bootstrapServers); }, bootstrapServers);
Thread thread = new ConsumerThread(); Thread thread = new ConsumerThread();
thread.start(); thread.start();
Thread thread2 = new ConsumerThread2();
thread2.start();
try { try {
thread.join(); thread.join();
thread2.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} }
...@@ -200,5 +209,44 @@ public class CaseController { ...@@ -200,5 +209,44 @@ public class CaseController {
consumer.close(); consumer.close();
} }
} }
public class ConsumerThread2 extends Thread {
@Override
public void run() {
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", bootstrapServers);
consumerProperties.put("group.id", "testGroup2");
consumerProperties.put("enable.auto.commit", "true");
consumerProperties.put("auto.commit.interval.ms", "1000");
consumerProperties.put("auto.offset.reset", "earliest");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());
int i = 0;
while (i++ <= 10) {
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
}
ConsumerRecords<String, String> records = consumer.poll(100);
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
logger.info("header: {}", new String(record.headers()
.headers("TEST")
.iterator()
.next()
.value()));
logger.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
}
break;
}
}
consumer.close();
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册