未验证 提交 b6cc9a59 编写于 作者: Y yoje 提交者: GitHub

Fix npe in afterMethod/handleMethodException of kafka/finagle plugins (#4712)

* Fix npe in afterMethod/handleMethodException of kafka/finagle plugins
上级 071708a4
......@@ -36,7 +36,7 @@ public interface InstanceMethodsAroundInterceptor {
/**
* called after target method invocation. Even method's invocation triggers an exception.
*
* @param ret the method's original return value.
* @param ret the method's original return value. May be null if the method triggers an exception.
* @return the method's actual return value.
*/
Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
......
......@@ -58,7 +58,13 @@ public class ClientDestTracingFilterInterceptor extends AbstractInterceptor {
@Override
public void handleMethodExceptionImpl(EnhancedInstance enhancedInstance, Method method, Object[] objects,
Class<?>[] classes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
/*
* Current thread may not be the same thread that execute ClientTracingFilterInterceptor, we can not ensure
* there is an active span
*/
if (ContextManager.isActive()) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
private String getRemote(Object[] objects) {
......
......@@ -66,22 +66,29 @@ public class ClientTracingFilterInterceptor extends AbstractInterceptor {
getLocalContextHolder().remove(SW_SPAN);
getMarshalledContextHolder().remove(SWContextCarrier$.MODULE$);
finagleSpan.prepareForAsync();
ContextManager.stopSpan(finagleSpan);
((Future<?>) ret).addEventListener(new FutureEventListener<Object>() {
@Override
public void onSuccess(Object value) {
finagleSpan.asyncFinish();
}
@Override
public void onFailure(Throwable cause) {
finagleSpan.errorOccurred();
finagleSpan.log(cause);
finagleSpan.asyncFinish();
}
});
/*
* If the intercepted method throws exception, the ret will be null
*/
if (ret == null) {
ContextManager.stopSpan(finagleSpan);
} else {
finagleSpan.prepareForAsync();
ContextManager.stopSpan(finagleSpan);
((Future<?>) ret).addEventListener(new FutureEventListener<Object>() {
@Override
public void onSuccess(Object value) {
finagleSpan.asyncFinish();
}
@Override
public void onFailure(Throwable cause) {
finagleSpan.errorOccurred();
finagleSpan.log(cause);
finagleSpan.asyncFinish();
}
});
}
return ret;
}
......
......@@ -61,21 +61,29 @@ public class ServerTracingFilterInterceptor extends AbstractInterceptor {
public Object afterMethodImpl(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, Object ret) throws Throwable {
final AbstractSpan finagleSpan = getSpan();
getLocalContextHolder().remove(FinagleCtxs.SW_SPAN);
finagleSpan.prepareForAsync();
ContextManager.stopSpan(finagleSpan);
((Future<?>) ret).addEventListener(new FutureEventListener<Object>() {
@Override
public void onSuccess(Object value) {
finagleSpan.asyncFinish();
}
@Override
public void onFailure(Throwable cause) {
finagleSpan.errorOccurred();
finagleSpan.log(cause);
finagleSpan.asyncFinish();
}
});
/*
* If the intercepted method throws exception, the ret will be null
*/
if (ret == null) {
ContextManager.stopSpan(finagleSpan);
} else {
finagleSpan.prepareForAsync();
ContextManager.stopSpan(finagleSpan);
((Future<?>) ret).addEventListener(new FutureEventListener<Object>() {
@Override
public void onSuccess(Object value) {
finagleSpan.asyncFinish();
}
@Override
public void onFailure(Throwable cause) {
finagleSpan.errorOccurred();
finagleSpan.log(cause);
finagleSpan.asyncFinish();
}
});
}
return ret;
}
......
......@@ -51,6 +51,12 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
/*
* If the intercepted method throws exception, the ret will be null
*/
if (ret == null) {
return ret;
}
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) ret;
//
// The entry span will only be created when the consumer received at least one message.
......@@ -88,6 +94,12 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
/*
* The entry span is created in {@link #afterMethod}, but {@link #handleMethodException} is called before
* {@link #afterMethod}, before the creation of entry span, we can not ensure there is an active span
*/
if (ContextManager.isActive()) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册