From aed11d4a593d470f04e2a17f4ec62b62a9700ce1 Mon Sep 17 00:00:00 2001 From: wusheng Date: Fri, 16 Jun 2017 22:04:25 +0800 Subject: [PATCH] Finish codes about sampling, ignore. And related test cases. --- .../apm/agent/core/context/TracerContext.java | 20 ++++- .../core/context/trace/TraceSegment.java | 16 +++- .../core/queue/TraceSegmentProcessQueue.java | 2 +- .../IllegalSamplingRateException.java | 12 --- .../agent/core/sampling/SamplingService.java | 16 +++- .../core/context/TracerContextTestCase.java | 1 - .../SamplingTracerContextTestCase.java | 78 +++++++++++++++++++ 7 files changed, 125 insertions(+), 20 deletions(-) delete mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/IllegalSamplingRateException.java create mode 100644 apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/sampling/SamplingTracerContextTestCase.java diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/TracerContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/TracerContext.java index b49319455..be3aebd4e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/TracerContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/TracerContext.java @@ -18,6 +18,8 @@ import org.skywalking.apm.agent.core.sampling.SamplingService; * Created by wusheng on 2017/2/17. */ public final class TracerContext implements AbstractTracerContext { + private SamplingService samplingService; + private TraceSegment segment; /** @@ -37,6 +39,9 @@ public final class TracerContext implements AbstractTracerContext { TracerContext() { this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE); this.spanIdGenerator = 0; + if (samplingService == null) { + samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); + } } /** @@ -83,7 +88,6 @@ public final class TracerContext implements AbstractTracerContext { * because the {@link #extract(ContextCarrier)} invoke before create the second span. */ if (spanIdGenerator == 1) { - SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); if (segment.hasRef()) { samplingService.forceSampled(); } else { @@ -172,7 +176,19 @@ public final class TracerContext implements AbstractTracerContext { * Finish this context, and notify all {@link TracerContextListener}s, managed by {@link ListenerManager} */ private void finish() { - ListenerManager.notifyFinish(segment.finish()); + TraceSegment finishedSegment = segment.finish(); + /** + * Recheck the segment if the segment contains only one span. + * Because in the runtime, can't sure this segment is part of distributed trace. + * + * @see {@link #createSpan(String, long, boolean)} + */ + if (!segment.hasRef() && segment.isSingleSpanSegment()) { + if (!samplingService.trySampling()) { + finishedSegment.setIgnore(true); + } + } + ListenerManager.notifyFinish(finishedSegment); } /** diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/TraceSegment.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/TraceSegment.java index eeb4c3a11..d54f76c91 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/TraceSegment.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/TraceSegment.java @@ -5,9 +5,9 @@ import com.google.gson.annotations.SerializedName; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator; import org.skywalking.apm.agent.core.context.ids.DistributedTraceId; import org.skywalking.apm.agent.core.context.ids.DistributedTraceIds; +import org.skywalking.apm.agent.core.context.ids.GlobalIdGenerator; import org.skywalking.apm.agent.core.context.ids.NewDistributedTraceId; /** @@ -90,6 +90,8 @@ public class TraceSegment { @SerializedName(value = "gt") private DistributedTraceIds relatedGlobalTraces; + private boolean ignore = false; + /** * Create a trace segment, by the given applicationCode. */ @@ -186,6 +188,10 @@ public class TraceSegment { return relatedGlobalTraces.getRelatedGlobalTraces(); } + public boolean isSingleSpanSegment() { + return this.spans != null && this.spans.size() == 1; + } + public List getSpans() { return Collections.unmodifiableList(spans); } @@ -194,6 +200,14 @@ public class TraceSegment { return applicationCode; } + public boolean isIgnore() { + return ignore; + } + + public void setIgnore(boolean ignore) { + this.ignore = ignore; + } + @Override public String toString() { return "TraceSegment{" + diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/queue/TraceSegmentProcessQueue.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/queue/TraceSegmentProcessQueue.java index 3fecf0162..f83f88ac7 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/queue/TraceSegmentProcessQueue.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/queue/TraceSegmentProcessQueue.java @@ -51,7 +51,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace */ @Override public void afterFinished(TraceSegment traceSegment) { - if (isStarted()) { + if (isStarted() && !traceSegment.isIgnore()) { long sequence = this.buffer.next(); // Grab the next sequence try { TraceSegmentHolder data = this.buffer.get(sequence); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/IllegalSamplingRateException.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/IllegalSamplingRateException.java deleted file mode 100644 index 8e50dd7b1..000000000 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/IllegalSamplingRateException.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.skywalking.apm.agent.core.sampling; - -/** - * Use IllegalSamplingRateException, only if the rate can not be supported. - * - * @author wusheng - */ -public class IllegalSamplingRateException extends Exception { - IllegalSamplingRateException(String message) { - super(message); - } -} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/SamplingService.java b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/SamplingService.java index 6dd43c52a..6d0b745c1 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/SamplingService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/sampling/SamplingService.java @@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.sampling; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.skywalking.apm.agent.core.boot.BootService; @@ -24,20 +25,28 @@ public class SamplingService implements BootService { private volatile boolean on = false; private volatile AtomicInteger samplingFactorHolder; + private volatile ScheduledFuture scheduledFuture; @Override public void bootUp() throws Throwable { + if (scheduledFuture != null) { + /** + * If {@link #bootUp()} invokes twice, mostly in test cases, + * cancel the old one. + */ + scheduledFuture.cancel(true); + } if (Config.Agent.SAMPLE_N_PER_10_SECS > 0) { on = true; this.resetSamplingFactor(); ScheduledExecutorService service = Executors .newSingleThreadScheduledExecutor(); - service.scheduleAtFixedRate(new Runnable() { + scheduledFuture = service.scheduleAtFixedRate(new Runnable() { @Override public void run() { resetSamplingFactor(); } - }, 1, 1, TimeUnit.SECONDS); + }, 0, 10, TimeUnit.SECONDS); logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_10_SECS); } } @@ -49,7 +58,8 @@ public class SamplingService implements BootService { if (on) { int factor = samplingFactorHolder.get(); if (factor < Config.Agent.SAMPLE_N_PER_10_SECS) { - return samplingFactorHolder.compareAndSet(factor, factor + 1); + boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1); + return success; } else { return false; } diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/context/TracerContextTestCase.java b/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/context/TracerContextTestCase.java index 2857a3a46..959bf5fc0 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/context/TracerContextTestCase.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/context/TracerContextTestCase.java @@ -8,7 +8,6 @@ import org.junit.Test; import org.skywalking.apm.agent.core.context.ids.DistributedTraceId; import org.skywalking.apm.agent.core.context.ids.PropagatedTraceId; import org.skywalking.apm.agent.core.context.trace.AbstractSpan; -import org.skywalking.apm.agent.core.context.trace.Span; import org.skywalking.apm.agent.core.context.trace.TraceSegment; /** diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/sampling/SamplingTracerContextTestCase.java b/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/sampling/SamplingTracerContextTestCase.java new file mode 100644 index 000000000..ba788f481 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/sampling/SamplingTracerContextTestCase.java @@ -0,0 +1,78 @@ +package org.skywalking.apm.agent.core.sampling; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.skywalking.apm.agent.core.boot.ServiceManager; +import org.skywalking.apm.agent.core.conf.Config; +import org.skywalking.apm.agent.core.context.ContextManager; +import org.skywalking.apm.agent.core.context.TracerContext; +import org.skywalking.apm.agent.core.context.TracerContextListener; +import org.skywalking.apm.agent.core.context.tag.Tags; +import org.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.skywalking.apm.agent.core.context.trace.TraceSegment; + +/** + * @author wusheng + */ +public class SamplingTracerContextTestCase { + private int finishedTracerCounter = 0; + + private TracerContextListener listener = new TracerContextListener() { + @Override + public void afterFinished(TraceSegment traceSegment) { + if (!traceSegment.isIgnore()) { + finishedTracerCounter++; + } + } + }; + + @Before + public void setUp() throws Exception { + Config.Agent.SAMPLE_N_PER_10_SECS = 5; + ServiceManager.INSTANCE.boot(); + TracerContext.ListenerManager.add(listener); + } + + @Test + public void testSample5InALoop() throws InterruptedException { + for (int i = 0; i < 11; i++) { + AbstractSpan span = ContextManager.createSpan("serviceA"); + Tags.COMPONENT.set(span, "test"); + ContextManager.stopSpan(); + } + + /** + * Considering the reset cycle, in ci-env, may sample 5-7 trace through 1 or 2 cycle. + */ + Assert.assertTrue(finishedTracerCounter >= 5); + Assert.assertTrue(finishedTracerCounter <= 7); + Thread.sleep(10 * 1000L); + } + + @Test + public void testSample5InLoopWithMultiSpans() { + finishedTracerCounter = 0; + for (int i = 0; i < 11; i++) { + AbstractSpan span = ContextManager.createSpan("serviceA"); + Tags.COMPONENT.set(span, "test"); + AbstractSpan span2 = ContextManager.createSpan("serviceB"); + Tags.COMPONENT.set(span2, "test2"); + ContextManager.stopSpan(); + ContextManager.stopSpan(); + } + + /** + * Considering the reset cycle, in ci-env, may sample 5-7 trace through 1 or 2 cycle. + */ + Assert.assertTrue(finishedTracerCounter >= 5); + Assert.assertTrue(finishedTracerCounter <= 7); + } + + @After + public void tearDown() throws Exception { + Config.Agent.SAMPLE_N_PER_10_SECS = -1; + TracerContext.ListenerManager.remove(listener); + } +} -- GitLab