未验证 提交 569baf32 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

Correlation protocol implement (#4555)

Co-authored-by: NMrproliu <mrproliu@lagou.com>
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
Co-authored-by: Nkezhenxu94 <kezhenxu94@apache.org>
上级 d53b27de
......@@ -18,6 +18,8 @@
package org.apache.skywalking.apm.toolkit.trace;
import java.util.Optional;
/**
* Try to access the sky-walking tracer context. The context is not existed, always. only the middleware, component, or
* rpc-framework are supported in the current invoke stack, in the same thread, the context will be available.
......@@ -33,4 +35,23 @@ public class TraceContext {
public static String traceId() {
return "";
}
/**
* Try to get the custom value from trace context.
*
* @return custom data value.
*/
public static Optional<String> getCorrelation(String key) {
return Optional.empty();
}
/**
* Put the custom key/value into trace context.
*
* @return previous value if it exists.
*/
public static Optional<String> putCorrelation(String key, String value) {
return Optional.empty();
}
}
......@@ -394,4 +394,16 @@ public class Config {
public static int HTTP_PARAMS_LENGTH_THRESHOLD = 1024;
}
}
public static class Correlation {
/**
* Max element count in the correlation context.
*/
public static int ELEMENT_MAX_NUMBER = 3;
/**
* Max value length of each element.
*/
public static int VALUE_MAX_LENGTH = 128;
}
}
......@@ -115,4 +115,8 @@ public interface AbstractTracerContext {
*/
void asyncStop(AsyncSpan span);
/**
* Get current correlation context
*/
CorrelationContext getCorrelationContext();
}
......@@ -70,8 +70,11 @@ public class ContextCarrier implements Serializable {
*/
private DistributedTraceId primaryDistributedTraceId;
private CorrelationContext correlationContext = new CorrelationContext();
public CarrierItem items() {
SW6CarrierItem sw6CarrierItem = new SW6CarrierItem(this, null);
SW7CorrelationCarrierItem sw7CorrelationCarrierItem = new SW7CorrelationCarrierItem(correlationContext, null);
SW6CarrierItem sw6CarrierItem = new SW6CarrierItem(this, sw7CorrelationCarrierItem);
return new CarrierItemHead(sw6CarrierItem);
}
......@@ -234,6 +237,10 @@ public class ContextCarrier implements Serializable {
this.entryServiceInstanceId = entryServiceInstanceId;
}
public CorrelationContext getCorrelationContext() {
return correlationContext;
}
public enum HeaderVersion {
v2
}
......
......@@ -225,4 +225,13 @@ public class ContextManager implements BootService {
return runtimeContext;
}
public static CorrelationContext getCorrelationContext() {
final AbstractTracerContext tracerContext = get();
if (tracerContext == null) {
return null;
}
return tracerContext.getCorrelationContext();
}
}
......@@ -49,12 +49,15 @@ public class ContextSnapshot {
private int entryApplicationInstanceId = DictionaryUtil.nullValue();
ContextSnapshot(ID traceSegmentId, int spanId, List<DistributedTraceId> distributedTraceIds) {
private CorrelationContext correlationContext;
ContextSnapshot(ID traceSegmentId, int spanId, List<DistributedTraceId> distributedTraceIds, CorrelationContext correlationContext) {
this.traceSegmentId = traceSegmentId;
this.spanId = spanId;
if (distributedTraceIds != null) {
this.primaryDistributedTraceId = distributedTraceIds.get(0);
}
this.correlationContext = correlationContext.clone();
}
public void setEntryOperationName(String entryOperationName) {
......@@ -108,4 +111,8 @@ public class ContextSnapshot {
public boolean isFromCurrent() {
return traceSegmentId.equals(ContextManager.capture().getTraceSegmentId());
}
public CorrelationContext getCorrelationContext() {
return correlationContext;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.base64.Base64;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.StringUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Correlation context, use to propagation user custom data.
* Working on the protocol and delegate set/get method.
*/
public class CorrelationContext {
private final Map<String, String> data;
public CorrelationContext() {
this.data = new HashMap<>(Config.Correlation.ELEMENT_MAX_NUMBER);
}
public Optional<String> put(String key, String value) {
// key must not null
if (key == null) {
return Optional.empty();
}
// remove and return previous value when value is empty
if (StringUtil.isEmpty(value)) {
return Optional.ofNullable(data.remove(key));
}
// check value length
if (value.length() > Config.Correlation.VALUE_MAX_LENGTH) {
return Optional.empty();
}
// already contain key
if (data.containsKey(key)) {
final String previousValue = data.put(key, value);
return Optional.of(previousValue);
}
// check keys count
if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
return Optional.empty();
}
// setting
data.put(key, value);
return Optional.empty();
}
public Optional<String> get(String key) {
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(data.get(key));
}
/**
* Serialize this {@link CorrelationContext} to a {@link String}
*
* @return the serialization string.
*/
String serialize() {
if (data.isEmpty()) {
return "";
}
return data.entrySet().stream()
.map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue()))
.collect(Collectors.joining(","));
}
/**
* Deserialize data from {@link String}
*/
void deserialize(String value) {
if (StringUtil.isEmpty(value)) {
return;
}
for (String perData : value.split(",")) {
// Only data with limited count of elements can be added
if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
break;
}
final String[] parts = perData.split(":");
String perDataKey = parts[0];
String perDataValue = parts.length > 1 ? parts[1] : "";
data.put(Base64.decode2UTFString(perDataKey), Base64.decode2UTFString(perDataValue));
}
}
/**
* Prepare for the cross-process propagation. Inject the {@link #data} into {@link ContextCarrier#getCorrelationContext()}
*/
void inject(ContextCarrier carrier) {
carrier.getCorrelationContext().data.putAll(this.data);
}
/**
* Extra the {@link ContextCarrier#getCorrelationContext()} into this context.
*/
void extract(ContextCarrier carrier) {
final Map<String, String> carrierCorrelationContext = carrier.getCorrelationContext().data;
for (Map.Entry<String, String> entry : carrierCorrelationContext.entrySet()) {
// Only data with limited count of elements can be added
if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
break;
}
this.data.put(entry.getKey(), entry.getValue());
}
}
/**
* Clone the context data, work for capture to cross-thread.
*/
public CorrelationContext clone() {
final CorrelationContext context = new CorrelationContext();
context.data.putAll(this.data);
return context;
}
void continued(ContextSnapshot snapshot) {
this.data.putAll(snapshot.getCorrelationContext().data);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CorrelationContext that = (CorrelationContext) o;
return Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(data);
}
}
......@@ -32,30 +32,33 @@ import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
public class IgnoredTracerContext implements AbstractTracerContext {
private static final NoopSpan NOOP_SPAN = new NoopSpan();
private final CorrelationContext correlationContext;
private int stackDepth;
public IgnoredTracerContext() {
this.stackDepth = 0;
this.correlationContext = new CorrelationContext();
}
@Override
public void inject(ContextCarrier carrier) {
this.correlationContext.inject(carrier);
}
@Override
public void extract(ContextCarrier carrier) {
this.correlationContext.extract(carrier);
}
@Override
public ContextSnapshot capture() {
return new ContextSnapshot(null, -1, null);
return new ContextSnapshot(null, -1, null, correlationContext);
}
@Override
public void continued(ContextSnapshot snapshot) {
this.correlationContext.continued(snapshot);
}
@Override
......@@ -105,6 +108,11 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
@Override
public CorrelationContext getCorrelationContext() {
return this.correlationContext;
}
public static class ListenerManager {
private static List<IgnoreTracerContextListener> LISTENERS = new LinkedList<>();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.context;
public class SW7CorrelationCarrierItem extends CarrierItem {
public static final String HEADER_NAME = "sw7-correlation";
private final CorrelationContext correlationContext;
public SW7CorrelationCarrierItem(CorrelationContext correlationContext, CarrierItem next) {
super(HEADER_NAME, correlationContext.serialize(), next);
this.correlationContext = correlationContext;
}
@Override
public void setHeadValue(String headValue) {
this.correlationContext.deserialize(headValue);
}
}
......@@ -111,6 +111,8 @@ public class TracingContext implements AbstractTracerContext {
*/
private final ProfileStatusReference profileStatus;
private final CorrelationContext correlationContext;
/**
* Initialize all fields with default value.
*/
......@@ -130,6 +132,8 @@ public class TracingContext implements AbstractTracerContext {
PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
}
this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(this, segment.getTraceSegmentId(), firstOPName);
this.correlationContext = new CorrelationContext();
}
/**
......@@ -232,6 +236,8 @@ public class TracingContext implements AbstractTracerContext {
}
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
this.correlationContext.inject(carrier);
}
/**
......@@ -248,6 +254,8 @@ public class TracingContext implements AbstractTracerContext {
if (span instanceof EntrySpan) {
span.ref(ref);
}
this.correlationContext.extract(carrier);
}
/**
......@@ -259,7 +267,7 @@ public class TracingContext implements AbstractTracerContext {
public ContextSnapshot capture() {
List<TraceSegmentRef> refs = this.segment.getRefs();
ContextSnapshot snapshot = new ContextSnapshot(
segment.getTraceSegmentId(), activeSpan().getSpanId(), segment.getRelatedGlobalTraces());
segment.getTraceSegmentId(), activeSpan().getSpanId(), segment.getRelatedGlobalTraces(), this.correlationContext);
int entryOperationId;
String entryOperationName = "";
int entryApplicationInstanceId;
......@@ -327,6 +335,7 @@ public class TracingContext implements AbstractTracerContext {
this.segment.ref(segmentRef);
this.activeSpan().ref(segmentRef);
this.segment.relatedGlobalTraces(snapshot.getDistributedTraceId());
this.correlationContext.continued(snapshot);
}
/**
......@@ -510,6 +519,11 @@ public class TracingContext implements AbstractTracerContext {
finish();
}
@Override
public CorrelationContext getCorrelationContext() {
return this.correlationContext;
}
/**
* Re-check current trace need profiling, encase third part plugin change the operation name.
*
......
......@@ -34,7 +34,13 @@ public class ContextCarrierV2HeaderTest {
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw--");
if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
next.setHeadValue("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw--");
} else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
next.setHeadValue("dGVzdA==:dHJ1ZQ==");
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
}
Assert.assertTrue(contextCarrier.isValid());
......@@ -55,6 +61,8 @@ public class ContextCarrierV2HeaderTest {
contextCarrier.setEntryEndpointName("/portal");
contextCarrier.setParentEndpointId(123);
contextCarrier.getCorrelationContext().put("test", "true");
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
......@@ -63,13 +71,30 @@ public class ContextCarrierV2HeaderTest {
*
* "1-3.4.5-1.2.3-4-1-1-#127.0.0.1:8080-#/portal-123"
*/
Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
} else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
/**
* customKey:customValue
*
* "test:true"
*/
Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
}
next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-1-1-IzEyNy4wLjAuMTo4MDgw-Iy9wb3J0YWw=-MTIz", next.getHeadValue());
} else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue());
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
}
Assert.assertTrue(contextCarrier.isValid());
......@@ -90,18 +115,33 @@ public class ContextCarrierV2HeaderTest {
contextCarrier.setEntryEndpointName("/portal");
contextCarrier.setParentEndpointId(123);
contextCarrier.getCorrelationContext().put("test", "true");
CarrierItem next = contextCarrier.items();
String headerValue = null;
String sw6HeaderValue = null;
String correlationHeaderValue = null;
while (next.hasNext()) {
next = next.next();
headerValue = next.getHeadValue();
if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
sw6HeaderValue = next.getHeadValue();
} else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
correlationHeaderValue = next.getHeadValue();
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
}
ContextCarrier contextCarrier2 = new ContextCarrier();
next = contextCarrier2.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(headerValue);
if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
next.setHeadValue(sw6HeaderValue);
} else if (next.getHeadKey().equals(SW7CorrelationCarrierItem.HEADER_NAME)) {
next.setHeadValue(correlationHeaderValue);
} else {
throw new IllegalArgumentException("Unknown Header: " + next.getHeadKey());
}
}
Assert.assertTrue(contextCarrier2.isValid());
......@@ -112,5 +152,6 @@ public class ContextCarrierV2HeaderTest {
Assert.assertEquals(contextCarrier.getEntryEndpointName(), contextCarrier2.getEntryEndpointName());
Assert.assertEquals(contextCarrier.getEntryServiceInstanceId(), contextCarrier2.getEntryServiceInstanceId());
Assert.assertEquals(contextCarrier.getParentServiceInstanceId(), contextCarrier2.getParentServiceInstanceId());
Assert.assertEquals(contextCarrier.getCorrelationContext(), contextCarrier2.getCorrelationContext());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Optional;
public class CorrelationContextTest {
@Before
public void setupConfig() {
Config.Correlation.ELEMENT_MAX_NUMBER = 2;
Config.Correlation.VALUE_MAX_LENGTH = 8;
}
@Test
public void testSet() {
final CorrelationContext context = new CorrelationContext();
// manual set
Optional<String> previous = context.put("test1", "t1");
Assert.assertNotNull(previous);
Assert.assertFalse(previous.isPresent());
// set with replace old value
previous = context.put("test1", "t1New");
Assert.assertNotNull(previous);
Assert.assertEquals("t1", previous.get());
// manual set
previous = context.put("test2", "t2");
Assert.assertNotNull(previous);
Assert.assertFalse(previous.isPresent());
// out of key count
previous = context.put("test3", "t3");
Assert.assertNotNull(previous);
Assert.assertFalse(previous.isPresent());
// key not null
previous = context.put(null, "t3");
Assert.assertNotNull(previous);
Assert.assertFalse(previous.isPresent());
// out of value length
previous = context.put(null, "123456789");
Assert.assertNotNull(previous);
Assert.assertFalse(previous.isPresent());
}
@Test
public void testGet() {
final CorrelationContext context = new CorrelationContext();
context.put("test1", "t1");
// manual get
Assert.assertEquals("t1", context.get("test1").get());
// ket if null
Assert.assertNull(context.get(null).orElse(null));
// value if null
context.put("test2", null);
Assert.assertNull(context.get("test2").orElse(null));
}
@Test
public void testSerialize() {
// manual
CorrelationContext context = new CorrelationContext();
context.put("test1", "t1");
context.put("test2", "t2");
Assert.assertEquals("dGVzdDE=:dDE=,dGVzdDI=:dDI=", context.serialize());
// empty value
context = new CorrelationContext();
context.put("test1", null);
Assert.assertEquals("", context.serialize());
// empty
context = new CorrelationContext();
Assert.assertEquals("", context.serialize());
}
@Test
public void testDeserialize() {
// manual
CorrelationContext context = new CorrelationContext();
context.deserialize("dGVzdDE=:dDE=,dGVzdDI=:dDI=");
Assert.assertEquals("t1", context.get("test1").get());
Assert.assertEquals("t2", context.get("test2").get());
// empty value
context = new CorrelationContext();
context.deserialize("dGVzdDE=:");
Assert.assertEquals("", context.get("test1").get());
// empty string
context = new CorrelationContext();
context.deserialize("");
Assert.assertNull(context.get("test1").orElse(null));
context.deserialize(null);
Assert.assertNull(context.get("test1").orElse(null));
}
}
......@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.plugin.finagle;
import com.twitter.io.Bufs;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.SW6CarrierItem;
import org.junit.Test;
import java.util.HashMap;
......@@ -53,7 +54,9 @@ public class CodecUtilsTest {
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(UUID.randomUUID().toString());
if (next.getHeadKey().equals(SW6CarrierItem.HEADER_NAME)) {
next.setHeadValue(UUID.randomUUID().toString());
}
}
SWContextCarrier swContextCarrier = new SWContextCarrier();
swContextCarrier.setContextCarrier(contextCarrier);
......
......@@ -53,6 +53,7 @@ import static junit.framework.TestCase.assertNotNull;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -132,7 +133,7 @@ public class HttpClientExecuteInterceptorTest {
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertHttpSpan(spans.get(0));
verify(request).setHeader(anyString(), anyString());
verify(request, times(2)).setHeader(anyString(), anyString());
}
@Test
......@@ -153,7 +154,7 @@ public class HttpClientExecuteInterceptorTest {
assertHttpSpan(spans.get(0));
assertThat(SpanHelper.getErrorOccurred(spans.get(0)), is(true));
verify(request).setHeader(anyString(), anyString());
verify(request, times(2)).setHeader(anyString(), anyString());
}
@Test
......@@ -171,7 +172,7 @@ public class HttpClientExecuteInterceptorTest {
assertHttpSpan(span);
assertThat(SpanHelper.getErrorOccurred(span), is(true));
assertHttpSpanErrorLog(SpanHelper.getLogs(span));
verify(request).setHeader(anyString(), anyString());
verify(request, times(2)).setHeader(anyString(), anyString());
}
......@@ -201,7 +202,7 @@ public class HttpClientExecuteInterceptorTest {
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertHttpSpan(spans.get(0));
verify(request).setHeader(anyString(), anyString());
verify(request, times(2)).setHeader(anyString(), anyString());
}
private void assertHttpSpanErrorLog(List<LogDataEntity> logs) {
......
......@@ -191,7 +191,7 @@ public class HttpAsyncClientInterceptorTest {
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(findNeedSegemnt());
assertHttpSpan(spans.get(0));
verify(requestWrapper).setHeader(anyString(), anyString());
verify(requestWrapper, times(2)).setHeader(anyString(), anyString());
}
......
......@@ -47,6 +47,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.apache.skywalking.apm.agent.test.tools.SpanAssert.assertComponent;
......@@ -92,7 +93,7 @@ public class MotanConsumerInterceptorTest {
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertMotanConsumerSpan(spans.get(0));
verify(request).setAttachment(anyString(), anyString());
verify(request, times(2)).setAttachment(anyString(), anyString());
}
@Test
......@@ -110,7 +111,7 @@ public class MotanConsumerInterceptorTest {
private void assertTraceSegmentWhenOccurException(AbstractTracingSpan tracingSpan) {
assertMotanConsumerSpan(tracingSpan);
verify(request).setAttachment(anyString(), anyString());
verify(request, times(2)).setAttachment(anyString(), anyString());
List<LogDataEntity> logDataEntities = SpanHelper.getLogs(tracingSpan);
assertThat(logDataEntities.size(), is(1));
SpanAssert.assertException(logDataEntities.get(0), RuntimeException.class);
......
......@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -118,7 +119,7 @@ public class ProducerOperationHandlerInterceptorTest {
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertCombSpan(spans.get(0));
verify(invocation).getContext();
verify(invocation, times(2)).getContext();
}
private void assertCombSpan(AbstractTracingSpan span) {
......
......@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -113,7 +114,7 @@ public class TransportClientHandlerInterceptorTest {
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertCombSpan(spans.get(0));
verify(invocation).getContext();
verify(invocation, times(2)).getContext();
}
private void assertCombSpan(AbstractTracingSpan span) {
......
......@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -118,7 +119,7 @@ public class ProducerOperationHandlerInterceptorTest {
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertCombSpan(spans.get(0));
verify(invocation).getContext();
verify(invocation, times(2)).getContext();
}
private void assertCombSpan(AbstractTracingSpan span) {
......
......@@ -51,6 +51,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -113,7 +114,7 @@ public class TransportClientHandlerInterceptorTest {
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertCombSpan(spans.get(0));
verify(invocation).getContext();
verify(invocation, times(2)).getContext();
}
private void assertCombSpan(AbstractTracingSpan span) {
......
......@@ -33,7 +33,7 @@ public enum MockContextSnapshot {
List<DistributedTraceId> distributedTraceIds = new ArrayList<DistributedTraceId>();
distributedTraceIds.add(new NewDistributedTraceId());
contextSnapshot = new ContextSnapshot(new ID(1, 2, 3), 1, distributedTraceIds);
contextSnapshot = new ContextSnapshot(new ID(1, 2, 3), 1, distributedTraceIds, new CorrelationContext());
contextSnapshot.setEntryApplicationInstanceId(1);
contextSnapshot.setEntryOperationId(0);
contextSnapshot.setEntryOperationName("/for-test-entryOperationName");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.toolkit.activation.trace;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
import java.lang.reflect.Method;
import java.util.Optional;
public class CorrelationContextGetInterceptor implements StaticMethodsAroundInterceptor {
@Override
public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, MethodInterceptResult result) {
final String key = (String) allArguments[0];
final Optional<String> data = ContextManager.getCorrelationContext().get(key);
result.defineReturnValue(data);
}
@Override
public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Object ret) {
return ret;
}
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Throwable t) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.toolkit.activation.trace;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
import java.lang.reflect.Method;
import java.util.Optional;
public class CorrelationContextPutInterceptor implements StaticMethodsAroundInterceptor {
@Override
public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, MethodInterceptResult result) {
final String key = (String) allArguments[0];
final String value = (String) allArguments[1];
final Optional<String> previous = ContextManager.getCorrelationContext().put(key, value);
result.defineReturnValue(previous);
}
@Override
public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Object ret) {
return ret;
}
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes, Throwable t) {
}
}
......@@ -38,6 +38,10 @@ public class TraceContextActivation extends ClassStaticMethodsEnhancePluginDefin
public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.TraceContextInterceptor";
public static final String ENHANCE_CLASS = "org.apache.skywalking.apm.toolkit.trace.TraceContext";
public static final String ENHANCE_METHOD = "traceId";
public static final String ENHANCE_GET_CORRELATION_METHOD = "getCorrelation";
public static final String INTERCEPT_GET_CORRELATION_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.CorrelationContextGetInterceptor";
public static final String ENHANCE_PUT_CORRELATION_METHOD = "putCorrelation";
public static final String INTERCEPT_PUT_CORRELATION_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.CorrelationContextPutInterceptor";
/**
* @return the target class, which needs active.
......@@ -65,6 +69,38 @@ public class TraceContextActivation extends ClassStaticMethodsEnhancePluginDefin
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_GET_CORRELATION_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_GET_CORRELATION_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_PUT_CORRELATION_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_PUT_CORRELATION_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
......
......@@ -9,7 +9,7 @@ Cross Process Correlation Header key is `sw7-correlation`. The value is the `enc
## Recommendations of language APIs
Recommended implementation in different language API.
1. `CorrelationContext#set` and `CorrelationContext#get` are recommended to write and read the correlation context, with key/value string.
1. `TraceContext#putCorrelation` and `TraceContext#getCorrelation` are recommended to write and read the correlation context, with key/value string.
1. The key should be added if it is absent.
1. The later writes should override the previous value.
1. The total number of all keys should be less than 3, and the length of each value should be less than 128 bytes.
......
......@@ -53,3 +53,14 @@ public User methodYouWantToTrace(String param1, String param2) {
}
```
* Use `TraceContext.putCorrelation()` API to put custom data in tracing context.
```java
Optional<String> previous = TraceContext.putCorrelation("customKey", "customValue");
```
CorrelationContext will remove the item when the value is `null` or empty.
* Use `TraceContext.getCorrelation()` API to get custom data.
```java
Optional<String> value = TraceContext.getCorrelation("customKey");
```
CorrelationContext configuration descriptions could be found in [the agent configuration](README.md#table-of-agent-configuration-properties) documentation, with `correlation.` as the prefix.
......@@ -125,6 +125,8 @@ property key | Description | Default |
`plugin.tomcat.collect_http_params`| This config item controls that whether the Tomcat plugin should collect the parameters of the request. Also, activate implicitly in the profiled trace. | `false` |
`plugin.springmvc.collect_http_params`| This config item controls that whether the SpringMVC plugin should collect the parameters of the request, when your Spring application is based on Tomcat, consider only setting either `plugin.tomcat.collect_http_params` or `plugin.springmvc.collect_http_params`. Also, activate implicitly in the profiled trace. | `false` |
`plugin.http.http_params_length_threshold`| When `COLLECT_HTTP_PARAMS` is enabled, how many characters to keep and send to the OAP backend, use negative values to keep and send the complete parameters, NB. this config item is added for the sake of performance. | `1024` |
`correlation.element_max_number`|Max element count of the correlation context.|`3`|
`correlation.value_max_length`|Max value length of correlation context element.|`128`|
## Optional Plugins
Java agent plugins are all pluggable. Optional plugins could be provided in `optional-plugins` folder under agent or 3rd party repositories.
......@@ -158,7 +160,7 @@ Now, we have the following known bootstrap plugins.
* If you want to use OpenTracing Java APIs, try [SkyWalking OpenTracing compatible tracer](Opentracing.md). More details you could find at http://opentracing.io
* If you want to print trace context(e.g. traceId) in your logs, choose the log frameworks, [log4j](Application-toolkit-log4j-1.x.md),
[log4j2](Application-toolkit-log4j-2.x.md), [logback](Application-toolkit-logback-1.x.md)
* If you want to use annotations or SkyWalking native APIs to read context, try [SkyWalking manual APIs](Application-toolkit-trace.md)
* If you want your codes to interact with SkyWalking agent, including `getting trace id`, `setting tags`, `propagating custom data` etc.. Try [SkyWalking manual APIs](Application-toolkit-trace.md).
* If you want to continue traces across thread manually, use [across thread solution APIs](Application-toolkit-trace-cross-thread.md).
* If you want to specify the path of your agent.config file. Read [set config file through system properties](Specified-agent-config.md)
......
......@@ -194,6 +194,7 @@ segmentItems:
tags:
- {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/callable'}
- {key: http.method, value: GET}
- {key: correlation, value: correlationValueTest}
refs:
- {parentEndpointId: -1, parentEndpoint: '',
networkAddressId: 0, entryEndpointId: 0, refType: CrossProcess, parentSpanId: 1,
......@@ -289,6 +290,7 @@ segmentItems:
tags:
- {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/runnable'}
- {key: http.method, value: GET}
- {key: correlation, value: correlationValueTest}
refs:
- {parentEndpointId: -1, parentEndpoint: '',
networkAddressId: 0, entryEndpointId: 0, refType: CrossProcess, parentSpanId: 1,
......@@ -348,6 +350,7 @@ segmentItems:
tags:
- {key: url, value: 'http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/supplier'}
- {key: http.method, value: GET}
- {key: correlation, value: correlationValueTest}
refs:
- {parentEndpointId: -1, parentEndpoint: '',
networkAddressId: 0, entryEndpointId: 0, refType: CrossProcess, parentSpanId: 1,
......
......@@ -18,6 +18,8 @@
package org.apache.skywalking.apm.toolkit.trace;
import java.util.Optional;
/**
* Try to access the sky-walking tracer context. The context is not existed, always. only the middleware, component, or
* rpc-framework are supported in the current invoke stack, in the same thread, the context will be available.
......@@ -33,4 +35,23 @@ public class TraceContext {
public static String traceId() {
return "";
}
/**
* Try to get the custom value from trace context.
*
* @return custom data value.
*/
public static Optional<String> getCorrelation(String key) {
return Optional.empty();
}
/**
* Put the custom key/value into trace context.
*
* @return previous value if it exists.
*/
public static Optional<String> putCorrelation(String key, String value) {
return Optional.empty();
}
}
......@@ -25,6 +25,8 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.apm.toolkit.trace.ActiveSpan;
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
......@@ -35,6 +37,10 @@ public class TestController {
private static final String SUCCESS = "Success";
private static final String CORRELATION_CONTEXT_KEY = "toolkit-test";
private static final String CORRELATION_CONTEXT_VALUE = "correlationValueTest";
private static final String CORRELATION_CONTEXT_TAG_KEY = "correlation";
@Autowired
private TestService testService;
......@@ -48,6 +54,7 @@ public class TestController {
testService.testErrorThrowable();
testService.testTagAnnotation("testTagAnnotationParam1", "testTagAnnotationParam2");
testService.testTagAnnotationReturnInfo("zhangsan", 15);
TraceContext.putCorrelation(CORRELATION_CONTEXT_KEY, CORRELATION_CONTEXT_VALUE);
testService.asyncCallable(() -> {
visit("http://localhost:8080/apm-toolkit-trace-scenario/case/asyncVisit/callable");
return true;
......@@ -77,16 +84,19 @@ public class TestController {
@RequestMapping("/asyncVisit/runnable")
public String asyncVisitRunnable() {
ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
return SUCCESS;
}
@RequestMapping("/asyncVisit/callable")
public String asyncVisitCallable() {
ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
return SUCCESS;
}
@RequestMapping("/asyncVisit/supplier")
public String asyncVisitSupplier() {
ActiveSpan.tag(CORRELATION_CONTEXT_TAG_KEY, TraceContext.getCorrelation(CORRELATION_CONTEXT_KEY).orElse(""));
return SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册