未验证 提交 a2dd0e13 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Polish the context related codes. (#5764)

上级 08781b41
......@@ -19,10 +19,12 @@
package org.apache.skywalking.apm.agent.core.context;
import java.io.Serializable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.agent.core.base64.Base64;
import org.apache.skywalking.apm.agent.core.conf.Constants;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.util.StringUtil;
/**
......@@ -30,28 +32,70 @@ import org.apache.skywalking.apm.util.StringUtil;
* TracingContext}.
* <p>
*/
@Setter
@Getter
@Setter(AccessLevel.PACKAGE)
public class ContextCarrier implements Serializable {
@Getter
private String traceId;
/**
* The segment id of the parent.
*/
@Getter
private String traceSegmentId;
/**
* The span id in the parent segment.
*/
@Getter
private int spanId = -1;
@Getter
private String parentService = Constants.EMPTY_STRING;
@Getter
private String parentServiceInstance = Constants.EMPTY_STRING;
/**
* The endpoint(entrance URI/method signature) of the parent service.
*/
@Getter
private String parentEndpoint;
/**
* The network address(ip:port, hostname:port) used in the parent service to access the current service.
*/
@Getter
private String addressUsedAtClient;
private CorrelationContext correlationContext = new CorrelationContext();
/**
* The extension context contains the optional context to enhance the analysis in some certain scenarios.
*/
@Getter(AccessLevel.PACKAGE)
private ExtensionContext extensionContext = new ExtensionContext();
private ExtensionInjector extensionInjector = new ExtensionInjector(extensionContext);
/**
* User's custom context container. The context propagates with the main tracing context.
*/
@Getter(AccessLevel.PACKAGE)
private CorrelationContext correlationContext = new CorrelationContext();
/**
* @return the list of items, which could exist in the current tracing context.
*/
public CarrierItem items() {
SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem);
SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(
correlationContext, sw8ExtensionCarrierItem);
SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
return new CarrierItemHead(sw8CarrierItem);
}
/**
* @return the injector for the extension context.
*/
public ExtensionInjector extensionInjector() {
return new ExtensionInjector(extensionContext);
}
/**
* Extract the extension context to the given span
*/
void extractExtensionTo(AbstractSpan span) {
this.extensionContext.handle(span);
}
/**
* Serialize this {@link ContextCarrier} to a {@link String}, with '|' split.
*
......
......@@ -17,19 +17,17 @@
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.base64.Base64;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.StringUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.agent.core.base64.Base64;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.StringUtil;
/**
* Correlation context, use to propagation user custom data.
* Working on the protocol and delegate set/get method.
*/
public class CorrelationContext {
......@@ -39,6 +37,13 @@ public class CorrelationContext {
this.data = new HashMap<>(Config.Correlation.ELEMENT_MAX_NUMBER);
}
/**
* Add or override the context.
*
* @param key to add or locate the existing context
* @param value as new value
* @return old one if exist.
*/
public Optional<String> put(String key, String value) {
// key must not null
if (key == null) {
......@@ -71,6 +76,10 @@ public class CorrelationContext {
return Optional.empty();
}
/**
* @param key to find the context
* @return value if exist.
*/
public Optional<String> get(String key) {
if (key == null) {
return Optional.empty();
......@@ -90,8 +99,8 @@ public class CorrelationContext {
}
return data.entrySet().stream()
.map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue()))
.collect(Collectors.joining(","));
.map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue()))
.collect(Collectors.joining(","));
}
/**
......@@ -116,7 +125,8 @@ public class CorrelationContext {
}
/**
* Prepare for the cross-process propagation. Inject the {@link #data} into {@link ContextCarrier#getCorrelationContext()}
* Prepare for the cross-process propagation. Inject the {@link #data} into {@link
* ContextCarrier#getCorrelationContext()}
*/
void inject(ContextCarrier carrier) {
carrier.getCorrelationContext().data.putAll(this.data);
......@@ -152,8 +162,10 @@ public class CorrelationContext {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
CorrelationContext that = (CorrelationContext) o;
return Objects.equals(data, that.data);
}
......
......@@ -18,8 +18,6 @@
package org.apache.skywalking.apm.agent.core.context;
import java.util.Optional;
/**
* Inject or read the extension protocol fields,such as {@link ExtensionContext#sendingTimestamp}.
*/
......@@ -37,11 +35,4 @@ public class ExtensionInjector {
public void injectSendingTimestamp() {
extensionContext.setSendingTimestamp(System.currentTimeMillis());
}
/**
* Read the exit span sending timestamp from the {@link ExtensionContext}.
*/
public Optional<Long> readSendingTimestamp() {
return Optional.ofNullable(extensionContext.getSendingTimestamp());
}
}
......@@ -187,7 +187,7 @@ public class TracingContext implements AbstractTracerContext {
span.ref(ref);
}
carrier.getExtensionContext().handle(span);
carrier.extractExtensionTo(span);
this.correlationContext.extract(carrier);
this.extensionContext.extract(carrier);
}
......
......@@ -59,7 +59,7 @@ public class ActiveMQProducerInterceptor implements InstanceMethodsAroundInterce
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_TOPIC.set(activeSpan, activeMQDestination.getPhysicalName());
}
contextCarrier.getExtensionInjector().injectSendingTimestamp();
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_PRODUCER);
CarrierItem next = contextCarrier.items();
......
......@@ -52,7 +52,7 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, topicName);
contextCarrier.getExtensionInjector().injectSendingTimestamp();
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
......
......@@ -65,7 +65,7 @@ public class PulsarProducerInterceptor implements InstanceMethodsAroundIntercept
.getServiceUrl());
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
Tags.MQ_TOPIC.set(activeSpan, topicName);
contextCarrier.getExtensionInjector().injectSendingTimestamp();
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
CarrierItem next = contextCarrier.items();
......
......@@ -77,7 +77,7 @@ public class RabbitMQProducerInterceptor implements InstanceMethodsAroundInterce
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_QUEUE.set(activeSpan, queueName);
Tags.MQ_TOPIC.set(activeSpan, exChangeName);
contextCarrier.getExtensionInjector().injectSendingTimestamp();
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.RABBITMQ_PRODUCER);
CarrierItem next = contextCarrier.items();
......
......@@ -59,7 +59,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
Tags.MQ_TOPIC.set(span, message.getTopic());
contextCarrier.getExtensionInjector().injectSendingTimestamp();
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3];
......
......@@ -59,7 +59,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
Tags.MQ_TOPIC.set(span, message.getTopic());
contextCarrier.getExtensionInjector().injectSendingTimestamp();
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册