未验证 提交 c65bcf11 编写于 作者: L Lu Jiajing 提交者: GitHub

Check bootstrapServers type before casting for kafka-clients >= 2.1 (#5510)

上级 b51dba5c
......@@ -22,16 +22,24 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedI
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.util.StringUtil;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Map<String, Object> config = (Map<String, Object>) allArguments[0];
// prevent errors caused by secondary interception in kafkaTemplate
if (objInst.getSkyWalkingDynamicField() == null) {
objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
Object bootstrapServers = config.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
objInst.setSkyWalkingDynamicField(String.join(";", (List<String>) bootstrapServers));
} else {
objInst.setSkyWalkingDynamicField(StringUtil.join(';', COMMA_WITH_WHITESPACE.split((String) bootstrapServers, -1)));
}
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@RunWith(MockitoJUnitRunner.class)
public class ProducerConstructorMapInterceptorTest {
private static Map<String, Object> PRODUCER_CONFIG_WITH_LIST_BOOTSTRAP_SERVERS;
private static Map<String, Object> PRODUCER_CONFIG_WITH_STRING_BOOTSTRAP_SERVERS;
@Mock
private ProducerConstructorMapInterceptor constructorMapInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private String brokerServers;
@Override
public Object getSkyWalkingDynamicField() {
return brokerServers;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
brokerServers = (String) value;
}
};
@BeforeClass
public static void setup() {
List<String> mockBootstrapServers = new ArrayList<String>();
mockBootstrapServers.add("localhost:9092");
mockBootstrapServers.add("localhost:19092");
PRODUCER_CONFIG_WITH_LIST_BOOTSTRAP_SERVERS = new HashMap<String, Object>() {{
put("bootstrap.servers", mockBootstrapServers);
}};
PRODUCER_CONFIG_WITH_STRING_BOOTSTRAP_SERVERS = new HashMap<String, Object>() {{
// deliberately add whitespaces
put("bootstrap.servers", String.join(" , ", mockBootstrapServers));
}};
}
@Before
public void setUpForEach() {
constructorMapInterceptor = new ProducerConstructorMapInterceptor();
}
@Test
public void givenListTypeBootstrapServers_whenConstructProducer_thenServersSaves() {
constructorMapInterceptor.onConstruct(enhancedInstance, new Object[]{PRODUCER_CONFIG_WITH_LIST_BOOTSTRAP_SERVERS});
assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("localhost:9092;localhost:19092"));
}
@Test
public void givenStringTypeBootstrapServers_whenConstructProducer_thenServersSaves() {
constructorMapInterceptor.onConstruct(enhancedInstance, new Object[]{PRODUCER_CONFIG_WITH_STRING_BOOTSTRAP_SERVERS});
assertThat(enhancedInstance.getSkyWalkingDynamicField().toString(), is("localhost:9092;localhost:19092"));
}
}
\ No newline at end of file
......@@ -34,6 +34,21 @@ segmentItems:
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- operationName: Kafka/spring_test/Producer
operationId: 0
parentSpanId: 0
spanId: 2
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 40
isError: false
spanType: Exit
peer: kafka-server:9092
skipAnalysis: false
tags:
- { key: mq.broker, value: 'kafka-server:9092' }
- { key: mq.topic, value: spring_test }
- operationName: /case/spring-kafka-case
operationId: 0
parentSpanId: -1
......@@ -71,6 +86,28 @@ segmentItems:
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.3.x-scenario,
traceId: not null}
- segmentId: not null
spans:
- operationName: /case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping'}
- {key: http.method, value: GET}
refs:
- {parentEndpoint: 'Kafka/spring_test/Consumer/grop:spring_test', networkAddress: 'localhost:8080',
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.3.x-scenario,
traceId: not null}
- segmentId: not null
spans:
- operationName: /spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping
......@@ -108,3 +145,40 @@ segmentItems:
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.3.x-scenario,
traceId: not null}
- segmentId: not null
spans:
- operationName: /spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: not null
endTime: not null
componentId: 12
isError: false
spanType: Exit
peer: localhost:8080
skipAnalysis: false
tags:
- {key: http.method, value: GET}
- {key: url, value: 'http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping'}
- operationName: Kafka/spring_test/Consumer/grop:spring_test
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 41
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.3.x-scenario,
traceId: not null}
\ No newline at end of file
......@@ -41,6 +41,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
......@@ -57,14 +58,16 @@ public class CaseController {
private String bootstrapServers;
private String topicName;
private KafkaTemplate<String, String> kafkaTemplate;
private KafkaTemplate<String, String> kafkaTemplate2;
private CountDownLatch latch = new CountDownLatch(1);
private CountDownLatch latch;
private String helloWorld = "helloWorld";
@PostConstruct
private void setUp() {
topicName = "spring_test";
setUpProvider();
setUpAnotherProvider();
setUpConsumer();
}
......@@ -82,6 +85,21 @@ public class CaseController {
}
}
private void setUpAnotherProvider() {
Map<String, Object> props = new HashMap<>();
// use list type here
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(bootstrapServers.split(",")));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate2 = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
try {
kafkaTemplate2.send(topicName, "key", "ping").get();
kafkaTemplate2.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
private void setUpConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
......@@ -115,9 +133,14 @@ public class CaseController {
@RequestMapping("/spring-kafka-case")
@ResponseBody
public String springKafkaCase() throws Exception {
this.latch = new CountDownLatch(1);
kafkaTemplate.send(topicName, "key", helloWorld).get();
latch.await();
this.latch.await();
kafkaTemplate.flush();
this.latch = new CountDownLatch(1);
kafkaTemplate2.send(topicName, "key", helloWorld).get();
this.latch.await();
kafkaTemplate2.flush();
return SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册