未验证 提交 806666b4 编写于 作者: V vcjmhg 提交者: GitHub

add webflux webclient plugin (#5493)

上级 b560bae3
......@@ -178,4 +178,6 @@ public class ComponentsDefine {
public static final OfficialComponent QUARTZ_SCHEDULER = new OfficialComponent(97, "quartz-scheduler");
public static final OfficialComponent XXL_JOB = new OfficialComponent(98, "xxl-job");
}
public static final OfficialComponent SPRING_WEBCLIENT = new OfficialComponent(99, "spring-webflux-webclient");
}
\ No newline at end of file
......@@ -40,6 +40,7 @@
<module>mvc-annotation-5.x-plugin</module>
<module>spring-kafka-2.x-plugin</module>
<module>scheduled-annotation-plugin</module>
<module>spring-webflux-5.x-webclient-plugin</module>
</modules>
<packaging>pom</packaging>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-plugins</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-webflux-5.x-webclient-plugin</artifactId>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.0.0.RELEASE</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<compiler.version>1.8</compiler.version>
</properties>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.springframework.http.client.reactive.ClientHttpRequest;
import java.lang.reflect.Method;
public class BodyInserterRequestInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ClientHttpRequest clientHttpRequest = (ClientHttpRequest) allArguments[0];
ContextCarrier contextCarrier = (ContextCarrier) objInst.getSkyWalkingDynamicField();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
clientHttpRequest.getHeaders().set(next.getHeadKey(), next.getHeadValue());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.core.publisher.Mono;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.function.BiConsumer;
public class WebFluxWebClientInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
if (allArguments[0] == null) {
//illegal args,can't trace ignore
return;
}
ClientRequest request = (ClientRequest) allArguments[0];
final ContextCarrier contextCarrier = new ContextCarrier();
URI uri = request.url();
final String requestURIString = getRequestURIString(uri);
final String operationName = requestURIString;
final String remotePeer = getIPAndPort(uri);
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remotePeer);
//set componet name
span.setComponent(ComponentsDefine.SPRING_WEBCLIENT);
Tags.URL.set(span, uri.toString());
Tags.HTTP.METHOD.set(span, request.method().toString());
SpanLayer.asHttp(span);
if (request instanceof EnhancedInstance) {
((EnhancedInstance) request).setSkyWalkingDynamicField(contextCarrier);
}
//user async interface
span.prepareForAsync();
ContextManager.stopSpan();
objInst.setSkyWalkingDynamicField(span);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
// fix the problem that allArgument[0] may be null
if (allArguments[0] == null) {
return ret;
}
Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
AbstractSpan span = (AbstractSpan) objInst.getSkyWalkingDynamicField();
return ret1.doAfterSuccessOrError(new BiConsumer<ClientResponse, Throwable>() {
@Override
public void accept(ClientResponse clientResponse, Throwable throwable) {
HttpStatus httpStatus = clientResponse.statusCode();
if (httpStatus != null) {
Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value()));
if (httpStatus.isError()) {
span.errorOccurred();
}
}
}
}).doOnError(error -> {
span.log(error);
}).doFinally(s -> {
span.asyncFinish();
});
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
private String getRequestURIString(URI uri) {
String requestPath = uri.getPath();
return requestPath != null && requestPath.length() > 0 ? requestPath : "/";
}
// return ip:port
private String getIPAndPort(URI uri) {
return uri.getHost() + ":" + uri.getPort();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class BodyInserterRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("writeTo").and(takesArgumentWithType(0, "org.springframework.http.client.reactive.ClientHttpRequest"));
}
@Override
public String getMethodsInterceptor() {
return "org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.BodyInserterRequestInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName("org.springframework.web.reactive.function.client.DefaultClientRequestBuilder$BodyInserterRequest");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
public class WebFluxWebClientInstrumentation extends ClassEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction";
private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.WebFluxWebClientInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("exchange");
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}
}
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
spring-webflux-5.x-webclient=org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.BodyInserterRequestInstrumentation
spring-webflux-5.x-webclient=org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.WebFluxWebClientInstrumentation
......@@ -87,6 +87,7 @@
- spring-scheduled-annotation
- spring-tx
- spring-webflux-5.x
- spring-webflux-5.x-webclient
- spymemcached-2.x
- struts2-2.x
- tomcat-7.x/8.x
......
......@@ -326,6 +326,9 @@ quartz-scheduler:
xxl-job:
id: 98
languages: Java
spring-webflux-webclient:
id: 99
languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
......
Subproject commit b2f381e63702bc43216ef5576637195102302c6e
Subproject commit 9933e2d17078c2bf07cd1c8d5ef36d52b5cbb917
......@@ -193,6 +193,28 @@ segmentItems:
parentSpanId: 8, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null}
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: /testcase/webclient/server
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 67
isError: false
spanType: Entry
peer: ''
tags:
- {key: url, value: 'http://localhost:18080/testcase/webclient/server'}
- {key: http.method, value: GET}
- {key: status_code, value: '200'}
refs:
- {parentEndpoint: /projectA/testcase, networkAddress: 'localhost:18080', refType: CrossProcess,
parentSpanId: 9, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null}
skipAnalysis: 'false'
- serviceName: webflux-projectA-scenario
segmentSize: nq 0
segments:
......@@ -321,6 +343,22 @@ segmentItems:
- {key: url, value: not null}
- {key: http.method, value: GET}
skipAnalysis: 'false'
- operationName: /testcase/webclient/server
operationId: 0
parentSpanId: 0
spanId: 9
spanLayer: Http
startTime: not null
endTime: not null
componentId: 99
isError: false
spanType: Exit
peer: not null
tags:
- {key: url, value: not null}
- {key: http.method, value: GET}
- {key: status_code, value:'200'}
skipAnalysis: 'false'
- operationName: /projectA/testcase
operationId: 0
parentSpanId: -1
......
......@@ -16,5 +16,6 @@
# 2.0.0-2.1.0 are supported, but due to the status code bug(https://github.com/spring-projects/spring-framework/issues/21901)
# we don’t test them
2.1.7.RELEASE
2.1.17.RELEASE
2.2.10.RELEASE
2.3.4.RELEASE
......@@ -39,6 +39,16 @@
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.2.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
</dependencies>
<build>
......
......@@ -17,12 +17,15 @@
package org.apache.skywalking.apm.testcase.sc.webflux.projectA.controller;
import java.io.IOException;
import org.apache.skywalking.apm.testcase.sc.webflux.projectA.utils.HttpUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.io.IOException;
@RestController
public class TestController {
......@@ -43,6 +46,7 @@ public class TestController {
visit("http://" + hostBAddress + "/testcase/route/error");
visit("http://" + hostBAddress + "/notFound");
visit("http://" + hostBAddress + "/testcase/annotation/mono/hello");
testGet("http://" + hostBAddress + "/testcase/webclient/server");
return "test";
}
......@@ -59,4 +63,17 @@ public class TestController {
}
}
/**
* test webflux webclient plugin
*/
private void testGet(String remoteUri) {
Mono<String> response = WebClient
.create()
.get()
.uri(remoteUri)
.retrieve()
.bodyToMono(String.class);
response.subscribe();
}
}
......@@ -44,6 +44,11 @@ public class TestAnnotationController {
}
return "1";
}
@RequestMapping("/testcase/webclient/server")
public String webclientServer(){
return "success";
}
@GetMapping("/testcase/annotation/{test}")
public Mono<String> urlPattern(@PathVariable("test") String var) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册