提交 b431f414 编写于 作者: L lican 提交者: wu-sheng

Rewrite Async http client plugin (#1217)

* rewrite http-async-client

* add unit test and document

* fix check style issue

* update test

* update doc fix ci

* fix review
上级 4a20c288
......@@ -41,12 +41,6 @@
<version>4.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient-cache</artifactId>
<version>4.1.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* Create local span :httpasyncclient/SocketChannel, to showcase the ability to connect to the remote host.
*
* @author liyuntao
*/
public class DefaultConnectingIOReactorIterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Object[] cacheValue = (Object[])objInst.getSkyWalkingDynamicField();
final ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan span = ContextManager.createExitSpan("httpasyncclient/" + method.getName(), contextCarrier, cacheValue[1].toString());
ContextManager.continued((ContextSnapshot)cacheValue[0]);
span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT).setLayer(SpanLayer.HTTP);
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
/**
* Pass ref accross thread by SessionRequest.
*
* @author liyuntao
*/
public class ConnectIterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
((EnhancedInstance)ret).setSkyWalkingDynamicField(ContextManager.capture());
InetSocketAddress remoteAddress = (InetSocketAddress)allArguments[0];
String peer = remoteAddress.toString().substring(1);
Object[] cacheValue = new Object[3];
cacheValue[0] = ContextManager.capture();
cacheValue[1] = peer;
objInst.setSkyWalkingDynamicField(cacheValue);
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.httpasyncclient.v4.wrapper.FutureCallbackWrapper;
import org.apache.skywalking.apm.plugin.httpasyncclient.v4.wrapper.HttpAsyncResponseConsumerWrapper;
import java.lang.reflect.Method;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
/**
* in main thread,hold the context in thread local so we can read in the same thread.
*
* @author lican
*/
public class HttpAsyncClientInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpAsyncResponseConsumer consumer = (HttpAsyncResponseConsumer) allArguments[1];
HttpContext context = (HttpContext) allArguments[2];
FutureCallback callback = (FutureCallback) allArguments[3];
allArguments[1] = new HttpAsyncResponseConsumerWrapper(consumer);
allArguments[3] = new FutureCallbackWrapper(callback);
CONTEXT_LOCAL.set(context);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* Create exit span of httpasyncclient.
*
* @author liyuntao
*/
public class StateInterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
if (null == allArguments[0]) {
return;
}
HttpRequestWrapper httpRequest = (HttpRequestWrapper)allArguments[0];
String uri = httpRequest.getOriginal().getRequestLine().getUri();
AbstractSpan span = null;
final ContextCarrier contextCarrier = new ContextCarrier();
try {
URL url = new URL(httpRequest.getOriginal().getRequestLine().getUri());
String remotePeer = url.getHost() + ":" + url.getPort();
span = ContextManager.createExitSpan(url.getPath(), contextCarrier, remotePeer);
} catch (MalformedURLException e) {
throw e;
}
span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT);
Tags.URL.set(span, uri);
Tags.HTTP.METHOD.set(span, httpRequest.getOriginal().getRequestLine().getMethod());
SpanLayer.asHttp(span);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
httpRequest.setHeader(next.getHeadKey(), next.getHeadValue());
}
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import org.apache.http.HttpHost;
import org.apache.http.RequestLine;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.protocol.HttpContext;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
import java.net.URL;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
/**
* the actual point request begin fetch the request from thread local .
* @author lican
*/
public class HttpAsyncRequestExecutorInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpContext context = CONTEXT_LOCAL.get();
CONTEXT_LOCAL.remove();
if (context == null) {
return;
}
final ContextCarrier contextCarrier = new ContextCarrier();
HttpRequestWrapper requestWrapper = (HttpRequestWrapper) context.getAttribute(HttpClientContext.HTTP_REQUEST);
HttpHost httpHost = (HttpHost) context.getAttribute(HttpClientContext.HTTP_TARGET_HOST);
RequestLine requestLine = requestWrapper.getRequestLine();
String uri = requestLine.getUri();
String operationName = uri.startsWith("http") ? new URL(uri).getPath() : uri;
int port = httpHost.getPort();
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, httpHost.getHostName() + ":" + (port == -1 ? 80 : port));
span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT);
Tags.URL.set(span, requestWrapper.getOriginal().getRequestLine().getUri());
Tags.HTTP.METHOD.set(span, requestLine.getMethod());
SpanLayer.asHttp(span);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
requestWrapper.setHeader(next.getHeadKey(), next.getHeadValue());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* End a local span for {@link org.apache.http.impl.nio.client.CloseableHttpAsyncClient#execute} called by
* application.
*
* @author liyuntao
*/
public class HttpAsyncResponseConsumerInterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpAsyncRequestProducer producer = (HttpAsyncRequestProducer)allArguments[0];
String uri = producer.generateRequest().getRequestLine().getUri();
String requestMethod = producer.generateRequest().getRequestLine().getMethod();
AbstractSpan span = ContextManager.createLocalSpan("httpasyncclient/" + method.getName());
Tags.HTTP.METHOD.set(span, requestMethod);
span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT).setLayer(SpanLayer.HTTP);
Tags.URL.set(span, uri);
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* End a local span for {@link org.apache.http.impl.nio.client.CloseableHttpAsyncClient#execute} called by
* application.
*
* @author liyuntao
*/
public class HttpHostInterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpHost producer = (HttpHost)allArguments[0];
String uri = producer.toURI();
AbstractSpan span = ContextManager.createLocalSpan("httpasyncclient/" + method.getName());
span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT).setLayer(SpanLayer.HTTP);
Tags.HTTP.METHOD.set(span, ((HttpRequest)allArguments[1]).getRequestLine().getMethod());
Tags.URL.set(span, uri);
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* 1.End exit span.
* 2.Create a local span of callback.
* 3.End local span:AsyncThread/execute.
*
* @author liyuntao
*/
public class ProcessResponseInterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
String uri = activeSpan.getOperationName();
//stop exitSpan
ContextManager.stopSpan();
AbstractSpan localSpan = ContextManager.createLocalSpan("callback:" + uri);
localSpan.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT).setLayer(SpanLayer.HTTP);
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
//stop local span:callback
ContextManager.stopSpan();
//stop local span:AsyncThread/execute
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.http.HttpResponse;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
/**
* End exit span and create a local span of future/Callback.
*
* @author liyuntao
*/
public class SetResponseInterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
if (null == allArguments[0]) {
return;
}
AbstractSpan span = ContextManager.activeSpan();
int statusCode = ((HttpResponse)allArguments[0]).getStatusLine().getStatusCode();
if (statusCode >= 400) {
span.errorOccurred();
Tags.STATUS_CODE.set(span, Integer.toString(statusCode));
}
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import org.apache.http.protocol.HttpContext;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
/**
* request ready(completed) so we can start our local thread span;
*
* @author lican
*/
public class SessionRequestCompleteInterceptor implements InstanceMethodsAroundInterceptor {
public static ThreadLocal<HttpContext> CONTEXT_LOCAL = new ThreadLocal<HttpContext>();
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Object[] array = (Object[]) objInst.getSkyWalkingDynamicField();
if (array == null || array.length == 0) {
return;
}
ContextSnapshot snapshot = (ContextSnapshot) array[0];
ContextManager.createLocalSpan("httpasyncclient/local");
if (snapshot != null) {
ContextManager.continued(snapshot);
}
CONTEXT_LOCAL.set((HttpContext) array[1]);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* Create a local sapn and passing ref accross thread by SessionRequest.
*
* @author liyuntao
*/
public class SuccessInterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
SessionRequest request = (SessionRequest)allArguments[0];
AbstractSpan localSpan = ContextManager.createLocalSpan("AsyncThread/execute");
localSpan.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT).setLayer(SpanLayer.HTTP);
Object cacheValue = ((EnhancedInstance)request).getSkyWalkingDynamicField();
ContextManager.continued((ContextSnapshot)cacheValue);
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
/**
* hold the snapshot in SkyWalkingDynamicField
* @author lican
*/
public class SessionRequestConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
if (ContextManager.isActive()) {
ContextSnapshot snapshot = ContextManager.capture();
objInst.setSkyWalkingDynamicField(new Object[]{snapshot, CONTEXT_LOCAL.get()});
}
CONTEXT_LOCAL.remove();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.lang.reflect.Method;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
/**
* Set local span false When connect to the remote host failed .
*
* @author liyuntao
*/
public class SessionRequestImplIterceptor implements InstanceMethodsAroundInterceptor {
@Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
}
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.errorOccurred();
activeSpan.log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import java.lang.reflect.Method;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
/**
* when request fail to ready we should remove thread local in case of memory leak;
*
* @author lican
*/
public class SessionRequestFailInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
//this means actual request will not started. so the span has not been created,we cannot log the status.
CONTEXT_LOCAL.remove();
objInst.setSkyWalkingDynamicField(null);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link AbstractNIOConnPoolInstrumentation} presents that skywalking intercept
* org.apache.http.nio.protocol.AbstractNIOConnPool#requestCompleted
*
* @author liyuntao
*/
public class AbstractNIOConnPoolInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.nio.pool.AbstractNIOConnPool";
private static final String START_LOCAL_SUCCESS_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SuccessInterceptor";
@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("requestCompleted");
}
@Override
public String getMethodsInterceptor() {
return START_LOCAL_SUCCESS_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link ExecuteInstrumentation} presents that skywalking intercepts org.apache.http.impl.nio.client.CloseableHttpAsyncClient#execute
*
* @author liyuntao
*/
public class ExecuteInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.client.CloseableHttpAsyncClient";
private static final String CONSUMER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncResponseConsumerInterceptor";
private static final String HOST_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpHostInterceptor";
@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("execute").and(takesArgumentWithType(0, "org.apache.http.nio.protocol.HttpAsyncRequestProducer")).and(takesArguments(3));
}
@Override
public String getMethodsInterceptor() {
return CONSUMER_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("execute").and(takesArguments(4)).and(takesArgumentWithType(0, "org.apache.http.HttpHost"));
}
@Override
public String getMethodsInterceptor() {
return HOST_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import java.io.IOException;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link SessionRequestImplInstrumentation} presents that skywalking intercepts
* {@link org.apache.http.impl.nio.reactor.SessionRequestImpl#failed(IOException)} }
*
* @author liyuntao
*/
public class SessionRequestImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl";
private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestImplIterceptor";
@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("failed");
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.*;
/**
* {@link HttpAsyncClientInstrumentation} indicates that the execute method in both org.apache.http.impl.nio.client.MinimalHttpAsyncClient#execute(HttpAsyncRequestProducer, HttpAsyncResponseConsumer, HttpContext, FutureCallback)
* and InternalHttpAsyncClient#execute(HttpAsyncRequestProducer, HttpAsyncResponseConsumer, HttpContext, FutureCallback) can be instrumented for single request.pipeline is not support now for some
* complex situation.this is run in main thread.
*
* @author lican
*/
public class HttpAsyncClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS_MINIMAL = "org.apache.http.impl.nio.client.MinimalHttpAsyncClient";
private static final String ENHANCE_CLASS_INTERNAL = "org.apache.http.impl.nio.client.InternalHttpAsyncClient";
private static final String METHOD = "execute";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor";
private static final String FIRST_ARG_TYPE = "org.apache.http.nio.protocol.HttpAsyncRequestProducer";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(METHOD).and(takesArguments(4)
.and(takesArgument(0, named(FIRST_ARG_TYPE))));
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return true;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_MINIMAL, ENHANCE_CLASS_INTERNAL);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link ProcessResponseInstrumentation} presents that skywalking intercept
* org.apache.http.nio.protocol.HttpAsyncRequestExecutor#processResponse and #connected
*
* @author liyuntao
*/
public class ProcessResponseInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor";
private static final String END_EXIT_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.ProcessResponseInterceptor";
@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("processResponse");
}
@Override
public String getMethodsInterceptor() {
return END_EXIT_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link HttpAsyncRequestExecutorInstrumentation} indicates the real request start location in method requestReady
*
* @author lican
*/
public class HttpAsyncRequestExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor";
private static final String METHOD = "requestReady";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link DefaultConnectingIOReactorInstrumentation} presents that skywalking intercepts
* org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processEvent
*
* @author liyuntao
*/
public class DefaultConnectingIOReactorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor";
private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.DefaultConnectingIOReactorIterceptor";
private static final String LOCAL_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.ConnectIterceptor";
@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("processEvent");
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("connect");
}
@Override
public String getMethodsInterceptor() {
return LOCAL_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* this is a bridge for main thread and real request thread which mean hold the {@link org.apache.skywalking.apm.agent.core.context.ContextSnapshot} object to be continued
* in "completed" method.that is mean the request is ready to submit
*
* @author lican
*/
public class SessionRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor";
private static final String COMPLETED_METHOD = "completed";
private static final String TIMEOUT_METHOD = "timeout";
private static final String FAILED_METHOD = "failed";
private static final String CANCEL_METHOD = "cancel";
private static final String SUCCESS_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor";
private static final String FAIL_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor";
private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(COMPLETED_METHOD);
}
@Override
public String getMethodsInterceptor() {
return SUCCESS_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(TIMEOUT_METHOD).or(named(FAILED_METHOD).or(named(CANCEL_METHOD)));
}
@Override
public String getMethodsInterceptor() {
return FAIL_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link StateInstrumentation} presents that skywalking intercept org.apache.http.nio.protocol.HttpAsyncRequestExecutor$State#setRequest
*
* @author liyuntao
*/
public class StateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor$State";
private static final String START_EXIT_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.StateInterceptor";
private static final String END_EXIT_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SetResponseInterceptor";
@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("setRequest");
}
@Override
public String getMethodsInterceptor() {
return START_EXIT_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("setResponse");
}
@Override
public String getMethodsInterceptor() {
return END_EXIT_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.wrapper;
import org.apache.http.concurrent.FutureCallback;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
/**
* a wrapper for {@link FutureCallback} so we can be notified when the hold response
* (when one or more request fails the pipeline mode may not callback though we haven't support pipeline)
* received whether it fails or completed or canceled.
* @author lican
*/
public class FutureCallbackWrapper<T> implements FutureCallback<T> {
private FutureCallback<T> callback;
public FutureCallbackWrapper(FutureCallback<T> callback) {
this.callback = callback;
}
@Override
public void completed(T o) {
if (ContextManager.isActive()) {
ContextManager.stopSpan();
}
if (callback != null) {
callback.completed(o);
}
}
@Override
public void failed(Exception e) {
CONTEXT_LOCAL.remove();
if (ContextManager.isActive()) {
ContextManager.activeSpan().errorOccurred().log(e);
ContextManager.stopSpan();
}
if (callback != null) {
callback.failed(e);
}
}
@Override
public void cancelled() {
CONTEXT_LOCAL.remove();
if (ContextManager.isActive()) {
ContextManager.activeSpan().errorOccurred();
ContextManager.stopSpan();
}
if (callback != null) {
callback.cancelled();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4.wrapper;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import java.io.IOException;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
/**
* a wrapper for {@link HttpAsyncResponseConsumer} so we can be notified when the
* current response(every response will callback the wrapper) received maybe completed or canceled,or failed.
*
* @author lican
*/
public class HttpAsyncResponseConsumerWrapper<T> implements HttpAsyncResponseConsumer<T> {
private HttpAsyncResponseConsumer<T> consumer;
public HttpAsyncResponseConsumerWrapper(HttpAsyncResponseConsumer<T> consumer) {
this.consumer = consumer;
}
@Override
public void responseReceived(HttpResponse response) throws IOException, HttpException {
if (ContextManager.isActive()) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode > 400) {
AbstractSpan span = ContextManager.activeSpan();
Tags.STATUS_CODE.set(span, String.valueOf(statusCode));
}
ContextManager.stopSpan();
}
consumer.responseReceived(response);
}
@Override
public void consumeContent(ContentDecoder decoder, IOControl ioctrl) throws IOException {
consumer.consumeContent(decoder, ioctrl);
}
@Override
public void responseCompleted(HttpContext context) {
consumer.responseCompleted(context);
}
@Override
public void failed(Exception ex) {
CONTEXT_LOCAL.remove();
if (ContextManager.isActive()) {
ContextManager.activeSpan().errorOccurred().log(ex);
ContextManager.stopSpan();
}
consumer.failed(ex);
}
@Override
public Exception getException() {
return consumer.getException();
}
@Override
public T getResult() {
return consumer.getResult();
}
@Override
public boolean isDone() {
return consumer.isDone();
}
@Override
public void close() throws IOException {
consumer.close();
}
@Override
public boolean cancel() {
CONTEXT_LOCAL.remove();
if (ContextManager.isActive()) {
ContextManager.activeSpan().errorOccurred();
ContextManager.stopSpan();
}
return consumer.cancel();
}
}
......@@ -14,11 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.ExecuteInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.DefaultConnectingIOReactorInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestImplInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.AbstractNIOConnPoolInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.StateInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.ProcessResponseInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncClientInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncRequestExecutorInstrumentation
......@@ -13,54 +13,60 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.util.List;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import org.apache.http.*;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.plugin.httpasyncclient.v4.wrapper.FutureCallbackWrapper;
import org.apache.skywalking.apm.plugin.httpasyncclient.v4.wrapper.HttpAsyncResponseConsumerWrapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.List;
import static org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor.CONTEXT_LOCAL;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
/**
* @author lican
*/
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
@PrepareForTest(HttpHost.class)
public class StateInterceptorTest {
public class HttpAsyncClientInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
......@@ -68,42 +74,54 @@ public class StateInterceptorTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
private StateInterceptor stateInterceptor;
private EnhancedInstance enhancedInstance;
private HttpAsyncClientInterceptor httpAsyncClientInterceptor;
private HttpAsyncRequestExecutorInterceptor requestExecutorInterceptor;
private SessionRequestConstructorInterceptor sessionRequestConstructorInterceptor;
private SetResponseInterceptor setResponseInterceptor;
private SessionRequestCompleteInterceptor completeInterceptor;
private ProcessResponseInterceptor processResponseInterceptor;
@Mock
private HttpHost httpHost;
private HttpAsyncRequestProducer producer;
@Mock
private HttpRequestWrapper request;
private HttpAsyncResponseConsumer consumer;
@Mock
private HttpRequest httpRequest;
private HttpContext httpContext;
@Mock
private HttpResponse httpResponse;
private FutureCallback callback;
@Mock
private StatusLine statusLine;
private HttpRequestWrapper requestWrapper;
private Object[] allArguments;
private Class[] argumentsType;
@Mock
private HttpHost httpHost;
@Mock
private EnhancedInstance enhancedInstance;
private HttpResponse response;
@Before
public void setUp() throws Exception {
ServiceManager.INSTANCE.boot();
stateInterceptor = new StateInterceptor();
setResponseInterceptor = new SetResponseInterceptor();
processResponseInterceptor = new ProcessResponseInterceptor();
httpAsyncClientInterceptor = new HttpAsyncClientInterceptor();
requestExecutorInterceptor = new HttpAsyncRequestExecutorInterceptor();
sessionRequestConstructorInterceptor = new SessionRequestConstructorInterceptor();
completeInterceptor = new SessionRequestCompleteInterceptor();
PowerMockito.mock(HttpHost.class);
when(statusLine.getStatusCode()).thenReturn(200);
when(httpResponse.getStatusLine()).thenReturn(statusLine);
httpContext = new BasicHttpContext();
httpContext.setAttribute(HttpClientContext.HTTP_REQUEST, requestWrapper);
httpContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, httpHost);
CONTEXT_LOCAL.set(httpContext);
when(httpHost.getHostName()).thenReturn("127.0.0.1");
when(httpHost.getSchemeName()).thenReturn("http");
when(request.getOriginal()).thenReturn(httpRequest);
when(httpRequest.getRequestLine()).thenReturn(new RequestLine() {
final RequestLine requestLine = new RequestLine() {
@Override
public String getMethod() {
return "GET";
......@@ -116,64 +134,142 @@ public class StateInterceptorTest {
@Override
public String getUri() {
return "http://127.0.0.1:8080/test-web/httpasync";
return "http://127.0.0.1:8080/test-web/test";
}
};
when(response.getStatusLine()).thenReturn(new StatusLine() {
@Override
public ProtocolVersion getProtocolVersion() {
return new ProtocolVersion("http", 1, 1);
}
@Override
public int getStatusCode() {
return 200;
}
@Override
public String getReasonPhrase() {
return null;
}
});
when(requestWrapper.getRequestLine()).thenReturn(requestLine);
when(requestWrapper.getOriginal()).thenReturn(new HttpGet("http://localhost:8081/original/test"));
when(httpHost.getPort()).thenReturn(8080);
allArguments = new Object[] {request};
argumentsType = new Class[] {request.getClass()};
enhancedInstance = new EnhancedInstance() {
private Object object;
@Override
public Object getSkyWalkingDynamicField() {
return object;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
this.object = value;
}
};
}
@Test
public void testHttpClient() throws Throwable {
AbstractSpan span = ContextManager.createLocalSpan("httpasyncclient/HttpAsyncRequestExecutor:");
stateInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
stateInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentsType, httpResponse);
processResponseInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
processResponseInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentsType, httpResponse);
Assert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
public void testSuccess() throws Throwable {
//mock active span;
ContextManager.createEntrySpan("mock-test", new ContextCarrier());
Thread thread = baseTest();
ContextManager.stopSpan();
thread.join();
Assert.assertThat(segmentStorage.getTraceSegments().size(), is(2));
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(findNeedSegemnt());
assertHttpSpan(spans.get(0));
verify(request, times(1)).setHeader(anyString(), anyString());
verify(requestWrapper, times(1)).setHeader(anyString(), anyString());
}
@Test
public void testStatusCodeNotEquals200() throws Throwable {
when(statusLine.getStatusCode()).thenReturn(500);
AbstractSpan span = ContextManager.createLocalSpan("httpasyncclient/HttpAsyncRequestExecutor:");
stateInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
stateInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentsType, httpResponse);
allArguments = new Object[] {httpResponse};
setResponseInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
setResponseInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentsType, httpResponse);
processResponseInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
processResponseInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentsType, httpResponse);
Assert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(3));
List<KeyValuePair> tags = SpanHelper.getTags(spans.get(0));
assertThat(tags.size(), is(3));
assertThat(tags.get(2).getValue(), is("500"));
public void testNoContext() throws Throwable {
assertHttpSpan(spans.get(0));
assertThat(SpanHelper.getErrorOccurred(spans.get(0)), is(true));
verify(request, times(1)).setHeader(anyString(), anyString());
Thread thread = baseTest();
thread.join();
Assert.assertThat(segmentStorage.getTraceSegments().size(), is(0));
verify(requestWrapper, times(0)).setHeader(anyString(), anyString());
}
private Thread baseTest() throws Throwable {
Object[] allArguments = new Object[]{producer, consumer, httpContext, callback};
Class[] types = new Class[]{HttpAsyncRequestProducer.class, HttpAsyncResponseConsumer.class, HttpContext.class, FutureCallback.class};
httpAsyncClientInterceptor.beforeMethod(enhancedInstance, null, allArguments, types, null);
Assert.assertEquals(CONTEXT_LOCAL.get(), httpContext);
Assert.assertTrue(allArguments[1] instanceof HttpAsyncResponseConsumerWrapper);
Assert.assertTrue(allArguments[3] instanceof FutureCallbackWrapper);
sessionRequestConstructorInterceptor.onConstruct(enhancedInstance, null);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
//start local
completeInterceptor.beforeMethod(enhancedInstance, null, null, null, null);
//start request
requestExecutorInterceptor.beforeMethod(enhancedInstance, null, null, null, null);
HttpAsyncResponseConsumerWrapper consumerWrapper = new HttpAsyncResponseConsumerWrapper(consumer);
consumerWrapper.responseReceived(response);
new FutureCallbackWrapper(callback).completed(null);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
});
thread.start();
return thread;
}
private TraceSegment findNeedSegemnt() {
for (TraceSegment traceSegment : segmentStorage.getTraceSegments()) {
if (SegmentHelper.getSpans(traceSegment).size() > 1) {
return traceSegment;
}
}
return null;
}
private void assertHttpSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("/test-web/httpasync"));
assertThat(span.getOperationName(), is("/test-web/test"));
assertThat(SpanHelper.getComponentId(span), is(26));
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("http://127.0.0.1:8080/test-web/httpasync"));
assertThat(tags.get(0).getValue(), is("http://localhost:8081/original/test"));
assertThat(tags.get(1).getValue(), is("GET"));
assertThat(span.isExit(), is(true));
}
}
@Test
public void afterMethod() throws Throwable {
baseCompleteTest(completeInterceptor);
baseCompleteTest(httpAsyncClientInterceptor);
baseCompleteTest(requestExecutorInterceptor);
}
private void baseCompleteTest(InstanceMethodsAroundInterceptor instanceMethodsAroundInterceptor) throws Throwable {
Object ret = new Object();
Object result = instanceMethodsAroundInterceptor.afterMethod(enhancedInstance, null, null, null, ret);
Assert.assertEquals(ret, result);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.httpasyncclient.v4;
import java.util.List;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.LogDataEntity;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import static junit.framework.TestCase.assertNotNull;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @auther lytscu
*/
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
@PrepareForTest(HttpHost.class)
public class TestException {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
private StateInterceptor stateInterceptor;
private SetResponseInterceptor setResponseInterceptor;
private ProcessResponseInterceptor processResponseInterceptor;
@Mock
private HttpHost httpHost;
@Mock
private HttpRequestWrapper request;
@Mock
private HttpRequest httpRequest;
@Mock
private HttpResponse httpResponse;
@Mock
private StatusLine statusLine;
private Object[] allArguments, setResponseInterceptorArguments;
private Class[] argumentsType;
@Mock
private EnhancedInstance enhancedInstance;
@Before
public void setUp() throws Exception {
ServiceManager.INSTANCE.boot();
stateInterceptor = new StateInterceptor();
setResponseInterceptor = new SetResponseInterceptor();
processResponseInterceptor = new ProcessResponseInterceptor();
PowerMockito.mock(HttpHost.class);
when(statusLine.getStatusCode()).thenReturn(200);
when(httpResponse.getStatusLine()).thenReturn(statusLine);
when(httpHost.getHostName()).thenReturn("127.0.0.1");
when(httpHost.getSchemeName()).thenReturn("http");
when(request.getOriginal()).thenReturn(httpRequest);
when(httpRequest.getRequestLine()).thenReturn(new RequestLine() {
@Override
public String getMethod() {
return "GET";
}
@Override
public ProtocolVersion getProtocolVersion() {
return new ProtocolVersion("http", 1, 1);
}
@Override
public String getUri() {
return "http://127.0.0.1:8080/test-web/httpasync";
}
});
when(httpHost.getPort()).thenReturn(8080);
allArguments = new Object[] {request};
setResponseInterceptorArguments = new Object[] {httpResponse};
argumentsType = new Class[] {request.getClass()};
}
@Test
public void testHttpClientWithException() throws Throwable {
AbstractSpan localSpan = ContextManager.createLocalSpan("httpasyncclient/HttpAsyncRequestExecutor:");
stateInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
stateInterceptor.handleMethodException(enhancedInstance, null, allArguments, argumentsType, new RuntimeException("testException"));
processResponseInterceptor.beforeMethod(enhancedInstance, null, allArguments, argumentsType, null);
processResponseInterceptor.afterMethod(enhancedInstance, null, allArguments, argumentsType, httpResponse);
Assert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertThat(spans.size(), is(3));
AbstractTracingSpan span = spans.get(0);
assertThat(SpanHelper.getErrorOccurred(span), is(true));
assertHttpSpanErrorLog(SpanHelper.getLogs(span));
verify(request, times(1)).setHeader(anyString(), anyString());
}
private void assertHttpSpanErrorLog(List<LogDataEntity> logs) {
assertThat(logs.size(), is(1));
LogDataEntity logData = logs.get(0);
Assert.assertThat(logData.getLogs().size(), is(4));
Assert.assertThat(logData.getLogs().get(0).getValue(), CoreMatchers.<Object>is("error"));
Assert.assertThat(logData.getLogs().get(1).getValue(), CoreMatchers.<Object>is(RuntimeException.class.getName()));
Assert.assertThat(logData.getLogs().get(2).getValue(), is("testException"));
assertNotNull(logData.getLogs().get(3).getValue());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册