diff --git a/.github/workflows/plugins-test.3.yaml b/.github/workflows/plugins-test.3.yaml index 136f90897403d309606cf256855a4c2b1be5c72d..155937177a957ddcb33f90cdead5c74c2aadb7a2 100644 --- a/.github/workflows/plugins-test.3.yaml +++ b/.github/workflows/plugins-test.3.yaml @@ -63,6 +63,7 @@ jobs: - graphql-9.x-scenario - graphql-12.x-scenario - hbase-scenario + - spring-kafka-1.3.x-scenario - spring-kafka-2.2.x-scenario - spring-kafka-2.3.x-scenario - spring-scheduled-scenario diff --git a/CHANGES.md b/CHANGES.md index 91a19a68a074b2bc2c6cf3e3a74456c8028bcbbc..48a1d2cb14d6130523571ebcfa988ab3d210e638 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,6 +21,7 @@ Release Notes. * Add the plugin for mssql-jtds 1.x. * Add the plugin for mssql-jdbc 6.x -> 9.x. * Fix the default ignore mechanism isn't accurate enough bug. +* Add the plugin for spring-kafka 1.3.x. #### OAP-Backend * Add the `@SuperDataset` annotation for BrowserErrorLog. diff --git a/apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml b/apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml index b0ea296acdeb3c9e2827442ed833446473e1ab6a..c7eaf5d3e968392e7a7d812f2be94ccdf530a461 100644 --- a/apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml @@ -38,6 +38,7 @@ mvc-annotation-commons spring-commons mvc-annotation-5.x-plugin + spring-kafka-1.x-plugin spring-kafka-2.x-plugin scheduled-annotation-plugin spring-webflux-5.x-webclient-plugin diff --git a/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..5bedd43323c907150e3b4396f64e9daaeff28e1b --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/pom.xml @@ -0,0 +1,48 @@ + + + + 4.0.0 + + + org.apache.skywalking + spring-plugins + 8.3.0-SNAPSHOT + + + apm-spring-kafka-1.x-plugin + + + 1.3.11.RELEASE + + + + + org.springframework.kafka + spring-kafka + ${spring-kafka.version} + provided + + + org.apache.skywalking + apm-kafka-commons + ${project.version} + provided + + + \ No newline at end of file diff --git a/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/PostRunMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/PostRunMethodInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..256deac8fdb633860127d0d3d35d1f5302820df6 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/PostRunMethodInterceptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.spring.kafka; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.plugin.kafka.define.InterceptorMethod; + +import java.lang.reflect.Method; + +public class PostRunMethodInterceptor implements InstanceMethodsAroundInterceptor { + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, MethodInterceptResult result) throws Throwable { + + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Object ret) throws Throwable { + return InterceptorMethod.afterMethod(ret); + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { + + } +} diff --git a/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/ProcessSeeksMethodInterceptor.java b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/ProcessSeeksMethodInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..8dbf706ecf146420569cf0ca2a60f07da2d69347 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/ProcessSeeksMethodInterceptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.spring.kafka; + +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; +import org.apache.skywalking.apm.plugin.kafka.define.Constants; +import org.apache.skywalking.apm.plugin.kafka.define.InterceptorMethod; + +import java.lang.reflect.Method; + +public class ProcessSeeksMethodInterceptor implements InstanceMethodsAroundInterceptor { + + private static final String OPERATION_NAME = "/spring-kafka" + Constants.KAFKA_POLL_AND_INVOKE_OPERATION_NAME; + + @Override + public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + MethodInterceptResult result) throws Throwable { + InterceptorMethod.afterMethod(null); + } + + @Override + public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, + Object ret) throws Throwable { + InterceptorMethod.beforeMethod(OPERATION_NAME); + return ret; + } + + @Override + public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, + Class[] argumentsTypes, Throwable t) { + InterceptorMethod.handleMethodException(t); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/define/LegacyListenerConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/define/LegacyListenerConsumerInstrumentation.java new file mode 100644 index 0000000000000000000000000000000000000000..9f3a31dfcb92708279dce7880c5aba94e95fc60a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/kafka/define/LegacyListenerConsumerInstrumentation.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.spring.kafka.define; + +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; +import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine; +import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; + +public class LegacyListenerConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { + + @Override + public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { + return new ConstructorInterceptPoint[0]; + } + + @Override + public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { + return new InstanceMethodsInterceptPoint[]{ + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("processSeeks").and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return "org.apache.skywalking.apm.plugin.spring.kafka.ProcessSeeksMethodInterceptor"; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + }, + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + return named("run").and(takesArguments(0)); + } + + @Override + public String getMethodsInterceptor() { + return "org.apache.skywalking.apm.plugin.spring.kafka.PostRunMethodInterceptor"; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + } + }; + } + + @Override + public ClassMatch enhanceClass() { + return byName("org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer"); + } + + @Override + protected String[] witnessClasses() { + // This class is moved to `org.springframework.kafka.listener` package since 2.2.x, + // So it can be used as the witness class for spring-kafka below 2.2, including 2.1.x, 2.0.x and 1.3.x + return new String[]{"org.springframework.kafka.listener.config.ContainerProperties"}; + } + +} diff --git a/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/resources/skywalking-plugin.def new file mode 100644 index 0000000000000000000000000000000000000000..56e1908db5f8ecb361cfa58b082e9baad0a51d2a --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/spring-plugins/spring-kafka-1.x-plugin/src/main/resources/skywalking-plugin.def @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +spring-kafka-1.x=org.apache.skywalking.apm.plugin.spring.kafka.define.LegacyListenerConsumerInstrumentation diff --git a/docs/en/FAQ/kafka-plugin.md b/docs/en/FAQ/kafka-plugin.md index cf46aed60197ae279acd01f3247e85ce805bdd8a..8aff1098d916a137e04fc6369d32109ca1c60319 100644 --- a/docs/en/FAQ/kafka-plugin.md +++ b/docs/en/FAQ/kafka-plugin.md @@ -2,7 +2,7 @@ The trace doesn't continue in kafka consumer side. ### Reason -The kafka client is pulling message from server, the plugin also just traces the pull action. As that, you need to do the manual instrument before the pull action, and include the further data process. +The kafka client is responsible for pulling messages from the brokers, and after that the data will be processed by the user-defined codes. However, only the poll action can be traced by the pluign and the subsequent data processing work is inevitably outside the scope of the trace context. Thus, in order to complete the client-side trace, manual instrument has to be done, i.e. the poll action and the processing action should be wrapped manually. ### Resolve -Use Application Toolkit libraries to do manual instrumentation. such as `@KafkaPollAndInvoke` annotation at `apm-toolkit-kafka` or OpenTracing API, Or if you're using `spring-kafka` 2.2.x or above, you can track the Consumer side without any code change. +With native kafka client, please use Application Toolkit libraries to do the manual instrumentation, with the help of `@KafkaPollAndInvoke` annotation in `apm-toolkit-kafka` or with OpenTracing API. And if you're using `spring-kafka` 1.3.x, 2.2.x or above, you can track the Consumer side without effort. diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md b/docs/en/setup/service-agent/java-agent/Plugin-list.md index a57defd71c671398dba80e3b41d5144c387779a2..5702dcf4e00ec69c80891324ad63f81c7ab3cb11 100644 --- a/docs/en/setup/service-agent/java-agent/Plugin-list.md +++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md @@ -80,6 +80,7 @@ - spring-cloud-gateway-2.1.x - spring-concurrent-util-4.x - spring-core-patch +- spring-kafka-1.x - spring-kafka-2.x - spring-mvc-annotation - spring-mvc-annotation-3.x diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md index 9f5b1a24e4cc510fae7beff71a3d40fc518d7e94..e9a2386aed4ed929bf293935ce4314303830c816 100644 --- a/docs/en/setup/service-agent/java-agent/Supported-list.md +++ b/docs/en/setup/service-agent/java-agent/Supported-list.md @@ -57,7 +57,7 @@ metrics based on the tracing data. * MQ * [RocketMQ](https://github.com/apache/rocketmq) 4.x * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0 - * [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 2.2.x + * [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka)) * [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4 * [RabbitMQ](https://www.rabbitmq.com/) 5.x * [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/bin/startup.sh b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/bin/startup.sh new file mode 100644 index 0000000000000000000000000000000000000000..ddc79ad29c9176843d1e53bfc7174afccf0e064a --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/bin/startup.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +home="$(cd "$(dirname $0)"; pwd)" + +java -Dbootstrap.servers=${BOOTSTRAP_SERVERS} -jar ${agent_opts} "-Dskywalking.agent.service_name=spring-kafka-1.3.x-scenario" ${home}/../libs/spring-kafka-1.3.x-scenario.jar & \ No newline at end of file diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/config/expectedData.yaml b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/config/expectedData.yaml new file mode 100644 index 0000000000000000000000000000000000000000..9ec9618cdaa798c3c7099a0951049720e514ea37 --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/config/expectedData.yaml @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +segmentItems: + - serviceName: spring-kafka-1.3.x-scenario + segmentSize: nq 0 + segments: + - segmentId: not null + spans: + - operationName: Kafka/spring_test/Producer + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: not null + endTime: not null + componentId: 40 + isError: false + spanType: Exit + peer: kafka-server:9092 + skipAnalysis: false + tags: + - {key: mq.broker, value: 'kafka-server:9092'} + - {key: mq.topic, value: spring_test} + - operationName: /case/spring-kafka-case + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: not null + endTime: not null + componentId: 14 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-case'} + - {key: http.method, value: GET} + - segmentId: not null + spans: + - operationName: /case/spring-kafka-consumer-ping + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: not null + endTime: not null + componentId: 14 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: url, value: 'http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping'} + - {key: http.method, value: GET} + refs: + - {parentEndpoint: 'Kafka/spring_test/Consumer/grop:spring_test', networkAddress: 'localhost:8080', + refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: spring-kafka-1.3.x-scenario, + traceId: not null} + - segmentId: not null + spans: + - operationName: /spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: not null + endTime: not null + componentId: 12 + isError: false + spanType: Exit + peer: localhost:8080 + skipAnalysis: false + tags: + - {key: http.method, value: GET} + - {key: url, value: 'http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping'} + - operationName: Kafka/spring_test/Consumer/grop:spring_test + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + startTime: not null + endTime: not null + componentId: 41 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - {key: mq.broker, value: 'kafka-server:9092'} + - {key: mq.topic, value: spring_test} + - {key: transmission.latency, value: not null} + refs: + - {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092', + refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: spring-kafka-1.3.x-scenario, + traceId: not null} diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/configuration.yml b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/configuration.yml new file mode 100644 index 0000000000000000000000000000000000000000..f60884707823a42f02401aedd77bbd6ee5a915ad --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/configuration.yml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +type: jvm +entryService: http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-case +healthCheck: http://localhost:8080/spring-kafka-1.3.x-scenario/case/healthCheck +startScript: ./bin/startup.sh +environment: + - BOOTSTRAP_SERVERS=kafka-server:9092 +depends_on: + - zookeeper-server + - kafka-server +dependencies: + zookeeper-server: + image: zookeeper:3.4 + hostname: zookeeper-server + kafka-server: + image: bitnami/kafka:2.1.1 + hostname: kafka-server + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 + - KAFKA_BROKER_ID=1 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 + depends_on: + - zookeeper-server diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/pom.xml b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..61f0ed084750c584a7c5b4c33e6b0c26b0f90bb5 --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/pom.xml @@ -0,0 +1,134 @@ + + + + 4.0.0 + + org.apache.skywalking + spring-kafka-1.3.x-scenario + 5.0.0 + + + UTF-8 + 1.8 + 1.3.11.RELEASE + 2.6.2 + 1.5.22.RELEASE + 0.11.0.3 + 3.0.0 + + + skywalking-spring-kafka-1.3.x-scenario + + + + org.springframework.boot + spring-boot-starter-web + ${spring-boot-version} + + + org.springframework.boot + spring-boot-starter-jdbc + ${spring-boot-version} + + + org.springframework.kafka + spring-kafka + ${test.framework.version} + + + org.slf4j + * + + + + + org.apache.kafka + kafka-clients + ${kafka-version} + + + slf4j-api + * + + + + + com.squareup.okhttp3 + okhttp + ${okhttp-version} + + + + + spring-kafka-1.3.x-scenario + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + maven-compiler-plugin + + ${compiler.version} + ${compiler.version} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-assembly-plugin + + + assemble + package + + single + + + + src/main/assembly/assembly.xml + + ./target/ + + + + + + + + + + spring-snapshots + http://repo.spring.io/snapshot + + + spring-milestones + http://repo.spring.io/milestone + + + \ No newline at end of file diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/assembly/assembly.xml b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/assembly/assembly.xml new file mode 100644 index 0000000000000000000000000000000000000000..3987c1a9ac71029796f7f0dc731602ea3acd496c --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/assembly/assembly.xml @@ -0,0 +1,41 @@ + + + + + zip + + + + + ./bin + 0775 + + + + + + ${project.build.directory}/spring-kafka-1.3.x-scenario.jar + ./libs + 0775 + + + diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/Application.java b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/Application.java new file mode 100644 index 0000000000000000000000000000000000000000..b35127e7c2341a9fbfb65d0e4a74f76584f1dd9b --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/Application.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test.org.apache.skywalking.apm.testcase.spring.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; + +@SpringBootApplication(exclude={DataSourceAutoConfiguration.class}) +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java new file mode 100644 index 0000000000000000000000000000000000000000..d2569e7a9c68f70d19ee6564c3936e03a1090d07 --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test.org.apache.skywalking.apm.testcase.spring.kafka.controller; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.PropertySource; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AcknowledgingMessageListener; +import org.springframework.kafka.listener.config.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; + +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +@Controller +@RequestMapping("/case") +@PropertySource("classpath:application.properties") +public class CaseController { + + private static final String SUCCESS = "Success"; + + @Value("${bootstrap.servers:127.0.0.1:9092}") + private String bootstrapServers; + private String topicName; + private KafkaTemplate kafkaTemplate; + + private CountDownLatch latch = new CountDownLatch(1); + private String helloWorld = "helloWorld"; + + @PostConstruct + private void setUp() { + topicName = "spring_test"; + setUpProvider(); + setUpConsumer(); + } + + private void setUpProvider() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + kafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<>(props)); + try { + kafkaTemplate.send(topicName, "key", "ping").get(); + kafkaTemplate.flush(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void setUpConsumer() { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, "grop:" + topicName); + configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + Deserializer stringDeserializer = new StringDeserializer(); + DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(configs, stringDeserializer, stringDeserializer); + ContainerProperties props = new ContainerProperties(topicName); + props.setMessageListener(new AcknowledgingMessageListener() { + @Override + public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { + if (data.value().equals(helloWorld)) { + OkHttpClient client = new OkHttpClient.Builder().build(); + Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping").build(); + Response response = null; + try { + response = client.newCall(request).execute(); + } catch (IOException e) { + } + response.body().close(); + acknowledgment.acknowledge(); + latch.countDown(); + } + } + }); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(factory, props); + container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); + container.start(); + } + + @RequestMapping("/spring-kafka-case") + @ResponseBody + public String springKafkaCase() throws Exception { + kafkaTemplate.send(topicName, "key", helloWorld).get(); + latch.await(); + kafkaTemplate.flush(); + return SUCCESS; + } + + @RequestMapping("/spring-kafka-consumer-ping") + @ResponseBody + public String springKafkaConsumerPing() { + return SUCCESS; + } + + @RequestMapping("/healthCheck") + @ResponseBody + public String healthCheck() { + return SUCCESS; + } +} + diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/resources/application.properties b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/resources/application.properties new file mode 100644 index 0000000000000000000000000000000000000000..fe256d643f71a45e7451b2427bf6a27554da12be --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/resources/application.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +server.port=8080 +server.context-path=/spring-kafka-1.3.x-scenario \ No newline at end of file diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/resources/log4j2.xml b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/resources/log4j2.xml new file mode 100644 index 0000000000000000000000000000000000000000..b5cda5ae8ad528f3272b269b1c9f0d733c52ef5f --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/src/main/resources/log4j2.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + diff --git a/test/plugin/scenarios/spring-kafka-1.3.x-scenario/support-version.list b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/support-version.list new file mode 100644 index 0000000000000000000000000000000000000000..5d6463f9b36a05749fcef993b8e9d4538b050ca3 --- /dev/null +++ b/test/plugin/scenarios/spring-kafka-1.3.x-scenario/support-version.list @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +1.3.9.RELEASE +1.3.10.RELEASE +1.3.11.RELEASE \ No newline at end of file