提交 b7e70da3 编写于 作者: U Ufuk Celebi

[FLINK-3310] [runtime-web] Add back pressure statistics to web monitor (backend)

上级 d69fe309
......@@ -165,6 +165,10 @@ The following parameters configure Flink's JobManager and TaskManagers.
- `jobmanager.web.history`: The number of latest jobs that the JobManager's web front-end in its history (DEFAULT: 5).
- `jobmanager.web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`).
- `jobmanager.web.checkpoints.history`: Number of checkpoint statistics to remember (DEFAULT: `10`).
- `jobmanager.web.backpressure.cleanup-interval`: Time after which cached stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins).
- `jobmanager.web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
- `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to take to determine back pressure (DEFAULT: `100`).
- `jobmanager.web.backpressure.delay-between-samples`: Delay between stack trace samples to determine back pressure (DEFAULT: `50`, 50 ms).
### Webclient
......
......@@ -357,7 +357,18 @@ public final class ConfigConstants {
/** Config parameter defining the number of checkpoints to remember for recent history. */
public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history";
/** Time after which cached stats are cleaned up if not accessed. */
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval";
/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval";
/** Number of stack trace samples to take to determine back pressure. */
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples";
/** Delay between stack trace samples to determine back pressure. */
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples";
// ------------------------------ AKKA ------------------------------------
......@@ -693,6 +704,18 @@ public final class ConfigConstants {
/** Default number of checkpoints to remember for recent history. */
public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
/** Time after which cached stats are cleaned up. */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
/** Number of samples to take to determine back pressure. */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
/** Delay between samples to determine back pressure. */
public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50;
// ------------------------------ Akka Values ------------------------------
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
......
......@@ -134,6 +134,12 @@ under the License.
<version>${curator.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import akka.dispatch.OnComplete;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Back pressure statistics tracker.
*
* <p>Back pressure is determined by sampling running tasks. If a task is
* slowed down by back pressure it will be stuck in memory requests to a
* {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
*
* <p>The back pressured stack traces look like this:
*
* <pre>
* java.lang.Object.wait(Native Method)
* o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
* o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
* request
* [...]
* </pre>
*/
public class BackPressureStatsTracker {
private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
/** Maximum stack trace depth for samples. */
static final int MAX_STACK_TRACE_DEPTH = 3;
/** Expected class name for back pressure indicating stack trace element. */
static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
/** Expected method name for back pressure indicating stack trace element. */
static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
/** Lock guarding trigger operations. */
private final Object lock = new Object();
/* Stack trace sample coordinator. */
private final StackTraceSampleCoordinator coordinator;
/**
* Completed stats. Important: Job vertex IDs need to be scoped by job ID,
* because they are potentially constant across runs messing up the cached
* data.
*/
private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
/** Pending in progress stats. Important: Job vertex IDs need to be scoped
* by job ID, because they are potentially constant across runs messing up
* the cached data.*/
private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
/** Cleanup interval for completed stats cache. */
private final int cleanUpInterval;
private final int numSamples;
private final FiniteDuration delayBetweenSamples;
/** Flag indicating whether the stats tracker has been shut down. */
private boolean shutDown;
/**
* Creates a back pressure statistics tracker.
*
* @param cleanUpInterval Clean up interval for completed stats.
* @param numSamples Number of stack trace samples when determining back pressure.
* @param delayBetweenSamples Delay between samples when determining back pressure.
*/
public BackPressureStatsTracker(
StackTraceSampleCoordinator coordinator,
int cleanUpInterval,
int numSamples,
FiniteDuration delayBetweenSamples) {
this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
checkArgument(cleanUpInterval >= 0, "Clean up interval");
this.cleanUpInterval = cleanUpInterval;
checkArgument(numSamples >= 1, "Number of samples");
this.numSamples = numSamples;
this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples");
this.operatorStatsCache = CacheBuilder.newBuilder()
.concurrencyLevel(1)
.expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS)
.build();
}
/** Cleanup interval for completed stats cache. */
public long getCleanUpInterval() {
return cleanUpInterval;
}
/**
* Returns back pressure statistics for a operator.
*
* @param vertex Operator to get the stats for.
*
* @return Back pressure statistics for an operator
*/
public Option<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
return Option.apply(operatorStatsCache.getIfPresent(vertex));
}
/**
* Triggers a stack trace sample for a operator to gather the back pressure
* statistics. If there is a sample in progress for the operator, the call
* is ignored.
*
* @param vertex Operator to get the stats for.
*/
@SuppressWarnings("unchecked")
public void triggerStackTraceSample(ExecutionJobVertex vertex) {
synchronized (lock) {
if (shutDown) {
return;
}
if (!pendingStats.contains(vertex)) {
pendingStats.add(vertex);
Future<StackTraceSample> sample = coordinator.triggerStackTraceSample(
vertex.getTaskVertices(),
numSamples,
delayBetweenSamples,
MAX_STACK_TRACE_DEPTH);
sample.onComplete(new StackTraceSampleCompletionCallback(
vertex), vertex.getGraph().getExecutionContext());
}
}
}
/**
* Cleans up the operator stats cache if it contains timed out entries.
*
* <p>The Guava cache only evicts as maintenance during normal operations.
* If this handler is inactive, it will never be cleaned.
*/
public void cleanUpOperatorStatsCache() {
operatorStatsCache.cleanUp();
}
/**
* Shuts down the stats tracker.
*
* <p>Invalidates the cache and clears all pending stats.
*/
public void shutDown() {
synchronized (lock) {
if (!shutDown) {
operatorStatsCache.invalidateAll();
pendingStats.clear();
shutDown = true;
}
}
}
/**
* Invalidates the cache (irrespective of clean up interval).
*/
void invalidateOperatorStatsCache() {
operatorStatsCache.invalidateAll();
}
/**
* Callback on completed stack trace sample.
*/
class StackTraceSampleCompletionCallback extends OnComplete<StackTraceSample> {
private final ExecutionJobVertex vertex;
public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
this.vertex = vertex;
}
@Override
public void onComplete(Throwable failure, StackTraceSample success) throws Throwable {
synchronized (lock) {
try {
if (shutDown) {
return;
}
if (success != null) {
OperatorBackPressureStats stats = createStatsFromSample(success);
operatorStatsCache.put(vertex, stats);
} else {
LOG.error("Failed to gather stack trace sample.", failure);
}
} catch (Throwable t) {
LOG.error("Error during stats completion.", t);
} finally {
pendingStats.remove(vertex);
}
}
}
/**
* Creates the back pressure stats from a stack trace sample.
*
* @param sample Stack trace sample to base stats on.
*
* @return Back pressure stats
*/
private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
// Map task ID to subtask index, because the web interface expects
// it like that.
Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
.newHashMapWithExpectedSize(traces.size());
Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
for (ExecutionVertex task : vertex.getTaskVertices()) {
ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
if (sampledTasks.contains(taskId)) {
subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
} else {
throw new RuntimeException("Outdated sample. A task, which is part of the " +
"sample has been reset.");
}
}
// Ratio of blocked samples to total samples per sub task. Array
// position corresponds to sub task index.
double[] backPressureRatio = new double[traces.size()];
for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
int backPressureSamples = 0;
List<StackTraceElement[]> taskTraces = entry.getValue();
for (StackTraceElement[] trace : taskTraces) {
for (int i = trace.length - 1; i >= 0; i--) {
StackTraceElement elem = trace[i];
if (elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
backPressureSamples++;
break; // Continue with next stack trace
}
}
}
int subtaskIndex = subtaskIndexMap.get(entry.getKey());
int size = taskTraces.size();
double ratio = (size > 0)
? ((double) backPressureSamples) / size
: 0;
backPressureRatio[subtaskIndex] = ratio;
}
return new OperatorBackPressureStats(
sample.getSampleId(),
sample.getEndTime(),
backPressureRatio);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import java.util.Arrays;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Back pressure statistics of multiple tasks.
*
* <p>Statistics are gathered by sampling stack traces of running tasks. The
* back pressure ratio denotes the ratio of traces indicating back pressure
* to the total number of sampled traces.
*/
public class OperatorBackPressureStats {
/** ID of the corresponding sample. */
private final int sampleId;
/** End time stamp of the corresponding sample. */
private final long endTimestamp;
/** Back pressure ratio per subtask. */
private final double[] subTaskBackPressureRatio;
/** Maximum back pressure ratio. */
private final double maxSubTaskBackPressureRatio;
public OperatorBackPressureStats(
int sampleId,
long endTimestamp,
double[] subTaskBackPressureRatio) {
this.sampleId = sampleId;
this.endTimestamp = endTimestamp;
this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified");
double max = 0;
for (double ratio : subTaskBackPressureRatio) {
if (ratio > max) {
max = ratio;
}
}
maxSubTaskBackPressureRatio = max;
}
/**
* Returns the ID of the sample.
*
* @return ID of the sample
*/
public int getSampleId() {
return sampleId;
}
/**
* Returns the time stamp, when all stack traces were collected at the
* JobManager.
*
* @return Time stamp, when all stack traces were collected at the
* JobManager
*/
public long getEndTimestamp() {
return endTimestamp;
}
/**
* Returns the number of sub tasks.
*
* @return Number of sub tasks.
*/
public int getNumberOfSubTasks() {
return subTaskBackPressureRatio.length;
}
/**
* Returns the ratio of stack traces indicating back pressure to total
* number of sampled stack traces.
*
* @param index Subtask index.
*
* @return Ratio of stack traces indicating back pressure to total number
* of sampled stack traces.
*/
public double getBackPressureRatio(int index) {
return subTaskBackPressureRatio[index];
}
/**
* Returns the maximum back pressure ratio of all sub tasks.
*
* @return Maximum back pressure ratio of all sub tasks.
*/
public double getMaxBackPressureRatio() {
return maxSubTaskBackPressureRatio;
}
@Override
public String toString() {
return "OperatorBackPressureStats{" +
"sampleId=" + sampleId +
", endTimestamp=" + endTimestamp +
", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
/**
* A sample of stack traces for one or more tasks.
*
* <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
*/
public class StackTraceSample {
/** ID of this sample (unique per job) */
private final int sampleId;
/** Time stamp, when the sample was triggered. */
private final long startTime;
/** Time stamp, when all stack traces were collected at the JobManager. */
private final long endTime;
/** Map of stack traces by execution ID. */
private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
/**
* Creates a stack trace sample
*
* @param sampleId ID of the sample.
* @param startTime Time stamp, when the sample was triggered.
* @param endTime Time stamp, when all stack traces were
* collected at the JobManager.
* @param stackTracesByTask Map of stack traces by execution ID.
*/
public StackTraceSample(
int sampleId,
long startTime,
long endTime,
Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) {
checkArgument(sampleId >= 0, "Negative sample ID");
checkArgument(startTime >= 0, "Negative start time");
checkArgument(endTime >= startTime, "End time before start time");
this.sampleId = sampleId;
this.startTime = startTime;
this.endTime = endTime;
this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask);
}
/**
* Returns the ID of the sample.
*
* @return ID of the sample
*/
public int getSampleId() {
return sampleId;
}
/**
* Returns the time stamp, when the sample was triggered.
*
* @return Time stamp, when the sample was triggered
*/
public long getStartTime() {
return startTime;
}
/**
* Returns the time stamp, when all stack traces were collected at the
* JobManager.
*
* @return Time stamp, when all stack traces were collected at the
* JobManager
*/
public long getEndTime() {
return endTime;
}
/**
* Returns the a map of stack traces by execution ID.
*
* @return Map of stack traces by execution ID
*/
public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() {
return stackTracesByTask;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.google.common.collect.Maps;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A coordinator for triggering and collecting stack traces of running tasks.
*/
public class StackTraceSampleCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
private static final int NUM_GHOST_SAMPLE_IDS = 10;
private final Object lock = new Object();
/** Actor for responses. */
private final ActorGateway responseActor;
/** Time out after the expected sampling duration. */
private final int sampleTimeout;
/** In progress samples (guarded by lock). */
private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>();
/** A list of recent sample IDs to identify late messages vs. invalid ones. */
private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
/** Sample ID counter (guarded by lock). */
private int sampleIdCounter;
/**
* Timer to discard expired in progress samples. Lazily initiated as the
* sample coordinator will not be used very often (guarded by lock).
*/
private Timer timer;
/**
* Flag indicating whether the coordinator is still running (guarded by
* lock).
*/
private boolean isShutDown;
/**
* Creates a new coordinator for the job.
*
* @param sampleTimeout Time out after the expected sampling duration.
* This is added to the expected duration of a
* sample, which is determined by the number of
* samples and the delay between each sample.
*/
public StackTraceSampleCoordinator(ActorSystem actorSystem, int sampleTimeout) {
Props props = Props.create(StackTraceSampleCoordinatorActor.class, this);
this.responseActor = new AkkaActorGateway(actorSystem.actorOf(props), null);
checkArgument(sampleTimeout >= 0);
this.sampleTimeout = sampleTimeout;
}
/**
* Triggers a stack trace sample to all tasks.
*
* @param tasksToSample Tasks to sample.
* @param numSamples Number of stack trace samples to collect.
* @param delayBetweenSamples Delay between consecutive samples.
* @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
* no maximum and keeps the complete stack trace.
* @return A future of the completed stack trace sample
*/
@SuppressWarnings("unchecked")
public Future<StackTraceSample> triggerStackTraceSample(
ExecutionVertex[] tasksToSample,
int numSamples,
FiniteDuration delayBetweenSamples,
int maxStackTraceDepth) {
checkNotNull(tasksToSample, "Tasks to sample");
checkArgument(tasksToSample.length >= 1, "No tasks to sample");
checkArgument(numSamples >= 1, "No number of samples");
checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth");
// Execution IDs of running tasks
ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
// Check that all tasks are RUNNING before triggering anything. The
// triggering can still fail.
for (int i = 0; i < triggerIds.length; i++) {
Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
if (execution != null && execution.getState() == ExecutionState.RUNNING) {
triggerIds[i] = execution.getAttemptId();
} else {
Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>()
.failure(new IllegalStateException("Task " + tasksToSample[i]
.getTaskNameWithSubtaskIndex() + " is not running."));
return failedPromise.future();
}
}
synchronized (lock) {
if (isShutDown) {
Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>()
.failure(new IllegalStateException("Shut down"));
return failedPromise.future();
}
if (timer == null) {
timer = new Timer("Stack trace sample coordinator timer");
}
int sampleId = sampleIdCounter++;
LOG.debug("Triggering stack trace sample {}", sampleId);
final PendingStackTraceSample pending = new PendingStackTraceSample(
sampleId, triggerIds);
// Discard the sample if it takes too long. We don't send cancel
// messages to the task managers, but only wait for the responses
// and then ignore them.
long expectedDuration = numSamples * delayBetweenSamples.toMillis();
long discardDelay = expectedDuration + sampleTimeout;
TimerTask discardTask = new TimerTask() {
@Override
public void run() {
try {
synchronized (lock) {
if (!pending.isDiscarded()) {
LOG.info("Sample {} expired before completing",
pending.getSampleId());
pending.discard(new RuntimeException("Time out"));
pendingSamples.remove(pending.getSampleId());
}
}
} catch (Throwable t) {
LOG.error("Exception while handling sample timeout", t);
}
}
};
// Add the pending sample before scheduling the discard task to
// prevent races with removing it again.
pendingSamples.put(sampleId, pending);
timer.schedule(discardTask, discardDelay);
boolean success = true;
try {
// Trigger all samples
for (int i = 0; i < tasksToSample.length; i++) {
TriggerStackTraceSample msg = new TriggerStackTraceSample(
sampleId,
triggerIds[i],
numSamples,
delayBetweenSamples,
maxStackTraceDepth);
if (!tasksToSample[i].sendMessageToCurrentExecution(
msg,
triggerIds[i],
responseActor)) {
success = false;
break;
}
}
return pending.getStackTraceSampleFuture();
} finally {
if (!success) {
pending.discard(new RuntimeException("Failed to trigger sample, " +
"because task has been reset."));
pendingSamples.remove(sampleId);
rememberRecentSampleId(sampleId);
}
}
}
}
/**
* Cancels a pending sample.
*
* @param sampleId ID of the sample to cancel.
* @param cause Cause of the cancelling (can be <code>null</code>).
*/
public void cancelStackTraceSample(int sampleId, Exception cause) {
synchronized (lock) {
if (isShutDown) {
return;
}
PendingStackTraceSample sample = pendingSamples.remove(sampleId);
if (sample != null) {
if (cause != null) {
LOG.info("Cancelling sample " + sampleId, cause);
} else {
LOG.info("Cancelling sample {}", sampleId);
}
sample.discard(cause);
rememberRecentSampleId(sampleId);
}
}
}
/**
* Shuts down the coordinator.
*
* <p>After shut down, no further operations are executed.
*/
public void shutDown() {
synchronized (lock) {
if (!isShutDown) {
LOG.info("Shutting down stack trace sample coordinator.");
for (PendingStackTraceSample pending : pendingSamples.values()) {
pending.discard(new RuntimeException("Shut down"));
}
pendingSamples.clear();
if (timer != null) {
timer.cancel();
}
isShutDown = true;
}
}
}
/**
* Collects stack traces of a task.
*
* @param sampleId ID of the sample.
* @param executionId ID of the sampled task.
* @param stackTraces Stack traces of the sampled task.
*
* @throws IllegalStateException If unknown sample ID and not recently
* finished or cancelled sample.
*/
public void collectStackTraces(
int sampleId,
ExecutionAttemptID executionId,
List<StackTraceElement[]> stackTraces) {
synchronized (lock) {
if (isShutDown) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId);
}
PendingStackTraceSample pending = pendingSamples.get(sampleId);
if (pending != null) {
pending.collectStackTraces(executionId, stackTraces);
// Publish the sample
if (pending.isComplete()) {
pendingSamples.remove(sampleId);
rememberRecentSampleId(sampleId);
pending.completePromiseAndDiscard();
}
} else if (recentPendingSamples.contains(sampleId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received late stack trace sample {} of task {}",
sampleId, executionId);
}
} else {
throw new IllegalStateException("Unknown sample ID " + sampleId);
}
}
}
private void rememberRecentSampleId(int sampleId) {
if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
recentPendingSamples.removeFirst();
}
recentPendingSamples.addLast(sampleId);
}
int getNumberOfPendingSamples() {
synchronized (lock) {
return pendingSamples.size();
}
}
// ------------------------------------------------------------------------
/**
* A pending stack trace sample, which collects stack traces and owns a
* {@link StackTraceSample} promise.
*
* <p>Access pending sample in lock scope.
*/
private static class PendingStackTraceSample {
private final int sampleId;
private final long startTime;
private final Set<ExecutionAttemptID> pendingTasks;
private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
private final Promise<StackTraceSample> stackTracePromise;
private boolean isDiscarded;
PendingStackTraceSample(
int sampleId,
ExecutionAttemptID[] tasksToCollect) {
this.sampleId = sampleId;
this.startTime = System.currentTimeMillis();
this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect));
this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length);
this.stackTracePromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
}
int getSampleId() {
return sampleId;
}
long getStartTime() {
return startTime;
}
boolean isDiscarded() {
return isDiscarded;
}
boolean isComplete() {
if (isDiscarded) {
throw new IllegalStateException("Discarded");
}
return pendingTasks.isEmpty();
}
void discard(Throwable cause) {
if (!isDiscarded) {
pendingTasks.clear();
stackTracesByTask.clear();
stackTracePromise.failure(new RuntimeException("Discarded", cause));
isDiscarded = true;
}
}
void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
if (isDiscarded) {
throw new IllegalStateException("Discarded");
}
if (pendingTasks.remove(executionId)) {
stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
} else if (isComplete()) {
throw new IllegalStateException("Completed");
} else {
throw new IllegalArgumentException("Unknown task " + executionId);
}
}
void completePromiseAndDiscard() {
if (isComplete()) {
isDiscarded = true;
long endTime = System.currentTimeMillis();
StackTraceSample stackTraceSample = new StackTraceSample(
sampleId,
startTime,
endTime,
stackTracesByTask);
stackTracePromise.success(stackTraceSample);
} else {
throw new IllegalStateException("Not completed yet");
}
}
@SuppressWarnings("unchecked")
Future<StackTraceSample> getStackTraceSampleFuture() {
return stackTracePromise.future();
}
}
/**
* Actor for stack trace sample responses.
*/
private static class StackTraceSampleCoordinatorActor extends FlinkUntypedActor {
StackTraceSampleCoordinator coordinator;
public StackTraceSampleCoordinatorActor(StackTraceSampleCoordinator coordinator) {
this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
}
@Override
protected void handleMessage(Object msg) throws Exception {
try {
if (msg instanceof ResponseStackTraceSampleSuccess) {
ResponseStackTraceSampleSuccess success = (ResponseStackTraceSampleSuccess) msg;
coordinator.collectStackTraces(
success.sampleId(),
success.executionId(),
success.samples());
} else if (msg instanceof ResponseStackTraceSampleFailure) {
ResponseStackTraceSampleFailure failure = (ResponseStackTraceSampleFailure) msg;
coordinator.cancelStackTraceSample(failure.sampleId(), failure.cause());
} else {
throw new IllegalArgumentException("Unexpected task sample message");
}
} catch (Throwable t) {
LOG.error("Error responding to message '" + msg + "': " + t.getMessage() + ".", t);
}
}
@Override
protected UUID getLeaderSessionID() {
return null;
}
}
}
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
......@@ -29,14 +28,17 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
......@@ -46,29 +48,24 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
......@@ -121,8 +118,11 @@ public class WebRuntimeMonitor implements WebMonitor {
private final File uploadDir;
private AtomicBoolean cleanedUp = new AtomicBoolean();
private final StackTraceSampleCoordinator stackTraceSamples;
private final BackPressureStatsTracker backPressureStatsTracker;
private AtomicBoolean cleanedUp = new AtomicBoolean();
public WebRuntimeMonitor(
Configuration config,
......@@ -163,6 +163,34 @@ public class WebRuntimeMonitor implements WebMonitor {
ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
// - Back pressure stats ----------------------------------------------
stackTraceSamples = new StackTraceSampleCoordinator(actorSystem, 60000);
// Back pressure stats tracker config
int cleanUpInterval = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
int refreshInterval = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
int numSamples = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
int delay = config.getInteger(
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
FiniteDuration delayBetweenSamples = new FiniteDuration(delay, TimeUnit.MILLISECONDS);
backPressureStatsTracker = new BackPressureStatsTracker(
stackTraceSamples, cleanUpInterval, numSamples, delayBetweenSamples);
// --------------------------------------------------------------------
router = new Router()
// config how to interact with this web server
.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
......@@ -187,7 +215,10 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
currentGraphs,
backPressureStatsTracker,
refreshInterval)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
......@@ -298,6 +329,23 @@ public class WebRuntimeMonitor implements WebMonitor {
synchronized (startupShutdownLock) {
jobManagerAddressPromise.success(jobManagerAkkaUrl);
leaderRetrievalService.start(retriever);
long delay = backPressureStatsTracker.getCleanUpInterval();
// Scheduled back pressure stats tracker cache cleanup. We schedule
// this here repeatedly, because cache clean up only happens on
// interactions with the cache. We need it to make sure that we
// don't leak memory after completed jobs or long ago accessed stats.
bootstrap.childGroup().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
backPressureStatsTracker.cleanUpOperatorStatsCache();
} catch (Throwable t) {
LOG.error("Error during back pressure stats cache cleanup.", t);
}
}
}, delay, delay, TimeUnit.MILLISECONDS);
}
}
......@@ -316,6 +364,10 @@ public class WebRuntimeMonitor implements WebMonitor {
}
}
stackTraceSamples.shutDown();
backPressureStatsTracker.shutDown();
cleanup();
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
import scala.Option;
import java.io.StringWriter;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Request handler that returns back pressure stats for a single job vertex and
* all its sub tasks.
*/
public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler {
/** Back pressure stats tracker. */
private final BackPressureStatsTracker backPressureStatsTracker;
/** Time after which stats are considered outdated. */
private final int refreshInterval;
public JobVertexBackPressureHandler(
ExecutionGraphHolder executionGraphHolder,
BackPressureStatsTracker backPressureStatsTracker,
int refreshInterval) {
super(executionGraphHolder);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
checkArgument(refreshInterval >= 0, "Negative timeout");
this.refreshInterval = refreshInterval;
}
@Override
public String handleRequest(
ExecutionJobVertex jobVertex,
Map<String, String> params) throws Exception {
try (StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
gen.writeStartObject();
Option<OperatorBackPressureStats> statsOption = backPressureStatsTracker
.getOperatorBackPressureStats(jobVertex);
if (statsOption.isDefined()) {
OperatorBackPressureStats stats = statsOption.get();
// Check whether we need to refresh
if (refreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) {
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
gen.writeStringField("status", "deprecated");
} else {
gen.writeStringField("status", "ok");
}
gen.writeStringField("backpressure-level", getBackPressureLevel(stats.getMaxBackPressureRatio()));
gen.writeNumberField("end-timestamp", stats.getEndTimestamp());
// Sub tasks
gen.writeArrayFieldStart("subtasks");
int numSubTasks = stats.getNumberOfSubTasks();
for (int i = 0; i < numSubTasks; i++) {
double ratio = stats.getBackPressureRatio(i);
gen.writeStartObject();
gen.writeNumberField("subtask", i);
gen.writeStringField("backpressure-level", getBackPressureLevel(ratio));
gen.writeNumberField("ratio", ratio);
gen.writeEndObject();
}
gen.writeEndArray();
} else {
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
gen.writeStringField("status", "deprecated");
}
gen.writeEndObject();
gen.close();
return writer.toString();
}
}
/**
* Returns the back pressure level as a String.
*
* @param backPressureRatio Ratio of back pressures samples to total number of samples.
*
* @return Back pressure level ('no', 'low', or 'high')
*/
static String getBackPressureLevel(double backPressureRatio) {
if (backPressureRatio <= 0.10) {
return "ok";
} else if (backPressureRatio <= 0.5) {
return "low";
} else {
return "high";
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* Simple back pressured task test.
*/
public class BackPressureStatsTrackerITCase extends TestLogger {
private static NetworkBufferPool networkBufferPool;
private static ActorSystem testActorSystem;
/** Shared as static variable with the test task. */
private static BufferPool testBufferPool;
@BeforeClass
public static void setup() {
testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
}
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(testActorSystem);
networkBufferPool.destroy();
}
/**
* Tests a simple fake-back pressured task. Back pressure is assumed when
* sampled stack traces are in blocking buffer requests.
*/
@Test
public void testBackPressuredProducer() throws Exception {
new JavaTestKit(testActorSystem) {{
final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
// The JobGraph
final JobGraph jobGraph = new JobGraph();
final int parallelism = 4;
final JobVertex task = new JobVertex("Task");
task.setInvokableClass(BackPressuredTask.class);
task.setParallelism(parallelism);
jobGraph.addVertex(task);
ActorGateway jobManger = null;
ActorGateway taskManager = null;
//
// 1) Consume all buffers at first (no buffers for the test task)
//
testBufferPool = networkBufferPool.createBufferPool(1, false);
final List<Buffer> buffers = new ArrayList<>();
while (true) {
Buffer buffer = testBufferPool.requestBuffer();
if (buffer != null) {
buffers.add(buffer);
} else {
break;
}
}
try {
jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
testActorSystem, jobManger, config, true, true);
final ActorGateway jm = jobManger;
new Within(deadline) {
@Override
protected void run() {
try {
ActorGateway testActor = new AkkaActorGateway(getTestActor(), null);
// Submit the job and wait until it is running
JobClient.submitJobDetached(
jm,
jobGraph,
deadline,
ClassLoader.getSystemClassLoader());
jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
// Get the ExecutionGraph
jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
ExecutionGraphFound executionGraphResponse =
expectMsgClass(ExecutionGraphFound.class);
ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
testActorSystem, 60000);
// Verify back pressure (clean up interval can be ignored)
BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
coordinator,
100 * 1000,
20,
new FiniteDuration(10, TimeUnit.MILLISECONDS));
int numAttempts = 10;
int nextSampleId = 0;
// Verify that all tasks are back pressured. This
// can fail if the task takes longer to request
// the buffer.
for (int attempt = 0; attempt < numAttempts; attempt++) {
try {
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
assertEquals(nextSampleId + attempt, stats.getSampleId());
assertEquals(parallelism, stats.getNumberOfSubTasks());
assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
for (int i = 0; i < parallelism; i++) {
assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
}
nextSampleId = stats.getSampleId() + 1;
break;
} catch (Throwable t) {
if (attempt == numAttempts - 1) {
throw t;
} else {
Thread.sleep(500);
}
}
}
//
// 2) Release all buffers and let the tasks grab one
//
for (Buffer buf : buffers) {
buf.recycle();
}
// Wait for all buffers to be available. The tasks
// grab them and then immediately release them.
while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
Thread.sleep(100);
}
// Verify that no task is back pressured any more.
for (int attempt = 0; attempt < numAttempts; attempt++) {
try {
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
assertEquals(nextSampleId + attempt, stats.getSampleId());
assertEquals(parallelism, stats.getNumberOfSubTasks());
// Verify that no task is back pressured
for (int i = 0; i < parallelism; i++) {
assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
}
break;
} catch (Throwable t) {
if (attempt == numAttempts - 1) {
throw t;
} else {
Thread.sleep(500);
}
}
}
// Shut down
jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
// Cancel job
jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
// Response to removal notification
expectMsgEquals(true);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
} finally {
TestingUtils.stopActor(jobManger);
TestingUtils.stopActor(taskManager);
for (Buffer buf : buffers) {
buf.recycle();
}
testBufferPool.lazyDestroy();
}
}};
}
/**
* Triggers a new stats sample.
*/
private OperatorBackPressureStats triggerStatsSample(
BackPressureStatsTracker statsTracker,
ExecutionJobVertex vertex) throws InterruptedException {
statsTracker.invalidateOperatorStatsCache();
statsTracker.triggerStackTraceSample(vertex);
// Sleep minimum duration
Thread.sleep(20 * 10);
Option<OperatorBackPressureStats> stats;
// Get the stats
while ((stats = statsTracker.getOperatorBackPressureStats(vertex)).isEmpty()) {
Thread.sleep(10);
}
return stats.get();
}
/**
* A back pressured producer sharing a {@link BufferPool} with the
* test driver.
*/
public static class BackPressuredTask extends AbstractInvokable {
@Override
public void invoke() throws Exception {
while (true) {
Buffer buffer = testBufferPool.requestBufferBlocking();
// Got a buffer, yay!
buffer.recycle();
new CountDownLatch(1).await();
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.Test;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class BackPressureStatsTrackerTest {
/** Tests simple statistics with fake stack traces. */
@Test
@SuppressWarnings("unchecked")
public void testTriggerStackTraceSample() throws Exception {
Promise<StackTraceSample> samplePromise = new Promise.DefaultPromise<>();
StackTraceSampleCoordinator sampleCoordinator = mock(StackTraceSampleCoordinator.class);
when(sampleCoordinator.triggerStackTraceSample(
any(ExecutionVertex[].class),
anyInt(),
any(FiniteDuration.class),
anyInt())).thenReturn(samplePromise.future());
ExecutionGraph graph = mock(ExecutionGraph.class);
// Same Thread execution context
when(graph.getExecutionContext()).thenReturn(new ExecutionContext() {
@Override
public void execute(Runnable runnable) {
runnable.run();
}
@Override
public void reportFailure(Throwable t) {
fail();
}
@Override
public ExecutionContext prepare() {
return this;
}
});
ExecutionVertex[] taskVertices = new ExecutionVertex[4];
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getJobId()).thenReturn(new JobID());
when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
when(jobVertex.getGraph()).thenReturn(graph);
when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
taskVertices[0] = mockExecutionVertex(jobVertex, 0);
taskVertices[1] = mockExecutionVertex(jobVertex, 1);
taskVertices[2] = mockExecutionVertex(jobVertex, 2);
taskVertices[3] = mockExecutionVertex(jobVertex, 3);
int numSamples = 100;
FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS);
BackPressureStatsTracker tracker = new BackPressureStatsTracker(
sampleCoordinator, 9999, numSamples, delayBetweenSamples);
// Trigger
tracker.triggerStackTraceSample(jobVertex);
verify(sampleCoordinator).triggerStackTraceSample(
eq(taskVertices),
eq(numSamples),
eq(delayBetweenSamples),
eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
// Trigger again for pending request, should not fire
tracker.triggerStackTraceSample(jobVertex);
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
verify(sampleCoordinator).triggerStackTraceSample(
eq(taskVertices),
eq(numSamples),
eq(delayBetweenSamples),
eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
// Complete the future
Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>();
for (ExecutionVertex vertex : taskVertices) {
List<StackTraceElement[]> taskTraces = new ArrayList<>();
for (int i = 0; i < taskVertices.length; i++) {
// Traces until sub task index are back pressured
taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex()));
}
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
}
int sampleId = 1231;
int endTime = 841;
StackTraceSample sample = new StackTraceSample(
sampleId,
0,
endTime,
traces);
// Succeed the promise
samplePromise.success(sample);
assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isDefined());
OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get();
// Verify the stats
assertEquals(sampleId, stats.getSampleId());
assertEquals(endTime, stats.getEndTimestamp());
assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
for (int i = 0; i < taskVertices.length; i++) {
double ratio = stats.getBackPressureRatio(i);
// Traces until sub task index are back pressured
assertEquals((i + 1) / ((double) 4), ratio, 0.0);
}
}
private StackTraceElement[] createStackTrace(boolean isBackPressure) {
if (isBackPressure) {
return new StackTraceElement[] { new StackTraceElement(
BackPressureStatsTracker.EXPECTED_CLASS_NAME,
BackPressureStatsTracker.EXPECTED_METHOD_NAME,
"LocalBufferPool.java",
133) };
} else {
return Thread.currentThread().getStackTrace();
}
}
private ExecutionVertex mockExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex) {
Execution exec = mock(Execution.class);
when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
JobVertexID id = jobVertex.getJobVertexId();
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getJobvertexId()).thenReturn(id);
when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
return vertex;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for the {@link StackTraceSampleCoordinator}.
*/
public class StackTraceSampleCoordinatorTest {
private static ActorSystem system;
private StackTraceSampleCoordinator coord;
@BeforeClass
public static void setUp() throws Exception {
system = AkkaUtils.createLocalActorSystem(new Configuration());
}
@AfterClass
public static void tearDown() throws Exception {
if (system != null) {
system.shutdown();
}
}
@Before
public void init() throws Exception {
this.coord = new StackTraceSampleCoordinator(system, 60000);
}
/** Tests simple trigger and collect of stack trace samples. */
@Test
public void testTriggerStackTraceSample() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)
};
int numSamples = 1;
FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS);
int maxStackTraceDepth = 0;
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices, numSamples, delayBetweenSamples, maxStackTraceDepth);
// Verify messages have been sent
for (ExecutionVertex vertex : vertices) {
ExecutionAttemptID expectedExecutionId = vertex
.getCurrentExecutionAttempt().getAttemptId();
TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample(
0,
expectedExecutionId,
numSamples,
delayBetweenSamples,
maxStackTraceDepth);
verify(vertex).sendMessageToCurrentExecution(
eq(expectedMsg), eq(expectedExecutionId), any(AkkaActorGateway.class));
}
assertFalse(sampleFuture.isCompleted());
StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace();
List<StackTraceElement[]> traces = new ArrayList<>();
traces.add(stackTraceSample);
traces.add(stackTraceSample);
traces.add(stackTraceSample);
// Collect stack traces
for (int i = 0; i < vertices.length; i++) {
ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId();
coord.collectStackTraces(0, executionId, traces);
if (i == vertices.length - 1) {
assertTrue(sampleFuture.isCompleted());
} else {
assertFalse(sampleFuture.isCompleted());
}
}
// Verify completed stack trace sample
StackTraceSample sample = sampleFuture.value().get().get();
assertEquals(0, sample.getSampleId());
assertTrue(sample.getEndTime() >= sample.getStartTime());
Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask = sample.getStackTraces();
for (ExecutionVertex vertex : vertices) {
ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId();
List<StackTraceElement[]> sampleTraces = tracesByTask.get(executionId);
assertNotNull("Task not found", sampleTraces);
assertTrue(traces.equals(sampleTraces));
}
// Verify no more pending sample
assertEquals(0, coord.getNumberOfPendingSamples());
// Verify no error on late collect
coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
}
/** Tests triggering for non-running tasks fails the future. */
@Test
public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)
};
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices,
1,
new FiniteDuration(100, TimeUnit.MILLISECONDS),
0);
assertTrue(sampleFuture.isCompleted());
assertTrue(sampleFuture.failed().isCompleted());
assertTrue(sampleFuture.failed().value().get().get() instanceof IllegalStateException);
}
/** Tests triggering for reset tasks fails the future. */
@Test
public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
// Fails to send the message to the execution (happens when execution is reset)
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)
};
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices,
1,
new FiniteDuration(100, TimeUnit.MILLISECONDS),
0);
assertTrue(sampleFuture.isCompleted());
assertTrue(sampleFuture.failed().isCompleted());
assertTrue(sampleFuture.failed().value().get().get().getCause() instanceof RuntimeException);
}
/** Tests that samples time out if they don't finish in time. */
@Test
public void testTriggerStackTraceSampleTimeout() throws Exception {
int timeout = 100;
coord = new StackTraceSampleCoordinator(system, timeout);
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
};
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
// Wait for the timeout
Thread.sleep(timeout * 2);
boolean success = false;
for (int i = 0; i < 10; i++) {
if (sampleFuture.isCompleted()) {
success = true;
break;
}
Thread.sleep(timeout);
}
assertTrue("Sample did not time out", success);
Throwable cause = sampleFuture.failed().value().get().get();
assertTrue(cause.getCause().getMessage().contains("Time out"));
// Collect after the timeout
try {
ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
}
}
/** Tests that collecting an unknown sample fails. */
@Test(expected = IllegalStateException.class)
public void testCollectStackTraceForUnknownSample() throws Exception {
coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
}
/** Tests cancelling of a pending sample. */
@Test
public void testCancelStackTraceSample() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
};
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
assertFalse(sampleFuture.isCompleted());
// Cancel
coord.cancelStackTraceSample(0, null);
// Verify completed
assertTrue(sampleFuture.isCompleted());
// Verify no more pending samples
assertEquals(0, coord.getNumberOfPendingSamples());
}
/** Tests that collecting for a cancelled sample throws no Exception. */
@Test
public void testCollectStackTraceForCanceledSample() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
};
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
assertFalse(sampleFuture.isCompleted());
coord.cancelStackTraceSample(0, null);
assertTrue(sampleFuture.isCompleted());
// Verify no error on late collect
ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
}
/** Tests that collecting for a cancelled sample throws no Exception. */
@Test
public void testCollectForDiscardedPendingSample() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
};
Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
assertFalse(sampleFuture.isCompleted());
coord.cancelStackTraceSample(0, null);
assertTrue(sampleFuture.isCompleted());
// Verify no error on late collect
ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
}
/** Tests that collecting for a unknown task fails. */
@Test(expected = IllegalArgumentException.class)
public void testCollectStackTraceForUnknownTask() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
};
coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
}
/** Tests that shut down fails all pending samples and future sample triggers. */
@Test
public void testShutDown() throws Exception {
ExecutionVertex[] vertices = new ExecutionVertex[] {
mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
};
List<Future<StackTraceSample>> sampleFutures = new ArrayList<>();
// Trigger
sampleFutures.add(coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0));
sampleFutures.add(coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0));
for (Future<StackTraceSample> future : sampleFutures) {
assertFalse(future.isCompleted());
}
// Shut down
coord.shutDown();
// Verify all completed
for (Future<StackTraceSample> future : sampleFutures) {
assertTrue(future.isCompleted());
}
// Verify new trigger returns failed future
Future<StackTraceSample> future = coord.triggerStackTraceSample(
vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
assertTrue(future.isCompleted());
assertTrue(future.failed().isCompleted());
}
// ------------------------------------------------------------------------
private ExecutionVertex mockExecutionVertex(
ExecutionAttemptID executionId,
ExecutionState state,
boolean sendSuccess) {
Execution exec = mock(Execution.class);
when(exec.getAttemptId()).thenReturn(executionId);
when(exec.getState()).thenReturn(state);
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
when(vertex.sendMessageToCurrentExecution(
any(Serializable.class), any(ExecutionAttemptID.class), any(AkkaActorGateway.class)))
.thenReturn(sendSuccess);
return vertex;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
import org.junit.Test;
import scala.Option;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for back pressure handler responses.
*/
public class JobVertexBackPressureHandlerTest {
/** Tests the response when no stats are available */
@Test
public void testResponseNoStatsAvailable() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
.thenReturn(Option.<OperatorBackPressureStats>empty());
JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
mock(ExecutionGraphHolder.class),
statsTracker,
9999);
String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
// Single element
assertEquals(1, rootNode.size());
// Status
JsonNode status = rootNode.get("status");
assertNotNull(status);
assertEquals("deprecated", status.textValue());
verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
}
/** Tests the response when stats are available */
@Test
public void testResponseStatsAvailable() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
OperatorBackPressureStats stats = new OperatorBackPressureStats(
0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
.thenReturn(Option.apply(stats));
JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
mock(ExecutionGraphHolder.class),
statsTracker,
9999);
String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
// Single element
assertEquals(4, rootNode.size());
// Status
JsonNode status = rootNode.get("status");
assertNotNull(status);
assertEquals("ok", status.textValue());
// Back pressure level
JsonNode backPressureLevel = rootNode.get("backpressure-level");
assertNotNull(backPressureLevel);
assertEquals("high", backPressureLevel.textValue());
// End time stamp
JsonNode endTimeStamp = rootNode.get("end-timestamp");
assertNotNull(endTimeStamp);
assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
// Subtasks
JsonNode subTasks = rootNode.get("subtasks");
assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
for (int i = 0; i < subTasks.size(); i++) {
JsonNode subTask = subTasks.get(i);
JsonNode index = subTask.get("subtask");
assertEquals(i, index.intValue());
JsonNode level = subTask.get("backpressure-level");
assertEquals(JobVertexBackPressureHandler
.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
JsonNode ratio = subTask.get("ratio");
assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
}
// Verify not triggered
verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class));
}
/** Tests that after the refresh interval another sample is triggered. */
@Test
public void testResponsePassedRefreshInterval() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class);
OperatorBackPressureStats stats = new OperatorBackPressureStats(
0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 });
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
.thenReturn(Option.apply(stats));
JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
mock(ExecutionGraphHolder.class),
statsTracker,
0); // <----- refresh interval should fire immediately
String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
// Single element
assertEquals(4, rootNode.size());
// Status
JsonNode status = rootNode.get("status");
assertNotNull(status);
// Interval passed, hence deprecated
assertEquals("deprecated", status.textValue());
// Back pressure level
JsonNode backPressureLevel = rootNode.get("backpressure-level");
assertNotNull(backPressureLevel);
assertEquals("high", backPressureLevel.textValue());
// End time stamp
JsonNode endTimeStamp = rootNode.get("end-timestamp");
assertNotNull(endTimeStamp);
assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue());
// Subtasks
JsonNode subTasks = rootNode.get("subtasks");
assertEquals(stats.getNumberOfSubTasks(), subTasks.size());
for (int i = 0; i < subTasks.size(); i++) {
JsonNode subTask = subTasks.get(i);
JsonNode index = subTask.get("subtask");
assertEquals(i, index.intValue());
JsonNode level = subTask.get("backpressure-level");
assertEquals(JobVertexBackPressureHandler
.getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue());
JsonNode ratio = subTask.get("ratio");
assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0);
}
// Verify triggered
verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class));
}
}
......@@ -1089,8 +1089,7 @@ public class ExecutionGraph implements Serializable {
// We don't clean the checkpoint stats tracker, because we want
// it to be available after the job has terminated.
}
catch (Exception e) {
} catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
}
......@@ -1100,8 +1099,7 @@ public class ExecutionGraph implements Serializable {
if (coord != null) {
coord.shutdown();
}
}
catch (Exception e) {
} catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
}
}
......@@ -1231,7 +1229,6 @@ public class ExecutionGraph implements Serializable {
}
}
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListenerActors.size() > 0) {
ExecutionGraphMessages.JobStatusChanged message =
......
......@@ -471,7 +471,17 @@ public class ExecutionVertex implements Serializable {
this.currentExecution.fail(t);
}
public void sendMessageToCurrentExecution(Serializable message, ExecutionAttemptID attemptID) {
public boolean sendMessageToCurrentExecution(
Serializable message,
ExecutionAttemptID attemptID) {
return sendMessageToCurrentExecution(message, attemptID, null);
}
public boolean sendMessageToCurrentExecution(
Serializable message,
ExecutionAttemptID attemptID,
ActorGateway sender) {
Execution exec = getCurrentExecutionAttempt();
// check that this is for the correct execution attempt
......@@ -482,16 +492,26 @@ public class ExecutionVertex implements Serializable {
if (slot != null) {
ActorGateway gateway = slot.getInstance().getActorGateway();
if (gateway != null) {
gateway.tell(message);
if (sender == null) {
gateway.tell(message);
} else {
gateway.tell(message, sender);
}
return true;
} else {
return false;
}
}
else {
LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID);
return false;
}
}
else {
LOG.debug("Skipping message to {}/{} because it does not match the current execution",
getSimpleName(), attemptID);
return false;
}
}
......
......@@ -110,7 +110,7 @@ class JobManager(
protected val leaderElectionService: LeaderElectionService,
protected val submittedJobGraphs : SubmittedJobGraphStore,
protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
extends FlinkActor
extends FlinkActor
with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
with LogMessages // mixin order is important, we want first logging
with LeaderContender
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages
import akka.actor.ActorRef
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import scala.concurrent.duration.FiniteDuration
/**
* A set of messages exchanged with task manager instances in order to sample
* the stack traces of running tasks.
*/
object StackTraceSampleMessages {
trait StackTraceSampleMessages
/**
* Triggers the sampling of a running task (sent by the job manager to the
* task managers).
*
* @param sampleId ID of this sample.
* @param executionId ID of the task to sample.
* @param numSamples Number of stack trace samples to collect.
* @param delayBetweenSamples Delay between consecutive samples.
* @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
* no maximum and collects the complete stack
* trace.
*/
case class TriggerStackTraceSample(
sampleId: Int,
executionId: ExecutionAttemptID,
numSamples: Int,
delayBetweenSamples: FiniteDuration,
maxStackTraceDepth: Int = 0)
extends StackTraceSampleMessages with java.io.Serializable
/**
* Response after a successful stack trace sample (sent by the task managers
* to the job manager).
*
* @param sampleId ID of the this sample.
* @param executionId ID of the sampled task.
* @param samples Stack trace samples (head is most recent sample).
*/
case class ResponseStackTraceSampleSuccess(
sampleId: Int,
executionId: ExecutionAttemptID,
samples: java.util.List[Array[StackTraceElement]])
extends StackTraceSampleMessages
/**
* Response after a failed stack trace sample (sent by the task managers to
* the job manager).
*
* @param sampleId ID of the this sample.
* @param executionId ID of the sampled task.
* @param cause Failure cause.
*/
case class ResponseStackTraceSampleFailure(
sampleId: Int,
executionId: ExecutionAttemptID,
cause: Exception)
extends StackTraceSampleMessages
/**
* Task manager internal sample message.
*
* @param sampleId ID of the this sample.
* @param executionId ID of the task to sample.
* @param delayBetweenSamples Delay between consecutive samples.
* @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
* no maximum and collects the complete stack
* trace.
* @param numRemainingSamples Number of remaining samples before this
* sample is finished.
* @param currentTraces The current list of gathered stack traces.
* @param sender Actor triggering this sample (receiver of result).
*/
case class SampleTaskStackTrace(
sampleId: Int,
executionId: ExecutionAttemptID,
delayBetweenSamples: FiniteDuration,
maxStackTraceDepth: Int,
numRemainingSamples: Int,
currentTraces: java.util.List[Array[StackTraceElement]],
sender: ActorRef)
extends StackTraceSampleMessages
}
......@@ -19,34 +19,29 @@
package org.apache.flink.runtime.taskmanager
import java.io.{File, IOException}
import java.lang.management.{ManagementFactory, OperatingSystemMXBean}
import java.lang.reflect.Method
import java.net.{InetAddress, InetSocketAddress}
import java.util
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.lang.reflect.Method
import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
import _root_.akka.actor._
import _root_.akka.pattern.ask
import _root_.akka.util.Timeout
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.codahale.metrics.json.MetricsModule
import com.codahale.metrics.jvm.{BufferPoolMetricSet, MemoryUsageGaugeSet, GarbageCollectorMetricSet}
import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet}
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
import org.apache.flink.configuration._
import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.blob.{BlobCache, BlobService}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.filecache.FileCache
......@@ -56,24 +51,28 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.StackTraceSampleMessages.{SampleTaskStackTrace, ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, StackTraceSampleMessages, TriggerStackTraceSample}
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.messages.TaskMessages._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.util.{SignalHandler, LeaderRetrievalUtils, MathUtils, EnvironmentInformation}
import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils, MathUtils, SignalHandler}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
import org.apache.flink.util.NetUtils
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success}
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.language.postfixOps
import scala.util.{Failure, Success}
/**
* The TaskManager is responsible for executing the individual tasks of a Flink job. It is
......@@ -273,6 +272,9 @@ class TaskManager(
// registration messages for connecting and disconnecting from / to the JobManager
case message: RegistrationMessage => handleRegistrationMessage(message)
// task sampling messages
case message: StackTraceSampleMessages => handleStackTraceSampleMessage(message)
// ----- miscellaneous messages ----
// periodic heart beats that transport metrics
......@@ -640,6 +642,120 @@ class TaskManager(
}
}
private def handleStackTraceSampleMessage(message: StackTraceSampleMessages): Unit = {
message match {
// Triggers the sampling of a task
case TriggerStackTraceSample(
sampleId,
executionId,
numSamples,
delayBetweenSamples,
maxStackTraceDepth) =>
log.debug(s"Triggering stack trace sample $sampleId.")
val senderRef = sender()
self ! SampleTaskStackTrace(
sampleId,
executionId,
delayBetweenSamples,
maxStackTraceDepth,
numSamples,
new java.util.ArrayList(),
senderRef)
// Repeatedly sent to self to sample a task
case SampleTaskStackTrace(
sampleId,
executionId,
delayBetweenSamples,
maxStackTraceDepth,
remainingNumSamples,
currentTraces,
sender) =>
try {
if (remainingNumSamples >= 1) {
getStackTrace(executionId, maxStackTraceDepth) match {
case Some(stackTrace) =>
currentTraces.add(stackTrace)
if (remainingNumSamples > 1) {
// ---- Continue ----
val msg = SampleTaskStackTrace(
sampleId,
executionId,
delayBetweenSamples,
maxStackTraceDepth,
remainingNumSamples - 1,
currentTraces,
sender)
context.system.scheduler.scheduleOnce(
delayBetweenSamples,
self,
msg)(context.dispatcher)
} else {
// ---- Done ----
log.debug(s"Done with stack trace sample $sampleId.")
sender ! ResponseStackTraceSampleSuccess(
sampleId,
executionId,
currentTraces)
}
case None =>
if (currentTraces.size() == 0) {
throw new IllegalStateException(s"Cannot sample task $executionId. " +
s"Either the task is not known to the task manager or it is not running.")
} else {
throw new IllegalStateException(s"Cannot sample task $executionId. " +
s"Task was removed after ${currentTraces.size()} sample(s).")
}
}
} else {
throw new IllegalStateException("Non-positive number of remaining samples")
}
} catch {
case e: Exception =>
sender ! ResponseStackTraceSampleFailure(sampleId, executionId, e)
}
case _ => unhandled(message)
}
/**
* Returns a stack trace of a running task.
*
* @param executionId ID of the running task.
* @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
* no maximum and collects the complete stack
* trace.
* @return Stack trace of the running task.
*/
def getStackTrace(
executionId: ExecutionAttemptID,
maxStackTraceDepth: Int): Option[Array[StackTraceElement]] = {
val task = runningTasks.get(executionId)
if (task != null && task.getExecutionState == ExecutionState.RUNNING) {
val stackTrace : Array[StackTraceElement] = task.getExecutingThread.getStackTrace
if (maxStackTraceDepth > 0) {
Option(util.Arrays.copyOfRange(stackTrace, 0, maxStackTraceDepth.min(stackTrace.length)))
} else {
Option(stackTrace)
}
} else {
Option.empty
}
}
}
// --------------------------------------------------------------------------
// Task Manager / JobManager association and initialization
......
......@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
......@@ -52,6 +51,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
......@@ -61,14 +63,12 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
......@@ -77,6 +77,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
......@@ -89,10 +90,11 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartit
import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class TaskManagerTest {
public class TaskManagerTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class);
......@@ -118,12 +120,6 @@ public class TaskManagerTest {
@Test
public void testSubmitAndExecuteTask() {
LOG.info( "--------------------------------------------------------------------\n" +
" Starting testSubmitAndExecuteTask() \n" +
"--------------------------------------------------------------------");
new JavaTestKit(system){{
ActorGateway taskManager = null;
......@@ -233,11 +229,6 @@ public class TaskManagerTest {
@Test
public void testJobSubmissionAndCanceling() {
LOG.info( "--------------------------------------------------------------------\n" +
" Starting testJobSubmissionAndCanceling() \n" +
"--------------------------------------------------------------------");
new JavaTestKit(system){{
ActorGateway jobManager = null;
......@@ -370,11 +361,6 @@ public class TaskManagerTest {
@Test
public void testGateChannelEdgeMismatch() {
LOG.info( "--------------------------------------------------------------------\n" +
" Starting testGateChannelEdgeMismatch() \n" +
"--------------------------------------------------------------------");
new JavaTestKit(system){{
ActorGateway jobManager = null;
......@@ -462,11 +448,6 @@ public class TaskManagerTest {
@Test
public void testRunJobWithForwardChannel() {
LOG.info( "--------------------------------------------------------------------\n" +
" Starting testRunJobWithForwardChannel() \n" +
"--------------------------------------------------------------------");
new JavaTestKit(system){{
ActorGateway jobManager = null;
......@@ -596,11 +577,6 @@ public class TaskManagerTest {
@Test
public void testCancellingDependentAndStateUpdateFails() {
LOG.info( "--------------------------------------------------------------------\n" +
" Starting testCancellingDependentAndStateUpdateFails() \n" +
"--------------------------------------------------------------------");
// this tests creates two tasks. the sender sends data, and fails to send the
// state update back to the job manager
// the second one blocks to be canceled
......@@ -929,6 +905,283 @@ public class TaskManagerTest {
}};
}
// ------------------------------------------------------------------------
// Stack trace sample
// ------------------------------------------------------------------------
/**
* Tests sampling of task stack traces.
*/
@Test
@SuppressWarnings("unchecked")
public void testTriggerStackTraceSampleMessage() throws Exception {
new JavaTestKit(system) {{
ActorGateway taskManagerActorGateway = null;
ActorGateway jobManagerActorGateway = TestingUtils.createForwardingJobManager(
system,
getTestActor(),
Option.<String>empty());
final ActorGateway testActorGateway = new AkkaActorGateway(
getTestActor(),
leaderSessionID);
try {
final ActorGateway jobManager = jobManagerActorGateway;
final ActorGateway taskManager = TestingUtils.createTaskManager(
system,
jobManager,
new Configuration(),
true,
false);
// Registration
new Within(d) {
@Override
protected void run() {
expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
assertEquals(taskManager.actor(), getLastSender());
taskManager.tell(new RegistrationMessages.AcknowledgeRegistration(
new InstanceID(), 12345), jobManager);
}
};
// Single blocking task
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
new ApplicationID(),
new JobID(),
new JobVertexID(),
new ExecutionAttemptID(),
"Task",
0,
1,
0,
new Configuration(),
new Configuration(),
Tasks.BlockingNoOpInvokable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
0);
// Submit the task
new Within(d) {
@Override
protected void run() {
try {
Future<Object> taskRunningFuture = taskManager.ask(
new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(
tdd.getExecutionId()), timeout);
taskManager.tell(new SubmitTask(tdd));
Await.ready(taskRunningFuture, d);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
//
// 1) Trigger sample for non-existing task
//
new Within(d) {
@Override
protected void run() {
try {
ExecutionAttemptID taskId = new ExecutionAttemptID();
taskManager.tell(new TriggerStackTraceSample(
112223,
taskId,
100,
d,
0),
testActorGateway);
// Receive the expected message (heartbeat races possible)
Object[] msg = receiveN(1);
while (!(msg[0] instanceof ResponseStackTraceSampleFailure)) {
msg = receiveN(1);
}
ResponseStackTraceSampleFailure response = (ResponseStackTraceSampleFailure) msg[0];
assertEquals(112223, response.sampleId());
assertEquals(taskId, response.executionId());
assertEquals(IllegalStateException.class, response.cause().getClass());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
//
// 2) Trigger sample for the blocking task
//
new Within(d) {
@Override
protected void run() {
boolean success = false;
Throwable lastError = null;
for (int i = 0; i < 100 && !success; i++) {
try {
int numSamples = 5;
taskManager.tell(new TriggerStackTraceSample(
19230,
tdd.getExecutionId(),
numSamples,
new FiniteDuration(100, TimeUnit.MILLISECONDS),
0),
testActorGateway);
// Receive the expected message (heartbeat races possible)
Object[] msg = receiveN(1);
while (!(msg[0] instanceof ResponseStackTraceSampleSuccess)) {
msg = receiveN(1);
}
ResponseStackTraceSampleSuccess response = (ResponseStackTraceSampleSuccess) msg[0];
// ---- Verify response ----
assertEquals(19230, response.sampleId());
assertEquals(tdd.getExecutionId(), response.executionId());
List<StackTraceElement[]> traces = response.samples();
assertEquals("Number of samples", numSamples, traces.size());
for (StackTraceElement[] trace : traces) {
// Look for BlockingNoOpInvokable#invoke
for (StackTraceElement elem : trace) {
if (elem.getClassName().equals(
Tasks.BlockingNoOpInvokable.class.getName())) {
assertEquals("invoke", elem.getMethodName());
success = true;
break;
}
}
assertTrue("Unexpected stack trace: " +
Arrays.toString(trace), success);
}
} catch (Throwable t) {
lastError = t;
LOG.warn("Failed to find invokable.", t);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Interrupted while sleeping before retry.", e);
break;
}
}
if (!success) {
if (lastError == null) {
fail("Failed to find invokable");
} else {
fail(lastError.getMessage());
}
}
}
};
//
// 4) Trigger sample for the blocking task with max depth
//
new Within(d) {
@Override
protected void run() {
try {
int numSamples = 5;
int maxDepth = 2;
taskManager.tell(new TriggerStackTraceSample(
1337,
tdd.getExecutionId(),
numSamples,
new FiniteDuration(100, TimeUnit.MILLISECONDS),
maxDepth),
testActorGateway);
// Receive the expected message (heartbeat races possible)
Object[] msg = receiveN(1);
while (!(msg[0] instanceof ResponseStackTraceSampleSuccess)) {
msg = receiveN(1);
}
ResponseStackTraceSampleSuccess response = (ResponseStackTraceSampleSuccess) msg[0];
// ---- Verify response ----
assertEquals(1337, response.sampleId());
assertEquals(tdd.getExecutionId(), response.executionId());
List<StackTraceElement[]> traces = response.samples();
assertEquals("Number of samples", numSamples, traces.size());
for (StackTraceElement[] trace : traces) {
assertEquals("Max depth", maxDepth, trace.length);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
//
// 5) Trigger sample for the blocking task, but cancel it during sampling
//
new Within(d) {
@Override
protected void run() {
try {
// Trigger many samples in order to cancel the task
// during a sample
taskManager.tell(new TriggerStackTraceSample(
0,
tdd.getExecutionId(),
10000,
new FiniteDuration(100, TimeUnit.MILLISECONDS),
0),
testActorGateway);
// Cancel the task
taskManager.tell(new CancelTask(tdd.getExecutionId()));
// Receive the expected message (heartbeat races possible)
Object[] msg = receiveN(1);
while (!(msg[0] instanceof ResponseStackTraceSampleFailure)) {
msg = receiveN(1);
}
ResponseStackTraceSampleFailure response = (ResponseStackTraceSampleFailure) msg[0];
assertEquals(tdd.getExecutionId(), response.executionId());
assertEquals(IllegalStateException.class, response.cause().getClass());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
} finally {
TestingUtils.stopActor(taskManagerActorGateway);
TestingUtils.stopActor(jobManagerActorGateway);
}
}};
}
// --------------------------------------------------------------------------------------------
public static class SimpleJobManager extends FlinkUntypedActor {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册