提交 2940f762 编写于 作者: wu-sheng's avatar wu-sheng

Fix tag bugs. Adjust TraceSegment and ContextCarrier. Add sampling mechanism. #110

上级 f2434067
......@@ -17,9 +17,9 @@ public class LogData {
@Expose
@SerializedName(value="fi")
private Map<String, ?> fields;
private Map<String, String> fields;
LogData(long time, Map<String, ?> fields) {
LogData(long time, Map<String, String> fields) {
this.time = time;
if(fields == null){
throw new NullPointerException();
......
......@@ -57,8 +57,16 @@ public class Span{
* {@see https://github.com/opentracing/specification/blob/master/specification.md#set-a-span-tag}
*/
@Expose
@SerializedName(value="ta")
private final Map<String, Object> tags;
@SerializedName(value="ts")
private final Map<String, String> tagsWithStr;
@Expose
@SerializedName(value="tb")
private final Map<String, Boolean> tagsWithBool;
@Expose
@SerializedName(value="ti")
private final Map<String, Integer> tagsWithInt;
/**
* Log is a concept from OpenTracing spec.
......@@ -151,7 +159,9 @@ public class Span{
* Create a new/empty span.
*/
public Span() {
tags = new HashMap<String, Object>();
tagsWithStr = new HashMap<String, String>(5);
tagsWithBool = new HashMap<String, Boolean>(1);
tagsWithInt = new HashMap<String, Integer>(2);
logs = new LinkedList<LogData>();
}
......@@ -194,17 +204,17 @@ public class Span{
* @return this Span instance, for chaining
*/
public final Span setTag(String key, String value) {
tags.put(key, value);
tagsWithStr.put(key, value);
return this;
}
public final Span setTag(String key, boolean value) {
tags.put(key, value);
tagsWithBool.put(key, value);
return this;
}
public final Span setTag(String key, Number value) {
tags.put(key, value);
public final Span setTag(String key, Integer value) {
tagsWithInt.put(key, value);
return this;
}
......@@ -214,7 +224,11 @@ public class Span{
* @return
*/
public final Map<String, Object> getTags() {
return Collections.unmodifiableMap(tags);
Map<String, Object> tags = new HashMap<String, Object>();
tags.putAll(tagsWithStr);
tags.putAll(tagsWithBool);
tags.putAll(tagsWithInt);
return tags;
}
/**
......@@ -223,8 +237,16 @@ public class Span{
* @param key the given tag key.
* @return tag value.
*/
public Object getTag(String key) {
return tags.get(key);
public String getStrTag(String key) {
return tagsWithStr.get(key);
}
public Boolean getBoolTag(String key) {
return tagsWithBool.get(key);
}
public Integer getIntTag(String key) {
return tagsWithInt.get(key);
}
/**
......@@ -250,7 +272,7 @@ public class Span{
* @return the Span, for chaining
* @see Span#log(String)
*/
public Span log(Map<String, ?> fields) {
public Span log(Map<String, String> fields) {
logs.add(new LogData(System.currentTimeMillis(), fields));
return this;
}
......
......@@ -26,21 +26,21 @@ public class TraceSegment {
* Every segment has its unique-global-id.
*/
@Expose
@SerializedName(value="ts")
@SerializedName(value = "ts")
private String traceSegmentId;
/**
* The start time of this trace segment.
*/
@Expose
@SerializedName(value="st")
@SerializedName(value = "st")
private long startTime;
/**
* The end time of this trace segment.
*/
@Expose
@SerializedName(value="et")
@SerializedName(value = "et")
private long endTime;
/**
......@@ -50,7 +50,7 @@ public class TraceSegment {
* at this moment, we use this {@link #refs} to link them.
*/
@Expose
@SerializedName(value="rs")
@SerializedName(value = "rs")
private List<TraceSegmentRef> refs;
/**
......@@ -59,7 +59,7 @@ public class TraceSegment {
* All active spans are hold and controlled by "skywalking-api" module.
*/
@Expose
@SerializedName(value="ss")
@SerializedName(value = "ss")
private List<Span> spans;
/**
......@@ -69,7 +69,7 @@ public class TraceSegment {
* e.g. account_app, billing_app
*/
@Expose
@SerializedName(value="ac")
@SerializedName(value = "ac")
private String applicationCode;
/**
......@@ -86,9 +86,19 @@ public class TraceSegment {
* multi {@link TraceSegment}s, only using {@link #refs} is not enough for analysis and ui.
*/
@Expose
@SerializedName(value="gt")
@SerializedName(value = "gt")
private DistributedTraceIds relatedGlobalTraces;
/**
* The <code>sampled</code> is a flag, which represent, when this {@link TraceSegment} finished, it need to be send
* to Collector.
*
* Its value depends on SamplingService. True, by default.
*
* This value is not serialized.
*/
private boolean sampled;
/**
* Create a trace segment, by given segmentId.
* This segmentId is generated by TraceSegmentRef, AKA, from tracer/agent module.
......@@ -107,6 +117,7 @@ public class TraceSegment {
this.spans = new LinkedList<Span>();
this.relatedGlobalTraces = new DistributedTraceIds();
this.relatedGlobalTraces.append(new NewDistributedTraceId());
this.sampled = true;
}
/**
......@@ -118,7 +129,7 @@ public class TraceSegment {
if (refs == null) {
refs = new LinkedList<TraceSegmentRef>();
}
if(!refs.contains(refSegment)){
if (!refs.contains(refSegment)) {
refs.add(refSegment);
}
}
......@@ -183,6 +194,14 @@ public class TraceSegment {
return applicationCode;
}
public boolean isSampled() {
return sampled;
}
public void setSampled(boolean sampled) {
this.sampled = sampled;
}
@Override
public String toString() {
return "TraceSegment{" +
......
......@@ -7,7 +7,7 @@ import com.a.eye.skywalking.trace.Span;
* All span's tags inherit from {@link AbstractTag},
* which provide an easy way to
* {@link Span#setTag(String, String)} ,
* {@link Span#setTag(String, Number)} ,
* {@link Span#setTag(String, Integer)}
* {@link Span#setTag(String, boolean)} ,
*
* Created by wusheng on 2017/2/17.
......
......@@ -30,13 +30,11 @@ public class BooleanTag extends AbstractTag<Boolean> {
*/
@Override
public Boolean get(Span span) {
Object tagValue = span.getTag(super.key);
Boolean tagValue = span.getBoolTag(super.key);
if (tagValue == null) {
return defaultValue;
} else if (tagValue instanceof Boolean) {
return (Boolean) tagValue;
} else {
return Boolean.valueOf(tagValue.toString());
return tagValue;
}
}
}
......@@ -26,13 +26,11 @@ public class IntTag extends AbstractTag<Integer> {
*/
@Override
public Integer get(Span span) {
Object tagValue = span.getTag(super.key);
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else if(tagValue instanceof Integer){
return (Integer)tagValue;
}else {
return Integer.valueOf(tagValue.toString());
} else {
return tagValue;
}
}
}
......@@ -14,7 +14,7 @@ public class ShortTag extends AbstractTag<Short> {
@Override
public void set(Span span, Short tagValue) {
span.setTag(super.key, tagValue);
span.setTag(super.key, (int)tagValue.shortValue());
}
/**
......@@ -25,12 +25,10 @@ public class ShortTag extends AbstractTag<Short> {
* @return tag value
*/
@Override public Short get(Span span) {
Object tagValue = span.getTag(super.key);
Integer tagValue = span.getIntTag(super.key);
if (tagValue == null) {
return null;
} else if(tagValue instanceof Short){
return (Short)tagValue;
}else {
} else {
return Short.valueOf(tagValue.toString());
}
}
......
......@@ -19,6 +19,6 @@ public class StringTag extends AbstractTag<String> {
}
@Override public String get(Span span) {
return (String)span.getTag(super.key);
return span.getStrTag(super.key);
}
}
......@@ -11,7 +11,7 @@ import org.junit.Test;
public class LogDataTestCase {
@Test
public void testHoldValue(){
Map<String, ?> fields = new HashMap<String, String>();
Map<String, String> fields = new HashMap<String, String>();
LogData logData = new LogData(123L, fields);
Assert.assertEquals(123, logData.getTime());
......
......@@ -12,13 +12,13 @@ public class TagsTest {
public void testLayer(){
Span span = new Span(1, "/test");
Tags.SPAN_LAYER.asDB(span);
Assert.assertEquals("db", span.getTag("span.layer"));
Assert.assertEquals("db", span.getStrTag("span.layer"));
Tags.SPAN_LAYER.asRPCFramework(span);
Assert.assertEquals("rpc", span.getTag("span.layer"));
Assert.assertEquals("rpc", span.getStrTag("span.layer"));
Tags.SPAN_LAYER.asHttp(span);
Assert.assertEquals("http", span.getTag("span.layer"));
Assert.assertEquals("http", span.getStrTag("span.layer"));
}
@Test
......
......@@ -36,23 +36,20 @@
</manifestEntries>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>com.lmax:*</exclude>
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>commons-logging:*</exclude>
<exclude>commons-codec:*</exclude>
<exclude>*:gson</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>${shade.net.bytebuddy.source}</pattern>
<shadedPattern>${shade.net.bytebuddy.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.lmax.disruptor.source}</pattern>
<shadedPattern>${shade.com.lmax.disruptor.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.com.google.source}</pattern>
<shadedPattern>${shade.com.google.target}</shadedPattern>
</relocation>
<relocation>
<pattern>${shade.org.apache.source}</pattern>
<shadedPattern>${shade.org.apache.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
......@@ -61,6 +58,12 @@
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......@@ -120,17 +123,11 @@
</repository>
</distributionManagement>
<properties>
<shade.org.apache.source>org.apache</shade.org.apache.source>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<shade.com.google.source>com.google</shade.com.google.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}</shade.com.lmax.disruptor.target>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.org.apache.target>${shade.package}.${shade.org.apache.source}</shade.org.apache.target>
<premain.class>com.a.eye.skywalking.agent.SkyWalkingAgent</premain.class>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
</properties>
</project>
......@@ -8,6 +8,8 @@ public class Config {
public static boolean IS_PREMAIN_MODE = false;
public static String PATH = "";
public static int SAMPLING_RATE = 10000;
}
public static class Collector{
......
......@@ -42,6 +42,11 @@ public class ContextCarrier implements Serializable {
*/
private List<DistributedTraceId> distributedTraceIds;
/**
* {@link TraceSegment#sampled}
*/
private boolean sampled;
/**
* Serialize this {@link ContextCarrier} to a {@link String},
* with '|' split.
......@@ -54,7 +59,8 @@ public class ContextCarrier implements Serializable {
this.getSpanId() + "",
this.getApplicationCode(),
this.getPeerHost(),
this.serializeDistributedTraceIds());
this.serializeDistributedTraceIds(),
this.isSampled() ? "1" : "0");
}
/**
......@@ -64,14 +70,15 @@ public class ContextCarrier implements Serializable {
*/
public ContextCarrier deserialize(String text) {
if (text != null) {
String[] parts = text.split("\\|", 5);
if (parts.length == 5) {
String[] parts = text.split("\\|", 6);
if (parts.length == 6) {
try {
setSpanId(Integer.parseInt(parts[1]));
setTraceSegmentId(parts[0]);
setApplicationCode(parts[2]);
setPeerHost(parts[3]);
setDistributedTraceIds(deserializeDistributedTraceIds(parts[4]));
setSampled("1".equals(parts[5]));
} catch (NumberFormatException e) {
}
......@@ -129,6 +136,14 @@ public class ContextCarrier implements Serializable {
return distributedTraceIds;
}
public boolean isSampled() {
return sampled;
}
public void setSampled(boolean sampled) {
this.sampled = sampled;
}
public void setDistributedTraceIds(List<DistributedTraceId> distributedTraceIds) {
this.distributedTraceIds = distributedTraceIds;
}
......
package com.a.eye.skywalking.api.context;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.sampling.SamplingService;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
......@@ -35,6 +37,7 @@ public final class TracerContext {
*/
TracerContext() {
this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE);
ServiceManager.INSTANCE.findService(SamplingService.class).trySampling(this.segment);
this.spanIdGenerator = 0;
}
......@@ -126,6 +129,7 @@ public final class TracerContext {
carrier.setApplicationCode(Config.Agent.APPLICATION_CODE);
carrier.setPeerHost(Tags.PEER_HOST.get(activeSpan()));
carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces());
carrier.setSampled(this.segment.isSampled());
}
/**
......@@ -137,6 +141,7 @@ public final class TracerContext {
public void extract(ContextCarrier carrier) {
if(carrier.isValid()) {
this.segment.ref(getRef(carrier));
ServiceManager.INSTANCE.findService(SamplingService.class).setSampleWhenExtract(this.segment, carrier);
this.segment.relatedGlobalTraces(carrier.getDistributedTraceIds());
}
}
......
......@@ -43,9 +43,14 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
disruptor.start();
}
/**
* Append the given traceSegment to the queue, wait for sending to Collector.
*
* @param traceSegment finished {@link TraceSegment}
*/
@Override
public void afterFinished(TraceSegment traceSegment) {
if (isStarted()) {
if (isStarted() && traceSegment.isSampled()) {
long sequence = this.buffer.next(); // Grab the next sequence
try {
TraceSegmentHolder data = this.buffer.get(sequence);
......
package com.a.eye.skywalking.api.sampling;
/**
* Use <code>IllegalSamplingRateException</code>, only if the rate can not be supported.
*
* @author wusheng
*/
public class IllegalSamplingRateException extends Exception {
IllegalSamplingRateException(String message) {
super(message);
}
}
package com.a.eye.skywalking.api.sampling;
import com.a.eye.skywalking.api.boot.BootService;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.context.ContextCarrier;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* The <code>SamplingService</code> take charge of how to sample the {@link TraceSegment}. Every {@link TraceSegment}s
* have been traced, but, considering CPU cost of serialization/deserialization, and network bandwidth, the agent do NOT
* send all of them to collector, if SAMPLING is on.
*
* By default, SAMPLING is off, and {@link Config.Agent#SAMPLING_RATE} == 1000.
*
* @author wusheng
*/
public class SamplingService implements BootService {
private static ILog logger = LogManager.getLogger(SamplingService.class);
private volatile boolean on = false;
private volatile int rate = 0;
private volatile int rollingSeed = 0;
@Override
public void bootUp() throws Throwable {
if (Config.Agent.SAMPLING_RATE == 10000) {
return;
}
if (Config.Agent.SAMPLING_RATE > 10000 || Config.Agent.SAMPLING_RATE < 1) {
throw new IllegalSamplingRateException("sampling rate should stay in (0, 10000].");
}
rate = 10000 / Config.Agent.SAMPLING_RATE;
on = true;
logger.debug("The trace sampling is on, and the sampling rate is: {}", rate);
}
public void trySampling(TraceSegment segment) {
if (on) {
if (rollingSeed++ != rate) {
segment.setSampled(false);
}
}
}
/**
* Set the {@link TraceSegment} to sampled, when {@link ContextCarrier} contains "isSampled" flag.
*
* A -> B, if TraceSegment is sampled in A, then the related TraceSegment in B must be sampled, no matter you
* sampling rate. And reset the {@link #rollingSeed}, in case of too many {@link TraceSegment}s, which started in
* this JVM, are sampled.
*
* @param segment the current TraceSegment.
* @param carrier
*/
public void setSampleWhenExtract(TraceSegment segment, ContextCarrier carrier) {
if(on) {
if (!segment.isSampled() && carrier.isSampled()) {
segment.setSampled(true);
this.rollingSeed = 0;
}
}
}
}
com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue
com.a.eye.skywalking.api.context.ContextManager
com.a.eye.skywalking.api.client.CollectorClientService
com.a.eye.skywalking.api.sampling.SamplingService
......@@ -18,17 +18,18 @@ public class ContextCarrierTestCase {
carrier.setSpanId(100);
carrier.setApplicationCode("REMOTE_APP");
carrier.setPeerHost("10.2.3.16:8080");
carrier.setSampled(true);
List<DistributedTraceId> ids = new LinkedList<DistributedTraceId>();
ids.add(new PropagatedTraceId("Trace.global.id.123"));
carrier.setDistributedTraceIds(ids);
Assert.assertEquals("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123", carrier.serialize());
Assert.assertEquals("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123|1", carrier.serialize());
}
@Test
public void testDeserialize(){
ContextCarrier carrier = new ContextCarrier();
carrier.deserialize("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222");
carrier.deserialize("trace_id_A|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222|1");
Assert.assertEquals("trace_id_A", carrier.getTraceSegmentId());
Assert.assertEquals(100, carrier.getSpanId());
......@@ -36,6 +37,7 @@ public class ContextCarrierTestCase {
Assert.assertEquals("10.2.3.16:8080", carrier.getPeerHost());
Assert.assertEquals("Trace.global.id.123", carrier.getDistributedTraceIds().get(0).get());
Assert.assertEquals("Trace.global.id.222", carrier.getDistributedTraceIds().get(1).get());
Assert.assertEquals(true, carrier.isSampled());
}
@Test
......@@ -62,6 +64,10 @@ public class ContextCarrierTestCase {
carrier = new ContextCarrier();
carrier.deserialize("trace_id|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222");
Assert.assertFalse(carrier.isValid());
carrier = new ContextCarrier();
carrier.deserialize("trace_id|100|REMOTE_APP|10.2.3.16:8080|Trace.global.id.123,Trace.global.id.222|0");
Assert.assertTrue(carrier.isValid());
}
}
......@@ -155,7 +155,7 @@ public class DubboInterceptorTest {
@Test
public void testProviderWithAttachment() {
when(rpcContext.isConsumerSide()).thenReturn(false);
when(rpcContext.getAttachment(DubboInterceptor.ATTACHMENT_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
when(rpcContext.getAttachment(DubboInterceptor.ATTACHMENT_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
dubboInterceptor.beforeMethod(classInstanceContext, methodInvokeContext, methodInterceptResult);
dubboInterceptor.afterMethod(classInstanceContext, methodInvokeContext, result);
......@@ -168,7 +168,7 @@ public class DubboInterceptorTest {
when(rpcContext.isConsumerSide()).thenReturn(false);
when(BugFixActive.isActive()).thenReturn(true);
testParam.setTraceContext("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
testParam.setTraceContext("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
dubboInterceptor.beforeMethod(classInstanceContext, methodInvokeContext, methodInterceptResult);
......
......@@ -94,7 +94,7 @@ public class MotanProviderInterceptorTest {
@Test
public void testInvokerWithRefSegment() {
HashMap attachments = new HashMap();
attachments.put("SWTraceContext", "302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
attachments.put("SWTraceContext", "302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
when(request.getAttachments()).thenReturn(attachments);
invokeInterceptor.beforeMethod(instanceContext, interceptorContext, null);
......
......@@ -81,7 +81,7 @@ public class TomcatInterceptorTest {
@Test
public void testWithSerializedContextData() {
when(request.getHeader(TomcatInterceptor.HEADER_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123");
when(request.getHeader(TomcatInterceptor.HEADER_NAME_OF_CONTEXT_DATA)).thenReturn("302017.1487666919810.624424584.17332.1.1|1|REMOTE_APP|127.0.0.1|Trace.globalId.123|1");
tomcatInterceptor.beforeMethod(classInstanceContext, methodInvokeContext, methodInterceptResult);
tomcatInterceptor.afterMethod(classInstanceContext, methodInvokeContext, null);
......
......@@ -24,8 +24,10 @@ public class SpanSetTagInterceptor implements InstanceMethodsAroundInterceptor {
ContextManager.activeSpan().setTag(key, (String)value);
else if (value instanceof Boolean)
ContextManager.activeSpan().setTag(key, (Boolean)value);
else if (value instanceof Number)
ContextManager.activeSpan().setTag(key, (Number)value);
else if (value instanceof Integer)
ContextManager.activeSpan().setTag(key, (Integer)value);
else
ContextManager.activeSpan().setTag(key, value.toString());
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册