未验证 提交 5251cc72 编写于 作者: Y yoje 提交者: GitHub

fix finagle: process NoopSpan (#4544)

fix KafkaProducer CallbackInterceptor npe
Co-authored-by: Nhuangyongjie <huangyongjie@tigerbrokers>
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
上级 1c7529a5
......@@ -19,7 +19,6 @@
package org.apache.skywalking.apm.plugin.finagle;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
......@@ -66,7 +65,7 @@ public class AnnotationInterceptor {
* which comes from client.
*/
span.setOperationName(rpc);
tryInjectContext((ExitSpan) span);
tryInjectContext(span);
}
}
}
......
......@@ -20,7 +20,6 @@ package org.apache.skywalking.apm.plugin.finagle;
import com.twitter.finagle.Address;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
......@@ -47,7 +46,7 @@ public class ClientDestTracingFilterInterceptor extends AbstractInterceptor {
public void beforeMethodImpl(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, MethodInterceptResult methodInterceptResult) throws Throwable {
String peer = (String) enhancedInstance.getSkyWalkingDynamicField();
getLocalContextHolder().let(FinagleCtxs.PEER_HOST, peer);
tryInjectContext((ExitSpan) getSpan());
tryInjectContext(getSpan());
}
@Override
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.StringUtil;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
......@@ -33,6 +34,8 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static org.apache.skywalking.apm.plugin.finagle.Constants.EMPTY_SWCONTEXTCARRIER;
public class CodecUtils {
static ILog LOGGER = LogManager.getLogger(CodecUtils.class);
......@@ -68,21 +71,24 @@ public class CodecUtils {
* @return
*/
static Buf encode(SWContextCarrier swContextCarrier) {
ByteArrayOutputStream bos = getBos();
try (DataOutputStream dos = new DataOutputStream(bos)) {
putString(dos, swContextCarrier.getOperationName());
CarrierItem next = swContextCarrier.getCarrier().items();
while (next.hasNext()) {
next = next.next();
if (next.getHeadKey() != null && next.getHeadValue() != null) {
putString(dos, next.getHeadKey());
putString(dos, next.getHeadValue());
if (StringUtil.isNotEmpty(swContextCarrier.getOperationName())
&& swContextCarrier.getCarrier() != null) {
ByteArrayOutputStream bos = getBos();
try (DataOutputStream dos = new DataOutputStream(bos)) {
putString(dos, swContextCarrier.getOperationName());
CarrierItem next = swContextCarrier.getCarrier().items();
while (next.hasNext()) {
next = next.next();
if (next.getHeadKey() != null && next.getHeadValue() != null) {
putString(dos, next.getHeadKey());
putString(dos, next.getHeadValue());
}
}
bos.flush();
return Bufs.ownedBuf(bos.toByteArray());
} catch (Exception e) {
LOGGER.error("encode swContextCarrier exception.", e);
}
bos.flush();
return Bufs.ownedBuf(bos.toByteArray());
} catch (Exception e) {
LOGGER.error("encode swContextCarrier exception.", e);
}
return Bufs.EMPTY;
}
......@@ -102,23 +108,32 @@ public class CodecUtils {
* @return
*/
static SWContextCarrier decode(Buf buf) {
ContextCarrier contextCarrier = new ContextCarrier();
SWContextCarrier swContextCarrier = new SWContextCarrier();
swContextCarrier.setContextCarrier(contextCarrier);
ByteBuffer byteBuffer = ByteBuffer.wrap(Bufs.ownedByteArray(buf));
String operationName = getNextString(byteBuffer);
if (operationName != null) {
swContextCarrier.setOperationName(operationName);
}
try {
byte[] bytes = Bufs.ownedByteArray(buf);
if (bytes == null || bytes.length == 0) {
return EMPTY_SWCONTEXTCARRIER;
}
ContextCarrier contextCarrier = new ContextCarrier();
SWContextCarrier swContextCarrier = new SWContextCarrier();
swContextCarrier.setContextCarrier(contextCarrier);
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
String operationName = getNextString(byteBuffer);
if (operationName != null) {
swContextCarrier.setOperationName(operationName);
}
Map<String, String> data = readToMap(byteBuffer);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(data.get(next.getHeadKey()));
Map<String, String> data = readToMap(byteBuffer);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(data.get(next.getHeadKey()));
}
return swContextCarrier;
} catch (Exception e) {
LOGGER.error("decode swContextCarrier exception.", e);
}
return swContextCarrier;
return EMPTY_SWCONTEXTCARRIER;
}
private static void putString(DataOutputStream dos, String value) throws IOException {
......
......@@ -21,4 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle;
public class Constants {
public static final String PENDING_OP_NAME = "pending";
public static final SWContextCarrier EMPTY_SWCONTEXTCARRIER = new SWContextCarrier();
}
......@@ -19,7 +19,9 @@
package org.apache.skywalking.apm.plugin.finagle;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitTypeSpan;
import static org.apache.skywalking.apm.plugin.finagle.Constants.PENDING_OP_NAME;
import static org.apache.skywalking.apm.plugin.finagle.FinagleCtxs.getPeerHost;
......@@ -34,23 +36,28 @@ class ContextCarrierHelper {
* interceptor, we check if the op name and peer information are exists in LocalContext, if it exists, we set it
* to span and inject to contextCarrier.
*/
static void tryInjectContext(ExitSpan span) {
String operationName = span.getOperationName();
if (PENDING_OP_NAME.equals(operationName)) {
return;
}
String peer = getPeerHost();
if (peer == null) {
return;
}
span.setPeer(peer);
static void tryInjectContext(AbstractSpan span) {
/*
* this may be a {@link NoopSpan}.
*/
if (span != null && span.isExit()) {
String operationName = span.getOperationName();
if (PENDING_OP_NAME.equals(operationName)) {
return;
}
String peer = getPeerHost();
if (peer == null) {
return;
}
span.setPeer(peer);
ContextCarrier contextCarrier = new ContextCarrier();
span.inject(contextCarrier);
ContextCarrier contextCarrier = new ContextCarrier();
((ExitTypeSpan) span).inject(contextCarrier);
SWContextCarrier swContextCarrier = getSWContextCarrier();
// we can ensure swContextCarrier is not null here
swContextCarrier.setContextCarrier(contextCarrier);
swContextCarrier.setOperationName(operationName);
SWContextCarrier swContextCarrier = getSWContextCarrier();
// we can ensure swContextCarrier is not null here
swContextCarrier.setContextCarrier(contextCarrier);
swContextCarrier.setOperationName(operationName);
}
}
}
......@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle;
import com.twitter.finagle.context.Contexts;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
......@@ -49,7 +48,7 @@ public class ServerTracingFilterInterceptor extends AbstractInterceptor {
SWContextCarrier swContextCarrier = Contexts.broadcast().apply(SWContextCarrier$.MODULE$);
span = ContextManager.createEntrySpan(swContextCarrier.getOperationName(), swContextCarrier.getCarrier());
} else {
span = ContextManager.createEntrySpan("unknown", new ContextCarrier());
span = ContextManager.createEntrySpan("unknown", null);
}
span.setComponent(FINAGLE);
......
......@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.UUID;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
public class CodecUtilsTest {
......@@ -43,9 +44,7 @@ public class CodecUtilsTest {
swContextCarrier = makeSWContextCarrier();
assertSwContextCarrier(swContextCarrier, CodecUtils.decode(CodecUtils.encode(swContextCarrier)));
ContextCarrier contextCarrier = new ContextCarrier();
swContextCarrier = new SWContextCarrier();
swContextCarrier.setContextCarrier(contextCarrier);
assertSwContextCarrier(swContextCarrier, CodecUtils.decode(Bufs.EMPTY));
}
......@@ -65,15 +64,20 @@ public class CodecUtilsTest {
private void assertSwContextCarrier(SWContextCarrier expected, SWContextCarrier actual) {
assertThat(expected.getOperationName(), is(actual.getOperationName()));
Map<String, String> data = new HashMap<>();
CarrierItem next = expected.getCarrier().items();
while (next.hasNext()) {
next = next.next();
data.put(next.getHeadKey(), next.getHeadValue());
}
next = actual.getCarrier().items();
while (next.hasNext()) {
next = next.next();
assertThat(next.getHeadValue(), is(data.get(next.getHeadKey())));
if (actual.getCarrier() == null) {
assertNull(expected.getCarrier());
} else {
CarrierItem next = expected.getCarrier().items();
while (next.hasNext()) {
next = next.next();
data.put(next.getHeadKey(), next.getHeadValue());
}
next = actual.getCarrier().items();
while (next.hasNext()) {
next = next.next();
assertThat(next.getHeadValue(), is(data.get(next.getHeadKey())));
}
}
}
}
......@@ -44,7 +44,10 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
RecordMetadata metadata = (RecordMetadata) allArguments[0];
AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
if (metadata != null) {
// Null if an error occurred during processing of this record
Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
}
ContextManager.continued(snapshot);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册