From a2dd0e13d580425733c0464bf2f0dd22c6af130e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Sun, 1 Nov 2020 13:56:24 +0800 Subject: [PATCH] Polish the context related codes. (#5764) --- .../agent/core/context/ContextCarrier.java | 56 +++++++++++++++++-- .../core/context/CorrelationContext.java | 32 +++++++---- .../agent/core/context/ExtensionInjector.java | 9 --- .../agent/core/context/TracingContext.java | 2 +- .../activemq/ActiveMQProducerInterceptor.java | 2 +- .../kafka/KafkaProducerInterceptor.java | 2 +- .../pulsar/PulsarProducerInterceptor.java | 2 +- .../rabbitmq/RabbitMQProducerInterceptor.java | 2 +- .../rocketMQ/v3/MessageSendInterceptor.java | 2 +- .../rocketMQ/v4/MessageSendInterceptor.java | 2 +- 10 files changed, 79 insertions(+), 32 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java index 5c6419a303..0330c40568 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java @@ -19,10 +19,12 @@ package org.apache.skywalking.apm.agent.core.context; import java.io.Serializable; +import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import org.apache.skywalking.apm.agent.core.base64.Base64; import org.apache.skywalking.apm.agent.core.conf.Constants; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.util.StringUtil; /** @@ -30,28 +32,70 @@ import org.apache.skywalking.apm.util.StringUtil; * TracingContext}. *

*/ -@Setter -@Getter +@Setter(AccessLevel.PACKAGE) public class ContextCarrier implements Serializable { + @Getter private String traceId; + /** + * The segment id of the parent. + */ + @Getter private String traceSegmentId; + /** + * The span id in the parent segment. + */ + @Getter private int spanId = -1; + @Getter private String parentService = Constants.EMPTY_STRING; + @Getter private String parentServiceInstance = Constants.EMPTY_STRING; + /** + * The endpoint(entrance URI/method signature) of the parent service. + */ + @Getter private String parentEndpoint; + /** + * The network address(ip:port, hostname:port) used in the parent service to access the current service. + */ + @Getter private String addressUsedAtClient; - - private CorrelationContext correlationContext = new CorrelationContext(); + /** + * The extension context contains the optional context to enhance the analysis in some certain scenarios. + */ + @Getter(AccessLevel.PACKAGE) private ExtensionContext extensionContext = new ExtensionContext(); - private ExtensionInjector extensionInjector = new ExtensionInjector(extensionContext); + /** + * User's custom context container. The context propagates with the main tracing context. + */ + @Getter(AccessLevel.PACKAGE) + private CorrelationContext correlationContext = new CorrelationContext(); + /** + * @return the list of items, which could exist in the current tracing context. + */ public CarrierItem items() { SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null); - SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem); + SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem( + correlationContext, sw8ExtensionCarrierItem); SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem); return new CarrierItemHead(sw8CarrierItem); } + /** + * @return the injector for the extension context. + */ + public ExtensionInjector extensionInjector() { + return new ExtensionInjector(extensionContext); + } + + /** + * Extract the extension context to the given span + */ + void extractExtensionTo(AbstractSpan span) { + this.extensionContext.handle(span); + } + /** * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split. * diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java index 4f860b2b3d..8c51ce2872 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java @@ -17,19 +17,17 @@ package org.apache.skywalking.apm.agent.core.context; -import org.apache.skywalking.apm.agent.core.base64.Base64; -import org.apache.skywalking.apm.agent.core.conf.Config; -import org.apache.skywalking.apm.util.StringUtil; - import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.skywalking.apm.agent.core.base64.Base64; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.util.StringUtil; /** * Correlation context, use to propagation user custom data. - * Working on the protocol and delegate set/get method. */ public class CorrelationContext { @@ -39,6 +37,13 @@ public class CorrelationContext { this.data = new HashMap<>(Config.Correlation.ELEMENT_MAX_NUMBER); } + /** + * Add or override the context. + * + * @param key to add or locate the existing context + * @param value as new value + * @return old one if exist. + */ public Optional put(String key, String value) { // key must not null if (key == null) { @@ -71,6 +76,10 @@ public class CorrelationContext { return Optional.empty(); } + /** + * @param key to find the context + * @return value if exist. + */ public Optional get(String key) { if (key == null) { return Optional.empty(); @@ -90,8 +99,8 @@ public class CorrelationContext { } return data.entrySet().stream() - .map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue())) - .collect(Collectors.joining(",")); + .map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue())) + .collect(Collectors.joining(",")); } /** @@ -116,7 +125,8 @@ public class CorrelationContext { } /** - * Prepare for the cross-process propagation. Inject the {@link #data} into {@link ContextCarrier#getCorrelationContext()} + * Prepare for the cross-process propagation. Inject the {@link #data} into {@link + * ContextCarrier#getCorrelationContext()} */ void inject(ContextCarrier carrier) { carrier.getCorrelationContext().data.putAll(this.data); @@ -152,8 +162,10 @@ public class CorrelationContext { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; CorrelationContext that = (CorrelationContext) o; return Objects.equals(data, that.data); } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ExtensionInjector.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ExtensionInjector.java index 69ecbc0912..70c7a75742 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ExtensionInjector.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ExtensionInjector.java @@ -18,8 +18,6 @@ package org.apache.skywalking.apm.agent.core.context; -import java.util.Optional; - /** * Inject or read the extension protocol fields,such as {@link ExtensionContext#sendingTimestamp}. */ @@ -37,11 +35,4 @@ public class ExtensionInjector { public void injectSendingTimestamp() { extensionContext.setSendingTimestamp(System.currentTimeMillis()); } - - /** - * Read the exit span sending timestamp from the {@link ExtensionContext}. - */ - public Optional readSendingTimestamp() { - return Optional.ofNullable(extensionContext.getSendingTimestamp()); - } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java index 35342fdd2d..73ff7c6f0f 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java @@ -187,7 +187,7 @@ public class TracingContext implements AbstractTracerContext { span.ref(ref); } - carrier.getExtensionContext().handle(span); + carrier.extractExtensionTo(span); this.correlationContext.extract(carrier); this.extensionContext.extract(carrier); } diff --git a/apm-sniffer/apm-sdk-plugin/activemq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/activemq/ActiveMQProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/activemq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/activemq/ActiveMQProducerInterceptor.java index 0d77167f6b..bce3acfb53 100644 --- a/apm-sniffer/apm-sdk-plugin/activemq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/activemq/ActiveMQProducerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/activemq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/activemq/ActiveMQProducerInterceptor.java @@ -59,7 +59,7 @@ public class ActiveMQProducerInterceptor implements InstanceMethodsAroundInterce Tags.MQ_BROKER.set(activeSpan, url); Tags.MQ_TOPIC.set(activeSpan, activeMQDestination.getPhysicalName()); } - contextCarrier.getExtensionInjector().injectSendingTimestamp(); + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(activeSpan); activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_PRODUCER); CarrierItem next = contextCarrier.items(); diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java index d94aecff8b..6c598a8a93 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java @@ -52,7 +52,7 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField()); Tags.MQ_TOPIC.set(activeSpan, topicName); - contextCarrier.getExtensionInjector().injectSendingTimestamp(); + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(activeSpan); activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER); diff --git a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java index 9f6469f84c..5c61d85459 100644 --- a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarProducerInterceptor.java @@ -65,7 +65,7 @@ public class PulsarProducerInterceptor implements InstanceMethodsAroundIntercept .getServiceUrl()); Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl()); Tags.MQ_TOPIC.set(activeSpan, topicName); - contextCarrier.getExtensionInjector().injectSendingTimestamp(); + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(activeSpan); activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER); CarrierItem next = contextCarrier.items(); diff --git a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerInterceptor.java index 691b5e986a..b200f854cf 100644 --- a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerInterceptor.java @@ -77,7 +77,7 @@ public class RabbitMQProducerInterceptor implements InstanceMethodsAroundInterce Tags.MQ_BROKER.set(activeSpan, url); Tags.MQ_QUEUE.set(activeSpan, queueName); Tags.MQ_TOPIC.set(activeSpan, exChangeName); - contextCarrier.getExtensionInjector().injectSendingTimestamp(); + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(activeSpan); activeSpan.setComponent(ComponentsDefine.RABBITMQ_PRODUCER); CarrierItem next = contextCarrier.items(); diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java index 9daf7804b1..2313bcf49d 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java @@ -59,7 +59,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER); Tags.MQ_BROKER.set(span, (String) allArguments[0]); Tags.MQ_TOPIC.set(span, message.getTopic()); - contextCarrier.getExtensionInjector().injectSendingTimestamp(); + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(span); SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3]; diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java index 644117b736..50b8d19102 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java @@ -59,7 +59,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER); Tags.MQ_BROKER.set(span, (String) allArguments[0]); Tags.MQ_TOPIC.set(span, message.getTopic()); - contextCarrier.getExtensionInjector().injectSendingTimestamp(); + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(span); SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3]; -- GitLab