提交 c17f955d 编写于 作者: A ascrutae

support the future stub of grpc framework

上级 b7dfd598
......@@ -19,39 +19,23 @@
package org.skywalking.apm.plugin.grpc.v1;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.grpc.v1.vo.GRPCDynamicFields;
/**
* {@link ClientCallOnNextInterceptor} create a exist span when the grpc start call. it will stop span when the method
* type is non-unary.
*
* @author zhangxin
*/
public class ClientCallStartInterceptor
implements InstanceMethodsAroundInterceptor {
public class ClientCallStartInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
GRPCDynamicFields cachedObjects = (GRPCDynamicFields)objInst.getSkyWalkingDynamicField();
final Metadata headers = (Metadata)allArguments[1];
final AbstractSpan span = ContextManager.createExitSpan(cachedObjects.getRequestMethodName(), cachedObjects.getAuthority());
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
final ContextCarrier contextCarrier = new ContextCarrier();
ContextManager.inject(contextCarrier);
CarrierItem contextItem = contextCarrier.items();
while (contextItem.hasNext()) {
contextItem = contextItem.next();
......@@ -59,6 +43,7 @@ public class ClientCallStartInterceptor
headers.put(headerKey, contextItem.getHeadValue());
}
GRPCDynamicFields cachedObjects = (GRPCDynamicFields)objInst.getSkyWalkingDynamicField();
GRPCDynamicFields listenerCachedObject = new GRPCDynamicFields();
listenerCachedObject.setSnapshot(ContextManager.capture());
listenerCachedObject.setDescriptor(cachedObjects.getDescriptor());
......@@ -68,15 +53,11 @@ public class ClientCallStartInterceptor
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (((GRPCDynamicFields)objInst.getSkyWalkingDynamicField()).getMethodType() != MethodDescriptor.MethodType.UNARY) {
ContextManager.stopSpan();
}
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
......@@ -18,45 +18,44 @@
package org.skywalking.apm.plugin.grpc.v1;
import io.grpc.Metadata;
import io.grpc.Status;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.grpc.v1.vo.GRPCDynamicFields;
/**
* {@link UnaryClientOnCloseInterceptor} stop the active span when the call end.
* {@link ClientCallOnNextInterceptor} create a exist span when the grpc start call. it will stop span when the method
* type is non-unary.
*
* @author zhangxin
*/
public class UnaryClientOnCloseInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
}
public class ClientCallsMethodInterceptor
implements StaticMethodsAroundInterceptor {
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
Status status = (Status)allArguments[0];
@Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
MethodInterceptResult result) {
EnhancedInstance clientCall = (EnhancedInstance)allArguments[0];
GRPCDynamicFields cachedObjects = (GRPCDynamicFields)clientCall.getSkyWalkingDynamicField();
if (status != Status.OK) {
activeSpan.errorOccurred().log(status.asRuntimeException((Metadata)allArguments[1]));
Tags.STATUS_CODE.set(activeSpan, status.getCode().toString());
}
final AbstractSpan span = ContextManager.createExitSpan(cachedObjects.getRequestMethodName(), cachedObjects.getAuthority());
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
}
@Override public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Object ret) {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.plugin.grpc.v1;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* {@link UnaryStreamToFutureConstructorInterceptor} stop the active span when the call end.
*
* @author zhangxin
*/
public class UnaryStreamToFutureConstructorInterceptor implements InstanceConstructorInterceptor {
@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
}
}
......@@ -31,7 +31,7 @@ import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link ClientCallInstrumentation} presents that skywalking intercept the <code>start</code> method in
* <code>io.grpc.internal.ClientCallImpl</code> class by <code>org.skywalking.apm.plugin.grpc.v1.ClientCallStartInterceptor</code>
* <code>io.grpc.internal.ClientCallImpl</code> class by <code>org.skywalking.apm.plugin.grpc.v1.ClientCallsMethodInterceptor</code>
* and the constructor in <code>io.grpc.internal.ClientCallImpl</code> by <code>org.skywalking.apm.plugin.grpc.v1.ClientCallIConstructorInterceptor</code>
*
* @author zhangxin
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.plugin.grpc.v1.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class ClientCallsInstrumentation extends ClassStaticMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls";
@Override protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[] {
new StaticMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return (named("asyncUnaryRequestCall").and(takesArgumentWithType(2,"io.grpc.ClientCall$Listener")))
.or(named("asyncStreamingRequestCall"))
.or(named("blockingUnaryCall"))
.or(named("futureUnaryCall"));
}
@Override public String getMethodsInterceptor() {
return "org.skywalking.apm.plugin.grpc.v1.ClientCallsMethodInterceptor";
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
......@@ -25,42 +25,37 @@ import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIntercept
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link UnaryClientCallListenerInstrumentation} indicates that skywalking enhance the <code>onClose</code> method in
* <code>io.grpc.stub.ClientCalls$UnaryStreamToFuture</code> class by <code>org.skywalking.apm.plugin.grpc.v1.UnaryClientOnCloseInterceptor</code>
* <code>io.grpc.stub.ClientCalls$UnaryStreamToFuture</code> class by <code>org.skywalking.apm.plugin.grpc.v1.UnaryStreamToFutureConstructorInterceptor</code>
*
* @author zhangxin
*/
public class UnaryClientCallListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls$UnaryStreamToFuture";
private static final String ENHANCE_METHOD = "onClose";
public static final String INTERCEPT_CLASS = "org.skywalking.apm.plugin.grpc.v1.UnaryClientOnCloseInterceptor";
public static final String INTERCEPT_CLASS = "org.skywalking.apm.plugin.grpc.v1.UnaryStreamToFutureConstructorInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override public String getMethodsInterceptor() {
@Override public String getConstructorInterceptor() {
return INTERCEPT_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
......
......@@ -2,6 +2,7 @@ grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.ClientCallInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.UnaryClientCallListenerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.UnaryServerCallListenerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.UnaryServerCallHandlerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.ClientCallsInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.ManagedChannelInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.StreamingServerCallHandlerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.StreamingServerCallListenerInstrumentation
......
......@@ -18,10 +18,7 @@
package org.skywalking.apm.plugin.grpc.v1;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
......@@ -34,7 +31,6 @@ import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.test.helper.SegmentHelper;
import org.skywalking.apm.agent.test.helper.SpanHelper;
import org.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.test.tools.SegmentStorage;
import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
......@@ -49,7 +45,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class ClientCallStartInterceptorTest {
public class ClientCallsMethodInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
......@@ -57,8 +53,7 @@ public class ClientCallStartInterceptorTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
private ClientCallStartInterceptor clientCallStartInterceptor;
private UnaryClientOnCloseInterceptor unaryClientOnCloseInterceptor;
private ClientCallsMethodInterceptor clientCallStartInterceptor;
@Mock
private EnhancedInstance clientCallImpl;
......@@ -72,8 +67,6 @@ public class ClientCallStartInterceptorTest {
@Mock
private GRPCDynamicFields streamCachedObjects;
private Status exceptionStatus = Status.NOT_FOUND.withCause(new RuntimeException());
private Object[] arguments;
private Class[] argumentTypes;
......@@ -87,20 +80,18 @@ public class ClientCallStartInterceptorTest {
when(streamCachedObjects.getAuthority()).thenReturn("localhost:500051");
when(streamCachedObjects.getMethodType()).thenReturn(MethodDescriptor.MethodType.SERVER_STREAMING);
arguments = new Object[] {clientCallListener, new Metadata()};
argumentTypes = new Class[] {clientCallListener.getClass(), Metadata.class};
arguments = new Object[] {clientCallImpl, clientCallListener};
argumentTypes = new Class[] {clientCallImpl.getClass(), clientCallListener.getClass()};
clientCallStartInterceptor = new ClientCallStartInterceptor();
unaryClientOnCloseInterceptor = new UnaryClientOnCloseInterceptor();
clientCallStartInterceptor = new ClientCallsMethodInterceptor();
}
@Test
public void testNormalUnaryCallStart() throws Throwable {
when(clientCallImpl.getSkyWalkingDynamicField()).thenReturn(unaryCachedObjects);
clientCallStartInterceptor.beforeMethod(clientCallImpl, null, arguments, argumentTypes, null);
clientCallStartInterceptor.afterMethod(clientCallImpl, null, arguments, argumentTypes, null);
unaryClientOnCloseInterceptor.afterMethod(null, null, new Object[] {Status.OK, new Metadata()}, null, null);
clientCallStartInterceptor.beforeMethod(null, null, arguments, argumentTypes, null);
clientCallStartInterceptor.afterMethod(null, null, arguments, argumentTypes, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
......@@ -111,30 +102,12 @@ public class ClientCallStartInterceptorTest {
SpanAssert.assertOccurException(abstractTracingSpan, false);
}
@Test
public void testUnaryCallStartWithException() throws Throwable {
when(clientCallImpl.getSkyWalkingDynamicField()).thenReturn(unaryCachedObjects);
clientCallStartInterceptor.beforeMethod(clientCallImpl, null, arguments, argumentTypes, null);
clientCallStartInterceptor.afterMethod(clientCallImpl, null, arguments, argumentTypes, null);
unaryClientOnCloseInterceptor.afterMethod(null, null, new Object[] {exceptionStatus, new Metadata()}, null, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
assertThat(SegmentHelper.getSpans(traceSegment).size(), is(1));
AbstractTracingSpan abstractTracingSpan = SegmentHelper.getSpans(traceSegment).get(0);
SpanAssert.assertComponent(abstractTracingSpan, ComponentsDefine.GRPC);
SpanAssert.assertLayer(abstractTracingSpan, SpanLayer.RPC_FRAMEWORK);
SpanAssert.assertOccurException(abstractTracingSpan, true);
SpanAssert.assertException(SpanHelper.getLogs(abstractTracingSpan).get(0), StatusRuntimeException.class, "NOT_FOUND");
}
@Test
public void testNormalStreamCallStart() throws Throwable {
when(clientCallImpl.getSkyWalkingDynamicField()).thenReturn(streamCachedObjects);
clientCallStartInterceptor.beforeMethod(clientCallImpl, null, arguments, argumentTypes, null);
clientCallStartInterceptor.afterMethod(clientCallImpl, null, arguments, argumentTypes, null);
clientCallStartInterceptor.beforeMethod(null, null, arguments, argumentTypes, null);
clientCallStartInterceptor.afterMethod(null, null, arguments, argumentTypes, null);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册