未验证 提交 08781b41 编写于 作者: E Evan 提交者: GitHub

add transmission latency for MQ case (#5666)

Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
Co-authored-by: Nkezhenxu94 <kezhenxu94@apache.org>
上级 9b6386f3
......@@ -29,6 +29,14 @@ public final class StringUtil {
return !isEmpty(str);
}
public static boolean isBlank(String str) {
return str == null || isEmpty(str.trim());
}
public static boolean isNotBlank(String str) {
return !isBlank(str);
}
public static void setIfPresent(String value, Consumer<String> setter) {
if (isNotEmpty(value)) {
setter.accept(value);
......
......@@ -30,6 +30,14 @@ public class StringUtilTest {
Assert.assertFalse(StringUtil.isEmpty("A String"));
}
@Test
public void testIsBlank() {
Assert.assertTrue(StringUtil.isBlank(null));
Assert.assertTrue(StringUtil.isBlank(""));
Assert.assertTrue(StringUtil.isBlank(" "));
Assert.assertFalse(StringUtil.isBlank("A String"));
}
@Test
public void testJoin() {
Assert.assertNull(StringUtil.join('.'));
......
......@@ -43,6 +43,7 @@ public class ContextCarrier implements Serializable {
private CorrelationContext correlationContext = new CorrelationContext();
private ExtensionContext extensionContext = new ExtensionContext();
private ExtensionInjector extensionInjector = new ExtensionInjector(extensionContext);
public CarrierItem items() {
SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
......@@ -124,10 +125,6 @@ public class ContextCarrier implements Serializable {
return false;
}
public CorrelationContext getCorrelationContext() {
return correlationContext;
}
public enum HeaderVersion {
v3
}
......
......@@ -19,7 +19,12 @@
package org.apache.skywalking.apm.agent.core.context;
import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.StringUtil;
/**
......@@ -28,18 +33,37 @@ import org.apache.skywalking.apm.util.StringUtil;
*/
public class ExtensionContext {
private static final ILog LOGGER = LogManager.getLogger(ExtensionContext.class);
/**
* The separator of extendable fields.
*/
private static final String SEPARATOR = "-";
/**
* The default value of extendable fields.
*/
private static final String PLACEHOLDER = " ";
/**
* Tracing Mode. If true means represents all spans generated in this context should skip analysis.
*/
private boolean skipAnalysis;
/**
* The sending timestamp of the exit span.
*/
@Getter
@Setter
private Long sendingTimestamp;
/**
* Serialize this {@link ExtensionContext} to a {@link String}
*
* @return the serialization string.
*/
String serialize() {
return skipAnalysis ? "1" : "0";
String res = skipAnalysis ? "1" : "0";
res += SEPARATOR;
res += Objects.isNull(sendingTimestamp) ? PLACEHOLDER : sendingTimestamp;
return res;
}
/**
......@@ -49,11 +73,24 @@ public class ExtensionContext {
if (StringUtil.isEmpty(value)) {
return;
}
final String[] extensionParts = value.split("-");
String[] extensionParts = value.split(SEPARATOR);
String extensionPart;
// All parts of the extension header are optional.
// only try to read it when it exist.
if (extensionParts.length > 0) {
this.skipAnalysis = Objects.equals(extensionParts[0], "1");
extensionPart = extensionParts[0];
this.skipAnalysis = Objects.equals(extensionPart, "1");
}
if (extensionParts.length > 1) {
extensionPart = extensionParts[1];
if (StringUtil.isNotBlank(extensionPart)) {
try {
this.sendingTimestamp = Long.parseLong(extensionPart);
} catch (NumberFormatException e) {
LOGGER.error(e, "the downstream sending timestamp is illegal:[{}]", extensionPart);
}
}
}
}
......@@ -78,19 +115,25 @@ public class ExtensionContext {
if (this.skipAnalysis) {
span.skipAnalysis();
}
if (Objects.nonNull(sendingTimestamp)) {
Tags.TRANSMISSION_LATENCY.set(span, String.valueOf(System.currentTimeMillis() - sendingTimestamp));
}
}
/**
* Clone the context data, work for capture to cross-thread.
*/
@Override
public ExtensionContext clone() {
final ExtensionContext context = new ExtensionContext();
context.skipAnalysis = this.skipAnalysis;
context.sendingTimestamp = this.sendingTimestamp;
return context;
}
void continued(ContextSnapshot snapshot) {
this.skipAnalysis = snapshot.getExtensionContext().skipAnalysis;
this.sendingTimestamp = snapshot.getExtensionContext().sendingTimestamp;
}
@Override
......@@ -100,11 +143,11 @@ public class ExtensionContext {
if (o == null || getClass() != o.getClass())
return false;
ExtensionContext that = (ExtensionContext) o;
return skipAnalysis == that.skipAnalysis;
return skipAnalysis == that.skipAnalysis && Objects.equals(this.sendingTimestamp, that.sendingTimestamp);
}
@Override
public int hashCode() {
return Objects.hash(skipAnalysis);
return Objects.hash(skipAnalysis, sendingTimestamp);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.context;
import java.util.Optional;
/**
* Inject or read the extension protocol fields,such as {@link ExtensionContext#sendingTimestamp}.
*/
public class ExtensionInjector {
private final ExtensionContext extensionContext;
ExtensionInjector(final ExtensionContext extensionContext) {
this.extensionContext = extensionContext;
}
/**
* Inject the current time in milliseconds to the {@link ExtensionContext}.
*/
public void injectSendingTimestamp() {
extensionContext.setSendingTimestamp(System.currentTimeMillis());
}
/**
* Read the exit span sending timestamp from the {@link ExtensionContext}.
*/
public Optional<Long> readSendingTimestamp() {
return Optional.ofNullable(extensionContext.getSendingTimestamp());
}
}
......@@ -187,9 +187,9 @@ public class TracingContext implements AbstractTracerContext {
span.ref(ref);
}
carrier.getExtensionContext().handle(span);
this.correlationContext.extract(carrier);
this.extensionContext.extract(carrier);
this.extensionContext.handle(span);
}
/**
......
......@@ -77,6 +77,12 @@ public final class Tags {
*/
public static final StringTag MQ_TOPIC = new StringTag(9, "mq.topic");
/**
* The latency of transmission. When there are more than one downstream parent/segment-ref(s), multiple tags will be
* recorded, such as a batch consumption in MQ.
*/
public static final StringTag TRANSMISSION_LATENCY = new StringTag(15, "transmission.latency", false);
public static final class HTTP {
public static final StringTag METHOD = new StringTag(10, "http.method");
......
......@@ -124,4 +124,5 @@ public interface AbstractSpan extends AsyncSpan {
* Should skip analysis in the backend.
*/
void skipAnalysis();
}
......@@ -38,7 +38,7 @@ public class ContextCarrierV3HeaderTest {
} else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) {
next.setHeadValue("dGVzdA==:dHJ1ZQ==");
} else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) {
next.setHeadValue("1");
next.setHeadValue("1- ");
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
......@@ -63,7 +63,7 @@ public class ContextCarrierV3HeaderTest {
contextCarrier.getCorrelationContext().put("test", "true");
contextCarrier.getExtensionContext().deserialize("1");
contextCarrier.getExtensionContext().deserialize("1- ");
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
......@@ -78,7 +78,7 @@ public class ContextCarrierV3HeaderTest {
*/
Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
} else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) {
Assert.assertEquals("1", next.getHeadValue());
Assert.assertEquals("1- ", next.getHeadValue());
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
......@@ -92,7 +92,7 @@ public class ContextCarrierV3HeaderTest {
} else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) {
Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
} else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) {
Assert.assertEquals("1", next.getHeadValue());
Assert.assertEquals("1- ", next.getHeadValue());
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
......@@ -117,7 +117,7 @@ public class ContextCarrierV3HeaderTest {
contextCarrier.setParentEndpoint("/app");
contextCarrier.getCorrelationContext().put("test", "true");
contextCarrier.getExtensionContext().deserialize("1");
contextCarrier.getExtensionContext().deserialize("1- ");
CarrierItem next = contextCarrier.items();
String sw6HeaderValue = null;
......
......@@ -18,36 +18,51 @@
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
public class ExtensionContextTest {
@Test
public void testSerialize() {
final ExtensionContext context = new ExtensionContext();
Assert.assertEquals(context.serialize(), "0");
Assert.assertEquals(context.serialize(), "0- ");
context.deserialize("1- ");
Assert.assertEquals(context.serialize(), "1- ");
context.deserialize("1-1");
Assert.assertEquals(context.serialize(), "1-1");
context.deserialize("1");
Assert.assertEquals(context.serialize(), "1");
}
@Test
public void testDeSerialize() {
final ExtensionContext context = new ExtensionContext();
context.deserialize("");
Assert.assertEquals(context.serialize(), "0");
Assert.assertEquals(context.serialize(), "0- ");
context.deserialize("0- ");
Assert.assertEquals(context.serialize(), "0- ");
context.deserialize("0");
Assert.assertEquals(context.serialize(), "0");
context.deserialize("test- ");
Assert.assertEquals(context.serialize(), "0- ");
context.deserialize("test");
Assert.assertEquals(context.serialize(), "0");
context.deserialize("1-test");
Assert.assertEquals(context.serialize(), "1- ");
context.deserialize("0-1602743904804");
Assert.assertEquals(context.serialize(), "0-1602743904804");
}
@Test
......@@ -55,21 +70,41 @@ public class ExtensionContextTest {
final ExtensionContext context = new ExtensionContext();
Assert.assertEquals(context, context.clone());
context.deserialize("1");
context.deserialize("0-1602743904804");
Assert.assertEquals(context, context.clone());
}
@Test
public void testHandle() {
public void testHandle() throws Exception {
final ExtensionContext context = new ExtensionContext();
context.deserialize("1");
context.deserialize("1- ");
NoopSpan span = Mockito.mock(NoopSpan.class);
context.handle(span);
verify(span, times(1)).skipAnalysis();
context.deserialize("0");
context.deserialize("0- ");
span = Mockito.mock(NoopSpan.class);
context.handle(span);
verify(span, times(0)).skipAnalysis();
PowerMockito.mockStatic(System.class);
PowerMockito.when(System.currentTimeMillis()).thenReturn(1602743904804L + 500);
span = PowerMockito.mock(NoopSpan.class);
context.deserialize("0-1602743904804");
context.handle(span);
verify(span, times(0)).tag(Tags.TRANSMISSION_LATENCY, "500");
}
@Test
public void testEqual() {
Assert.assertEquals(new ExtensionContext(), new ExtensionContext());
ExtensionContext context = new ExtensionContext();
context.setSendingTimestamp(1L);
Assert.assertNotEquals(context, new ExtensionContext());
Assert.assertNotEquals(new ExtensionContext(), context);
ExtensionContext another = new ExtensionContext();
another.setSendingTimestamp(1L);
Assert.assertEquals(context, another);
Assert.assertEquals(another, context);
}
}
......@@ -59,6 +59,7 @@ public class ActiveMQProducerInterceptor implements InstanceMethodsAroundInterce
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_TOPIC.set(activeSpan, activeMQDestination.getPhysicalName());
}
contextCarrier.getExtensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_PRODUCER);
CarrierItem next = contextCarrier.items();
......
......@@ -52,6 +52,7 @@ public class KafkaProducerInterceptor implements InstanceMethodsAroundIntercepto
Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, topicName);
contextCarrier.getExtensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
......
......@@ -65,6 +65,7 @@ public class PulsarProducerInterceptor implements InstanceMethodsAroundIntercept
.getServiceUrl());
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getServiceUrl());
Tags.MQ_TOPIC.set(activeSpan, topicName);
contextCarrier.getExtensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.PULSAR_PRODUCER);
CarrierItem next = contextCarrier.items();
......
......@@ -77,6 +77,7 @@ public class RabbitMQProducerInterceptor implements InstanceMethodsAroundInterce
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_QUEUE.set(activeSpan, queueName);
Tags.MQ_TOPIC.set(activeSpan, exChangeName);
contextCarrier.getExtensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.RABBITMQ_PRODUCER);
CarrierItem next = contextCarrier.items();
......
......@@ -59,6 +59,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
Tags.MQ_TOPIC.set(span, message.getTopic());
contextCarrier.getExtensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3];
......
......@@ -59,6 +59,7 @@ public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
Tags.MQ_TOPIC.set(span, message.getTopic());
contextCarrier.getExtensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3];
......
......@@ -43,5 +43,5 @@ The current value includes fields.
1. Tracing Mode. empty, 0 or 1. empty or 0 is default. 1 represents all spans generated in this context should skip analysis,
`spanObject#skipAnalysis=true`. This context should be propagated to upstream in the default, unless it is changed in the
tracing process.
2. The timestamp of sending at the client-side. This is used in async RPC such as MQ. Once it is set, the consumer side would calculate the latency between sending and receiving, and tag the latency in the span by using key `transmission.latency` automatically.
......@@ -65,6 +65,7 @@ segmentItems:
tags:
- {key: mq.broker, value: not null}
- {key: mq.queue, value: test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /activemq-scenario/case/activemq, networkAddress: not null,
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,
......
......@@ -120,6 +120,7 @@ segmentItems:
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /case/kafka-case, networkAddress: 'kafka-server:9092', refType: CrossProcess,
parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
......@@ -156,6 +157,7 @@ segmentItems:
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: test.}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /case/kafka-case, networkAddress: 'kafka-server:9092', refType: CrossProcess,
parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
......
......@@ -83,8 +83,9 @@ segmentItems:
spanType: Entry
peer: ''
tags:
- {key: mq.broker, value: not null}
- {key: mq.topic, value: test}
- {key: transmission.latency, value: not null}
- {key: mq.broker, value: not null}
- {key: mq.topic, value: test}
refs:
- {parentEndpoint: /case/pulsar-case, networkAddress: not null, refType: CrossProcess,
parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
......
......@@ -34,6 +34,7 @@ segmentItems:
- {key: mq.broker, value: not null}
- {key: mq.topic, value: ''}
- {key: mq.queue, value: test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /rabbitmq-scenario/case/rabbitmq, networkAddress: not null,
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not
......
......@@ -103,6 +103,7 @@ segmentItems:
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
......
......@@ -140,6 +140,7 @@ segmentItems:
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
......@@ -177,6 +178,7 @@ segmentItems:
tags:
- {key: mq.broker, value: 'kafka-server:9092'}
- {key: mq.topic, value: spring_test}
- {key: transmission.latency, value: not null}
refs:
- {parentEndpoint: /case/spring-kafka-case, networkAddress: 'kafka-server:9092',
refType: CrossProcess, parentSpanId: not null, parentTraceSegmentId: not null,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册