提交 90f3c4de 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: kezhenxu94

Sniffer processing profile task and report status and snapshot (#4220)

* sniffer processing profile task and report status and snapshot

* resolve testServiceDependencies test case error, use same register with `TraceSegmentServiceClient`

* resolve names

* change profile to single one thread run.

* 1. change to the ArrayList, because known the max size
2. rename issue resolved

* add profiling status enum

* change sniffer use full name issue

* 1. remove `prepareProfiling` method, build profiling status when construct `TracingContext`
2. add `TracingThreadListenerManager`, notify when tracing main thread finish
3. change ProfileThread start when process new profile task

* remove unnecessary getter

* add test assert error message

* adding `AgentServiceRule`

* revert original assert

* remove unnecessary getter

* resolve issues

* reduce findService invoke

* resolve style error

* recheck profiling when change first span operatin name

* resolve issues
1. remove `ContextManager#profilingRecheck`, only check on `TracingContext`
2. rename comments
3. resolve volatile array setting

* remove article link

* add `ProfileTask#maxSamplingCount` check

* resolve conflict (Downstream -> Commands)

* 1. change profilingSegmentSlots init on construct
2. if is profiling, recheck dont need to stop
3. total profiling count increment on first dump

* remove unused return val

* remove some `@param` and `@return`

* add profile task check result data bean

* change profiler slot to `AtomicReferenceArray`

* resolved java doc error

* fix doc error, remove meaningless descriptions

* resolve missed profile receiver on oap starter

* resolve method invoke error
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
Co-authored-by: Nkezhenxu94 <kezhenxu94@163.com>
上级 019af3e3
......@@ -27,10 +27,11 @@ import java.util.List;
* @author MrPro
*/
public class ProfileTaskCommand extends BaseCommand implements Serializable, Deserializable<ProfileTaskCommand> {
public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new ProfileTaskCommand("", "", 0, 0, 0, 0, 0, 0);
public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new ProfileTaskCommand("", "", "", 0, 0, 0, 0, 0, 0);
public static final String NAME = "ProfileTaskQuery";
// profile task data
private String taskId;
private String endpointName;
private int duration;
private int minDurationThreshold;
......@@ -39,8 +40,9 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des
private long startTime;
private long createTime;
public ProfileTaskCommand(String serialNumber, String endpointName, int duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long startTime, long createTime) {
public ProfileTaskCommand(String serialNumber, String taskId, String endpointName, int duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long startTime, long createTime) {
super(NAME, serialNumber);
this.taskId = taskId;
this.endpointName = endpointName;
this.duration = duration;
this.minDurationThreshold = minDurationThreshold;
......@@ -54,6 +56,7 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des
public ProfileTaskCommand deserialize(Command command) {
final List<KeyStringValuePair> argsList = command.getArgsList();
String serialNumber = null;
String taskId = null;
String endpointName = null;
int duration = 0;
int minDurationThreshold = 0;
......@@ -67,6 +70,8 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des
serialNumber = pair.getValue();
} else if ("EndpointName".equals(pair.getKey())) {
endpointName = pair.getValue();
} else if ("TaskId".equals(pair.getKey())) {
taskId = pair.getValue();
} else if ("Duration".equals(pair.getKey())) {
duration = Integer.parseInt(pair.getValue());
} else if ("MinDurationThreshold".equals(pair.getKey())) {
......@@ -82,13 +87,14 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des
}
}
return new ProfileTaskCommand(serialNumber, endpointName, duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime);
return new ProfileTaskCommand(serialNumber, taskId, endpointName, duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime);
}
@Override
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
builder.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
.addArgs(KeyStringValuePair.newBuilder().setKey("MinDurationThreshold").setValue(String.valueOf(minDurationThreshold)))
.addArgs(KeyStringValuePair.newBuilder().setKey("DumpPeriod").setValue(String.valueOf(dumpPeriod)))
......@@ -125,4 +131,8 @@ public class ProfileTaskCommand extends BaseCommand implements Serializable, Des
public long getCreateTime() {
return createTime;
}
public String getTaskId() {
return taskId;
}
}
......@@ -39,7 +39,8 @@ public class ProfileTaskCommandExecutor implements CommandExecutor {
// build profile task
final ProfileTask profileTask = new ProfileTask();
profileTask.setEndpointName(profileTaskCommand.getEndpointName());
profileTask.setTaskId(profileTaskCommand.getTaskId());
profileTask.setFistSpanOPName(profileTaskCommand.getEndpointName());
profileTask.setDuration(profileTaskCommand.getDuration());
profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold());
profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod());
......
......@@ -138,6 +138,26 @@ public class Config {
* If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
*/
public static boolean ACTIVE = true;
/**
* Parallel monitor segment count
*/
public static int MAX_PARALLEL = 5;
/**
* Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it.
*/
public static int MAX_DURATION = 10;
/**
* Max dump thread stack depth
*/
public static int DUMP_MAX_STACK_DEPTH = 500;
/**
* Snapshot transport to backend buffer size
*/
public static int SNAPSHOT_TRANSPORT_BUFFER_SIZE = 500;
}
public static class Jvm {
......
......@@ -115,4 +115,5 @@ public interface AbstractTracerContext {
* @param span to be stopped.
*/
void asyncStop(AsyncSpan span);
}
......@@ -51,7 +51,7 @@ public class ContextManagerExtendService implements BootService {
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
context = new TracingContext(operationName);
} else {
context = new IgnoredTracerContext();
}
......
......@@ -40,6 +40,7 @@ import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -61,10 +62,15 @@ public class TracingContext implements AbstractTracerContext {
private static final ILog logger = LogManager.getLogger(TracingContext.class);
private long lastWarningTimestamp = 0;
/**
* @see {@link ProfileTaskExecutionService}
*/
private static ProfileTaskExecutionService PROFILE_TASK_EXECUTION_SERVICE;
/**
* @see {@link SamplingService}
*/
private SamplingService samplingService;
private static SamplingService SAMPLING_SERVICE;
/**
* The final {@link TraceSegment}, which includes all finished spans.
......@@ -92,15 +98,32 @@ public class TracingContext implements AbstractTracerContext {
private volatile boolean running;
private final long createTime;
/**
* profiling status
*/
private volatile boolean profiling;
/**
* Initialize all fields with default value.
*/
TracingContext() {
TracingContext(String firstOPName) {
this.segment = new TraceSegment();
this.spanIdGenerator = 0;
samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
isRunningInAsyncMode = false;
createTime = System.currentTimeMillis();
running = true;
if (SAMPLING_SERVICE == null) {
SAMPLING_SERVICE = ServiceManager.INSTANCE.findService(SamplingService.class);
}
// profiling status
if (PROFILE_TASK_EXECUTION_SERVICE == null) {
PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
}
this.profiling = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(this, segment.getTraceSegmentId(), firstOPName);
}
/**
......@@ -308,6 +331,7 @@ public class TracingContext implements AbstractTracerContext {
return push(span);
}
AbstractSpan entrySpan;
TracingContext owner = this;
final AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
if (parentSpan != null && parentSpan.isEntry()) {
......@@ -328,11 +352,11 @@ public class TracingContext implements AbstractTracerContext {
.findOnly(segment.getServiceId(), operationName)
.doInCondition(new PossibleFound.FoundAndObtain() {
@Override public Object doProcess(int operationId) {
return new EntrySpan(spanIdGenerator++, parentSpanId, operationId);
return new EntrySpan(spanIdGenerator++, parentSpanId, operationId, owner);
}
}, new PossibleFound.NotFoundAndObtain() {
@Override public Object doProcess() {
return new EntrySpan(spanIdGenerator++, parentSpanId, operationName);
return new EntrySpan(spanIdGenerator++, parentSpanId, operationName, owner);
}
});
entrySpan.start();
......@@ -358,7 +382,7 @@ public class TracingContext implements AbstractTracerContext {
* From v6.0.0-beta, local span doesn't do op name register.
* All op name register is related to entry and exit spans only.
*/
AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName);
AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, parentSpanId, operationName, this);
span.start();
return push(span);
}
......@@ -380,6 +404,7 @@ public class TracingContext implements AbstractTracerContext {
AbstractSpan exitSpan;
AbstractSpan parentSpan = peek();
TracingContext owner = this;
if (parentSpan != null && parentSpan.isExit()) {
exitSpan = parentSpan;
} else {
......@@ -389,13 +414,13 @@ public class TracingContext implements AbstractTracerContext {
new PossibleFound.FoundAndObtain() {
@Override
public Object doProcess(final int peerId) {
return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, peerId);
return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, peerId, owner);
}
},
new PossibleFound.NotFoundAndObtain() {
@Override
public Object doProcess() {
return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer);
return new ExitSpan(spanIdGenerator++, parentSpanId, operationName, remotePeer, owner);
}
});
push(exitSpan);
......@@ -462,16 +487,39 @@ public class TracingContext implements AbstractTracerContext {
finish();
}
/**
* Re-check current trace need profiling, encase third part plugin change the operation name.
*
* @param span current modify span
* @param operationName change to operation name
*/
public void profilingRecheck(AbstractSpan span, String operationName) {
// only recheck first span
if (span.getSpanId() != 0) {
return;
}
profiling = PROFILE_TASK_EXECUTION_SERVICE.profilingRecheck(this, segment.getTraceSegmentId(), operationName);
}
/**
* Finish this context, and notify all {@link TracingContextListener}s, managed by {@link
* TracingContext.ListenerManager}
* TracingContext.ListenerManager} and {@link TracingContext.TracingThreadListenerManager}
*/
private void finish() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) {
boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
if (isFinishedInMainThread) {
/**
* Notify after tracing finished in the main thread.
*/
TracingThreadListenerManager.notifyFinish(this);
}
if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) {
TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
/*
* Recheck the segment if the segment contains only one span.
......@@ -480,7 +528,7 @@ public class TracingContext implements AbstractTracerContext {
* @see {@link #createSpan(String, long, boolean)}
*/
if (!segment.hasRef() && segment.isSingleSpanSegment()) {
if (!samplingService.trySampling()) {
if (!SAMPLING_SERVICE.trySampling()) {
finishedSegment.setIgnore(true);
}
}
......@@ -543,6 +591,27 @@ public class TracingContext implements AbstractTracerContext {
}
/**
* The <code>ListenerManager</code> represents an event notify for every registered listener, which are notified
*/
public static class TracingThreadListenerManager {
private static List<TracingThreadListener> LISTENERS = new LinkedList<>();
public static synchronized void add(TracingThreadListener listener) {
LISTENERS.add(listener);
}
static void notifyFinish(TracingContext finishedContext) {
for (TracingThreadListener listener : LISTENERS) {
listener.afterMainThreadFinish(finishedContext);
}
}
public static synchronized void remove(TracingThreadListener listener) {
LISTENERS.remove(listener);
}
}
/**
* @return the top element of 'ActiveSpanStack', and remove it.
*/
......@@ -587,4 +656,13 @@ public class TracingContext implements AbstractTracerContext {
return false;
}
}
public long createTime() {
return this.createTime;
}
public boolean isProfiling() {
return this.profiling;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.context;
/**
* @author MrPro
*/
public interface TracingThreadListener {
void afterMainThreadFinish(TracingContext tracingContext);
}
......@@ -48,7 +48,11 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
* The flag represents whether the span has been async stopped
*/
private volatile boolean isAsyncStopped = false;
protected volatile AbstractTracerContext context;
/**
* The context to which the span belongs
*/
protected final TracingContext owner;
/**
* The start time of this Span.
......@@ -79,18 +83,20 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
*/
protected List<TraceSegmentRef> refs;
protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName) {
protected AbstractTracingSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
this.operationName = operationName;
this.operationId = DictionaryUtil.nullValue();
this.spanId = spanId;
this.parentSpanId = parentSpanId;
this.owner = owner;
}
protected AbstractTracingSpan(int spanId, int parentSpanId, int operationId) {
protected AbstractTracingSpan(int spanId, int parentSpanId, int operationId, TracingContext owner) {
this.operationName = null;
this.operationId = operationId;
this.spanId = spanId;
this.parentSpanId = parentSpanId;
this.owner = owner;
}
/**
......@@ -203,6 +209,9 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
public AbstractTracingSpan setOperationName(String operationName) {
this.operationName = operationName;
this.operationId = DictionaryUtil.nullValue();
// recheck profiling status
owner.profilingRecheck(this, operationName);
return this;
}
......@@ -332,7 +341,7 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
if (isInAsyncMode) {
throw new RuntimeException("Prepare for async repeatedly. Span is already in async mode.");
}
context = ContextManager.awaitFinishAsync(this);
ContextManager.awaitFinishAsync(this);
isInAsyncMode = true;
return this;
}
......@@ -345,7 +354,7 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
throw new RuntimeException("Can not do async finish for the span repeately.");
}
this.endTime = System.currentTimeMillis();
context.asyncStop(this);
owner.asyncStop(this);
isAsyncStopped = true;
return this;
}
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.network.trace.component.Component;
......@@ -37,13 +38,13 @@ public class EntrySpan extends StackBasedTracingSpan {
private int currentMaxDepth;
public EntrySpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
public EntrySpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
this.currentMaxDepth = 0;
}
public EntrySpan(int spanId, int parentSpanId, int operationId) {
super(spanId, parentSpanId, operationId);
public EntrySpan(int spanId, int parentSpanId, int operationId, TracingContext owner) {
super(spanId, parentSpanId, operationId, owner);
this.currentMaxDepth = 0;
}
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.network.trace.component.Component;
......@@ -37,20 +38,20 @@ import org.apache.skywalking.apm.network.trace.component.Component;
*/
public class ExitSpan extends StackBasedTracingSpan implements WithPeerInfo {
public ExitSpan(int spanId, int parentSpanId, String operationName, String peer) {
super(spanId, parentSpanId, operationName, peer);
public ExitSpan(int spanId, int parentSpanId, String operationName, String peer, TracingContext owner) {
super(spanId, parentSpanId, operationName, peer, owner);
}
public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId) {
super(spanId, parentSpanId, operationId, peerId);
public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId, TracingContext owner) {
super(spanId, parentSpanId, operationId, peerId, owner);
}
public ExitSpan(int spanId, int parentSpanId, int operationId, String peer) {
super(spanId, parentSpanId, operationId, peer);
public ExitSpan(int spanId, int parentSpanId, int operationId, String peer, TracingContext owner) {
super(spanId, parentSpanId, operationId, peer, owner);
}
public ExitSpan(int spanId, int parentSpanId, String operationName, int peerId) {
super(spanId, parentSpanId, operationName, peerId);
public ExitSpan(int spanId, int parentSpanId, String operationName, int peerId, TracingContext owner) {
super(spanId, parentSpanId, operationName, peerId, owner);
}
/**
......
......@@ -19,6 +19,8 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
/**
* The <code>LocalSpan</code> represents a normal tracing point, such as a local method.
*
......@@ -26,12 +28,12 @@ package org.apache.skywalking.apm.agent.core.context.trace;
*/
public class LocalSpan extends AbstractTracingSpan {
public LocalSpan(int spanId, int parentSpanId, int operationId) {
super(spanId, parentSpanId, operationId);
public LocalSpan(int spanId, int parentSpanId, int operationId, TracingContext owner) {
super(spanId, parentSpanId, operationId, owner);
}
public LocalSpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
public LocalSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
}
@Override
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
......@@ -35,40 +36,40 @@ public abstract class StackBasedTracingSpan extends AbstractTracingSpan {
protected String peer;
protected int peerId;
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName) {
super(spanId, parentSpanId, operationName);
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
this.stackDepth = 0;
this.peer = null;
this.peerId = DictionaryUtil.nullValue();
}
protected StackBasedTracingSpan(int spanId, int parentSpanId, int operationId) {
super(spanId, parentSpanId, operationId);
protected StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, TracingContext owner) {
super(spanId, parentSpanId, operationId, owner);
this.stackDepth = 0;
this.peer = null;
this.peerId = DictionaryUtil.nullValue();
}
public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, int peerId) {
super(spanId, parentSpanId, operationId);
public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, int peerId, TracingContext owner) {
super(spanId, parentSpanId, operationId, owner);
this.peer = null;
this.peerId = peerId;
}
public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, String peer) {
super(spanId, parentSpanId, operationId);
public StackBasedTracingSpan(int spanId, int parentSpanId, int operationId, String peer, TracingContext owner) {
super(spanId, parentSpanId, operationId, owner);
this.peer = peer;
this.peerId = DictionaryUtil.nullValue();
}
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, String peer) {
super(spanId, parentSpanId, operationName);
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, String peer, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
this.peer = peer;
this.peerId = DictionaryUtil.nullValue();
}
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, int peerId) {
super(spanId, parentSpanId, operationName);
protected StackBasedTracingSpan(int spanId, int parentSpanId, String operationName, int peerId, TracingContext owner) {
super(spanId, parentSpanId, operationName, owner);
this.peer = null;
this.peerId = peerId;
}
......
......@@ -27,8 +27,11 @@ import java.util.Objects;
*/
public class ProfileTask {
// monitor endpoint name
private String endpointName;
// task id
private String taskId;
// monitor first span operation name
private String fistSpanOPName;
// task duration (minute)
private int duration;
......@@ -48,12 +51,12 @@ public class ProfileTask {
// task create time
private long createTime;
public String getEndpointName() {
return endpointName;
public String getFistSpanOPName() {
return fistSpanOPName;
}
public void setEndpointName(String endpointName) {
this.endpointName = endpointName;
public void setFistSpanOPName(String fistSpanOPName) {
this.fistSpanOPName = fistSpanOPName;
}
public int getDuration() {
......@@ -104,6 +107,14 @@ public class ProfileTask {
this.createTime = createTime;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
......@@ -115,11 +126,12 @@ public class ProfileTask {
maxSamplingCount == that.maxSamplingCount &&
startTime == that.startTime &&
createTime == that.createTime &&
endpointName.equals(that.endpointName);
taskId.equals(that.taskId) &&
fistSpanOPName.equals(that.fistSpanOPName);
}
@Override
public int hashCode() {
return Objects.hash(endpointName, duration, minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, createTime);
return Objects.hash(taskId, fistSpanOPName, duration, minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, createTime);
}
}
......@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.agent.core.profile;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
......@@ -31,30 +32,46 @@ import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.*;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
/**
* sniffer will check has new profile task list every {@link Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
* Sniffer and backend, about the communication service of profile task protocol.
* 1. Sniffer will check has new profile task list every {@link Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
* 2. When there is a new profile task snapshot, the data is transferred to the back end. use {@link LinkedBlockingQueue}
* 3. When profiling task finish, it will send task finish status to backend
*
* @author MrPro
*/
@DefaultImplementor
public class ProfileTaskQueryService implements BootService, Runnable, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(ProfileTaskQueryService.class);
public class ProfileTaskChannelService implements BootService, Runnable, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(ProfileTaskChannelService.class);
// channel status
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
// gRPC stub
private volatile ProfileTaskGrpc.ProfileTaskBlockingStub profileTaskBlockingStub;
private volatile ProfileTaskGrpc.ProfileTaskStub profileTaskStub;
// segment snapshot sender
private final LinkedBlockingQueue<TracingThreadSnapshot> snapshotQueue = new LinkedBlockingQueue<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
private volatile ScheduledFuture<?> sendSnapshotFuture;
// query task list schedule
private volatile ScheduledFuture<?> getTaskListFuture;
@Override
......@@ -76,7 +93,7 @@ public class ProfileTaskQueryService implements BootService, Runnable, GRPCChann
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
} catch (Throwable t) {
if (!(t instanceof StatusRuntimeException)) {
logger.error(t, "query profile task from Collector fail.", t);
logger.error(t, "Query profile task from backend fail.");
return;
}
final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t;
......@@ -85,6 +102,11 @@ public class ProfileTaskQueryService implements BootService, Runnable, GRPCChann
if (getTaskListFuture != null) {
getTaskListFuture.cancel(true);
}
// stop snapshot sender
if (sendSnapshotFuture != null) {
sendSnapshotFuture.cancel(true);
}
}
}
}
......@@ -100,8 +122,21 @@ public class ProfileTaskQueryService implements BootService, Runnable, GRPCChann
@Override
public void boot() throws Throwable {
if (Config.Profile.ACTIVE) {
// query task list
getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileGetTaskService"))
.scheduleWithFixedDelay(this, 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS);
.scheduleWithFixedDelay(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("Query profile task list failure.", t);
}
}), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS);
sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileSendSnapshotService"))
.scheduleWithFixedDelay(new RunnableWithExceptionProtection(new SnapshotSender(), new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("Profile segment snapshot upload failure.", t);
}
}), 0, 500, TimeUnit.MILLISECONDS);
}
}
......@@ -114,6 +149,10 @@ public class ProfileTaskQueryService implements BootService, Runnable, GRPCChann
if (getTaskListFuture != null) {
getTaskListFuture.cancel(true);
}
if (sendSnapshotFuture != null) {
sendSnapshotFuture.cancel(true);
}
}
@Override
......@@ -121,9 +160,84 @@ public class ProfileTaskQueryService implements BootService, Runnable, GRPCChann
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
profileTaskStub = ProfileTaskGrpc.newStub(channel);
} else {
profileTaskBlockingStub = null;
profileTaskStub = null;
}
this.status = status;
}
/**
* add a new profiling snapshot, send to {@link #snapshotQueue}
*/
public void addProfilingSnapshot(TracingThreadSnapshot snapshot) {
snapshotQueue.add(snapshot);
}
/**
* notify backend profile task has finish
*/
public void notifyProfileTaskFinish(ProfileTask task) {
try {
final ProfileTaskFinishReport.Builder reportBuilder = ProfileTaskFinishReport.newBuilder();
// sniffer info
reportBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
// task info
reportBuilder.setTaskId(task.getTaskId());
// send data
profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).reportTaskFinish(reportBuilder.build());
} catch (Throwable e) {
logger.error(e, "Notify profile task finish to backend fail.");
}
}
/**
* send segment snapshot
*/
private class SnapshotSender implements Runnable {
@Override
public void run() {
if (status == GRPCChannelStatus.CONNECTED) {
try {
ArrayList<TracingThreadSnapshot> buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
snapshotQueue.drainTo(buffer);
if (buffer.size() > 0) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<ThreadSnapshot> snapshotStreamObserver = profileTaskStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collectSnapshot(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send profile segment snapshot to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
for (TracingThreadSnapshot snapshot : buffer) {
final ThreadSnapshot transformSnapshot = snapshot.transform();
snapshotStreamObserver.onNext(transformSnapshot);
}
snapshotStreamObserver.onCompleted();
status.wait4Finish();
}
} catch (Throwable t) {
logger.error(t, "Send profile segment snapshot to backend fail.");
}
}
}
}
}
......@@ -18,7 +18,15 @@
package org.apache.skywalking.apm.agent.core.profile;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.ids.ID;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* profile task execution context, it will create on process this profile task
......@@ -30,20 +38,119 @@ public class ProfileTaskExecutionContext {
// task data
private final ProfileTask task;
// task real start time
private final long startTime;
// record current profiling count, use this to check has available profile slot
private final AtomicInteger currentProfilingCount = new AtomicInteger(0);
// profiling segment slot
private volatile AtomicReferenceArray<ThreadProfiler> profilingSegmentSlots;
// current profiling execution future
private volatile Future profilingFuture;
public ProfileTaskExecutionContext(ProfileTask task, long startTime) {
// total started profiling tracing context count
private final AtomicInteger totalStartedProfilingCount = new AtomicInteger(0);
public ProfileTaskExecutionContext(ProfileTask task) {
this.task = task;
this.startTime = startTime;
profilingSegmentSlots = new AtomicReferenceArray<>(Config.Profile.MAX_PARALLEL);
}
/**
* start profiling this task
*/
public void startProfiling(ExecutorService executorService) {
profilingFuture = executorService.submit(new ProfileThread(this));
}
/**
* stop profiling
*/
public void stopProfiling() {
if (profilingFuture != null) {
profilingFuture.cancel(true);
}
}
/**
* check have available slot to profile and add it
*
* @return is add profile success
*/
public boolean attemptProfiling(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) {
// check has available slot
final int usingSlotCount = currentProfilingCount.get();
if (usingSlotCount >= Config.Profile.MAX_PARALLEL) {
return false;
}
// check first operation name matches
if (!Objects.equals(task.getFistSpanOPName(), firstSpanOPName)) {
return false;
}
// if out limit started profiling count then stop add profiling
if (totalStartedProfilingCount.get() > task.getMaxSamplingCount()) {
return false;
}
// try to occupy slot
if (!currentProfilingCount.compareAndSet(usingSlotCount, usingSlotCount + 1)) {
return false;
}
final ThreadProfiler threadProfiler = new ThreadProfiler(tracingContext, traceSegmentId, Thread.currentThread(), this);
int slotLength = profilingSegmentSlots.length();
for (int slot = 0; slot < slotLength; slot++) {
if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) {
break;
}
}
return true;
}
/**
* profiling recheck
*/
public boolean profilingRecheck(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) {
// if started, keep profiling
if (tracingContext.isProfiling()) {
return true;
}
return attemptProfiling(tracingContext, traceSegmentId, firstSpanOPName);
}
/**
* find tracing context and clear on slot
*/
public void stopTracingProfile(TracingContext tracingContext) {
// find current tracingContext and clear it
int slotLength = profilingSegmentSlots.length();
for (int slot = 0; slot < slotLength; slot++) {
ThreadProfiler currentProfiler = profilingSegmentSlots.get(slot);
if (currentProfiler != null && currentProfiler.matches(tracingContext)) {
profilingSegmentSlots.set(slot, null);
// setting stop running
currentProfiler.stopProfiling();
currentProfilingCount.addAndGet(-1);
break;
}
}
}
public ProfileTask getTask() {
return task;
}
public long getStartTime() {
return startTime;
public AtomicReferenceArray<ThreadProfiler> threadProfilerSlots() {
return profilingSegmentSlots;
}
public boolean isStartProfileable() {
// check is out of max sampling count check
return totalStartedProfilingCount.incrementAndGet() > task.getMaxSamplingCount();
}
@Override
......
......@@ -21,6 +21,10 @@ package org.apache.skywalking.apm.agent.core.profile;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingThreadListener;
import org.apache.skywalking.apm.agent.core.context.ids.ID;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.constants.ProfileConstants;
......@@ -38,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
* @author MrPro
*/
@DefaultImplementor
public class ProfileTaskExecutionService implements BootService {
public class ProfileTaskExecutionService implements BootService, TracingThreadListener {
private static final ILog logger = LogManager.getLogger(ProfileTaskExecutionService.class);
......@@ -51,12 +55,14 @@ public class ProfileTaskExecutionService implements BootService {
// current processing profile task context
private final AtomicReference<ProfileTaskExecutionContext> taskExecutionContext = new AtomicReference<>();
// profile executor thread pool, only running one thread
private final static ExecutorService PROFILE_EXECUTOR = Executors.newSingleThreadExecutor(new DefaultNamedThreadFactory("PROFILING-TASK"));
// profile task list, include running and waiting running tasks
private final List<ProfileTask> profileTaskList = Collections.synchronizedList(new LinkedList<>());
/**
* get profile task from OAP
* @param task
* add profile task from OAP
*/
public void addProfileTask(ProfileTask task) {
// update last command create time
......@@ -65,9 +71,9 @@ public class ProfileTaskExecutionService implements BootService {
}
// check profile task limit
final String dataError = checkProfileTaskSuccess(task);
if (dataError != null) {
logger.warn("check command error, cannot process this profile task. reason: {}", dataError);
final CheckResult dataError = checkProfileTaskSuccess(task);
if (!dataError.isSuccess()) {
logger.warn("check command error, cannot process this profile task. reason: {}", dataError.getErrorReason());
return;
}
......@@ -84,19 +90,46 @@ public class ProfileTaskExecutionService implements BootService {
}, timeToProcessMills, TimeUnit.MILLISECONDS);
}
/**
* check and add {@link TracingContext} profiling
*/
public boolean addProfiling(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) {
// get current profiling task, check need profiling
final ProfileTaskExecutionContext executionContext = taskExecutionContext.get();
if (executionContext == null) {
return false;
}
return executionContext.attemptProfiling(tracingContext, traceSegmentId, firstSpanOPName);
}
/**
* Re-check current trace need profiling, in case that third-party plugins change the operation name.
*/
public boolean profilingRecheck(TracingContext tracingContext, ID traceSegmentId, String firstSpanOPName) {
// get current profiling task, check need profiling
final ProfileTaskExecutionContext executionContext = taskExecutionContext.get();
if (executionContext == null) {
return false;
}
return executionContext.profilingRecheck(tracingContext, traceSegmentId, firstSpanOPName);
}
/**
* active the selected profile task to execution task, and start a removal task for it.
* @param task
*/
private synchronized void processProfileTask(ProfileTask task) {
// make sure prev profile task already stopped
stopCurrentProfileTask(taskExecutionContext.get());
// make stop task schedule and task context
// TODO process task on next step
final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task, System.currentTimeMillis());
final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task);
taskExecutionContext.set(currentStartedTaskContext);
// start profiling this task
currentStartedTaskContext.startProfiling(PROFILE_EXECUTOR);
PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
@Override
public void run() {
......@@ -108,36 +141,44 @@ public class ProfileTaskExecutionService implements BootService {
/**
* stop profile task, remove context data
*/
private synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) {
synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) {
// stop same context only
if (needToStop == null || !taskExecutionContext.compareAndSet(needToStop, null)) {
return;
}
// current execution stop running
needToStop.stopProfiling();
// remove task
profileTaskList.remove(needToStop.getTask());
// TODO notify OAP current profile task execute finish
// notify profiling task has finished
ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class).notifyProfileTaskFinish(needToStop.getTask());
}
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
}
@Override
public void onComplete() throws Throwable {
// add trace finish notification
TracingContext.TracingThreadListenerManager.add(this);
}
@Override
public void shutdown() throws Throwable {
// remove trace listener
TracingContext.TracingThreadListenerManager.remove(this);
PROFILE_TASK_SCHEDULE.shutdown();
PROFILE_EXECUTOR.shutdown();
}
public long getLastCommandCreateTime() {
......@@ -146,39 +187,37 @@ public class ProfileTaskExecutionService implements BootService {
/**
* check profile task data success, make the re-check, prevent receiving wrong data from database or OAP
* @param task
* @return
*/
private String checkProfileTaskSuccess(ProfileTask task) {
private CheckResult checkProfileTaskSuccess(ProfileTask task) {
// endpoint name
if (StringUtil.isEmpty(task.getEndpointName())) {
return "endpoint name cannot be empty";
if (StringUtil.isEmpty(task.getFistSpanOPName())) {
return new CheckResult(false, "endpoint name cannot be empty");
}
// duration
if (task.getDuration() < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
return "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
return new CheckResult(false, "monitor duration must greater than " + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes");
}
if (task.getDuration() > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
return "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
return new CheckResult(false, "The duration of the monitoring task cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes");
}
// min duration threshold
if (task.getMinDurationThreshold() < 0) {
return "min duration threshold must greater than or equals zero";
return new CheckResult(false, "min duration threshold must greater than or equals zero");
}
// dump period
if (task.getThreadDumpPeriod() < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
return "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
return new CheckResult(false, "dump period must be greater than or equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds");
}
// max sampling count
if (task.getMaxSamplingCount() <= 0) {
return "max sampling count must greater than zero";
return new CheckResult(false, "max sampling count must greater than zero");
}
if (task.getMaxSamplingCount() >= ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
return "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT;
return new CheckResult(false, "max sampling count must less than " + ProfileConstants.TASK_MAX_SAMPLING_COUNT);
}
// check task queue, check only one task in a certain time
......@@ -187,15 +226,46 @@ public class ProfileTaskExecutionService implements BootService {
// if the end time of the task to be added is during the execution of any data, means is a error data
if (taskProcessFinishTime >= profileTask.getStartTime() && taskProcessFinishTime <= calcProfileTaskFinishTime(profileTask)) {
return "there already have processing task in time range, could not add a new task again. processing task monitor endpoint name: " + profileTask.getEndpointName();
return new CheckResult(false, "there already have processing task in time range, could not add a new task again. processing task monitor endpoint name: " + profileTask.getFistSpanOPName());
}
}
return null;
return new CheckResult(true, null);
}
private long calcProfileTaskFinishTime(ProfileTask task) {
return task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration());
}
@Override
public void afterMainThreadFinish(TracingContext tracingContext) {
if (tracingContext.isProfiling()) {
// stop profiling tracing context
ProfileTaskExecutionContext currentExecutionContext = taskExecutionContext.get();
if (currentExecutionContext != null) {
currentExecutionContext.stopTracingProfile(tracingContext);
}
}
}
/**
* check profile task is processable
*/
private static class CheckResult {
private boolean success;
private String errorReason;
public CheckResult(boolean success, String errorReason) {
this.success = success;
this.errorReason = errorReason;
}
public boolean isSuccess() {
return success;
}
public String getErrorReason() {
return errorReason;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* Profile task process thread, dump the executing thread stack.
*
* @author MrPro
*/
public class ProfileThread implements Runnable {
private static final ILog logger = LogManager.getLogger(ProfileThread.class);
// profiling task context
private final ProfileTaskExecutionContext taskExecutionContext;
private final ProfileTaskExecutionService profileTaskExecutionService;
private final ProfileTaskChannelService profileTaskChannelService;
public ProfileThread(ProfileTaskExecutionContext taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
profileTaskExecutionService = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
profileTaskChannelService = ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class);
}
@Override
public void run() {
try {
profiling(taskExecutionContext);
} catch (InterruptedException e) {
// ignore interrupted
// means current task has stopped
} catch (Exception e) {
logger.error(e, "Profiling task fail. taskId:{}", taskExecutionContext.getTask().getTaskId());
} finally {
// finally stop current profiling task, tell execution service task has stop
profileTaskExecutionService.stopCurrentProfileTask(taskExecutionContext);
}
}
/**
* start profiling
*/
private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {
int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
// run loop when current thread still running
long currentLoopStartTime = -1;
while (!Thread.currentThread().isInterrupted()) {
currentLoopStartTime = System.currentTimeMillis();
// each all slot
AtomicReferenceArray<ThreadProfiler> profilers = executionContext.threadProfilerSlots();
int profilerCount = profilers.length();
for (int slot = 0; slot < profilerCount; slot++) {
ThreadProfiler currentProfiler = profilers.get(slot);
if (currentProfiler == null) {
continue;
}
switch (currentProfiler.profilingStatus()) {
case READY:
// check tracing context running time
currentProfiler.startProfilingIfNeed();
break;
case PROFILING:
// dump stack
TracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();
if (snapshot != null) {
profileTaskChannelService.addProfilingSnapshot(snapshot);
} else {
// tell execution context current tracing thread dump failed, stop it
executionContext.stopTracingProfile(currentProfiler.tracingContext());
}
break;
}
}
// sleep to next period
// if out of period, sleep one period
long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();
needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
Thread.sleep(needToSleep);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
/**
* @author MrPro
*/
public enum ProfilingStatus {
READY,
PROFILING,
STOPPED
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import com.google.common.base.Objects;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.ids.ID;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
/**
* @author MrPro
*/
public class ThreadProfiler {
// current tracing context
private final TracingContext tracingContext;
// current tracing segment id
private final ID traceSegmentId;
// need to profiling thread
private final Thread profilingThread;
// profiling execution context
private final ProfileTaskExecutionContext executionContext;
// profiling time
private long profilingStartTime;
private long profilingMaxTimeMills;
// after min duration threshold check, it will start dump
private ProfilingStatus profilingStatus = ProfilingStatus.READY;
// thread dump sequence
private int dumpSequence = 0;
public ThreadProfiler(TracingContext tracingContext, ID traceSegmentId, Thread profilingThread, ProfileTaskExecutionContext executionContext) {
this.tracingContext = tracingContext;
this.traceSegmentId = traceSegmentId;
this.profilingThread = profilingThread;
this.executionContext = executionContext;
this.profilingMaxTimeMills = TimeUnit.MINUTES.toMillis(Config.Profile.MAX_DURATION);
}
/**
* If tracing start time greater than {@link ProfileTask#getMinDurationThreshold()}, then start to profiling trace
*/
public void startProfilingIfNeed() {
if (System.currentTimeMillis() - tracingContext.createTime() > executionContext.getTask().getMinDurationThreshold()) {
this.profilingStartTime = System.currentTimeMillis();
this.profilingStatus = ProfilingStatus.PROFILING;
}
}
/**
* Stop profiling status
*/
public void stopProfiling() {
this.profilingStatus = ProfilingStatus.STOPPED;
}
/**
* dump tracing thread and build thread snapshot
*
* @return snapshot, if null means dump snapshot error, should stop it
*/
public TracingThreadSnapshot buildSnapshot() {
if (!isProfilingContinuable()) {
return null;
}
long currentTime = System.currentTimeMillis();
// dump thread
StackTraceElement[] stackTrace;
try {
stackTrace = profilingThread.getStackTrace();
// stack depth is zero, means thread is already run finished
if (stackTrace.length == 0) {
return null;
}
} catch (Exception e) {
// dump error ignore and make this profiler stop
return null;
}
// if is first dump, check is can start profiling
if (dumpSequence == 0 && (!executionContext.isStartProfileable())) {
return null;
}
int dumpElementCount = Math.min(stackTrace.length, Config.Profile.DUMP_MAX_STACK_DEPTH);
// use inverted order, because thread dump is start with bottom
final ArrayList<String> stackList = new ArrayList<>(dumpElementCount);
for (int i = dumpElementCount - 1; i >= 0; i--) {
stackList.add(buildStackElementCodeSignature(stackTrace[i]));
}
String taskId = executionContext.getTask().getTaskId();
return new TracingThreadSnapshot(taskId, traceSegmentId, dumpSequence++, currentTime, stackList);
}
/**
* build thread stack element code signature
*
* @return code sign: className.methodName:lineNumber
*/
private String buildStackElementCodeSignature(StackTraceElement element) {
return element.getClassName() + "." + element.getMethodName() + ":" + element.getLineNumber();
}
/**
* matches profiling tracing context
*/
public boolean matches(TracingContext context) {
// match trace id
return Objects.equal(context.getReadableGlobalTraceId(), tracingContext.getReadableGlobalTraceId());
}
/**
* check whether profiling should continue
*
* @return if true means this thread profiling is continuable
*/
private boolean isProfilingContinuable() {
return System.currentTimeMillis() - profilingStartTime < profilingMaxTimeMills;
}
public TracingContext tracingContext() {
return tracingContext;
}
public ProfilingStatus profilingStatus() {
return profilingStatus;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import org.apache.skywalking.apm.agent.core.context.ids.ID;
import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
import org.apache.skywalking.apm.network.language.profile.ThreadStack;
import java.util.List;
/**
* @author MrPro
*/
public class TracingThreadSnapshot {
// thread profiler
private final String taskId;
private final ID traceSegmentId;
// dump info
private final int sequence;
private final long time;
private final List<String> stackList;
public TracingThreadSnapshot(String taskId, ID traceSegmentId, int sequence, long time, List<String> stackList) {
this.taskId = taskId;
this.traceSegmentId = traceSegmentId;
this.sequence = sequence;
this.time = time;
this.stackList = stackList;
}
/**
* transform to gRPC data
*/
public ThreadSnapshot transform() {
final ThreadSnapshot.Builder builder = ThreadSnapshot.newBuilder();
// task id
builder.setTaskId(taskId);
// dumped segment id
builder.setTraceSegmentId(traceSegmentId.transform());
// dump time
builder.setTime(time);
// snapshot dump sequence
builder.setSequence(sequence);
// snapshot stack
final ThreadStack.Builder stackBuilder = ThreadStack.newBuilder();
for (String codeSign : stackList) {
stackBuilder.addCodeSignatures(codeSign);
}
builder.setStack(stackBuilder);
return builder.build();
}
}
......@@ -26,5 +26,5 @@ org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
......@@ -22,13 +22,11 @@ package org.apache.skywalking.apm.agent.core.boot;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.jvm.JVMService;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService;
import org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
......@@ -64,11 +62,17 @@ public class ServiceManagerTest {
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskQueryService.class));
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class));
assertProfileTaskExecuteService(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class));
assertTracingContextListener();
assertIgnoreTracingContextListener();
assertTracingThreadContextListener();
}
private void assertTracingThreadContextListener() throws Exception {
List<TracingThreadListener> listeners = getFieldValue(TracingContext.TracingThreadListenerManager.class, "LISTENERS");
assertThat(listeners.size(), is(1));
}
private void assertIgnoreTracingContextListener() throws Exception {
......@@ -87,7 +91,7 @@ public class ServiceManagerTest {
assertNotNull(service);
}
private void assertProfileTaskQueryService(ProfileTaskQueryService service) {
private void assertProfileTaskQueryService(ProfileTaskChannelService service) {
assertNotNull(service);
}
......
......@@ -46,7 +46,7 @@ public class TracingContextTest {
};
TracingContext.ListenerManager.add(listener);
try {
TracingContext tracingContext = new TracingContext();
TracingContext tracingContext = new TracingContext("/url");
AbstractSpan span = tracingContext.createEntrySpan("/url");
for (int i = 0; i < 10; i++) {
......
......@@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.test.tools;
import java.util.HashMap;
import java.util.LinkedList;
import org.apache.skywalking.apm.agent.core.context.TracingThreadListener;
import org.junit.rules.ExternalResource;
import org.powermock.reflect.Whitebox;
import org.apache.skywalking.apm.agent.core.boot.BootService;
......@@ -37,6 +39,7 @@ public class AgentServiceRule extends ExternalResource {
Whitebox.setInternalState(ServiceManager.INSTANCE, "bootedServices", new HashMap<Class, BootService>());
Whitebox.setInternalState(TracingContext.ListenerManager.class, "LISTENERS", new LinkedList<TracingContextListener>());
Whitebox.setInternalState(IgnoredTracerContext.ListenerManager.class, "LISTENERS", new LinkedList<TracingContextListener>());
Whitebox.setInternalState(TracingContext.TracingThreadListenerManager.class, "LISTENERS", new LinkedList<TracingThreadListener>());
}
@Override
......
......@@ -44,6 +44,18 @@ agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
# If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.
# profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
# Parallel monitor segment count
# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5}
# Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it.
# profile.duration=${SW_AGENT_PROFILE_DURATION:10}
# Max dump thread stack depth
# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500}
# Snapshot transport to backend buffer size
# profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50}
# Backend service addresses.
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
......
......@@ -102,6 +102,10 @@ property key | Description | Default |
`dictionary.service_code_buffer_size`|The buffer size of application codes and peer|`10 * 10000`|
`dictionary.endpoint_name_buffer_size`|The buffer size of endpoint names and peer|`1000 * 10000`|
`profile.active`|If true, skywalking agent will enable profile when user create a new profile task. Otherwise disable profile.|`true`|
`profile.max_parallel`|Parallel monitor segment count|`5`|
`profile.duration`|Max monitor segment time(minutes), if current segment monitor time out of limit, then stop it.|`10`|
`profile.dump_max_stack_depth`|Max dump thread stack depth|`500`|
`profile.snapshot_transport_buffer_size`|Snapshot transport to backend buffer size|`50`|
`plugin.peer_max_length `|Peer maximum description limit.|`200`|
`plugin.mongodb.trace_param`|If true, trace all the parameters in MongoDB access, default is false. Only trace the operation, not include parameters.|`false`|
`plugin.mongodb.filter_length_limit`|If set to positive number, the `WriteRequest.params` would be truncated to this length, otherwise it would be completely saved, which may cause performance problem.|`256`|
......
......@@ -164,6 +164,8 @@ istio-telemetry:
default:
envoy-metric:
default:
receiver-profile:
default:
# alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh}
#receiver_zipkin:
# default:
......@@ -233,3 +235,4 @@ configuration:
# grpc:
# targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
# targetPort: ${SW_EXPORTER_GRPC_PORT:9870}
......@@ -23,14 +23,18 @@ import com.google.common.cache.CacheBuilder;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
......@@ -42,9 +46,11 @@ public class ProfileTaskCache implements Service {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileTaskCache.class);
private final Cache<Integer, List<ProfileTask>> profileTaskCache;
private final Cache<Integer, List<ProfileTask>> profileTaskDownstreamCache;
private final Cache<String, ProfileTask> profileTaskIdCache;
private final ModuleManager moduleManager;
private IProfileTaskQueryDAO profileTaskQueryDAO;
public ProfileTaskCache(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
this.moduleManager = moduleManager;
......@@ -52,9 +58,18 @@ public class ProfileTaskCache implements Service {
long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L;
int initialCapacitySize = (int)(initialSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : initialSize);
profileTaskCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
profileTaskDownstreamCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
// remove old profile task data
.expireAfterWrite(Duration.ofMinutes(1)).build();
profileTaskIdCache = CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask()).build();
}
private IProfileTaskQueryDAO getProfileTaskQueryDAO() {
if (Objects.isNull(profileTaskQueryDAO)) {
profileTaskQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
}
return profileTaskQueryDAO;
}
/**
......@@ -64,10 +79,32 @@ public class ProfileTaskCache implements Service {
*/
public List<ProfileTask> getProfileTaskList(int serviceId) {
// read profile task list from cache only, use cache update timer mechanism
List<ProfileTask> profileTaskList = profileTaskCache.getIfPresent(serviceId);
List<ProfileTask> profileTaskList = profileTaskDownstreamCache.getIfPresent(serviceId);
return profileTaskList;
}
/**
* query profile task by id
* @param id
* @return
*/
public ProfileTask getProfileTaskById(String id) {
ProfileTask profile = profileTaskIdCache.getIfPresent(id);
if (profile == null) {
try {
profile = getProfileTaskQueryDAO().getById(id);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
if (profile != null) {
profileTaskIdCache.put(id, profile);
}
}
return profile;
}
/**
* save service task list
* @param serviceId
......@@ -78,7 +115,7 @@ public class ProfileTaskCache implements Service {
taskList = Collections.emptyList();
}
profileTaskCache.put(serviceId, taskList);
profileTaskDownstreamCache.put(serviceId, taskList);
}
/**
......
......@@ -43,7 +43,7 @@ public class CommandService implements Service {
public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
final String serialNumber = UUID.randomUUID().toString();
return new ProfileTaskCommand(serialNumber, task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
return new ProfileTaskCommand(serialNumber, task.getId(), task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
}
private String generateSerialNumber(final int serviceInstanceId, final long time, final String serviceInstanceUUID) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.profile;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT;
/**
* Profiling segment snapshot database bean, use record
*
* @author MrPro
*/
@Getter
@Setter
@ScopeDeclaration(id = PROFILE_TASK_SEGMENT_SNAPSHOT, name = "ProfileTaskSegmentSnapshot")
@Stream(name = ProfileTaskSegmentSnapshotRecord.INDEX_NAME, scopeId = PROFILE_TASK_SEGMENT_SNAPSHOT, builder = ProfileTaskSegmentSnapshotRecord.Builder.class, processor = RecordStreamProcessor.class)
public class ProfileTaskSegmentSnapshotRecord extends Record {
public static final String INDEX_NAME = "profile_task_segment_snapshot";
public static final String TASK_ID = "task_id";
public static final String SEGMENT_ID = "segment_id";
public static final String DUMP_TIME = "dump_time";
public static final String SEQUENCE = "sequence";
public static final String STACK_BINARY = "stack_binary";
@Column(columnName = TASK_ID) private String taskId;
@Column(columnName = SEGMENT_ID) private String segmentId;
@Column(columnName = DUMP_TIME) private long dumpTime;
@Column(columnName = SEQUENCE) private int sequence;
@Column(columnName = STACK_BINARY) private byte[] stackBinary;
@Override
public String id() {
return getTaskId() + Const.ID_SPLIT + getSegmentId() + Const.ID_SPLIT + getSequence() + Const.ID_SPLIT;
}
public static class Builder implements StorageBuilder<ProfileTaskSegmentSnapshotRecord> {
@Override
public ProfileTaskSegmentSnapshotRecord map2Data(Map<String, Object> dbMap) {
final ProfileTaskSegmentSnapshotRecord snapshot = new ProfileTaskSegmentSnapshotRecord();
snapshot.setTaskId((String)dbMap.get(TASK_ID));
snapshot.setSegmentId((String)dbMap.get(SEGMENT_ID));
snapshot.setDumpTime(((Number)dbMap.get(DUMP_TIME)).longValue());
snapshot.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
snapshot.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).intValue());
if (StringUtil.isEmpty((String)dbMap.get(STACK_BINARY))) {
snapshot.setStackBinary(new byte[] {});
} else {
snapshot.setStackBinary(Base64.getDecoder().decode((String)dbMap.get(STACK_BINARY)));
}
return snapshot;
}
@Override
public Map<String, Object> data2Map(ProfileTaskSegmentSnapshotRecord storageData) {
final HashMap<String, Object> map = new HashMap<>();
map.put(TASK_ID, storageData.getTaskId());
map.put(SEGMENT_ID, storageData.getSegmentId());
map.put(DUMP_TIME, storageData.getDumpTime());
map.put(SEQUENCE, storageData.getSequence());
map.put(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getStackBinary())) {
map.put(STACK_BINARY, Const.EMPTY_STRING);
} else {
map.put(STACK_BINARY, new String(Base64.getEncoder().encode(storageData.getStackBinary())));
}
return map;
}
}
}
......@@ -69,6 +69,7 @@ public class DefaultScopeDefine {
public static final int HTTP_ACCESS_LOG = 25;
public static final int PROFILE_TASK = 26;
public static final int PROFILE_TASK_LOG = 27;
public static final int PROFILE_TASK_SEGMENT_SNAPSHOT = 28;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
......
......@@ -41,4 +41,11 @@ public interface IProfileTaskQueryDAO extends DAO {
*/
List<ProfileTask> getTaskList(final Integer serviceId, final String endpointName, final Long startTimeBucket, final Long endTimeBucket, final Integer limit) throws IOException;
/**
* query profile task by id
* @param id
* @return
*/
ProfileTask getById(final String id) throws IOException;
}
......@@ -20,19 +20,25 @@ package org.apache.skywalking.oap.server.receiver.profile.provider.handler;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskSegmentSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
......@@ -42,6 +48,8 @@ import java.util.concurrent.TimeUnit;
*/
public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBase implements GRPCHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ProfileTaskServiceHandler.class);
private ProfileTaskCache profileTaskCache;
private final CommandService commandService;
......@@ -71,7 +79,7 @@ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBa
}
// record profile task log
recordProfileTaskLog(profileTask, request);
recordProfileTaskLog(profileTask, request.getInstanceId(), ProfileTaskLogOperationType.NOTIFIED);
// add command
commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
......@@ -81,11 +89,72 @@ public class ProfileTaskServiceHandler extends ProfileTaskGrpc.ProfileTaskImplBa
responseObserver.onCompleted();
}
private void recordProfileTaskLog(ProfileTask task, ProfileTaskCommandQuery query) {
@Override
public StreamObserver<ThreadSnapshot> collectSnapshot(StreamObserver<Commands> responseObserver) {
return new StreamObserver<ThreadSnapshot>() {
@Override
public void onNext(ThreadSnapshot snapshot) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("receive profile segment snapshot");
}
// parse segment id
UniqueId uniqueId = snapshot.getTraceSegmentId();
StringBuilder segmentIdBuilder = new StringBuilder();
for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
if (i == 0) {
segmentIdBuilder.append(uniqueId.getIdPartsList().get(i));
} else {
segmentIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i));
}
}
// build database data
final ProfileTaskSegmentSnapshotRecord record = new ProfileTaskSegmentSnapshotRecord();
record.setTaskId(snapshot.getTaskId());
record.setSegmentId(segmentIdBuilder.toString());
record.setDumpTime(snapshot.getTime());
record.setSequence(snapshot.getSequence());
record.setStackBinary(snapshot.getStack().toByteArray());
record.setTimeBucket(TimeBucket.getRecordTimeBucket(snapshot.getTime()));
// async storage
RecordStreamProcessor.getInstance().in(record);
}
@Override
public void onError(Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
};
}
@Override
public void reportTaskFinish(ProfileTaskFinishReport request, StreamObserver<Commands> responseObserver) {
// query task from cache, set log time bucket need it
final ProfileTask profileTask = profileTaskCache.getProfileTaskById(request.getTaskId());
// record finish log
if (profileTask != null) {
recordProfileTaskLog(profileTask, request.getInstanceId(), ProfileTaskLogOperationType.EXECUTION_FINISHED);
}
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
private void recordProfileTaskLog(ProfileTask task, int instanceId, ProfileTaskLogOperationType operationType) {
final ProfileTaskLogRecord logRecord = new ProfileTaskLogRecord();
logRecord.setTaskId(task.getId());
logRecord.setInstanceId(query.getInstanceId());
logRecord.setOperationType(ProfileTaskLogOperationType.NOTIFIED.getCode());
logRecord.setInstanceId(instanceId);
logRecord.setOperationType(operationType.getCode());
logRecord.setOperationTime(System.currentTimeMillis());
// same with task time bucket, ensure record will ttl same with profile task
logRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(task.getStartTime() + TimeUnit.MINUTES.toMillis(task.getDuration())));
......
......@@ -88,6 +88,25 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
return tasks;
}
@Override
public ProfileTask getById(String id) throws IOException {
if (StringUtil.isEmpty(id)) {
return null;
}
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.idsQuery().addIds(id));
sourceBuilder.size(1);
final SearchResponse response = getClient().search(ProfileTaskNoneStream.INDEX_NAME, sourceBuilder);
if (response.getHits().getHits().length > 0) {
return parseTask(response.getHits().getHits()[0]);
}
return null;
}
private ProfileTask parseTask(SearchHit data) {
return ProfileTask.builder()
.id(data.getId())
......
......@@ -22,6 +22,7 @@ import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskNoneStream;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import java.io.IOException;
......@@ -87,6 +88,29 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
}
}
@Override
public ProfileTask getById(String id) throws IOException {
if (StringUtil.isEmpty(id)) {
return null;
}
final StringBuilder sql = new StringBuilder();
final ArrayList<Object> condition = new ArrayList<>(1);
sql.append("select * from ").append(ProfileTaskNoneStream.INDEX_NAME).append(" where id=? LIMIT 1");
condition.add(id);
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
if (resultSet.next()) {
return parseTask(resultSet);
}
}
} catch (SQLException | JDBCClientException e) {
throw new IOException(e);
}
return null;
}
/**
* parse profile task data
* @param data
......
......@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
while (true) {
try {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
final ResponseEntity<String> responseEntity = sendRequest(false);
LOGGER.info("responseEntity: {}", responseEntity);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = profileClient.traces(
......@@ -123,6 +117,17 @@ public class ProfileVerificationITCase {
}
private ResponseEntity<String> sendRequest(boolean needProfiling) {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
user.put("enableProfiling", String.valueOf(needProfiling));
return restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
}
/**
* verify create profile task
* @param minutesAgo
......@@ -134,10 +139,10 @@ public class ProfileVerificationITCase {
final ProfileTaskCreationRequest creationRequest = ProfileTaskCreationRequest.builder()
.serviceId(2)
.endpointName("/e2e/users")
.duration(5)
.duration(1)
.startTime(-1)
.minDurationThreshold(10)
.dumpPeriod(10)
.minDurationThreshold(1000)
.dumpPeriod(50)
.maxSamplingCount(5).build();
// verify create task
......@@ -147,18 +152,29 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
// verify get task list and sniffer get task logs
verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
// send a profile request
sendRequest(true);
// verify task execution finish
verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
}
private void verifyProfileTask(int serviceId, String verifyResources) throws InterruptedException {
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.serviceId(serviceId)
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
new ClassPathResource(verifyResources).getInputStream();
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
......
......@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
while (true) {
try {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
final ResponseEntity<String> responseEntity = sendRequest(false);
LOGGER.info("responseEntity: {}", responseEntity);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = profileClient.traces(
......@@ -122,6 +116,17 @@ public class ProfileVerificationITCase {
}
private ResponseEntity<String> sendRequest(boolean needProfiling) {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
user.put("enableProfiling", String.valueOf(needProfiling));
return restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
}
/**
* verify create profile task
* @param minutesAgo
......@@ -133,10 +138,10 @@ public class ProfileVerificationITCase {
final ProfileTaskCreationRequest creationRequest = ProfileTaskCreationRequest.builder()
.serviceId(2)
.endpointName("/e2e/users")
.duration(5)
.duration(1)
.startTime(-1)
.minDurationThreshold(10)
.dumpPeriod(10)
.minDurationThreshold(1000)
.dumpPeriod(50)
.maxSamplingCount(5).build();
// verify create task
......@@ -146,18 +151,29 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
// verify get task list and sniffer get task logs
verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
// send a profile request
sendRequest(true);
// verify task execution finish
verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
}
private void verifyProfileTask(int serviceId, String verifyResources) throws InterruptedException {
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.serviceId(serviceId)
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
new ClassPathResource(verifyResources).getInputStream();
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
......
......@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
while (true) {
try {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
final ResponseEntity<String> responseEntity = sendRequest(false);
LOGGER.info("responseEntity: {}", responseEntity);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = profileClient.traces(
......@@ -122,6 +116,17 @@ public class ProfileVerificationITCase {
}
private ResponseEntity<String> sendRequest(boolean needProfiling) {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
user.put("enableProfiling", String.valueOf(needProfiling));
return restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
}
/**
* verify create profile task
* @param minutesAgo
......@@ -133,10 +138,10 @@ public class ProfileVerificationITCase {
final ProfileTaskCreationRequest creationRequest = ProfileTaskCreationRequest.builder()
.serviceId(2)
.endpointName("/e2e/users")
.duration(5)
.duration(1)
.startTime(-1)
.minDurationThreshold(10)
.dumpPeriod(10)
.minDurationThreshold(1000)
.dumpPeriod(50)
.maxSamplingCount(5).build();
// verify create task
......@@ -146,18 +151,29 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
// verify get task list and sniffer get task logs
verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
// send a profile request
sendRequest(true);
// verify task execution finish
verifyProfileTask(creationRequest.getServiceId(), "expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
}
private void verifyProfileTask(int serviceId, String verifyResources) throws InterruptedException {
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
.serviceId(creationRequest.getServiceId())
.serviceId(serviceId)
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
new ClassPathResource(verifyResources).getInputStream();
final ProfilesTasksMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.profile;
/**
* @author MrPro
*/
public class CreateUser {
private String name;
private boolean enableProfiling;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean getEnableProfiling() {
return enableProfiling;
}
public void setEnableProfiling(boolean enableProfiling) {
this.enableProfiling = enableProfiling;
}
public User toUser() {
final User user = new User();
user.setName(name);
return user;
}
}
......@@ -20,6 +20,8 @@ package org.apache.skywalking.e2e.profile;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.TimeUnit;
/**
* @author MrPro
*/
......@@ -38,8 +40,14 @@ public class TestController {
}
@PostMapping("/users")
public User createAuthor(@RequestBody final User user) throws InterruptedException {
Thread.sleep(1000L);
return userRepo.save(user);
public User createAuthor(@RequestBody final CreateUser createUser) throws InterruptedException {
final User user = userRepo.save(createUser.toUser());
if (!createUser.getEnableProfiling()) {
return user;
} else {
// sleep 10 second
TimeUnit.SECONDS.sleep(10);
return user;
}
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
tasks:
- id: not null
serviceId: gt 0
endpointName: not null
startTime: gt 0
duration: gt 0
minDurationThreshold: gt 0
dumpPeriod: gt 0
maxSamplingCount: gt 0
logs:
- id: not null
instanceId: gt 0
operationType: EXECUTION_FINISHED
operationTime: gt 0
- id: not null
instanceId: gt 0
operationType: NOTIFIED
operationTime: gt 0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册