未验证 提交 e37fc2e4 编写于 作者: 于玉桔 提交者: GitHub

Support spring-kafka (#5254)

上级 2df3c683
......@@ -53,6 +53,8 @@ jobs:
- { name: 'graphql-9.x-scenario', title: 'graphql-9.x 9.0-11.0 (3)' }
- { name: 'graphql-12.x-scenario', title: 'graphql-12.x 12.0-15.0 (4)' }
- { name: 'hbase-scenario', title: 'hbase-scenario (5)' }
- { name: 'spring-kafka-2.2.x-scenario', title: 'Spring-Kafka 2.2.x (7)' }
- { name: 'spring-kafka-2.3.x-scenario', title: 'Spring-Kafka 2.3.x (7)' }
steps:
- uses: actions/checkout@v2
with:
......
......@@ -170,4 +170,6 @@ public class ComponentsDefine {
public static final OfficialComponent SPRING_ANNOTATION = new OfficialComponent(93, "spring-annotation");
public static final OfficialComponent HBASE = new OfficialComponent(94, "HBase");
public static final OfficialComponent SPRING_KAFKA_CONSUMER = new OfficialComponent(95, "spring-kafka-consumer");
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-sdk-plugin</artifactId>
<version>8.2.0-SNAPSHOT</version>
</parent>
<artifactId>apm-kafka-commons</artifactId>
<name>kafka-commons</name>
<properties>
<kafka-clients.version>2.0.1</kafka-clients.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
/**
* implements Callback and EnhancedInstance, for transformation kafkaTemplate.buildCallback
*/
public class CallbackAdapter implements Callback, EnhancedInstance {
private Object instance;
private Callback userCallback;
public CallbackAdapter(Callback userCallback, Object instance) {
this.userCallback = userCallback;
this.instance = instance;
}
public CallbackAdapter() {
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (userCallback != null) {
userCallback.onCompletion(metadata, exception);
}
}
@Override
public Object getSkyWalkingDynamicField() {
return instance;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.instance = value;
}
public Callback getUserCallback() {
return userCallback;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.apache.skywalking.apm.plugin.kafka.define;
public class Constants {
public static final String SPRING_KAFKA_FLAG = "SW_SPRING_KAFKA_FLAG";
public static final String SPRING_KAFKA_POLL_AND_INVOKE_OPERATION_NAME = "/spring-kafka/pollAndInvoke";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.apache.skywalking.apm.plugin.kafka.define;
public class SpringKafkaContext {
public SpringKafkaContext() {
needStop = false;
}
private boolean needStop;
public boolean isNeedStop() {
return needStop;
}
public void setNeedStop(boolean needStop) {
this.needStop = needStop;
}
}
......@@ -17,7 +17,8 @@
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
......@@ -28,10 +29,16 @@
<artifactId>apm-kafka-plugin</artifactId>
<properties>
<kafka-clients.version>0.11.0.0</kafka-clients.version>
<kafka-clients.version>2.0.1</kafka-clients.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-kafka-commons</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
......
......@@ -18,10 +18,6 @@
package org.apache.skywalking.apm.plugin.kafka;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
......@@ -35,6 +31,13 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedI
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.kafka.define.Constants;
import org.apache.skywalking.apm.plugin.kafka.define.SpringKafkaContext;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
......@@ -43,14 +46,14 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
MethodInterceptResult result) throws Throwable {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
requiredInfo.setStartTime(System.currentTimeMillis());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Object ret) throws Throwable {
/*
* If the intercepted method throws exception, the ret will be null
*/
......@@ -63,8 +66,12 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
//
if (records.size() > 0) {
ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst.getSkyWalkingDynamicField();
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo
.getGroupId(), null).start(requiredInfo.getStartTime());
if (ContextManager.getRuntimeContext().get(Constants.SPRING_KAFKA_FLAG) != null) {
ContextManager.createEntrySpan(Constants.SPRING_KAFKA_POLL_AND_INVOKE_OPERATION_NAME, null);
((SpringKafkaContext) ContextManager.getRuntimeContext().get(Constants.SPRING_KAFKA_FLAG)).setNeedStop(true);
}
String operationName = OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId();
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, null).start(requiredInfo.getStartTime());
activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
SpanLayer.asMQ(activeSpan);
......@@ -93,7 +100,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
Class<?>[] argumentsTypes, Throwable t) {
/*
* The entry span is created in {@link #afterMethod}, but {@link #handleMethodException} is called before
* {@link #afterMethod}, before the creation of entry span, we can not ensure there is an active span
......
......@@ -18,5 +18,4 @@ kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstr
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation
\ No newline at end of file
......@@ -63,6 +63,7 @@
<module>elastic-job-3.x-plugin</module>
<module>mongodb-2.x-plugin</module>
<module>httpasyncclient-4.x-plugin</module>
<module>kafka-commons</module>
<module>kafka-plugin</module>
<module>servicecomb-plugin</module>
<module>hystrix-1.x-plugin</module>
......
......@@ -39,6 +39,7 @@
<module>spring-commons</module>
<module>mvc-annotation-5.x-plugin</module>
<module>spring-webflux-5.x-plugin</module>
<module>spring-kafka-2.x-plugin</module>
</modules>
<packaging>pom</packaging>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-plugins</artifactId>
<version>8.2.0-SNAPSHOT</version>
</parent>
<artifactId>apm-spring-kafka-2.x-plugin</artifactId>
<properties>
<spring-kafka.version>2.2.9.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-kafka-commons</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -16,11 +16,12 @@
*
*/
package org.apache.skywalking.apm.plugin.kafka;
package org.apache.skywalking.apm.plugin.spring.kafka;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.kafka.CallbackAdapter;
import java.lang.reflect.Method;
......@@ -30,19 +31,18 @@ import java.lang.reflect.Method;
public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Object ret) throws Throwable {
return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst);
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
Class<?>[] argumentsTypes, Throwable t) {
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.kafka;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.kafka.define.Constants;
import org.apache.skywalking.apm.plugin.kafka.define.SpringKafkaContext;
import java.lang.reflect.Method;
public class PollAndInvokeMethodInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextManager.getRuntimeContext().put(Constants.SPRING_KAFKA_FLAG, new SpringKafkaContext());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
SpringKafkaContext context = (SpringKafkaContext) ContextManager.getRuntimeContext().get(Constants.SPRING_KAFKA_FLAG);
if (context == null) {
return ret;
}
if (context.isNeedStop()) {
ContextManager.stopSpan();
} else {
ContextManager.getRuntimeContext().remove(Constants.SPRING_KAFKA_FLAG);
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
......@@ -16,13 +16,14 @@
*
*/
package org.apache.skywalking.apm.plugin.kafka.define;
package org.apache.skywalking.apm.plugin.spring.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.plugin.kafka.define.AbstractKafkaTemplateInstrumentation;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
......@@ -34,7 +35,7 @@ public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumen
private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate";
private static final String ENHANCE_METHOD = "buildCallback";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.spring.kafka.KafkaTemplateCallbackInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
......@@ -43,7 +44,7 @@ public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumen
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.kafka.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class ListenerConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("pollAndInvoke").and(takesArguments(0));
}
@Override
public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.kafka.PollAndInvokeMethodInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public ClassMatch enhanceClass() {
return byName("org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer");
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
spring-kafka-2.x=org.apache.skywalking.apm.plugin.spring.kafka.define.KafkaTemplateInstrumentation
spring-kafka-2.x=org.apache.skywalking.apm.plugin.spring.kafka.define.ListenerConsumerInstrumentation
......@@ -5,4 +5,4 @@ The trace doesn't continue in kafka consumer side.
The kafka client is pulling message from server, the plugin also just traces the pull action. As that, you need to do the manual instrument before the pull action, and include the further data process.
### Resolve
Use Application Toolkit libraries to do manual instrumentation. such as `@Trace` annotation or OpenTracing API.
Use Application Toolkit libraries to do manual instrumentation. such as `@Trace` annotation or OpenTracing API, Or if you're using `spring-kafka` 2.2.x or above, you can track the Consumer side without any code change.
......@@ -49,6 +49,7 @@
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 1.0
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 2.2.x
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
* [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.4.x
......
......@@ -314,6 +314,9 @@ spring-annotation:
HBase:
id: 94
languages: Java
spring-kafka-consumer:
id: 95
languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
......@@ -496,4 +499,5 @@ Component-Server-Mappings:
Mysqli: Mysql
influxdb-java: InfluxDB
Predis: Redis
PyMysql: Mysql
\ No newline at end of file
PyMysql: Mysql
spring-kafka-consumer: kafka-consumer
\ No newline at end of file
......@@ -35,7 +35,7 @@
<packaging>pom</packaging>
<properties>
<agent-test-tools.version>08f6fc9466d892e25794c4a9561750816bd06d28</agent-test-tools.version>
<agent-test-tools.version>2eb5cb96c36fd720a013c70dba8c5717ddfbe870</agent-test-tools.version>
<agent-test-tools.workingDirectory>${project.basedir}/target/agent-test-tools</agent-test-tools.workingDirectory>
<agent-test-tools.repos>https://github.com/apache/skywalking-agent-test-tool.git</agent-test-tools.repos>
</properties>
......
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
home="$(cd "$(dirname $0)"; pwd)"
java -Dbootstrap.servers=${BOOTSTRAP_SERVERS} -jar ${agent_opts} "-Dskywalking.agent.service_name=spring-kafka-2.2.x-scenario" ${home}/../libs/spring-kafka-2.2.x-scenario.jar &
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
segmentItems:
- serviceName: spring-kafka-2.2.x-scenario
segmentSize: nq 0
segments:
- segmentId: not null
spans:
- operationName: Kafka/spring_test/Producer
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 40
isError: false
spanType: Exit
peer: kafka-server:9092
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- operationName: /case/spring-kafka-case
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-case'}
- {key: http.method, value: GET}
- segmentId: not null
spans:
- operationName: /case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping'}
- {key: http.method, value: GET}
refs:
- {parentEndpoint: 'Kafka/spring_test/Consumer/grop:spring_test', networkAddress: 'localhost:8080',
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.2.x-scenario,
traceId: not null}
- segmentId: not null
spans:
- operationName: /spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: not null
endTime: not null
componentId: 12
isError: false
spanType: Exit
peer: localhost:8080
skipAnalysis: false
tags:
- {key: http.method, value: GET}
- {key: url, value: 'http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping'}
- operationName: Kafka/spring_test/Consumer/grop:spring_test
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 41
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.2.x-scenario,
traceId: not null}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
type: jvm
entryService: http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-case
healthCheck: http://localhost:8080/spring-kafka-2.2.x-scenario/case/healthCheck
startScript: ./bin/startup.sh
environment:
- BOOTSTRAP_SERVERS=kafka-server:9092
depends_on:
- zookeeper-server
- kafka-server
dependencies:
zookeeper-server:
image: zookeeper:3.4
hostname: zookeeper-server
kafka-server:
image: bitnami/kafka:2.1.1
hostname: kafka-server
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
depends_on:
- zookeeper-server
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-kafka-2.2.x-scenario</artifactId>
<version>5.0.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<test.framework.version>2.2.9.RELEASE</test.framework.version>
<log4j.version>2.6.2</log4j.version>
<spring-boot-version>2.1.9.RELEASE</spring-boot-version>
<kafka-version>2.0.1</kafka-version>
<okhttp-version>3.0.0</okhttp-version>
</properties>
<name>skywalking-spring-kafka-2.2.x-scenario</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>${spring-boot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${test.framework.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp-version}</version>
</dependency>
</dependencies>
<build>
<finalName>spring-kafka-2.2.x-scenario</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${compiler.version}</source>
<target>${compiler.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
<outputDirectory>./target/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>http://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>http://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<formats>
<format>zip</format>
</formats>
<fileSets>
<fileSet>
<directory>./bin</directory>
<fileMode>0775</fileMode>
</fileSet>
</fileSets>
<files>
<file>
<source>${project.build.directory}/spring-kafka-2.2.x-scenario.jar</source>
<outputDirectory>./libs</outputDirectory>
<fileMode>0775</fileMode>
</file>
</files>
</assembly>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test.org.apache.skywalking.apm.testcase.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("/case")
@PropertySource("classpath:application.properties")
public class CaseController {
private static final String SUCCESS = "Success";
@Value("${bootstrap.servers:127.0.0.1:9092}")
private String bootstrapServers;
private String topicName;
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
private void setUp() {
topicName = "spring_test";
setUpProvider();
setUpConsumer();
}
private void setUpProvider() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
}
private void setUpConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "grop:" + topicName);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Deserializer<String> stringDeserializer = new StringDeserializer();
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory(configs, stringDeserializer, stringDeserializer);
ContainerProperties props = new ContainerProperties(topicName);
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
System.out.println(data);
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
}
response.body().close();
acknowledgment.acknowledge();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
container.start();
}
@RequestMapping("/spring-kafka-case")
@ResponseBody
public String springKafkaCase() {
try {
kafkaTemplate.send(topicName, "key", "helloWorld").get();
Thread.sleep(2000L);
} catch (Exception e) {
e.printStackTrace();
}
return SUCCESS;
}
@RequestMapping("/spring-kafka-consumer-ping")
@ResponseBody
public String springKafkaConsumerPing() {
return SUCCESS;
}
@RequestMapping("/healthCheck")
@ResponseBody
public String healthCheck() {
return SUCCESS;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
server.port=8080
server.servlet.context-path=/spring-kafka-2.2.x-scenario
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="WARN">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2.2.14.RELEASE
2.2.13.RELEASE
2.2.12.RELEASE
2.2.11.RELEASE
2.2.10.RELEASE
2.2.9.RELEASE
2.2.8.RELEASE
\ No newline at end of file
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
home="$(cd "$(dirname $0)"; pwd)"
java -Dbootstrap.servers=${BOOTSTRAP_SERVERS} -jar ${agent_opts} "-Dskywalking.agent.service_name=spring-kafka-2.3.x-scenario" ${home}/../libs/spring-kafka-2.3.x-scenario.jar &
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
segmentItems:
- serviceName: spring-kafka-2.3.x-scenario
segmentSize: nq 0
segments:
- segmentId: not null
spans:
- operationName: Kafka/spring_test/Producer
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 40
isError: false
spanType: Exit
peer: kafka-server:9092
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- operationName: /case/spring-kafka-case
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-case'}
- {key: http.method, value: GET}
- segmentId: not null
spans:
- operationName: /case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 14
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping'}
- {key: http.method, value: GET}
refs:
- {parentEndpoint: 'Kafka/spring_test/Consumer/grop:spring_test', networkAddress: 'localhost:8080',
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.3.x-scenario,
traceId: not null}
- segmentId: not null
spans:
- operationName: /spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: not null
endTime: not null
componentId: 12
isError: false
spanType: Exit
peer: localhost:8080
skipAnalysis: false
tags:
- {key: http.method, value: GET}
- {key: url, value: 'http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping'}
- operationName: Kafka/spring_test/Consumer/grop:spring_test
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: not null
endTime: not null
componentId: 41
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: spring-kafka-2.3.x-scenario,
traceId: not null}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
type: jvm
entryService: http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-case
healthCheck: http://localhost:8080/spring-kafka-2.3.x-scenario/case/healthCheck
startScript: ./bin/startup.sh
environment:
- BOOTSTRAP_SERVERS=kafka-server:9092
depends_on:
- zookeeper-server
- kafka-server
dependencies:
zookeeper-server:
image: zookeeper:3.4
hostname: zookeeper-server
kafka-server:
image: bitnami/kafka:2.1.1
hostname: kafka-server
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
depends_on:
- zookeeper-server
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-kafka-2.3.x-scenario</artifactId>
<version>5.0.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<test.framework.version>2.3.3.RELEASE</test.framework.version>
<log4j.version>2.6.2</log4j.version>
<spring-boot-version>2.3.2.RELEASE</spring-boot-version>
<kafka-version>2.3.1</kafka-version>
<okhttp-version>3.0.0</okhttp-version>
</properties>
<name>skywalking-spring-kafka-2.3.x-scenario</name>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${test.framework.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp-version}</version>
</dependency>
</dependencies>
<build>
<finalName>spring-kafka-2.3.x-scenario</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${compiler.version}</source>
<target>${compiler.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
<outputDirectory>./target/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<url>http://repo.spring.io/snapshot</url>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<url>http://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<formats>
<format>zip</format>
</formats>
<fileSets>
<fileSet>
<directory>./bin</directory>
<fileMode>0775</fileMode>
</fileSet>
</fileSets>
<files>
<file>
<source>${project.build.directory}/spring-kafka-2.3.x-scenario.jar</source>
<outputDirectory>./libs</outputDirectory>
<fileMode>0775</fileMode>
</file>
</files>
</assembly>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test.org.apache.skywalking.apm.testcase.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("/case")
@PropertySource("classpath:application.properties")
public class CaseController {
private static final String SUCCESS = "Success";
@Value("${bootstrap.servers:127.0.0.1:9092}")
private String bootstrapServers;
private String topicName;
private KafkaTemplate<String, String> kafkaTemplate;
@PostConstruct
private void setUp() {
topicName = "spring_test";
setUpProvider();
setUpConsumer();
}
private void setUpProvider() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
}
private void setUpConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "grop:" + topicName);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Deserializer<String> stringDeserializer = new StringDeserializer();
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory(configs, stringDeserializer, stringDeserializer);
ContainerProperties props = new ContainerProperties(topicName);
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
System.out.println(data);
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
}
response.body().close();
acknowledgment.acknowledge();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
container.start();
}
@RequestMapping("/spring-kafka-case")
@ResponseBody
public String springKafkaCase() {
try {
kafkaTemplate.send(topicName, "key", "helloWorld").get();
Thread.sleep(2000L);
} catch (Exception e) {
e.printStackTrace();
}
return SUCCESS;
}
@RequestMapping("/spring-kafka-consumer-ping")
@ResponseBody
public String springKafkaConsumerPing() {
return SUCCESS;
}
@RequestMapping("/healthCheck")
@ResponseBody
public String healthCheck() {
return SUCCESS;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
server.port=8080
server.servlet.context-path=/spring-kafka-2.3.x-scenario
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="WARN">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2.3.10.RELEASE
2.3.9.RELEASE
2.3.8.RELEASE
2.3.7.RELEASE
2.3.6.RELEASE
2.3.5.RELEASE
2.3.4.RELEASE
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册