提交 aed11d4a 编写于 作者: wu-sheng's avatar wu-sheng

Finish codes about sampling, ignore. And related test cases.

上级 112547b5
......@@ -18,6 +18,8 @@ import org.skywalking.apm.agent.core.sampling.SamplingService;
* Created by wusheng on 2017/2/17.
*/
public final class TracerContext implements AbstractTracerContext {
private SamplingService samplingService;
private TraceSegment segment;
/**
......@@ -37,6 +39,9 @@ public final class TracerContext implements AbstractTracerContext {
TracerContext() {
this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE);
this.spanIdGenerator = 0;
if (samplingService == null) {
samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
}
}
/**
......@@ -83,7 +88,6 @@ public final class TracerContext implements AbstractTracerContext {
* because the {@link #extract(ContextCarrier)} invoke before create the second span.
*/
if (spanIdGenerator == 1) {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (segment.hasRef()) {
samplingService.forceSampled();
} else {
......@@ -172,7 +176,19 @@ public final class TracerContext implements AbstractTracerContext {
* Finish this context, and notify all {@link TracerContextListener}s, managed by {@link ListenerManager}
*/
private void finish() {
ListenerManager.notifyFinish(segment.finish());
TraceSegment finishedSegment = segment.finish();
/**
* Recheck the segment if the segment contains only one span.
* Because in the runtime, can't sure this segment is part of distributed trace.
*
* @see {@link #createSpan(String, long, boolean)}
*/
if (!segment.hasRef() && segment.isSingleSpanSegment()) {
if (!samplingService.trySampling()) {
finishedSegment.setIgnore(true);
}
}
ListenerManager.notifyFinish(finishedSegment);
}
/**
......
......@@ -5,9 +5,9 @@ import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceIds;
import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator;
import org.skywalking.apm.agent.core.context.ids.NewDistributedTraceId;
/**
......@@ -90,6 +90,8 @@ public class TraceSegment {
@SerializedName(value = "gt")
private DistributedTraceIds relatedGlobalTraces;
private boolean ignore = false;
/**
* Create a trace segment, by the given applicationCode.
*/
......@@ -186,6 +188,10 @@ public class TraceSegment {
return relatedGlobalTraces.getRelatedGlobalTraces();
}
public boolean isSingleSpanSegment() {
return this.spans != null && this.spans.size() == 1;
}
public List<Span> getSpans() {
return Collections.unmodifiableList(spans);
}
......@@ -194,6 +200,14 @@ public class TraceSegment {
return applicationCode;
}
public boolean isIgnore() {
return ignore;
}
public void setIgnore(boolean ignore) {
this.ignore = ignore;
}
@Override
public String toString() {
return "TraceSegment{" +
......
......@@ -51,7 +51,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
*/
@Override
public void afterFinished(TraceSegment traceSegment) {
if (isStarted()) {
if (isStarted() && !traceSegment.isIgnore()) {
long sequence = this.buffer.next(); // Grab the next sequence
try {
TraceSegmentHolder data = this.buffer.get(sequence);
......
package org.skywalking.apm.agent.core.sampling;
/**
* Use <code>IllegalSamplingRateException</code>, only if the rate can not be supported.
*
* @author wusheng
*/
public class IllegalSamplingRateException extends Exception {
IllegalSamplingRateException(String message) {
super(message);
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.sampling;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.skywalking.apm.agent.core.boot.BootService;
......@@ -24,20 +25,28 @@ public class SamplingService implements BootService {
private volatile boolean on = false;
private volatile AtomicInteger samplingFactorHolder;
private volatile ScheduledFuture<?> scheduledFuture;
@Override
public void bootUp() throws Throwable {
if (scheduledFuture != null) {
/**
* If {@link #bootUp()} invokes twice, mostly in test cases,
* cancel the old one.
*/
scheduledFuture.cancel(true);
}
if (Config.Agent.SAMPLE_N_PER_10_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new Runnable() {
scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, 1, 1, TimeUnit.SECONDS);
}, 0, 10, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_10_SECS);
}
}
......@@ -49,7 +58,8 @@ public class SamplingService implements BootService {
if (on) {
int factor = samplingFactorHolder.get();
if (factor < Config.Agent.SAMPLE_N_PER_10_SECS) {
return samplingFactorHolder.compareAndSet(factor, factor + 1);
boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
return success;
} else {
return false;
}
......
......@@ -8,7 +8,6 @@ import org.junit.Test;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.context.ids.PropagatedTraceId;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.Span;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
/**
......
package org.skywalking.apm.agent.core.sampling;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.TracerContext;
import org.skywalking.apm.agent.core.context.TracerContextListener;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
/**
* @author wusheng
*/
public class SamplingTracerContextTestCase {
private int finishedTracerCounter = 0;
private TracerContextListener listener = new TracerContextListener() {
@Override
public void afterFinished(TraceSegment traceSegment) {
if (!traceSegment.isIgnore()) {
finishedTracerCounter++;
}
}
};
@Before
public void setUp() throws Exception {
Config.Agent.SAMPLE_N_PER_10_SECS = 5;
ServiceManager.INSTANCE.boot();
TracerContext.ListenerManager.add(listener);
}
@Test
public void testSample5InALoop() throws InterruptedException {
for (int i = 0; i < 11; i++) {
AbstractSpan span = ContextManager.createSpan("serviceA");
Tags.COMPONENT.set(span, "test");
ContextManager.stopSpan();
}
/**
* Considering the reset cycle, in ci-env, may sample 5-7 trace through 1 or 2 cycle.
*/
Assert.assertTrue(finishedTracerCounter >= 5);
Assert.assertTrue(finishedTracerCounter <= 7);
Thread.sleep(10 * 1000L);
}
@Test
public void testSample5InLoopWithMultiSpans() {
finishedTracerCounter = 0;
for (int i = 0; i < 11; i++) {
AbstractSpan span = ContextManager.createSpan("serviceA");
Tags.COMPONENT.set(span, "test");
AbstractSpan span2 = ContextManager.createSpan("serviceB");
Tags.COMPONENT.set(span2, "test2");
ContextManager.stopSpan();
ContextManager.stopSpan();
}
/**
* Considering the reset cycle, in ci-env, may sample 5-7 trace through 1 or 2 cycle.
*/
Assert.assertTrue(finishedTracerCounter >= 5);
Assert.assertTrue(finishedTracerCounter <= 7);
}
@After
public void tearDown() throws Exception {
Config.Agent.SAMPLE_N_PER_10_SECS = -1;
TracerContext.ListenerManager.remove(listener);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册