diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java index 51492e2732730477c303be5767edf34c7ae9727c..7cc25f54862144649629947717149567c8a8cd94 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/AnnotationInterceptor.java @@ -19,7 +19,6 @@ package org.apache.skywalking.apm.plugin.finagle; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; -import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor; @@ -66,7 +65,7 @@ public class AnnotationInterceptor { * which comes from client. */ span.setOperationName(rpc); - tryInjectContext((ExitSpan) span); + tryInjectContext(span); } } } diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java index 5f083de718464a9f86b649a85d2004adfc2b8e7e..291660fc52b1518b58e29450254326fa1e412465 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ClientDestTracingFilterInterceptor.java @@ -20,7 +20,6 @@ package org.apache.skywalking.apm.plugin.finagle; import com.twitter.finagle.Address; import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance; import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult; @@ -47,7 +46,7 @@ public class ClientDestTracingFilterInterceptor extends AbstractInterceptor { public void beforeMethodImpl(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class[] classes, MethodInterceptResult methodInterceptResult) throws Throwable { String peer = (String) enhancedInstance.getSkyWalkingDynamicField(); getLocalContextHolder().let(FinagleCtxs.PEER_HOST, peer); - tryInjectContext((ExitSpan) getSpan()); + tryInjectContext(getSpan()); } @Override diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java index 52426e557efce4ee38c19355f5855fc044bd4269..d9c5503b50a4b3c01ae019a5cfece61a0483d30c 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/CodecUtils.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.context.CarrierItem; import org.apache.skywalking.apm.agent.core.context.ContextCarrier; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.util.StringUtil; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -33,6 +34,8 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import static org.apache.skywalking.apm.plugin.finagle.Constants.EMPTY_SWCONTEXTCARRIER; + public class CodecUtils { static ILog LOGGER = LogManager.getLogger(CodecUtils.class); @@ -68,21 +71,24 @@ public class CodecUtils { * @return */ static Buf encode(SWContextCarrier swContextCarrier) { - ByteArrayOutputStream bos = getBos(); - try (DataOutputStream dos = new DataOutputStream(bos)) { - putString(dos, swContextCarrier.getOperationName()); - CarrierItem next = swContextCarrier.getCarrier().items(); - while (next.hasNext()) { - next = next.next(); - if (next.getHeadKey() != null && next.getHeadValue() != null) { - putString(dos, next.getHeadKey()); - putString(dos, next.getHeadValue()); + if (StringUtil.isNotEmpty(swContextCarrier.getOperationName()) + && swContextCarrier.getCarrier() != null) { + ByteArrayOutputStream bos = getBos(); + try (DataOutputStream dos = new DataOutputStream(bos)) { + putString(dos, swContextCarrier.getOperationName()); + CarrierItem next = swContextCarrier.getCarrier().items(); + while (next.hasNext()) { + next = next.next(); + if (next.getHeadKey() != null && next.getHeadValue() != null) { + putString(dos, next.getHeadKey()); + putString(dos, next.getHeadValue()); + } } + bos.flush(); + return Bufs.ownedBuf(bos.toByteArray()); + } catch (Exception e) { + LOGGER.error("encode swContextCarrier exception.", e); } - bos.flush(); - return Bufs.ownedBuf(bos.toByteArray()); - } catch (Exception e) { - LOGGER.error("encode swContextCarrier exception.", e); } return Bufs.EMPTY; } @@ -102,23 +108,32 @@ public class CodecUtils { * @return */ static SWContextCarrier decode(Buf buf) { - ContextCarrier contextCarrier = new ContextCarrier(); - SWContextCarrier swContextCarrier = new SWContextCarrier(); - swContextCarrier.setContextCarrier(contextCarrier); - - ByteBuffer byteBuffer = ByteBuffer.wrap(Bufs.ownedByteArray(buf)); - String operationName = getNextString(byteBuffer); - if (operationName != null) { - swContextCarrier.setOperationName(operationName); - } + try { + byte[] bytes = Bufs.ownedByteArray(buf); + if (bytes == null || bytes.length == 0) { + return EMPTY_SWCONTEXTCARRIER; + } + ContextCarrier contextCarrier = new ContextCarrier(); + SWContextCarrier swContextCarrier = new SWContextCarrier(); + swContextCarrier.setContextCarrier(contextCarrier); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + String operationName = getNextString(byteBuffer); + if (operationName != null) { + swContextCarrier.setOperationName(operationName); + } - Map data = readToMap(byteBuffer); - CarrierItem next = contextCarrier.items(); - while (next.hasNext()) { - next = next.next(); - next.setHeadValue(data.get(next.getHeadKey())); + Map data = readToMap(byteBuffer); + CarrierItem next = contextCarrier.items(); + while (next.hasNext()) { + next = next.next(); + next.setHeadValue(data.get(next.getHeadKey())); + } + return swContextCarrier; + } catch (Exception e) { + LOGGER.error("decode swContextCarrier exception.", e); } - return swContextCarrier; + return EMPTY_SWCONTEXTCARRIER; } private static void putString(DataOutputStream dos, String value) throws IOException { diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java index 791a62489f33952944e8152675f8a6115732aa78..e32045d5d9c544f32c773749daad720654cf5173 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/Constants.java @@ -21,4 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle; public class Constants { public static final String PENDING_OP_NAME = "pending"; + + public static final SWContextCarrier EMPTY_SWCONTEXTCARRIER = new SWContextCarrier(); } diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java index 1f094513ab925ff7c20d058d54bc5fae4e980dcd..a1ac9d1729f1b643208c951775cef5a7feb416e4 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ContextCarrierHelper.java @@ -19,7 +19,9 @@ package org.apache.skywalking.apm.plugin.finagle; import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan; +import org.apache.skywalking.apm.agent.core.context.trace.ExitTypeSpan; import static org.apache.skywalking.apm.plugin.finagle.Constants.PENDING_OP_NAME; import static org.apache.skywalking.apm.plugin.finagle.FinagleCtxs.getPeerHost; @@ -34,23 +36,28 @@ class ContextCarrierHelper { * interceptor, we check if the op name and peer information are exists in LocalContext, if it exists, we set it * to span and inject to contextCarrier. */ - static void tryInjectContext(ExitSpan span) { - String operationName = span.getOperationName(); - if (PENDING_OP_NAME.equals(operationName)) { - return; - } - String peer = getPeerHost(); - if (peer == null) { - return; - } - span.setPeer(peer); + static void tryInjectContext(AbstractSpan span) { + /* + * this may be a {@link NoopSpan}. + */ + if (span != null && span.isExit()) { + String operationName = span.getOperationName(); + if (PENDING_OP_NAME.equals(operationName)) { + return; + } + String peer = getPeerHost(); + if (peer == null) { + return; + } + span.setPeer(peer); - ContextCarrier contextCarrier = new ContextCarrier(); - span.inject(contextCarrier); + ContextCarrier contextCarrier = new ContextCarrier(); + ((ExitTypeSpan) span).inject(contextCarrier); - SWContextCarrier swContextCarrier = getSWContextCarrier(); - // we can ensure swContextCarrier is not null here - swContextCarrier.setContextCarrier(contextCarrier); - swContextCarrier.setOperationName(operationName); + SWContextCarrier swContextCarrier = getSWContextCarrier(); + // we can ensure swContextCarrier is not null here + swContextCarrier.setContextCarrier(contextCarrier); + swContextCarrier.setOperationName(operationName); + } } } diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java index 6cd22be317ae0aba6e2cbc39e2833950e1cc4b55..b473cac1949fc4d6f1b8edae45e9867e58833695 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/finagle/ServerTracingFilterInterceptor.java @@ -21,7 +21,6 @@ package org.apache.skywalking.apm.plugin.finagle; import com.twitter.finagle.context.Contexts; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; -import org.apache.skywalking.apm.agent.core.context.ContextCarrier; import org.apache.skywalking.apm.agent.core.context.ContextManager; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; @@ -49,7 +48,7 @@ public class ServerTracingFilterInterceptor extends AbstractInterceptor { SWContextCarrier swContextCarrier = Contexts.broadcast().apply(SWContextCarrier$.MODULE$); span = ContextManager.createEntrySpan(swContextCarrier.getOperationName(), swContextCarrier.getCarrier()); } else { - span = ContextManager.createEntrySpan("unknown", new ContextCarrier()); + span = ContextManager.createEntrySpan("unknown", null); } span.setComponent(FINAGLE); diff --git a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java index d7763d6ec72d362d98201fe50c154c5c92e4d19e..4d4f1abd633bf0d71ef6f35160fb82f1c829cef0 100644 --- a/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java +++ b/apm-sniffer/apm-sdk-plugin/finagle-6.25.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/finagle/CodecUtilsTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.UUID; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; public class CodecUtilsTest { @@ -43,9 +44,7 @@ public class CodecUtilsTest { swContextCarrier = makeSWContextCarrier(); assertSwContextCarrier(swContextCarrier, CodecUtils.decode(CodecUtils.encode(swContextCarrier))); - ContextCarrier contextCarrier = new ContextCarrier(); swContextCarrier = new SWContextCarrier(); - swContextCarrier.setContextCarrier(contextCarrier); assertSwContextCarrier(swContextCarrier, CodecUtils.decode(Bufs.EMPTY)); } @@ -65,15 +64,20 @@ public class CodecUtilsTest { private void assertSwContextCarrier(SWContextCarrier expected, SWContextCarrier actual) { assertThat(expected.getOperationName(), is(actual.getOperationName())); Map data = new HashMap<>(); - CarrierItem next = expected.getCarrier().items(); - while (next.hasNext()) { - next = next.next(); - data.put(next.getHeadKey(), next.getHeadValue()); - } - next = actual.getCarrier().items(); - while (next.hasNext()) { - next = next.next(); - assertThat(next.getHeadValue(), is(data.get(next.getHeadKey()))); + if (actual.getCarrier() == null) { + assertNull(expected.getCarrier()); + } else { + CarrierItem next = expected.getCarrier().items(); + while (next.hasNext()) { + next = next.next(); + data.put(next.getHeadKey(), next.getHeadValue()); + } + next = actual.getCarrier().items(); + while (next.hasNext()) { + next = next.next(); + assertThat(next.getHeadValue(), is(data.get(next.getHeadKey()))); + } } + } } diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java index d834c4950c08839422274c3752573f387e2f930c..96e763e19c7c012fd24b80771ddc772ec2e326b5 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java @@ -44,7 +44,10 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor { RecordMetadata metadata = (RecordMetadata) allArguments[0]; AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback"); activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER); - Tags.MQ_TOPIC.set(activeSpan, metadata.topic()); + if (metadata != null) { + // Null if an error occurred during processing of this record + Tags.MQ_TOPIC.set(activeSpan, metadata.topic()); + } ContextManager.continued(snapshot); } }