未验证 提交 d9623ec4 编写于 作者: L Lu Jiajing 提交者: GitHub

Spring-Kafka 1.x support (#5879)

* support spring-kafka 1.3.x and add test

* add spring-kafka-1.3.x-scenario to github workflow

* change kafka-plugin FAQ

* add supported versions

* update CHANGES.md
上级 8d164d3d
......@@ -63,6 +63,7 @@ jobs:
- graphql-9.x-scenario
- graphql-12.x-scenario
- hbase-scenario
- spring-kafka-1.3.x-scenario
- spring-kafka-2.2.x-scenario
- spring-kafka-2.3.x-scenario
- spring-scheduled-scenario
......
......@@ -21,6 +21,7 @@ Release Notes.
* Add the plugin for mssql-jtds 1.x.
* Add the plugin for mssql-jdbc 6.x -> 9.x.
* Fix the default ignore mechanism isn't accurate enough bug.
* Add the plugin for spring-kafka 1.3.x.
#### OAP-Backend
* Add the `@SuperDataset` annotation for BrowserErrorLog.
......
......@@ -38,6 +38,7 @@
<module>mvc-annotation-commons</module>
<module>spring-commons</module>
<module>mvc-annotation-5.x-plugin</module>
<module>spring-kafka-1.x-plugin</module>
<module>spring-kafka-2.x-plugin</module>
<module>scheduled-annotation-plugin</module>
<module>spring-webflux-5.x-webclient-plugin</module>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-plugins</artifactId>
<version>8.3.0-SNAPSHOT</version>
</parent>
<artifactId>apm-spring-kafka-1.x-plugin</artifactId>
<properties>
<spring-kafka.version>1.3.11.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-kafka-commons</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.kafka.define.InterceptorMethod;
import java.lang.reflect.Method;
public class PostRunMethodInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return InterceptorMethod.afterMethod(ret);
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.kafka.define.Constants;
import org.apache.skywalking.apm.plugin.kafka.define.InterceptorMethod;
import java.lang.reflect.Method;
public class ProcessSeeksMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final String OPERATION_NAME = "/spring-kafka" + Constants.KAFKA_POLL_AND_INVOKE_OPERATION_NAME;
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
InterceptorMethod.afterMethod(null);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
InterceptorMethod.beforeMethod(OPERATION_NAME);
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
InterceptorMethod.handleMethodException(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class LegacyListenerConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("processSeeks").and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.kafka.ProcessSeeksMethodInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("run").and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.kafka.PostRunMethodInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public ClassMatch enhanceClass() {
return byName("org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer");
}
@Override
protected String[] witnessClasses() {
// This class is moved to `org.springframework.kafka.listener` package since 2.2.x,
// So it can be used as the witness class for spring-kafka below 2.2, including 2.1.x, 2.0.x and 1.3.x
return new String[]{"org.springframework.kafka.listener.config.ContainerProperties"};
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
spring-kafka-1.x=org.apache.skywalking.apm.plugin.spring.kafka.define.LegacyListenerConsumerInstrumentation
......@@ -2,7 +2,7 @@
The trace doesn't continue in kafka consumer side.
### Reason
The kafka client is pulling message from server, the plugin also just traces the pull action. As that, you need to do the manual instrument before the pull action, and include the further data process.
The kafka client is responsible for pulling messages from the brokers, and after that the data will be processed by the user-defined codes. However, only the poll action can be traced by the pluign and the subsequent data processing work is inevitably outside the scope of the trace context. Thus, in order to complete the client-side trace, manual instrument has to be done, i.e. the poll action and the processing action should be wrapped manually.
### Resolve
Use Application Toolkit libraries to do manual instrumentation. such as `@KafkaPollAndInvoke` annotation at `apm-toolkit-kafka` or OpenTracing API, Or if you're using `spring-kafka` 2.2.x or above, you can track the Consumer side without any code change.
With native kafka client, please use Application Toolkit libraries to do the manual instrumentation, with the help of `@KafkaPollAndInvoke` annotation in `apm-toolkit-kafka` or with OpenTracing API. And if you're using `spring-kafka` 1.3.x, 2.2.x or above, you can track the Consumer side without effort.
......@@ -80,6 +80,7 @@
- spring-cloud-gateway-2.1.x
- spring-concurrent-util-4.x
- spring-core-patch
- spring-kafka-1.x
- spring-kafka-2.x
- spring-mvc-annotation
- spring-mvc-annotation-3.x
......
......@@ -57,7 +57,7 @@ metrics based on the tracing data.
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 2.2.x
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
* [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x
......
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
home="$(cd "$(dirname $0)"; pwd)"
java -Dbootstrap.servers=${BOOTSTRAP_SERVERS} -jar ${agent_opts} "-Dskywalking.agent.service_name=spring-kafka-1.3.x-scenario" ${home}/../libs/spring-kafka-1.3.x-scenario.jar &
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
segmentItems:
- serviceName: spring-kafka-1.3.x-scenario
segmentSize: nq 0
segments:
- segmentId: not null
spans:
- operationName: Kafka/spring_test/Producer
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 40
isError: false
spanType: Exit
peer: kafka-server:9092
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- operationName: /case/spring-kafka-case
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-case'}
- {key: http.method, value: GET}
- segmentId: not null
spans:
- operationName: /case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping'}
- {key: http.method, value: GET}
refs:
- {parentEndpoint: 'Kafka/spring_test/Consumer/grop:spring_test', networkAddress: 'localhost:8080',
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-1.3.x-scenario,
traceId: not null}
- segmentId: not null
spans:
- operationName: /spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: not null
endTime: not null
componentId: 12
isError: false
spanType: Exit
peer: localhost:8080
skipAnalysis: false
tags:
- {key: http.method, value: GET}
- {key: url, value: 'http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping'}
- operationName: Kafka/spring_test/Consumer/grop:spring_test
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 41
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-1.3.x-scenario,
traceId: not null}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
type: jvm
entryService: http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-case
healthCheck: http://localhost:8080/spring-kafka-1.3.x-scenario/case/healthCheck
startScript: ./bin/startup.sh
environment:
- BOOTSTRAP_SERVERS=kafka-server:9092
depends_on:
- zookeeper-server
- kafka-server
dependencies:
zookeeper-server:
image: zookeeper:3.4
hostname: zookeeper-server
kafka-server:
image: bitnami/kafka:2.1.1
hostname: kafka-server
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
depends_on:
- zookeeper-server
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-kafka-1.3.x-scenario</artifactId>
<version>5.0.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<test.framework.version>1.3.11.RELEASE</test.framework.version>
<log4j.version>2.6.2</log4j.version>
<spring-boot-version>1.5.22.RELEASE</spring-boot-version>
<kafka-version>0.11.0.3</kafka-version>
<okhttp-version>3.0.0</okhttp-version>
</properties>
<name>skywalking-spring-kafka-1.3.x-scenario</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>${spring-boot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${test.framework.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp-version}</version>
</dependency>
</dependencies>
<build>
<finalName>spring-kafka-1.3.x-scenario</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${compiler.version}</source>
<target>${compiler.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
<outputDirectory>./target/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>http://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>http://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<formats>
<format>zip</format>
</formats>
<fileSets>
<fileSet>
<directory>./bin</directory>
<fileMode>0775</fileMode>
</fileSet>
</fileSets>
<files>
<file>
<source>${project.build.directory}/spring-kafka-1.3.x-scenario.jar</source>
<outputDirectory>./libs</outputDirectory>
<fileMode>0775</fileMode>
</file>
</files>
</assembly>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test.org.apache.skywalking.apm.testcase.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Controller
@RequestMapping("/case")
@PropertySource("classpath:application.properties")
public class CaseController {
private static final String SUCCESS = "Success";
@Value("${bootstrap.servers:127.0.0.1:9092}")
private String bootstrapServers;
private String topicName;
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
private String helloWorld = "helloWorld";
@PostConstruct
private void setUp() {
topicName = "spring_test";
setUpProvider();
setUpConsumer();
}
private void setUpProvider() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
try {
kafkaTemplate.send(topicName, "key", "ping").get();
kafkaTemplate.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
private void setUpConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "grop:" + topicName);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Deserializer<String> stringDeserializer = new StringDeserializer();
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory(configs, stringDeserializer, stringDeserializer);
ContainerProperties props = new ContainerProperties(topicName);
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
if (data.value().equals(helloWorld)) {
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-1.3.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
}
response.body().close();
acknowledgment.acknowledge();
latch.countDown();
}
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
container.start();
}
@RequestMapping("/spring-kafka-case")
@ResponseBody
public String springKafkaCase() throws Exception {
kafkaTemplate.send(topicName, "key", helloWorld).get();
latch.await();
kafkaTemplate.flush();
return SUCCESS;
}
@RequestMapping("/spring-kafka-consumer-ping")
@ResponseBody
public String springKafkaConsumerPing() {
return SUCCESS;
}
@RequestMapping("/healthCheck")
@ResponseBody
public String healthCheck() {
return SUCCESS;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
server.port=8080
server.context-path=/spring-kafka-1.3.x-scenario
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="WARN">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
1.3.9.RELEASE
1.3.10.RELEASE
1.3.11.RELEASE
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册