From 90f3c4de557b427426dc33a86f3223be6a831794 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Sat, 18 Jan 2020 18:38:03 +0800 Subject: [PATCH] Sniffer processing profile task and report status and snapshot (#4220) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * sniffer processing profile task and report status and snapshot * resolve testServiceDependencies test case error, use same register with `TraceSegmentServiceClient` * resolve names * change profile to single one thread run. * 1. change to the ArrayList, because known the max size 2. rename issue resolved * add profiling status enum * change sniffer use full name issue * 1. remove `prepareProfiling` method, build profiling status when construct `TracingContext` 2. add `TracingThreadListenerManager`, notify when tracing main thread finish 3. change ProfileThread start when process new profile task * remove unnecessary getter * add test assert error message * adding `AgentServiceRule` * revert original assert * remove unnecessary getter * resolve issues * reduce findService invoke * resolve style error * recheck profiling when change first span operatin name * resolve issues 1. remove `ContextManager#profilingRecheck`, only check on `TracingContext` 2. rename comments 3. resolve volatile array setting * remove article link * add `ProfileTask#maxSamplingCount` check * resolve conflict (Downstream -> Commands) * 1. change profilingSegmentSlots init on construct 2. if is profiling, recheck dont need to stop 3. total profiling count increment on first dump * remove unused return val * remove some `@param` and `@return` * add profile task check result data bean * change profiler slot to `AtomicReferenceArray` * resolved java doc error * fix doc error, remove meaningless descriptions * resolve missed profile receiver on oap starter * resolve method invoke error Co-authored-by: 吴晟 Wu Sheng Co-authored-by: kezhenxu94 --- .../component/command/ProfileTaskCommand.java | 18 +- .../executor/ProfileTaskCommandExecutor.java | 3 +- .../apm/agent/core/conf/Config.java | 20 ++ .../core/context/AbstractTracerContext.java | 1 + .../context/ContextManagerExtendService.java | 2 +- .../agent/core/context/TracingContext.java | 100 ++++++- .../core/context/TracingThreadListener.java | 27 ++ .../context/trace/AbstractTracingSpan.java | 19 +- .../agent/core/context/trace/EntrySpan.java | 9 +- .../agent/core/context/trace/ExitSpan.java | 17 +- .../agent/core/context/trace/LocalSpan.java | 10 +- .../context/trace/StackBasedTracingSpan.java | 25 +- .../apm/agent/core/profile/ProfileTask.java | 28 +- .../profile/ProfileTaskChannelService.java | 243 ++++++++++++++++++ .../profile/ProfileTaskExecutionContext.java | 119 ++++++++- .../profile/ProfileTaskExecutionService.java | 124 +++++++-- .../core/profile/ProfileTaskQueryService.java | 129 ---------- .../apm/agent/core/profile/ProfileThread.java | 115 +++++++++ .../agent/core/profile/ProfilingStatus.java | 31 +++ .../agent/core/profile/ThreadProfiler.java | 153 +++++++++++ .../core/profile/TracingThreadSnapshot.java | 73 ++++++ ...skywalking.apm.agent.core.boot.BootService | 2 +- .../agent/core/boot/ServiceManagerTest.java | 18 +- .../core/context/TracingContextTest.java | 2 +- .../core/test/tools/AgentServiceRule.java | 3 + apm-sniffer/config/agent.config | 12 + .../setup/service-agent/java-agent/README.md | 4 + .../src/main/resources/application.yml | 3 + .../server/core/cache/ProfileTaskCache.java | 45 +++- .../server/core/command/CommandService.java | 2 +- .../ProfileTaskSegmentSnapshotRecord.java | 101 ++++++++ .../core/source/DefaultScopeDefine.java | 1 + .../storage/profile/IProfileTaskQueryDAO.java | 7 + .../handler/ProfileTaskServiceHandler.java | 77 +++++- .../query/ProfileTaskQueryEsDAO.java | 19 ++ .../jdbc/h2/dao/H2ProfileTaskQueryDAO.java | 24 ++ .../e2e/ProfileVerificationITCase.java | 40 ++- .../e2e/ProfileVerificationITCase.java | 40 ++- .../e2e/ProfileVerificationITCase.java | 40 ++- .../skywalking/e2e/profile/CreateUser.java | 51 ++++ .../e2e/profile/TestController.java | 14 +- ...rificationITCase.profileTasks.finished.yml | 35 +++ ...ificationITCase.profileTasks.notified.yml} | 0 43 files changed, 1529 insertions(+), 277 deletions(-) create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java delete mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java create mode 100644 apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java create mode 100644 test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java create mode 100644 test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml rename test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/{org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml => org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml} (100%) diff --git a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java index fecb876906..f595fda65d 100644 --- a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java +++ b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java @@ -27,10 +27,11 @@ import java.util.List; * @author MrPro */ public class ProfileTaskCommand extends BaseCommand implements Serializable, Deserializable { - public static final Deserializable DESERIALIZER = new ProfileTaskCommand("", "", 0, 0, 0, 0, 0, 0); + public static final Deserializable DESERIALIZER = new ProfileTaskCommand("", "", "", 0, 0, 0, 0, 0, 0); public static final String NAME = "ProfileTaskQuery"; // profile task data + private String taskId; private String endpointName; private int duration; private int minDurationThreshold; @@ -39,8 +40,9 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des private long startTime; private long createTime; - public ProfileTaskCommand(String serialNumber, String endpointName, int duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long startTime, long createTime) { + public ProfileTaskCommand(String serialNumber, String taskId, String endpointName, int duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long startTime, long createTime) { super(NAME, serialNumber); + this.taskId = taskId; this.endpointName = endpointName; this.duration = duration; this.minDurationThreshold = minDurationThreshold; @@ -54,6 +56,7 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des public ProfileTaskCommand deserialize(Command command) { final List argsList = command.getArgsList(); String serialNumber = null; + String taskId = null; String endpointName = null; int duration = 0; int minDurationThreshold = 0; @@ -67,6 +70,8 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des serialNumber = pair.getValue(); } else if ("EndpointName".equals(pair.getKey())) { endpointName = pair.getValue(); + } else if ("TaskId".equals(pair.getKey())) { + taskId = pair.getValue(); } else if ("Duration".equals(pair.getKey())) { duration = Integer.parseInt(pair.getValue()); } else if ("MinDurationThreshold".equals(pair.getKey())) { @@ -82,13 +87,14 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des } } - return new ProfileTaskCommand(serialNumber, endpointName, duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime); + return new ProfileTaskCommand(serialNumber, taskId, endpointName, duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime); } @Override public Command.Builder serialize() { final Command.Builder builder = commandBuilder(); - builder.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName)) + builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId)) + .addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName)) .addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration))) .addArgs(KeyStringValuePair.newBuilder().setKey("MinDurationThreshold").setValue(String.valueOf(minDurationThreshold))) .addArgs(KeyStringValuePair.newBuilder().setKey("DumpPeriod").setValue(String.valueOf(dumpPeriod))) @@ -125,4 +131,8 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des public long getCreateTime() { return createTime; } + + public String getTaskId() { + return taskId; + } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java index c6c22d267c..eb6212ca3e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java @@ -39,7 +39,8 @@ public class ProfileTaskCommandExecutor implements CommandExecutor { // build profile task final ProfileTask profileTask = new ProfileTask(); - profileTask.setEndpointName(profileTaskCommand.getEndpointName()); + profileTask.setTaskId(profileTaskCommand.getTaskId()); + profileTask.setFistSpanOPName(profileTaskCommand.getEndpointName()); profileTask.setDuration(profileTaskCommand.getDuration()); profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold()); profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod()); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 084e819ecd..d356e2bf10 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -138,6 +138,26 @@ public class Config { * If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile. */ public static boolean ACTIVE = true; + + /** + * Parallel monitor segment count + */ + public static int MAX_PARALLEL = 5; + + /** + * Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it. + */ + public static int MAX_DURATION = 10; + + /** + * Max dump thread stack depth + */ + public static int DUMP_MAX_STACK_DEPTH = 500; + + /** + * Snapshot transport to backend buffer size + */ + public static int SNAPSHOT_TRANSPORT_BUFFER_SIZE = 500; } public static class Jvm { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java index 4b3cd2d68e..6c76873c10 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java @@ -115,4 +115,5 @@ public interface AbstractTracerContext { * @param span to be stopped. */ void asyncStop(AsyncSpan span); + } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java index d25e915f0b..1331f62a2b 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java @@ -51,7 +51,7 @@ public class ContextManagerExtendService implements BootService { } else { SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); if (forceSampling || samplingService.trySampling()) { - context = new TracingContext(); + context = new TracingContext(operationName); } else { context = new IgnoredTracerContext(); } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java index ccb9aae523..9a9ccd5eea 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java @@ -40,6 +40,7 @@ import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService; import org.apache.skywalking.apm.agent.core.sampling.SamplingService; import org.apache.skywalking.apm.util.StringUtil; @@ -61,10 +62,15 @@ public class TracingContext implements AbstractTracerContext { private static final ILog logger = LogManager.getLogger(TracingContext.class); private long lastWarningTimestamp = 0; + /** + * @see {@link ProfileTaskExecutionService} + */ + private static ProfileTaskExecutionService PROFILE_TASK_EXECUTION_SERVICE; + /** * @see {@link SamplingService} */ - private SamplingService samplingService; + private static SamplingService SAMPLING_SERVICE; /** * The final {@link TraceSegment}, which includes all finished spans. @@ -92,15 +98,32 @@ public class TracingContext implements AbstractTracerContext { private volatile boolean running; + private final long createTime; + + /** + * profiling status + */ + private volatile boolean profiling; + /** * Initialize all fields with default value. */ - TracingContext() { + TracingContext(String firstOPName) { this.segment = new TraceSegment(); this.spanIdGenerator = 0; - samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); isRunningInAsyncMode = false; + createTime = System.currentTimeMillis(); running = true; + + if (SAMPLING_SERVICE == null) { + SAMPLING_SERVICE = ServiceManager.INSTANCE.findService(SamplingService.class); + } + + // profiling status + if (PROFILE_TASK_EXECUTION_SERVICE == null) { + PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class); + } + this.profiling = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(this, segment.getTraceSegmentId(), firstOPName); } /** @@ -308,6 +331,7 @@ public class TracingContext implements AbstractTracerContext { return push(span); } AbstractSpan entrySpan; + TracingContext owner = this; final AbstractSpan parentSpan = peek(); final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId(); if (parentSpan != null && parentSpan.isEntry()) { @@ -328,11 +352,11 @@ public class TracingContext implements AbstractTracerContext { .findOnly(segment.getServiceId(), operationName) .doInCondition(new PossibleFound.FoundAndObtain() { @Override public Object doProcess(int operationId) { - return new EntrySpan(spanIdGenerator++, parentSpanId, operationId); + return new EntrySpan(spanIdGenerator++, parentSpanId, operationId, owner); } }, new PossibleFound.NotFoundAndObtain() { @Override public Object doProcess() { - return new EntrySpan(spanIdGenerator++, parentSpanId, operationName); + return new EntrySpan(spanIdGenerator++, parentSpanId, operationName, owner); } }); entrySpan.start(); @@ -358,7 +382,7 @@ public class TracingContext implements AbstractTracerContext { * From v6.0.0-beta, local span doesn't do op name register. * All op name register is related to entry and exit spans only. */ - AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName); + AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName, this); span.start(); return push(span); } @@ -380,6 +404,7 @@ public class TracingContext implements AbstractTracerContext { AbstractSpan exitSpan; AbstractSpan parentSpan = peek(); + TracingContext owner = this; if (parentSpan != null && parentSpan.isExit()) { exitSpan = parentSpan; } else { @@ -389,13 +414,13 @@ public class TracingContext implements AbstractTracerContext { new PossibleFound.FoundAndObtain() { @Override public Object doProcess(final int peerId) { - return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, peerId); + return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, peerId, owner); } }, new PossibleFound.NotFoundAndObtain() { @Override public Object doProcess() { - return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer); + return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer, owner); } }); push(exitSpan); @@ -462,16 +487,39 @@ public class TracingContext implements AbstractTracerContext { finish(); } + /** + * Re-check current trace need profiling, encase third part plugin change the operation name. + * + * @param span current modify span + * @param operationName change to operation name + */ + public void profilingRecheck(AbstractSpan span, String operationName) { + // only recheck first span + if (span.getSpanId() != 0) { + return; + } + + profiling = PROFILE_TASK_EXECUTION_SERVICE.profilingRecheck(this, segment.getTraceSegmentId(), operationName); + } + /** * Finish this context, and notify all {@link TracingContextListener}s, managed by {@link - * TracingContext.ListenerManager} + * TracingContext.ListenerManager} and {@link TracingContext.TracingThreadListenerManager} */ private void finish() { if (isRunningInAsyncMode) { asyncFinishLock.lock(); } try { - if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) { + boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running; + if (isFinishedInMainThread) { + /** + * Notify after tracing finished in the main thread. + */ + TracingThreadListenerManager.notifyFinish(this); + } + + if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) { TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking()); /* * Recheck the segment if the segment contains only one span. @@ -480,7 +528,7 @@ public class TracingContext implements AbstractTracerContext { * @see {@link #createSpan(String, long, boolean)} */ if (!segment.hasRef() && segment.isSingleSpanSegment()) { - if (!samplingService.trySampling()) { + if (!SAMPLING_SERVICE.trySampling()) { finishedSegment.setIgnore(true); } } @@ -543,6 +591,27 @@ public class TracingContext implements AbstractTracerContext { } + /** + * The ListenerManager represents an event notify for every registered listener, which are notified + */ + public static class TracingThreadListenerManager { + private static List LISTENERS = new LinkedList<>(); + + public static synchronized void add(TracingThreadListener listener) { + LISTENERS.add(listener); + } + + static void notifyFinish(TracingContext finishedContext) { + for (TracingThreadListener listener : LISTENERS) { + listener.afterMainThreadFinish(finishedContext); + } + } + + public static synchronized void remove(TracingThreadListener listener) { + LISTENERS.remove(listener); + } + } + /** * @return the top element of 'ActiveSpanStack', and remove it. */ @@ -587,4 +656,13 @@ public class TracingContext implements AbstractTracerContext { return false; } } + + public long createTime() { + return this.createTime; + } + + public boolean isProfiling() { + return this.profiling; + } + } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java new file mode 100644 index 0000000000..c175454d57 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.context; + +/** + * @author MrPro + */ +public interface TracingThreadListener { + + void afterMainThreadFinish(TracingContext tracingContext); +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java index b5ece98f02..4eec0676f3 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java @@ -48,7 +48,11 @@ public abstract class AbstractTracingSpan implements AbstractSpan { * The flag represents whether the span has been async stopped */ private volatile boolean isAsyncStopped = false; - protected volatile AbstractTracerContext context; + + /** + * The context to which the span belongs + */ + protected final TracingContext owner; /** * The start time of this Span. @@ -79,18 +83,20 @@ public abstract class AbstractTracingSpan implements AbstractSpan { */ protected List refs; - protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName) { + protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) { this.operationName = operationName; this.operationId = DictionaryUtil.nullValue(); this.spanId = spanId; this.parentSpanId = parentSpanId; + this.owner = owner; } - protected AbstractTracingSpan(int spanId, int parentSpanId, int operationId) { + protected AbstractTracingSpan(int spanId, int parentSpanId, int operationId, TracingContext owner) { this.operationName = null; this.operationId = operationId; this.spanId = spanId; this.parentSpanId = parentSpanId; + this.owner = owner; } /** @@ -203,6 +209,9 @@ public abstract class AbstractTracingSpan implements AbstractSpan { public AbstractTracingSpan setOperationName(String operationName) { this.operationName = operationName; this.operationId = DictionaryUtil.nullValue(); + + // recheck profiling status + owner.profilingRecheck(this, operationName); return this; } @@ -332,7 +341,7 @@ public abstract class AbstractTracingSpan implements AbstractSpan { if (isInAsyncMode) { throw new RuntimeException("Prepare for async repeatedly. Span is already in async mode."); } - context = ContextManager.awaitFinishAsync(this); + ContextManager.awaitFinishAsync(this); isInAsyncMode = true; return this; } @@ -345,7 +354,7 @@ public abstract class AbstractTracingSpan implements AbstractSpan { throw new RuntimeException("Can not do async finish for the span repeately."); } this.endTime = System.currentTimeMillis(); - context.asyncStop(this); + owner.asyncStop(this); isAsyncStopped = true; return this; } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java index bef43e44e4..7d83af9371 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java @@ -18,6 +18,7 @@ package org.apache.skywalking.apm.agent.core.context.trace; +import org.apache.skywalking.apm.agent.core.context.TracingContext; import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.apache.skywalking.apm.network.trace.component.Component; @@ -37,13 +38,13 @@ public class EntrySpan extends StackBasedTracingSpan { private int currentMaxDepth; - public EntrySpan(int spanId, int parentSpanId, String operationName) { - super(spanId, parentSpanId, operationName); + public EntrySpan(int spanId, int parentSpanId, String operationName, TracingContext owner) { + super(spanId, parentSpanId, operationName, owner); this.currentMaxDepth = 0; } - public EntrySpan(int spanId, int parentSpanId, int operationId) { - super(spanId, parentSpanId, operationId); + public EntrySpan(int spanId, int parentSpanId, int operationId, TracingContext owner) { + super(spanId, parentSpanId, operationId, owner); this.currentMaxDepth = 0; } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java index aa0089a061..d081a78e8c 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java @@ -19,6 +19,7 @@ package org.apache.skywalking.apm.agent.core.context.trace; +import org.apache.skywalking.apm.agent.core.context.TracingContext; import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag; import org.apache.skywalking.apm.network.trace.component.Component; @@ -37,20 +38,20 @@ import org.apache.skywalking.apm.network.trace.component.Component; */ public class ExitSpan extends StackBasedTracingSpan implements WithPeerInfo { - public ExitSpan(int spanId, int parentSpanId, String operationName, String peer) { - super(spanId, parentSpanId, operationName, peer); + public ExitSpan(int spanId, int parentSpanId, String operationName, String peer, TracingContext owner) { + super(spanId, parentSpanId, operationName, peer, owner); } - public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId) { - super(spanId, parentSpanId, operationId, peerId); + public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId, TracingContext owner) { + super(spanId, parentSpanId, operationId, peerId, owner); } - public ExitSpan(int spanId, int parentSpanId, int operationId, String peer) { - super(spanId, parentSpanId, operationId, peer); + public ExitSpan(int spanId, int parentSpanId, int operationId, String peer, TracingContext owner) { + super(spanId, parentSpanId, operationId, peer, owner); } - public ExitSpan(int spanId, int parentSpanId, String operationName, int peerId) { - super(spanId, parentSpanId, operationName, peerId); + public ExitSpan(int spanId, int parentSpanId, String operationName, int peerId, TracingContext owner) { + super(spanId, parentSpanId, operationName, peerId, owner); } /** diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java index 1bd4a77ea0..be4c4cc604 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java @@ -19,6 +19,8 @@ package org.apache.skywalking.apm.agent.core.context.trace; +import org.apache.skywalking.apm.agent.core.context.TracingContext; + /** * The LocalSpan represents a normal tracing point, such as a local method. * @@ -26,12 +28,12 @@ package org.apache.skywalking.apm.agent.core.context.trace; */ public class LocalSpan extends AbstractTracingSpan { - public LocalSpan(int spanId, int parentSpanId, int operationId) { - super(spanId, parentSpanId, operationId); + public LocalSpan(int spanId, int parentSpanId, int operationId, TracingContext owner) { + super(spanId, parentSpanId, operationId, owner); } - public LocalSpan(int spanId, int parentSpanId, String operationName) { - super(spanId, parentSpanId, operationName); + public LocalSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) { + super(spanId, parentSpanId, operationName, owner); } @Override diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java index cdf26a8d7e..19000fbec7 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java @@ -18,6 +18,7 @@ package org.apache.skywalking.apm.agent.core.context.trace; +import org.apache.skywalking.apm.agent.core.context.TracingContext; import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager; import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil; import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound; @@ -35,40 +36,40 @@ public abstract class StackBasedTracingSpan extends AbstractTracingSpan { protected String peer; protected int peerId; - protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName) { - super(spanId, parentSpanId, operationName); + protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) { + super(spanId, parentSpanId, operationName, owner); this.stackDepth = 0; this.peer = null; this.peerId = DictionaryUtil.nullValue(); } - protected StackBasedTracingSpan(int spanId, int parentSpanId, int operationId) { - super(spanId, parentSpanId, operationId); + protected StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, TracingContext owner) { + super(spanId, parentSpanId, operationId, owner); this.stackDepth = 0; this.peer = null; this.peerId = DictionaryUtil.nullValue(); } - public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, int peerId) { - super(spanId, parentSpanId, operationId); + public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, int peerId, TracingContext owner) { + super(spanId, parentSpanId, operationId, owner); this.peer = null; this.peerId = peerId; } - public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, String peer) { - super(spanId, parentSpanId, operationId); + public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, String peer, TracingContext owner) { + super(spanId, parentSpanId, operationId, owner); this.peer = peer; this.peerId = DictionaryUtil.nullValue(); } - protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, String peer) { - super(spanId, parentSpanId, operationName); + protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, String peer, TracingContext owner) { + super(spanId, parentSpanId, operationName, owner); this.peer = peer; this.peerId = DictionaryUtil.nullValue(); } - protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, int peerId) { - super(spanId, parentSpanId, operationName); + protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, int peerId, TracingContext owner) { + super(spanId, parentSpanId, operationName, owner); this.peer = null; this.peerId = peerId; } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java index 8669c7de1e..2c4c6c1e0b 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java @@ -27,8 +27,11 @@ import java.util.Objects; */ public class ProfileTask { - // monitor endpoint name - private String endpointName; + // task id + private String taskId; + + // monitor first span operation name + private String fistSpanOPName; // task duration (minute) private int duration; @@ -48,12 +51,12 @@ public class ProfileTask { // task create time private long createTime; - public String getEndpointName() { - return endpointName; + public String getFistSpanOPName() { + return fistSpanOPName; } - public void setEndpointName(String endpointName) { - this.endpointName = endpointName; + public void setFistSpanOPName(String fistSpanOPName) { + this.fistSpanOPName = fistSpanOPName; } public int getDuration() { @@ -104,6 +107,14 @@ public class ProfileTask { this.createTime = createTime; } + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -115,11 +126,12 @@ public class ProfileTask { maxSamplingCount == that.maxSamplingCount && startTime == that.startTime && createTime == that.createTime && - endpointName.equals(that.endpointName); + taskId.equals(that.taskId) && + fistSpanOPName.equals(that.fistSpanOPName); } @Override public int hashCode() { - return Objects.hash(endpointName, duration, minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, createTime); + return Objects.hash(taskId, fistSpanOPName, duration, minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, createTime); } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java new file mode 100644 index 0000000000..9a4b839931 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.profile; + +import io.grpc.Channel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.apache.skywalking.apm.agent.core.boot.BootService; +import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; +import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.commands.CommandService; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig; +import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +import org.apache.skywalking.apm.agent.core.remote.*; +import org.apache.skywalking.apm.network.common.Commands; +import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery; +import org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport; +import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc; +import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot; +import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; + +import java.util.ArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; + +/** + * Sniffer and backend, about the communication service of profile task protocol. + * 1. Sniffer will check has new profile task list every {@link Config.Collector#GET_PROFILE_TASK_INTERVAL} second. + * 2. When there is a new profile task snapshot, the data is transferred to the back end. use {@link LinkedBlockingQueue} + * 3. When profiling task finish, it will send task finish status to backend + * + * @author MrPro + */ +@DefaultImplementor +public class ProfileTaskChannelService implements BootService, Runnable, GRPCChannelListener { + private static final ILog logger = LogManager.getLogger(ProfileTaskChannelService.class); + + // channel status + private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; + + // gRPC stub + private volatile ProfileTaskGrpc.ProfileTaskBlockingStub profileTaskBlockingStub; + private volatile ProfileTaskGrpc.ProfileTaskStub profileTaskStub; + + // segment snapshot sender + private final LinkedBlockingQueue snapshotQueue = new LinkedBlockingQueue<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE); + private volatile ScheduledFuture sendSnapshotFuture; + + // query task list schedule + private volatile ScheduledFuture getTaskListFuture; + + @Override + public void run() { + if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue() + && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue() + ) { + if (status == GRPCChannelStatus.CONNECTED) { + try { + ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder(); + + // sniffer info + builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); + + // last command create time + builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime()); + + Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).getProfileTaskCommands(builder.build()); + ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); + } catch (Throwable t) { + if (!(t instanceof StatusRuntimeException)) { + logger.error(t, "Query profile task from backend fail."); + return; + } + final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; + if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) { + logger.warn("Backend doesn't support profiling, profiling will be disabled"); + if (getTaskListFuture != null) { + getTaskListFuture.cancel(true); + } + + // stop snapshot sender + if (sendSnapshotFuture != null) { + sendSnapshotFuture.cancel(true); + } + } + } + } + } + + } + + @Override + public void prepare() throws Throwable { + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); + } + + @Override + public void boot() throws Throwable { + if (Config.Profile.ACTIVE) { + // query task list + getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileGetTaskService")) + .scheduleWithFixedDelay(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() { + @Override + public void handle(Throwable t) { + logger.error("Query profile task list failure.", t); + } + }), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS); + + sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileSendSnapshotService")) + .scheduleWithFixedDelay(new RunnableWithExceptionProtection(new SnapshotSender(), new RunnableWithExceptionProtection.CallbackWhenException() { + @Override public void handle(Throwable t) { + logger.error("Profile segment snapshot upload failure.", t); + } + }), 0, 500, TimeUnit.MILLISECONDS); + } + } + + @Override + public void onComplete() throws Throwable { + } + + @Override + public void shutdown() throws Throwable { + if (getTaskListFuture != null) { + getTaskListFuture.cancel(true); + } + + if (sendSnapshotFuture != null) { + sendSnapshotFuture.cancel(true); + } + } + + @Override + public void statusChanged(GRPCChannelStatus status) { + if (GRPCChannelStatus.CONNECTED.equals(status)) { + Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); + profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel); + profileTaskStub = ProfileTaskGrpc.newStub(channel); + } else { + profileTaskBlockingStub = null; + profileTaskStub = null; + } + this.status = status; + } + + /** + * add a new profiling snapshot, send to {@link #snapshotQueue} + */ + public void addProfilingSnapshot(TracingThreadSnapshot snapshot) { + snapshotQueue.add(snapshot); + } + + /** + * notify backend profile task has finish + */ + public void notifyProfileTaskFinish(ProfileTask task) { + try { + final ProfileTaskFinishReport.Builder reportBuilder = ProfileTaskFinishReport.newBuilder(); + // sniffer info + reportBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); + // task info + reportBuilder.setTaskId(task.getTaskId()); + + // send data + profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).reportTaskFinish(reportBuilder.build()); + } catch (Throwable e) { + logger.error(e, "Notify profile task finish to backend fail."); + } + } + + /** + * send segment snapshot + */ + private class SnapshotSender implements Runnable { + + @Override + public void run() { + if (status == GRPCChannelStatus.CONNECTED) { + try { + ArrayList buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE); + snapshotQueue.drainTo(buffer); + if (buffer.size() > 0) { + final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); + StreamObserver snapshotStreamObserver = profileTaskStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collectSnapshot(new StreamObserver() { + @Override + public void onNext(Commands commands) { + } + + @Override + public void onError(Throwable throwable) { + status.finished(); + if (logger.isErrorEnable()) { + logger.error(throwable, "Send profile segment snapshot to collector fail with a grpc internal exception."); + } + ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable); + } + + @Override + public void onCompleted() { + status.finished(); + } + }); + for (TracingThreadSnapshot snapshot : buffer) { + final ThreadSnapshot transformSnapshot = snapshot.transform(); + snapshotStreamObserver.onNext(transformSnapshot); + } + + snapshotStreamObserver.onCompleted(); + status.wait4Finish(); + } + } catch (Throwable t) { + logger.error(t, "Send profile segment snapshot to backend fail."); + } + } + } + + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java index 20b9eecf31..b17414c6b6 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java @@ -18,7 +18,15 @@ package org.apache.skywalking.apm.agent.core.profile; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.context.TracingContext; +import org.apache.skywalking.apm.agent.core.context.ids.ID; + import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; /** * profile task execution context, it will create on process this profile task @@ -30,20 +38,119 @@ public class ProfileTaskExecutionContext { // task data private final ProfileTask task; - // task real start time - private final long startTime; + // record current profiling count, use this to check has available profile slot + private final AtomicInteger currentProfilingCount = new AtomicInteger(0); + + // profiling segment slot + private volatile AtomicReferenceArray profilingSegmentSlots; + + // current profiling execution future + private volatile Future profilingFuture; - public ProfileTaskExecutionContext(ProfileTask task, long startTime) { + // total started profiling tracing context count + private final AtomicInteger totalStartedProfilingCount = new AtomicInteger(0); + + public ProfileTaskExecutionContext(ProfileTask task) { this.task = task; - this.startTime = startTime; + profilingSegmentSlots = new AtomicReferenceArray<>(Config.Profile.MAX_PARALLEL); + } + + /** + * start profiling this task + */ + public void startProfiling(ExecutorService executorService) { + profilingFuture = executorService.submit(new ProfileThread(this)); + } + + /** + * stop profiling + */ + public void stopProfiling() { + if (profilingFuture != null) { + profilingFuture.cancel(true); + } + } + + /** + * check have available slot to profile and add it + * + * @return is add profile success + */ + public boolean attemptProfiling(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) { + // check has available slot + final int usingSlotCount = currentProfilingCount.get(); + if (usingSlotCount >= Config.Profile.MAX_PARALLEL) { + return false; + } + + // check first operation name matches + if (!Objects.equals(task.getFistSpanOPName(), firstSpanOPName)) { + return false; + } + + // if out limit started profiling count then stop add profiling + if (totalStartedProfilingCount.get() > task.getMaxSamplingCount()) { + return false; + } + + // try to occupy slot + if (!currentProfilingCount.compareAndSet(usingSlotCount, usingSlotCount + 1)) { + return false; + } + + final ThreadProfiler threadProfiler = new ThreadProfiler(tracingContext, traceSegmentId, Thread.currentThread(), this); + int slotLength = profilingSegmentSlots.length(); + for (int slot = 0; slot < slotLength; slot++) { + if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) { + break; + } + } + return true; + } + + + /** + * profiling recheck + */ + public boolean profilingRecheck(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) { + // if started, keep profiling + if (tracingContext.isProfiling()) { + return true; + } + + return attemptProfiling(tracingContext, traceSegmentId, firstSpanOPName); + } + + /** + * find tracing context and clear on slot + */ + public void stopTracingProfile(TracingContext tracingContext) { + // find current tracingContext and clear it + int slotLength = profilingSegmentSlots.length(); + for (int slot = 0; slot < slotLength; slot++) { + ThreadProfiler currentProfiler = profilingSegmentSlots.get(slot); + if (currentProfiler != null && currentProfiler.matches(tracingContext)) { + profilingSegmentSlots.set(slot, null); + + // setting stop running + currentProfiler.stopProfiling(); + currentProfilingCount.addAndGet(-1); + break; + } + } } public ProfileTask getTask() { return task; } - public long getStartTime() { - return startTime; + public AtomicReferenceArray threadProfilerSlots() { + return profilingSegmentSlots; + } + + public boolean isStartProfileable() { + // check is out of max sampling count check + return totalStartedProfilingCount.incrementAndGet() > task.getMaxSamplingCount(); } @Override diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java index 0bf0f50fbd..9a3f2703f7 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java @@ -21,6 +21,10 @@ package org.apache.skywalking.apm.agent.core.profile; import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.context.TracingContext; +import org.apache.skywalking.apm.agent.core.context.TracingThreadListener; +import org.apache.skywalking.apm.agent.core.context.ids.ID; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.network.constants.ProfileConstants; @@ -38,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; * @author MrPro */ @DefaultImplementor -public class ProfileTaskExecutionService implements BootService { +public class ProfileTaskExecutionService implements BootService, TracingThreadListener { private static final ILog logger = LogManager.getLogger(ProfileTaskExecutionService.class); @@ -51,12 +55,14 @@ public class ProfileTaskExecutionService implements BootService { // current processing profile task context private final AtomicReference taskExecutionContext = new AtomicReference<>(); + // profile executor thread pool, only running one thread + private final static ExecutorService PROFILE_EXECUTOR = Executors.newSingleThreadExecutor(new DefaultNamedThreadFactory("PROFILING-TASK")); + // profile task list, include running and waiting running tasks private final List profileTaskList = Collections.synchronizedList(new LinkedList<>()); /** - * get profile task from OAP - * @param task + * add profile task from OAP */ public void addProfileTask(ProfileTask task) { // update last command create time @@ -65,9 +71,9 @@ public class ProfileTaskExecutionService implements BootService { } // check profile task limit - final String dataError = checkProfileTaskSuccess(task); - if (dataError != null) { - logger.warn("check command error, cannot process this profile task. reason: {}", dataError); + final CheckResult dataError = checkProfileTaskSuccess(task); + if (!dataError.isSuccess()) { + logger.warn("check command error, cannot process this profile task. reason: {}", dataError.getErrorReason()); return; } @@ -84,19 +90,46 @@ public class ProfileTaskExecutionService implements BootService { }, timeToProcessMills, TimeUnit.MILLISECONDS); } + /** + * check and add {@link TracingContext} profiling + */ + public boolean addProfiling(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) { + // get current profiling task, check need profiling + final ProfileTaskExecutionContext executionContext = taskExecutionContext.get(); + if (executionContext == null) { + return false; + } + + return executionContext.attemptProfiling(tracingContext, traceSegmentId, firstSpanOPName); + } + + /** + * Re-check current trace need profiling, in case that third-party plugins change the operation name. + */ + public boolean profilingRecheck(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) { + // get current profiling task, check need profiling + final ProfileTaskExecutionContext executionContext = taskExecutionContext.get(); + if (executionContext == null) { + return false; + } + + return executionContext.profilingRecheck(tracingContext, traceSegmentId, firstSpanOPName); + } + /** * active the selected profile task to execution task, and start a removal task for it. - * @param task */ private synchronized void processProfileTask(ProfileTask task) { // make sure prev profile task already stopped stopCurrentProfileTask(taskExecutionContext.get()); // make stop task schedule and task context - // TODO process task on next step - final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task, System.currentTimeMillis()); + final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task); taskExecutionContext.set(currentStartedTaskContext); + // start profiling this task + currentStartedTaskContext.startProfiling(PROFILE_EXECUTOR); + PROFILE_TASK_SCHEDULE.schedule(new Runnable() { @Override public void run() { @@ -108,36 +141,44 @@ public class ProfileTaskExecutionService implements BootService { /** * stop profile task, remove context data */ - private synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) { + synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) { // stop same context only if (needToStop == null || !taskExecutionContext.compareAndSet(needToStop, null)) { return; } + // current execution stop running + needToStop.stopProfiling(); + // remove task profileTaskList.remove(needToStop.getTask()); - // TODO notify OAP current profile task execute finish + // notify profiling task has finished + ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class).notifyProfileTaskFinish(needToStop.getTask()); } @Override public void prepare() throws Throwable { - } @Override public void boot() throws Throwable { - } @Override public void onComplete() throws Throwable { - + // add trace finish notification + TracingContext.TracingThreadListenerManager.add(this); } @Override public void shutdown() throws Throwable { + // remove trace listener + TracingContext.TracingThreadListenerManager.remove(this); + PROFILE_TASK_SCHEDULE.shutdown(); + + PROFILE_EXECUTOR.shutdown(); } public long getLastCommandCreateTime() { @@ -146,39 +187,37 @@ public class ProfileTaskExecutionService implements BootService { /** * check profile task data success, make the re-check, prevent receiving wrong data from database or OAP - * @param task - * @return */ - private String checkProfileTaskSuccess(ProfileTask task) { + private CheckResult checkProfileTaskSuccess(ProfileTask task) { // endpoint name - if (StringUtil.isEmpty(task.getEndpointName())) { - return "endpoint name cannot be empty"; + if (StringUtil.isEmpty(task.getFistSpanOPName())) { + return new CheckResult(false, "endpoint name cannot be empty"); } // duration if (task.getDuration() < ProfileConstants.TASK_DURATION_MIN_MINUTE) { - return "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes"; + return new CheckResult(false, "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes"); } if (task.getDuration() > ProfileConstants.TASK_DURATION_MAX_MINUTE) { - return "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes"; + return new CheckResult(false, "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes"); } // min duration threshold if (task.getMinDurationThreshold() < 0) { - return "min duration threshold must greater than or equals zero"; + return new CheckResult(false, "min duration threshold must greater than or equals zero"); } // dump period if (task.getThreadDumpPeriod() < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) { - return "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds"; + return new CheckResult(false, "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds"); } // max sampling count if (task.getMaxSamplingCount() <= 0) { - return "max sampling count must greater than zero"; + return new CheckResult(false, "max sampling count must greater than zero"); } if (task.getMaxSamplingCount() >= ProfileConstants.TASK_MAX_SAMPLING_COUNT) { - return "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT; + return new CheckResult(false, "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT); } // check task queue, check only one task in a certain time @@ -187,15 +226,46 @@ public class ProfileTaskExecutionService implements BootService { // if the end time of the task to be added is during the execution of any data, means is a error data if (taskProcessFinishTime >= profileTask.getStartTime() && taskProcessFinishTime <= calcProfileTaskFinishTime(profileTask)) { - return "there already have processing task in time range, could not add a new task again. processing task monitor endpoint name: " + profileTask.getEndpointName(); + return new CheckResult(false, "there already have processing task in time range, could not add a new task again. processing task monitor endpoint name: " + profileTask.getFistSpanOPName()); } } - return null; + return new CheckResult(true, null); } private long calcProfileTaskFinishTime(ProfileTask task) { return task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration()); } + @Override + public void afterMainThreadFinish(TracingContext tracingContext) { + if (tracingContext.isProfiling()) { + // stop profiling tracing context + ProfileTaskExecutionContext currentExecutionContext = taskExecutionContext.get(); + if (currentExecutionContext != null) { + currentExecutionContext.stopTracingProfile(tracingContext); + } + } + } + + /** + * check profile task is processable + */ + private static class CheckResult { + private boolean success; + private String errorReason; + + public CheckResult(boolean success, String errorReason) { + this.success = success; + this.errorReason = errorReason; + } + + public boolean isSuccess() { + return success; + } + + public String getErrorReason() { + return errorReason; + } + } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java deleted file mode 100644 index 5cca040bf2..0000000000 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.apm.agent.core.profile; - -import io.grpc.Channel; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import org.apache.skywalking.apm.agent.core.boot.BootService; -import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; -import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; -import org.apache.skywalking.apm.agent.core.boot.ServiceManager; -import org.apache.skywalking.apm.agent.core.commands.CommandService; -import org.apache.skywalking.apm.agent.core.conf.Config; -import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig; -import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil; -import org.apache.skywalking.apm.agent.core.logging.api.ILog; -import org.apache.skywalking.apm.agent.core.logging.api.LogManager; -import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; -import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; -import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus; -import org.apache.skywalking.apm.network.common.Commands; -import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery; -import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT; - -/** - * sniffer will check has new profile task list every {@link Config.Collector#GET_PROFILE_TASK_INTERVAL} second. - * - * @author MrPro - */ -@DefaultImplementor -public class ProfileTaskQueryService implements BootService, Runnable, GRPCChannelListener { - private static final ILog logger = LogManager.getLogger(ProfileTaskQueryService.class); - - private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; - private volatile ProfileTaskGrpc.ProfileTaskBlockingStub profileTaskBlockingStub; - private volatile ScheduledFuture getTaskListFuture; - - @Override - public void run() { - if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue() - && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue() - ) { - if (status == GRPCChannelStatus.CONNECTED) { - try { - ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder(); - - // sniffer info - builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); - - // last command create time - builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime()); - - Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).getProfileTaskCommands(builder.build()); - ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); - } catch (Throwable t) { - if (!(t instanceof StatusRuntimeException)) { - logger.error(t, "query profile task from Collector fail.", t); - return; - } - final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; - if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) { - logger.warn("Backend doesn't support profiling, profiling will be disabled"); - if (getTaskListFuture != null) { - getTaskListFuture.cancel(true); - } - } - } - } - } - - } - - @Override - public void prepare() throws Throwable { - ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); - } - - @Override - public void boot() throws Throwable { - if (Config.Profile.ACTIVE) { - getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileGetTaskService")) - .scheduleWithFixedDelay(this, 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS); - } - } - - @Override - public void onComplete() throws Throwable { - } - - @Override - public void shutdown() throws Throwable { - if (getTaskListFuture != null) { - getTaskListFuture.cancel(true); - } - } - - @Override - public void statusChanged(GRPCChannelStatus status) { - if (GRPCChannelStatus.CONNECTED.equals(status)) { - Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); - profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel); - } else { - profileTaskBlockingStub = null; - } - this.status = status; - } -} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java new file mode 100644 index 0000000000..086126f756 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.profile; + +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; + +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * Profile task process thread, dump the executing thread stack. + * + * @author MrPro + */ +public class ProfileThread implements Runnable { + + private static final ILog logger = LogManager.getLogger(ProfileThread.class); + + // profiling task context + private final ProfileTaskExecutionContext taskExecutionContext; + + private final ProfileTaskExecutionService profileTaskExecutionService; + private final ProfileTaskChannelService profileTaskChannelService; + + public ProfileThread(ProfileTaskExecutionContext taskExecutionContext) { + this.taskExecutionContext = taskExecutionContext; + profileTaskExecutionService = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class); + profileTaskChannelService = ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class); + } + + @Override + public void run() { + + try { + profiling(taskExecutionContext); + } catch (InterruptedException e) { + // ignore interrupted + // means current task has stopped + } catch (Exception e) { + logger.error(e, "Profiling task fail. taskId:{}", taskExecutionContext.getTask().getTaskId()); + } finally { + // finally stop current profiling task, tell execution service task has stop + profileTaskExecutionService.stopCurrentProfileTask(taskExecutionContext); + } + + } + + /** + * start profiling + */ + private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException { + + int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod(); + + // run loop when current thread still running + long currentLoopStartTime = -1; + while (!Thread.currentThread().isInterrupted()) { + currentLoopStartTime = System.currentTimeMillis(); + + // each all slot + AtomicReferenceArray profilers = executionContext.threadProfilerSlots(); + int profilerCount = profilers.length(); + for (int slot = 0; slot < profilerCount; slot++) { + ThreadProfiler currentProfiler = profilers.get(slot); + if (currentProfiler == null) { + continue; + } + + switch (currentProfiler.profilingStatus()) { + + case READY: + // check tracing context running time + currentProfiler.startProfilingIfNeed(); + break; + + case PROFILING: + // dump stack + TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot(); + if (snapshot != null) { + profileTaskChannelService.addProfilingSnapshot(snapshot); + } else { + // tell execution context current tracing thread dump failed, stop it + executionContext.stopTracingProfile(currentProfiler.tracingContext()); + } + break; + + } + } + + // sleep to next period + // if out of period, sleep one period + long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis(); + needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod; + Thread.sleep(needToSleep); + } + } + +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java new file mode 100644 index 0000000000..5159c0ce42 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.profile; + +/** + * @author MrPro + */ +public enum ProfilingStatus { + + READY, + + PROFILING, + + STOPPED +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java new file mode 100644 index 0000000000..59b5e998f0 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.profile; + +import com.google.common.base.Objects; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.context.TracingContext; +import org.apache.skywalking.apm.agent.core.context.ids.ID; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +/** + * @author MrPro + */ +public class ThreadProfiler { + + // current tracing context + private final TracingContext tracingContext; + // current tracing segment id + private final ID traceSegmentId; + // need to profiling thread + private final Thread profilingThread; + // profiling execution context + private final ProfileTaskExecutionContext executionContext; + + // profiling time + private long profilingStartTime; + private long profilingMaxTimeMills; + + // after min duration threshold check, it will start dump + private ProfilingStatus profilingStatus = ProfilingStatus.READY; + // thread dump sequence + private int dumpSequence = 0; + + public ThreadProfiler(TracingContext tracingContext, ID traceSegmentId, Thread profilingThread, ProfileTaskExecutionContext executionContext) { + this.tracingContext = tracingContext; + this.traceSegmentId = traceSegmentId; + this.profilingThread = profilingThread; + this.executionContext = executionContext; + this.profilingMaxTimeMills = TimeUnit.MINUTES.toMillis(Config.Profile.MAX_DURATION); + } + + /** + * If tracing start time greater than {@link ProfileTask#getMinDurationThreshold()}, then start to profiling trace + */ + public void startProfilingIfNeed() { + if (System.currentTimeMillis() - tracingContext.createTime() > executionContext.getTask().getMinDurationThreshold()) { + this.profilingStartTime = System.currentTimeMillis(); + this.profilingStatus = ProfilingStatus.PROFILING; + } + } + + /** + * Stop profiling status + */ + public void stopProfiling() { + this.profilingStatus = ProfilingStatus.STOPPED; + } + + /** + * dump tracing thread and build thread snapshot + * + * @return snapshot, if null means dump snapshot error, should stop it + */ + public TracingThreadSnapshot buildSnapshot() { + if (!isProfilingContinuable()) { + return null; + } + + long currentTime = System.currentTimeMillis(); + // dump thread + StackTraceElement[] stackTrace; + try { + stackTrace = profilingThread.getStackTrace(); + + // stack depth is zero, means thread is already run finished + if (stackTrace.length == 0) { + return null; + } + } catch (Exception e) { + // dump error ignore and make this profiler stop + return null; + } + + // if is first dump, check is can start profiling + if (dumpSequence == 0 && (!executionContext.isStartProfileable())) { + return null; + } + + int dumpElementCount = Math.min(stackTrace.length, Config.Profile.DUMP_MAX_STACK_DEPTH); + + // use inverted order, because thread dump is start with bottom + final ArrayList stackList = new ArrayList<>(dumpElementCount); + for (int i = dumpElementCount - 1; i >= 0; i--) { + stackList.add(buildStackElementCodeSignature(stackTrace[i])); + } + + String taskId = executionContext.getTask().getTaskId(); + return new TracingThreadSnapshot(taskId, traceSegmentId, dumpSequence++, currentTime, stackList); + } + + /** + * build thread stack element code signature + * + * @return code sign: className.methodName:lineNumber + */ + private String buildStackElementCodeSignature(StackTraceElement element) { + return element.getClassName() + "." + element.getMethodName() + ":" + element.getLineNumber(); + } + + /** + * matches profiling tracing context + */ + public boolean matches(TracingContext context) { + // match trace id + return Objects.equal(context.getReadableGlobalTraceId(), tracingContext.getReadableGlobalTraceId()); + } + + /** + * check whether profiling should continue + * + * @return if true means this thread profiling is continuable + */ + private boolean isProfilingContinuable() { + return System.currentTimeMillis() - profilingStartTime < profilingMaxTimeMills; + } + + public TracingContext tracingContext() { + return tracingContext; + } + + public ProfilingStatus profilingStatus() { + return profilingStatus; + } + +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java new file mode 100644 index 0000000000..dcfcf05971 --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.profile; + +import org.apache.skywalking.apm.agent.core.context.ids.ID; +import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot; +import org.apache.skywalking.apm.network.language.profile.ThreadStack; + +import java.util.List; + +/** + * @author MrPro + */ +public class TracingThreadSnapshot { + + // thread profiler + private final String taskId; + private final ID traceSegmentId; + + // dump info + private final int sequence; + private final long time; + private final List stackList; + + public TracingThreadSnapshot(String taskId, ID traceSegmentId, int sequence, long time, List stackList) { + this.taskId = taskId; + this.traceSegmentId = traceSegmentId; + this.sequence = sequence; + this.time = time; + this.stackList = stackList; + } + + /** + * transform to gRPC data + */ + public ThreadSnapshot transform() { + final ThreadSnapshot.Builder builder = ThreadSnapshot.newBuilder(); + // task id + builder.setTaskId(taskId); + // dumped segment id + builder.setTraceSegmentId(traceSegmentId.transform()); + // dump time + builder.setTime(time); + // snapshot dump sequence + builder.setSequence(sequence); + // snapshot stack + final ThreadStack.Builder stackBuilder = ThreadStack.newBuilder(); + for (String codeSign : stackList) { + stackBuilder.addCodeSignatures(codeSign); + } + builder.setStack(stackBuilder); + + return builder.build(); + } + + +} diff --git a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService index 826b03215f..392600c769 100644 --- a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService +++ b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService @@ -26,5 +26,5 @@ org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService org.apache.skywalking.apm.agent.core.commands.CommandService org.apache.skywalking.apm.agent.core.commands.CommandExecutorService org.apache.skywalking.apm.agent.core.context.OperationNameFormatService -org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService +org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java index 0390dcc01b..53b4d4d8c9 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java @@ -22,13 +22,11 @@ package org.apache.skywalking.apm.agent.core.boot; import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; -import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext; -import org.apache.skywalking.apm.agent.core.context.TracingContext; -import org.apache.skywalking.apm.agent.core.context.TracingContextListener; + +import org.apache.skywalking.apm.agent.core.context.*; import org.apache.skywalking.apm.agent.core.jvm.JVMService; import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService; -import org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService; +import org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener; import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient; @@ -64,11 +62,17 @@ public class ServiceManagerTest { assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class)); assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class)); assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class)); - assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskQueryService.class)); + assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class)); assertProfileTaskExecuteService(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class)); assertTracingContextListener(); assertIgnoreTracingContextListener(); + assertTracingThreadContextListener(); + } + + private void assertTracingThreadContextListener() throws Exception { + List listeners = getFieldValue(TracingContext.TracingThreadListenerManager.class, "LISTENERS"); + assertThat(listeners.size(), is(1)); } private void assertIgnoreTracingContextListener() throws Exception { @@ -87,7 +91,7 @@ public class ServiceManagerTest { assertNotNull(service); } - private void assertProfileTaskQueryService(ProfileTaskQueryService service) { + private void assertProfileTaskQueryService(ProfileTaskChannelService service) { assertNotNull(service); } diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java index 0943107349..9686324ca1 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java @@ -46,7 +46,7 @@ public class TracingContextTest { }; TracingContext.ListenerManager.add(listener); try { - TracingContext tracingContext = new TracingContext(); + TracingContext tracingContext = new TracingContext("/url"); AbstractSpan span = tracingContext.createEntrySpan("/url"); for (int i = 0; i < 10; i++) { diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java index 1e4a987649..bd2b368a72 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java @@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.test.tools; import java.util.HashMap; import java.util.LinkedList; + +import org.apache.skywalking.apm.agent.core.context.TracingThreadListener; import org.junit.rules.ExternalResource; import org.powermock.reflect.Whitebox; import org.apache.skywalking.apm.agent.core.boot.BootService; @@ -37,6 +39,7 @@ public class AgentServiceRule extends ExternalResource { Whitebox.setInternalState(ServiceManager.INSTANCE, "bootedServices", new HashMap()); Whitebox.setInternalState(TracingContext.ListenerManager.class, "LISTENERS", new LinkedList()); Whitebox.setInternalState(IgnoredTracerContext.ListenerManager.class, "LISTENERS", new LinkedList()); + Whitebox.setInternalState(TracingContext.TracingThreadListenerManager.class, "LISTENERS", new LinkedList()); } @Override diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index acd369b669..ee6dc5a4d6 100644 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -44,6 +44,18 @@ agent.service_name=${SW_AGENT_NAME:Your_ApplicationName} # If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile. # profile.active=${SW_AGENT_PROFILE_ACTIVE:true} +# Parallel monitor segment count +# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5} + +# Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it. +# profile.duration=${SW_AGENT_PROFILE_DURATION:10} + +# Max dump thread stack depth +# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500} + +# Snapshot transport to backend buffer size +# profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50} + # Backend service addresses. collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md index f38d67fa70..fc9946b40a 100755 --- a/docs/en/setup/service-agent/java-agent/README.md +++ b/docs/en/setup/service-agent/java-agent/README.md @@ -102,6 +102,10 @@ property key | Description | Default | `dictionary.service_code_buffer_size`|The buffer size of application codes and peer|`10 * 10000`| `dictionary.endpoint_name_buffer_size`|The buffer size of endpoint names and peer|`1000 * 10000`| `profile.active`|If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.|`true`| +`profile.max_parallel`|Parallel monitor segment count|`5`| +`profile.duration`|Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it.|`10`| +`profile.dump_max_stack_depth`|Max dump thread stack depth|`500`| +`profile.snapshot_transport_buffer_size`|Snapshot transport to backend buffer size|`50`| `plugin.peer_max_length `|Peer maximum description limit.|`200`| `plugin.mongodb.trace_param`|If true, trace all the parameters in MongoDB access, default is false. Only trace the operation, not include parameters.|`false`| `plugin.mongodb.filter_length_limit`|If set to positive number, the `WriteRequest.params` would be truncated to this length, otherwise it would be completely saved, which may cause performance problem.|`256`| diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 3f79711266..faaa7e3e08 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -164,6 +164,8 @@ istio-telemetry: default: envoy-metric: default: +receiver-profile: + default: # alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh} #receiver_zipkin: # default: @@ -233,3 +235,4 @@ configuration: # grpc: # targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1} # targetPort: ${SW_EXPORTER_GRPC_PORT:9870} + diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java index b790fbcaf2..22f6172cb6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java @@ -23,14 +23,18 @@ import com.google.common.cache.CacheBuilder; import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.query.entity.ProfileTask; +import org.apache.skywalking.oap.server.core.storage.StorageModule; +import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; /** @@ -42,9 +46,11 @@ public class ProfileTaskCache implements Service { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileTaskCache.class); - private final Cache> profileTaskCache; + private final Cache> profileTaskDownstreamCache; + private final Cache profileTaskIdCache; private final ModuleManager moduleManager; + private IProfileTaskQueryDAO profileTaskQueryDAO; public ProfileTaskCache(ModuleManager moduleManager, CoreModuleConfig moduleConfig) { this.moduleManager = moduleManager; @@ -52,9 +58,18 @@ public class ProfileTaskCache implements Service { long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L; int initialCapacitySize = (int)(initialSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : initialSize); - profileTaskCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask()) + profileTaskDownstreamCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask()) // remove old profile task data .expireAfterWrite(Duration.ofMinutes(1)).build(); + + profileTaskIdCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask()).build(); + } + + private IProfileTaskQueryDAO getProfileTaskQueryDAO() { + if (Objects.isNull(profileTaskQueryDAO)) { + profileTaskQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class); + } + return profileTaskQueryDAO; } /** @@ -64,10 +79,32 @@ public class ProfileTaskCache implements Service { */ public List getProfileTaskList(int serviceId) { // read profile task list from cache only, use cache update timer mechanism - List profileTaskList = profileTaskCache.getIfPresent(serviceId); + List profileTaskList = profileTaskDownstreamCache.getIfPresent(serviceId); return profileTaskList; } + /** + * query profile task by id + * @param id + * @return + */ + public ProfileTask getProfileTaskById(String id) { + ProfileTask profile = profileTaskIdCache.getIfPresent(id); + + if (profile == null) { + try { + profile = getProfileTaskQueryDAO().getById(id); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + } + if (profile != null) { + profileTaskIdCache.put(id, profile); + } + } + + return profile; + } + /** * save service task list * @param serviceId @@ -78,7 +115,7 @@ public class ProfileTaskCache implements Service { taskList = Collections.emptyList(); } - profileTaskCache.put(serviceId, taskList); + profileTaskDownstreamCache.put(serviceId, taskList); } /** diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java index cb6fa4315f..c152d83d47 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java @@ -43,7 +43,7 @@ public class CommandService implements Service { public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) { final String serialNumber = UUID.randomUUID().toString(); - return new ProfileTaskCommand(serialNumber, task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime()); + return new ProfileTaskCommand(serialNumber, task.getId(), task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime()); } private String generateSerialNumber(final int serviceInstanceId, final long time, final String serviceInstanceUUID) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java new file mode 100644 index 0000000000..36a4cba098 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.skywalking.oap.server.core.profile; + +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; +import org.apache.skywalking.oap.server.core.source.ScopeDeclaration; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; + +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT; + +/** + * Profiling segment snapshot database bean, use record + * + * @author MrPro + */ +@Getter +@Setter +@ScopeDeclaration(id = PROFILE_TASK_SEGMENT_SNAPSHOT, name = "ProfileTaskSegmentSnapshot") +@Stream(name = ProfileTaskSegmentSnapshotRecord.INDEX_NAME, scopeId = PROFILE_TASK_SEGMENT_SNAPSHOT, builder = ProfileTaskSegmentSnapshotRecord.Builder.class, processor = RecordStreamProcessor.class) +public class ProfileTaskSegmentSnapshotRecord extends Record { + + public static final String INDEX_NAME = "profile_task_segment_snapshot"; + public static final String TASK_ID = "task_id"; + public static final String SEGMENT_ID = "segment_id"; + public static final String DUMP_TIME = "dump_time"; + public static final String SEQUENCE = "sequence"; + public static final String STACK_BINARY = "stack_binary"; + + @Column(columnName = TASK_ID) private String taskId; + @Column(columnName = SEGMENT_ID) private String segmentId; + @Column(columnName = DUMP_TIME) private long dumpTime; + @Column(columnName = SEQUENCE) private int sequence; + @Column(columnName = STACK_BINARY) private byte[] stackBinary; + + @Override + public String id() { + return getTaskId() + Const.ID_SPLIT + getSegmentId() + Const.ID_SPLIT + getSequence() + Const.ID_SPLIT; + } + + public static class Builder implements StorageBuilder { + + @Override + public ProfileTaskSegmentSnapshotRecord map2Data(Map dbMap) { + final ProfileTaskSegmentSnapshotRecord snapshot = new ProfileTaskSegmentSnapshotRecord(); + snapshot.setTaskId((String)dbMap.get(TASK_ID)); + snapshot.setSegmentId((String)dbMap.get(SEGMENT_ID)); + snapshot.setDumpTime(((Number)dbMap.get(DUMP_TIME)).longValue()); + snapshot.setSequence(((Number)dbMap.get(SEQUENCE)).intValue()); + snapshot.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).intValue()); + if (StringUtil.isEmpty((String)dbMap.get(STACK_BINARY))) { + snapshot.setStackBinary(new byte[] {}); + } else { + snapshot.setStackBinary(Base64.getDecoder().decode((String)dbMap.get(STACK_BINARY))); + } + return snapshot; + } + + @Override + public Map data2Map(ProfileTaskSegmentSnapshotRecord storageData) { + final HashMap map = new HashMap<>(); + map.put(TASK_ID, storageData.getTaskId()); + map.put(SEGMENT_ID, storageData.getSegmentId()); + map.put(DUMP_TIME, storageData.getDumpTime()); + map.put(SEQUENCE, storageData.getSequence()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + if (CollectionUtils.isEmpty(storageData.getStackBinary())) { + map.put(STACK_BINARY, Const.EMPTY_STRING); + } else { + map.put(STACK_BINARY, new String(Base64.getEncoder().encode(storageData.getStackBinary()))); + } + return map; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 54f9dfcc64..c6741b58a6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -69,6 +69,7 @@ public class DefaultScopeDefine { public static final int HTTP_ACCESS_LOG = 25; public static final int PROFILE_TASK = 26; public static final int PROFILE_TASK_LOG = 27; + public static final int PROFILE_TASK_SEGMENT_SNAPSHOT = 28; /** * Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java index 5e6fa098f8..7ca639d290 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java @@ -41,4 +41,11 @@ public interface IProfileTaskQueryDAO extends DAO { */ List getTaskList(final Integer serviceId, final String endpointName, final Long startTimeBucket, final Long endTimeBucket, final Integer limit) throws IOException; + /** + * query profile task by id + * @param id + * @return + */ + ProfileTask getById(final String id) throws IOException; + } diff --git a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java index 0511808cab..b181fd3869 100644 --- a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java @@ -20,19 +20,25 @@ package org.apache.skywalking.oap.server.receiver.profile.provider.handler; import io.grpc.stub.StreamObserver; import org.apache.skywalking.apm.network.common.Commands; +import org.apache.skywalking.apm.network.language.agent.UniqueId; import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery; +import org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport; import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc; +import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache; import org.apache.skywalking.oap.server.core.command.CommandService; import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskSegmentSnapshotRecord; import org.apache.skywalking.oap.server.core.query.entity.ProfileTask; import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.TimeUnit; @@ -42,6 +48,8 @@ import java.util.concurrent.TimeUnit; */ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBase implements GRPCHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(ProfileTaskServiceHandler.class); + private ProfileTaskCache profileTaskCache; private final CommandService commandService; @@ -71,7 +79,7 @@ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBa } // record profile task log - recordProfileTaskLog(profileTask, request); + recordProfileTaskLog(profileTask, request.getInstanceId(), ProfileTaskLogOperationType.NOTIFIED); // add command commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build()); @@ -81,11 +89,72 @@ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBa responseObserver.onCompleted(); } - private void recordProfileTaskLog(ProfileTask task, ProfileTaskCommandQuery query) { + @Override + public StreamObserver collectSnapshot(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(ThreadSnapshot snapshot) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("receive profile segment snapshot"); + } + + // parse segment id + UniqueId uniqueId = snapshot.getTraceSegmentId(); + StringBuilder segmentIdBuilder = new StringBuilder(); + for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) { + if (i == 0) { + segmentIdBuilder.append(uniqueId.getIdPartsList().get(i)); + } else { + segmentIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i)); + } + } + + // build database data + final ProfileTaskSegmentSnapshotRecord record = new ProfileTaskSegmentSnapshotRecord(); + record.setTaskId(snapshot.getTaskId()); + record.setSegmentId(segmentIdBuilder.toString()); + record.setDumpTime(snapshot.getTime()); + record.setSequence(snapshot.getSequence()); + record.setStackBinary(snapshot.getStack().toByteArray()); + record.setTimeBucket(TimeBucket.getRecordTimeBucket(snapshot.getTime())); + + // async storage + RecordStreamProcessor.getInstance().in(record); + } + + @Override + public void onError(Throwable throwable) { + LOGGER.error(throwable.getMessage(), throwable); + responseObserver.onCompleted(); + } + + @Override + public void onCompleted() { + responseObserver.onNext(Commands.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public void reportTaskFinish(ProfileTaskFinishReport request, StreamObserver responseObserver) { + // query task from cache, set log time bucket need it + final ProfileTask profileTask = profileTaskCache.getProfileTaskById(request.getTaskId()); + + // record finish log + if (profileTask != null) { + recordProfileTaskLog(profileTask, request.getInstanceId(), ProfileTaskLogOperationType.EXECUTION_FINISHED); + } + + responseObserver.onNext(Commands.newBuilder().build()); + responseObserver.onCompleted(); + } + + private void recordProfileTaskLog(ProfileTask task, int instanceId, ProfileTaskLogOperationType operationType) { final ProfileTaskLogRecord logRecord = new ProfileTaskLogRecord(); logRecord.setTaskId(task.getId()); - logRecord.setInstanceId(query.getInstanceId()); - logRecord.setOperationType(ProfileTaskLogOperationType.NOTIFIED.getCode()); + logRecord.setInstanceId(instanceId); + logRecord.setOperationType(operationType.getCode()); logRecord.setOperationTime(System.currentTimeMillis()); // same with task time bucket, ensure record will ttl same with profile task logRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration()))); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java index d7598cff72..dbd2dee7fd 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java @@ -88,6 +88,25 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO return tasks; } + @Override + public ProfileTask getById(String id) throws IOException { + if (StringUtil.isEmpty(id)) { + return null; + } + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + sourceBuilder.query(QueryBuilders.idsQuery().addIds(id)); + sourceBuilder.size(1); + + final SearchResponse response = getClient().search(ProfileTaskNoneStream.INDEX_NAME, sourceBuilder); + + if (response.getHits().getHits().length > 0) { + return parseTask(response.getHits().getHits()[0]); + } + + return null; + } + private ProfileTask parseTask(SearchHit data) { return ProfileTask.builder() .id(data.getId()) diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java index a526b49ea4..248ca70c4f 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java @@ -22,6 +22,7 @@ import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.profile.ProfileTaskNoneStream; import org.apache.skywalking.oap.server.core.query.entity.ProfileTask; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; import java.io.IOException; @@ -87,6 +88,29 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO { } } + @Override + public ProfileTask getById(String id) throws IOException { + if (StringUtil.isEmpty(id)) { + return null; + } + + final StringBuilder sql = new StringBuilder(); + final ArrayList condition = new ArrayList<>(1); + sql.append("select * from ").append(ProfileTaskNoneStream.INDEX_NAME).append(" where id=? LIMIT 1"); + condition.add(id); + + try (Connection connection = h2Client.getConnection()) { + try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) { + if (resultSet.next()) { + return parseTask(resultSet); + } + } + } catch (SQLException | JDBCClientException e) { + throw new IOException(e); + } + return null; + } + /** * parse profile task data * @param data diff --git a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java index 528c49090a..b998f4ca79 100644 --- a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java +++ b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java @@ -91,13 +91,7 @@ public class ProfileVerificationITCase { while (true) { try { - final Map user = new HashMap<>(); - user.put("name", "SkyWalking"); - final ResponseEntity responseEntity = restTemplate.postForEntity( - instrumentedServiceUrl + "/e2e/users", - user, - String.class - ); + final ResponseEntity responseEntity = sendRequest(false); LOGGER.info("responseEntity: {}", responseEntity); assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK); final List traces = profileClient.traces( @@ -123,6 +117,17 @@ public class ProfileVerificationITCase { } + private ResponseEntity sendRequest(boolean needProfiling) { + final Map user = new HashMap<>(); + user.put("name", "SkyWalking"); + user.put("enableProfiling", String.valueOf(needProfiling)); + return restTemplate.postForEntity( + instrumentedServiceUrl + "/e2e/users", + user, + String.class + ); + } + /** * verify create profile task * @param minutesAgo @@ -134,10 +139,10 @@ public class ProfileVerificationITCase { final ProfileTaskCreationRequest creationRequest = ProfileTaskCreationRequest.builder() .serviceId(2) .endpointName("/e2e/users") - .duration(5) + .duration(1) .startTime(-1) - .minDurationThreshold(10) - .dumpPeriod(10) + .minDurationThreshold(1000) + .dumpPeriod(50) .maxSamplingCount(5).build(); // verify create task @@ -147,18 +152,29 @@ public class ProfileVerificationITCase { ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher(); creationResultMatcher.verify(creationResult); + // verify get task list and sniffer get task logs + verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml"); + + // send a profile request + sendRequest(true); + + // verify task execution finish + verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml"); + } + + private void verifyProfileTask(int serviceId, String verifyResources) throws InterruptedException { // verify get task list and logs for (int i = 0; i < 10; i++) { try { final ProfileTasks tasks = profileClient.getProfileTaskList( new ProfileTaskQuery() - .serviceId(creationRequest.getServiceId()) + .serviceId(serviceId) .endpointName("") ); LOGGER.info("get profile task list: {}", tasks); InputStream expectedInputStream = - new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream(); + new ClassPathResource(verifyResources).getInputStream(); final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class); servicesMatcher.verify(tasks); diff --git a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java index cab6dfff97..2289ae80ac 100644 --- a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java +++ b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java @@ -91,13 +91,7 @@ public class ProfileVerificationITCase { while (true) { try { - final Map user = new HashMap<>(); - user.put("name", "SkyWalking"); - final ResponseEntity responseEntity = restTemplate.postForEntity( - instrumentedServiceUrl + "/e2e/users", - user, - String.class - ); + final ResponseEntity responseEntity = sendRequest(false); LOGGER.info("responseEntity: {}", responseEntity); assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK); final List traces = profileClient.traces( @@ -122,6 +116,17 @@ public class ProfileVerificationITCase { } + private ResponseEntity sendRequest(boolean needProfiling) { + final Map user = new HashMap<>(); + user.put("name", "SkyWalking"); + user.put("enableProfiling", String.valueOf(needProfiling)); + return restTemplate.postForEntity( + instrumentedServiceUrl + "/e2e/users", + user, + String.class + ); + } + /** * verify create profile task * @param minutesAgo @@ -133,10 +138,10 @@ public class ProfileVerificationITCase { final ProfileTaskCreationRequest creationRequest = ProfileTaskCreationRequest.builder() .serviceId(2) .endpointName("/e2e/users") - .duration(5) + .duration(1) .startTime(-1) - .minDurationThreshold(10) - .dumpPeriod(10) + .minDurationThreshold(1000) + .dumpPeriod(50) .maxSamplingCount(5).build(); // verify create task @@ -146,18 +151,29 @@ public class ProfileVerificationITCase { ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher(); creationResultMatcher.verify(creationResult); + // verify get task list and sniffer get task logs + verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml"); + + // send a profile request + sendRequest(true); + + // verify task execution finish + verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml"); + } + + private void verifyProfileTask(int serviceId, String verifyResources) throws InterruptedException { // verify get task list and logs for (int i = 0; i < 10; i++) { try { final ProfileTasks tasks = profileClient.getProfileTaskList( new ProfileTaskQuery() - .serviceId(creationRequest.getServiceId()) + .serviceId(serviceId) .endpointName("") ); LOGGER.info("get profile task list: {}", tasks); InputStream expectedInputStream = - new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream(); + new ClassPathResource(verifyResources).getInputStream(); final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class); servicesMatcher.verify(tasks); diff --git a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java index cab6dfff97..2289ae80ac 100644 --- a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java +++ b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java @@ -91,13 +91,7 @@ public class ProfileVerificationITCase { while (true) { try { - final Map user = new HashMap<>(); - user.put("name", "SkyWalking"); - final ResponseEntity responseEntity = restTemplate.postForEntity( - instrumentedServiceUrl + "/e2e/users", - user, - String.class - ); + final ResponseEntity responseEntity = sendRequest(false); LOGGER.info("responseEntity: {}", responseEntity); assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK); final List traces = profileClient.traces( @@ -122,6 +116,17 @@ public class ProfileVerificationITCase { } + private ResponseEntity sendRequest(boolean needProfiling) { + final Map user = new HashMap<>(); + user.put("name", "SkyWalking"); + user.put("enableProfiling", String.valueOf(needProfiling)); + return restTemplate.postForEntity( + instrumentedServiceUrl + "/e2e/users", + user, + String.class + ); + } + /** * verify create profile task * @param minutesAgo @@ -133,10 +138,10 @@ public class ProfileVerificationITCase { final ProfileTaskCreationRequest creationRequest = ProfileTaskCreationRequest.builder() .serviceId(2) .endpointName("/e2e/users") - .duration(5) + .duration(1) .startTime(-1) - .minDurationThreshold(10) - .dumpPeriod(10) + .minDurationThreshold(1000) + .dumpPeriod(50) .maxSamplingCount(5).build(); // verify create task @@ -146,18 +151,29 @@ public class ProfileVerificationITCase { ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher(); creationResultMatcher.verify(creationResult); + // verify get task list and sniffer get task logs + verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml"); + + // send a profile request + sendRequest(true); + + // verify task execution finish + verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml"); + } + + private void verifyProfileTask(int serviceId, String verifyResources) throws InterruptedException { // verify get task list and logs for (int i = 0; i < 10; i++) { try { final ProfileTasks tasks = profileClient.getProfileTaskList( new ProfileTaskQuery() - .serviceId(creationRequest.getServiceId()) + .serviceId(serviceId) .endpointName("") ); LOGGER.info("get profile task list: {}", tasks); InputStream expectedInputStream = - new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream(); + new ClassPathResource(verifyResources).getInputStream(); final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class); servicesMatcher.verify(tasks); diff --git a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java new file mode 100644 index 0000000000..158f0134a1 --- /dev/null +++ b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.e2e.profile; + +/** + * @author MrPro + */ +public class CreateUser { + + private String name; + + private boolean enableProfiling; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean getEnableProfiling() { + return enableProfiling; + } + + public void setEnableProfiling(boolean enableProfiling) { + this.enableProfiling = enableProfiling; + } + + public User toUser() { + final User user = new User(); + user.setName(name); + return user; + } +} diff --git a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java index c300fc35c9..7cb6fac460 100644 --- a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java +++ b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java @@ -20,6 +20,8 @@ package org.apache.skywalking.e2e.profile; import org.springframework.web.bind.annotation.*; +import java.util.concurrent.TimeUnit; + /** * @author MrPro */ @@ -38,8 +40,14 @@ public class TestController { } @PostMapping("/users") - public User createAuthor(@RequestBody final User user) throws InterruptedException { - Thread.sleep(1000L); - return userRepo.save(user); + public User createAuthor(@RequestBody final CreateUser createUser) throws InterruptedException { + final User user = userRepo.save(createUser.toUser()); + if (!createUser.getEnableProfiling()) { + return user; + } else { + // sleep 10 second + TimeUnit.SECONDS.sleep(10); + return user; + } } } diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml new file mode 100644 index 0000000000..03274e3ff2 --- /dev/null +++ b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +tasks: + - id: not null + serviceId: gt 0 + endpointName: not null + startTime: gt 0 + duration: gt 0 + minDurationThreshold: gt 0 + dumpPeriod: gt 0 + maxSamplingCount: gt 0 + logs: + - id: not null + instanceId: gt 0 + operationType: EXECUTION_FINISHED + operationTime: gt 0 + - id: not null + instanceId: gt 0 + operationType: NOTIFIED + operationTime: gt 0 + diff --git a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml similarity index 100% rename from test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml rename to test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml -- GitLab