diff --git a/docs/setup/config.md b/docs/setup/config.md index 91297a4f172fd8dc9321e4ea487e5d0affc3d218..0eb3acb4dde6fd588b6d89125bf6be9c69fa6349 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -165,6 +165,10 @@ The following parameters configure Flink's JobManager and TaskManagers. - `jobmanager.web.history`: The number of latest jobs that the JobManager's web front-end in its history (DEFAULT: 5). - `jobmanager.web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`). - `jobmanager.web.checkpoints.history`: Number of checkpoint statistics to remember (DEFAULT: `10`). +- `jobmanager.web.backpressure.cleanup-interval`: Time after which cached stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins). +- `jobmanager.web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min). +- `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to take to determine back pressure (DEFAULT: `100`). +- `jobmanager.web.backpressure.delay-between-samples`: Delay between stack trace samples to determine back pressure (DEFAULT: `50`, 50 ms). ### Webclient diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index aba3540eaee9be02e7d03c3b7011daf584749385..2b756449d53d4510df8401c7b2d947050fafb56a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -357,7 +357,18 @@ public final class ConfigConstants { /** Config parameter defining the number of checkpoints to remember for recent history. */ public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history"; - + + /** Time after which cached stats are cleaned up if not accessed. */ + public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval"; + + /** Time after which available stats are deprecated and need to be refreshed (by resampling). */ + public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval"; + + /** Number of stack trace samples to take to determine back pressure. */ + public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples"; + + /** Delay between stack trace samples to determine back pressure. */ + public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples"; // ------------------------------ AKKA ------------------------------------ @@ -693,6 +704,18 @@ public final class ConfigConstants { /** Default number of checkpoints to remember for recent history. */ public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10; + /** Time after which cached stats are cleaned up. */ + public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000; + + /** Time after which available stats are deprecated and need to be refreshed (by resampling). */ + public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000; + + /** Number of samples to take to determine back pressure. */ + public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100; + + /** Delay between samples to determine back pressure. */ + public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50; + // ------------------------------ Akka Values ------------------------------ public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s"; diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index 7a6147beb6e44ecefdff8ec280a3a93922e5dbad..1a19fb185f800fb74099f4280984d1cf0b5d6862 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -134,6 +134,12 @@ under the License. ${curator.version} test + + + com.typesafe.akka + akka-testkit_${scala.binary.version} + test + diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java new file mode 100644 index 0000000000000000000000000000000000000000..ff0573a83d562980445e01c8a5034d5763da91cc --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import akka.dispatch.OnComplete; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Maps; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Back pressure statistics tracker. + * + *

Back pressure is determined by sampling running tasks. If a task is + * slowed down by back pressure it will be stuck in memory requests to a + * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}. + * + *

The back pressured stack traces look like this: + * + *

+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
+ * request
+ * [...]
+ * 
+ */ +public class BackPressureStatsTracker { + + private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class); + + /** Maximum stack trace depth for samples. */ + static final int MAX_STACK_TRACE_DEPTH = 3; + + /** Expected class name for back pressure indicating stack trace element. */ + static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool"; + + /** Expected method name for back pressure indicating stack trace element. */ + static final String EXPECTED_METHOD_NAME = "requestBufferBlocking"; + + /** Lock guarding trigger operations. */ + private final Object lock = new Object(); + + /* Stack trace sample coordinator. */ + private final StackTraceSampleCoordinator coordinator; + + /** + * Completed stats. Important: Job vertex IDs need to be scoped by job ID, + * because they are potentially constant across runs messing up the cached + * data. + */ + private final Cache operatorStatsCache; + + /** Pending in progress stats. Important: Job vertex IDs need to be scoped + * by job ID, because they are potentially constant across runs messing up + * the cached data.*/ + private final Set pendingStats = new HashSet<>(); + + /** Cleanup interval for completed stats cache. */ + private final int cleanUpInterval; + + private final int numSamples; + + private final FiniteDuration delayBetweenSamples; + + /** Flag indicating whether the stats tracker has been shut down. */ + private boolean shutDown; + + /** + * Creates a back pressure statistics tracker. + * + * @param cleanUpInterval Clean up interval for completed stats. + * @param numSamples Number of stack trace samples when determining back pressure. + * @param delayBetweenSamples Delay between samples when determining back pressure. + */ + public BackPressureStatsTracker( + StackTraceSampleCoordinator coordinator, + int cleanUpInterval, + int numSamples, + FiniteDuration delayBetweenSamples) { + + this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator"); + + checkArgument(cleanUpInterval >= 0, "Clean up interval"); + this.cleanUpInterval = cleanUpInterval; + + checkArgument(numSamples >= 1, "Number of samples"); + this.numSamples = numSamples; + + this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples"); + + this.operatorStatsCache = CacheBuilder.newBuilder() + .concurrencyLevel(1) + .expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS) + .build(); + } + + /** Cleanup interval for completed stats cache. */ + public long getCleanUpInterval() { + return cleanUpInterval; + } + + /** + * Returns back pressure statistics for a operator. + * + * @param vertex Operator to get the stats for. + * + * @return Back pressure statistics for an operator + */ + public Option getOperatorBackPressureStats(ExecutionJobVertex vertex) { + return Option.apply(operatorStatsCache.getIfPresent(vertex)); + } + + /** + * Triggers a stack trace sample for a operator to gather the back pressure + * statistics. If there is a sample in progress for the operator, the call + * is ignored. + * + * @param vertex Operator to get the stats for. + */ + @SuppressWarnings("unchecked") + public void triggerStackTraceSample(ExecutionJobVertex vertex) { + synchronized (lock) { + if (shutDown) { + return; + } + + if (!pendingStats.contains(vertex)) { + pendingStats.add(vertex); + + Future sample = coordinator.triggerStackTraceSample( + vertex.getTaskVertices(), + numSamples, + delayBetweenSamples, + MAX_STACK_TRACE_DEPTH); + + sample.onComplete(new StackTraceSampleCompletionCallback( + vertex), vertex.getGraph().getExecutionContext()); + } + } + } + + /** + * Cleans up the operator stats cache if it contains timed out entries. + * + *

The Guava cache only evicts as maintenance during normal operations. + * If this handler is inactive, it will never be cleaned. + */ + public void cleanUpOperatorStatsCache() { + operatorStatsCache.cleanUp(); + } + + /** + * Shuts down the stats tracker. + * + *

Invalidates the cache and clears all pending stats. + */ + public void shutDown() { + synchronized (lock) { + if (!shutDown) { + operatorStatsCache.invalidateAll(); + pendingStats.clear(); + + shutDown = true; + } + } + } + + /** + * Invalidates the cache (irrespective of clean up interval). + */ + void invalidateOperatorStatsCache() { + operatorStatsCache.invalidateAll(); + } + + /** + * Callback on completed stack trace sample. + */ + class StackTraceSampleCompletionCallback extends OnComplete { + + private final ExecutionJobVertex vertex; + + public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) { + this.vertex = vertex; + } + + @Override + public void onComplete(Throwable failure, StackTraceSample success) throws Throwable { + synchronized (lock) { + try { + if (shutDown) { + return; + } + + if (success != null) { + OperatorBackPressureStats stats = createStatsFromSample(success); + operatorStatsCache.put(vertex, stats); + } else { + LOG.error("Failed to gather stack trace sample.", failure); + } + } catch (Throwable t) { + LOG.error("Error during stats completion.", t); + } finally { + pendingStats.remove(vertex); + } + } + } + + /** + * Creates the back pressure stats from a stack trace sample. + * + * @param sample Stack trace sample to base stats on. + * + * @return Back pressure stats + */ + private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) { + Map> traces = sample.getStackTraces(); + + // Map task ID to subtask index, because the web interface expects + // it like that. + Map subtaskIndexMap = Maps + .newHashMapWithExpectedSize(traces.size()); + + Set sampledTasks = sample.getStackTraces().keySet(); + + for (ExecutionVertex task : vertex.getTaskVertices()) { + ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId(); + if (sampledTasks.contains(taskId)) { + subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex()); + } else { + throw new RuntimeException("Outdated sample. A task, which is part of the " + + "sample has been reset."); + } + } + + // Ratio of blocked samples to total samples per sub task. Array + // position corresponds to sub task index. + double[] backPressureRatio = new double[traces.size()]; + + for (Entry> entry : traces.entrySet()) { + int backPressureSamples = 0; + + List taskTraces = entry.getValue(); + + for (StackTraceElement[] trace : taskTraces) { + for (int i = trace.length - 1; i >= 0; i--) { + StackTraceElement elem = trace[i]; + + if (elem.getClassName().equals(EXPECTED_CLASS_NAME) && + elem.getMethodName().equals(EXPECTED_METHOD_NAME)) { + + backPressureSamples++; + break; // Continue with next stack trace + } + } + } + + int subtaskIndex = subtaskIndexMap.get(entry.getKey()); + + int size = taskTraces.size(); + double ratio = (size > 0) + ? ((double) backPressureSamples) / size + : 0; + + backPressureRatio[subtaskIndex] = ratio; + } + + return new OperatorBackPressureStats( + sample.getSampleId(), + sample.getEndTime(), + backPressureRatio); + } + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java new file mode 100644 index 0000000000000000000000000000000000000000..cb262e32ea637b00a399fc46386c3dc285cb029e --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import java.util.Arrays; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Back pressure statistics of multiple tasks. + * + *

Statistics are gathered by sampling stack traces of running tasks. The + * back pressure ratio denotes the ratio of traces indicating back pressure + * to the total number of sampled traces. + */ +public class OperatorBackPressureStats { + + /** ID of the corresponding sample. */ + private final int sampleId; + + /** End time stamp of the corresponding sample. */ + private final long endTimestamp; + + /** Back pressure ratio per subtask. */ + private final double[] subTaskBackPressureRatio; + + /** Maximum back pressure ratio. */ + private final double maxSubTaskBackPressureRatio; + + public OperatorBackPressureStats( + int sampleId, + long endTimestamp, + double[] subTaskBackPressureRatio) { + + this.sampleId = sampleId; + this.endTimestamp = endTimestamp; + this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio"); + checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified"); + + double max = 0; + for (double ratio : subTaskBackPressureRatio) { + if (ratio > max) { + max = ratio; + } + } + + maxSubTaskBackPressureRatio = max; + } + + /** + * Returns the ID of the sample. + * + * @return ID of the sample + */ + public int getSampleId() { + return sampleId; + } + + /** + * Returns the time stamp, when all stack traces were collected at the + * JobManager. + * + * @return Time stamp, when all stack traces were collected at the + * JobManager + */ + public long getEndTimestamp() { + return endTimestamp; + } + + /** + * Returns the number of sub tasks. + * + * @return Number of sub tasks. + */ + public int getNumberOfSubTasks() { + return subTaskBackPressureRatio.length; + } + + /** + * Returns the ratio of stack traces indicating back pressure to total + * number of sampled stack traces. + * + * @param index Subtask index. + * + * @return Ratio of stack traces indicating back pressure to total number + * of sampled stack traces. + */ + public double getBackPressureRatio(int index) { + return subTaskBackPressureRatio[index]; + } + + /** + * Returns the maximum back pressure ratio of all sub tasks. + * + * @return Maximum back pressure ratio of all sub tasks. + */ + public double getMaxBackPressureRatio() { + return maxSubTaskBackPressureRatio; + } + + @Override + public String toString() { + return "OperatorBackPressureStats{" + + "sampleId=" + sampleId + + ", endTimestamp=" + endTimestamp + + ", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) + + '}'; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java new file mode 100644 index 0000000000000000000000000000000000000000..e86126523db12a61e72ee9c1b12b0bb3246f94e4 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * A sample of stack traces for one or more tasks. + * + *

The sampling is triggered in {@link StackTraceSampleCoordinator}. + */ +public class StackTraceSample { + + /** ID of this sample (unique per job) */ + private final int sampleId; + + /** Time stamp, when the sample was triggered. */ + private final long startTime; + + /** Time stamp, when all stack traces were collected at the JobManager. */ + private final long endTime; + + /** Map of stack traces by execution ID. */ + private final Map> stackTracesByTask; + + /** + * Creates a stack trace sample + * + * @param sampleId ID of the sample. + * @param startTime Time stamp, when the sample was triggered. + * @param endTime Time stamp, when all stack traces were + * collected at the JobManager. + * @param stackTracesByTask Map of stack traces by execution ID. + */ + public StackTraceSample( + int sampleId, + long startTime, + long endTime, + Map> stackTracesByTask) { + + checkArgument(sampleId >= 0, "Negative sample ID"); + checkArgument(startTime >= 0, "Negative start time"); + checkArgument(endTime >= startTime, "End time before start time"); + + this.sampleId = sampleId; + this.startTime = startTime; + this.endTime = endTime; + this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask); + } + + /** + * Returns the ID of the sample. + * + * @return ID of the sample + */ + public int getSampleId() { + return sampleId; + } + + /** + * Returns the time stamp, when the sample was triggered. + * + * @return Time stamp, when the sample was triggered + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns the time stamp, when all stack traces were collected at the + * JobManager. + * + * @return Time stamp, when all stack traces were collected at the + * JobManager + */ + public long getEndTime() { + return endTime; + } + + /** + * Returns the a map of stack traces by execution ID. + * + * @return Map of stack traces by execution ID + */ + public Map> getStackTraces() { + return stackTracesByTask; + } + +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java new file mode 100644 index 0000000000000000000000000000000000000000..e7b292f546b7c828002f0abb04bc1beda671ee33 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.google.common.collect.Maps; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A coordinator for triggering and collecting stack traces of running tasks. + */ +public class StackTraceSampleCoordinator { + + private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class); + + private static final int NUM_GHOST_SAMPLE_IDS = 10; + + private final Object lock = new Object(); + + /** Actor for responses. */ + private final ActorGateway responseActor; + + /** Time out after the expected sampling duration. */ + private final int sampleTimeout; + + /** In progress samples (guarded by lock). */ + private final Map pendingSamples = new HashMap<>(); + + /** A list of recent sample IDs to identify late messages vs. invalid ones. */ + private final ArrayDeque recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS); + + /** Sample ID counter (guarded by lock). */ + private int sampleIdCounter; + + /** + * Timer to discard expired in progress samples. Lazily initiated as the + * sample coordinator will not be used very often (guarded by lock). + */ + private Timer timer; + + /** + * Flag indicating whether the coordinator is still running (guarded by + * lock). + */ + private boolean isShutDown; + + /** + * Creates a new coordinator for the job. + * + * @param sampleTimeout Time out after the expected sampling duration. + * This is added to the expected duration of a + * sample, which is determined by the number of + * samples and the delay between each sample. + */ + public StackTraceSampleCoordinator(ActorSystem actorSystem, int sampleTimeout) { + Props props = Props.create(StackTraceSampleCoordinatorActor.class, this); + this.responseActor = new AkkaActorGateway(actorSystem.actorOf(props), null); + + checkArgument(sampleTimeout >= 0); + this.sampleTimeout = sampleTimeout; + } + + /** + * Triggers a stack trace sample to all tasks. + * + * @param tasksToSample Tasks to sample. + * @param numSamples Number of stack trace samples to collect. + * @param delayBetweenSamples Delay between consecutive samples. + * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates + * no maximum and keeps the complete stack trace. + * @return A future of the completed stack trace sample + */ + @SuppressWarnings("unchecked") + public Future triggerStackTraceSample( + ExecutionVertex[] tasksToSample, + int numSamples, + FiniteDuration delayBetweenSamples, + int maxStackTraceDepth) { + + checkNotNull(tasksToSample, "Tasks to sample"); + checkArgument(tasksToSample.length >= 1, "No tasks to sample"); + checkArgument(numSamples >= 1, "No number of samples"); + checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth"); + + // Execution IDs of running tasks + ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length]; + + // Check that all tasks are RUNNING before triggering anything. The + // triggering can still fail. + for (int i = 0; i < triggerIds.length; i++) { + Execution execution = tasksToSample[i].getCurrentExecutionAttempt(); + if (execution != null && execution.getState() == ExecutionState.RUNNING) { + triggerIds[i] = execution.getAttemptId(); + } else { + Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>() + .failure(new IllegalStateException("Task " + tasksToSample[i] + .getTaskNameWithSubtaskIndex() + " is not running.")); + return failedPromise.future(); + } + } + + synchronized (lock) { + if (isShutDown) { + Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>() + .failure(new IllegalStateException("Shut down")); + return failedPromise.future(); + } + + if (timer == null) { + timer = new Timer("Stack trace sample coordinator timer"); + } + + int sampleId = sampleIdCounter++; + + LOG.debug("Triggering stack trace sample {}", sampleId); + + final PendingStackTraceSample pending = new PendingStackTraceSample( + sampleId, triggerIds); + + // Discard the sample if it takes too long. We don't send cancel + // messages to the task managers, but only wait for the responses + // and then ignore them. + long expectedDuration = numSamples * delayBetweenSamples.toMillis(); + long discardDelay = expectedDuration + sampleTimeout; + + TimerTask discardTask = new TimerTask() { + @Override + public void run() { + try { + synchronized (lock) { + if (!pending.isDiscarded()) { + LOG.info("Sample {} expired before completing", + pending.getSampleId()); + + pending.discard(new RuntimeException("Time out")); + pendingSamples.remove(pending.getSampleId()); + } + } + } catch (Throwable t) { + LOG.error("Exception while handling sample timeout", t); + } + } + }; + + // Add the pending sample before scheduling the discard task to + // prevent races with removing it again. + pendingSamples.put(sampleId, pending); + + timer.schedule(discardTask, discardDelay); + + boolean success = true; + try { + // Trigger all samples + for (int i = 0; i < tasksToSample.length; i++) { + TriggerStackTraceSample msg = new TriggerStackTraceSample( + sampleId, + triggerIds[i], + numSamples, + delayBetweenSamples, + maxStackTraceDepth); + + if (!tasksToSample[i].sendMessageToCurrentExecution( + msg, + triggerIds[i], + responseActor)) { + success = false; + break; + } + } + + return pending.getStackTraceSampleFuture(); + } finally { + if (!success) { + pending.discard(new RuntimeException("Failed to trigger sample, " + + "because task has been reset.")); + pendingSamples.remove(sampleId); + rememberRecentSampleId(sampleId); + } + } + } + } + + /** + * Cancels a pending sample. + * + * @param sampleId ID of the sample to cancel. + * @param cause Cause of the cancelling (can be null). + */ + public void cancelStackTraceSample(int sampleId, Exception cause) { + synchronized (lock) { + if (isShutDown) { + return; + } + + PendingStackTraceSample sample = pendingSamples.remove(sampleId); + if (sample != null) { + if (cause != null) { + LOG.info("Cancelling sample " + sampleId, cause); + } else { + LOG.info("Cancelling sample {}", sampleId); + } + + sample.discard(cause); + rememberRecentSampleId(sampleId); + } + } + } + + /** + * Shuts down the coordinator. + * + *

After shut down, no further operations are executed. + */ + public void shutDown() { + synchronized (lock) { + if (!isShutDown) { + LOG.info("Shutting down stack trace sample coordinator."); + + for (PendingStackTraceSample pending : pendingSamples.values()) { + pending.discard(new RuntimeException("Shut down")); + } + + pendingSamples.clear(); + + if (timer != null) { + timer.cancel(); + } + + isShutDown = true; + } + } + } + + /** + * Collects stack traces of a task. + * + * @param sampleId ID of the sample. + * @param executionId ID of the sampled task. + * @param stackTraces Stack traces of the sampled task. + * + * @throws IllegalStateException If unknown sample ID and not recently + * finished or cancelled sample. + */ + public void collectStackTraces( + int sampleId, + ExecutionAttemptID executionId, + List stackTraces) { + + synchronized (lock) { + if (isShutDown) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId); + } + + PendingStackTraceSample pending = pendingSamples.get(sampleId); + + if (pending != null) { + pending.collectStackTraces(executionId, stackTraces); + + // Publish the sample + if (pending.isComplete()) { + pendingSamples.remove(sampleId); + rememberRecentSampleId(sampleId); + + pending.completePromiseAndDiscard(); + } + } else if (recentPendingSamples.contains(sampleId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received late stack trace sample {} of task {}", + sampleId, executionId); + } + } else { + throw new IllegalStateException("Unknown sample ID " + sampleId); + } + } + } + + private void rememberRecentSampleId(int sampleId) { + if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) { + recentPendingSamples.removeFirst(); + } + recentPendingSamples.addLast(sampleId); + } + + int getNumberOfPendingSamples() { + synchronized (lock) { + return pendingSamples.size(); + } + } + + // ------------------------------------------------------------------------ + + /** + * A pending stack trace sample, which collects stack traces and owns a + * {@link StackTraceSample} promise. + * + *

Access pending sample in lock scope. + */ + private static class PendingStackTraceSample { + + private final int sampleId; + private final long startTime; + private final Set pendingTasks; + private final Map> stackTracesByTask; + private final Promise stackTracePromise; + + private boolean isDiscarded; + + PendingStackTraceSample( + int sampleId, + ExecutionAttemptID[] tasksToCollect) { + + this.sampleId = sampleId; + this.startTime = System.currentTimeMillis(); + this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect)); + this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length); + this.stackTracePromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); + } + + int getSampleId() { + return sampleId; + } + + long getStartTime() { + return startTime; + } + + boolean isDiscarded() { + return isDiscarded; + } + + boolean isComplete() { + if (isDiscarded) { + throw new IllegalStateException("Discarded"); + } + + return pendingTasks.isEmpty(); + } + + void discard(Throwable cause) { + if (!isDiscarded) { + pendingTasks.clear(); + stackTracesByTask.clear(); + + stackTracePromise.failure(new RuntimeException("Discarded", cause)); + + isDiscarded = true; + } + } + + void collectStackTraces(ExecutionAttemptID executionId, List stackTraces) { + if (isDiscarded) { + throw new IllegalStateException("Discarded"); + } + + if (pendingTasks.remove(executionId)) { + stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces)); + } else if (isComplete()) { + throw new IllegalStateException("Completed"); + } else { + throw new IllegalArgumentException("Unknown task " + executionId); + } + } + + void completePromiseAndDiscard() { + if (isComplete()) { + isDiscarded = true; + + long endTime = System.currentTimeMillis(); + + StackTraceSample stackTraceSample = new StackTraceSample( + sampleId, + startTime, + endTime, + stackTracesByTask); + + stackTracePromise.success(stackTraceSample); + } else { + throw new IllegalStateException("Not completed yet"); + } + } + + @SuppressWarnings("unchecked") + Future getStackTraceSampleFuture() { + return stackTracePromise.future(); + } + } + + /** + * Actor for stack trace sample responses. + */ + private static class StackTraceSampleCoordinatorActor extends FlinkUntypedActor { + + StackTraceSampleCoordinator coordinator; + + public StackTraceSampleCoordinatorActor(StackTraceSampleCoordinator coordinator) { + this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator"); + } + + @Override + protected void handleMessage(Object msg) throws Exception { + try { + if (msg instanceof ResponseStackTraceSampleSuccess) { + ResponseStackTraceSampleSuccess success = (ResponseStackTraceSampleSuccess) msg; + + coordinator.collectStackTraces( + success.sampleId(), + success.executionId(), + success.samples()); + } else if (msg instanceof ResponseStackTraceSampleFailure) { + ResponseStackTraceSampleFailure failure = (ResponseStackTraceSampleFailure) msg; + + coordinator.cancelStackTraceSample(failure.sampleId(), failure.cause()); + } else { + throw new IllegalArgumentException("Unexpected task sample message"); + } + } catch (Throwable t) { + LOG.error("Error responding to message '" + msg + "': " + t.getMessage() + ".", t); + } + } + + @Override + protected UUID getLeaderSessionID() { + return null; + } + } + +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 352309126c842fbaa48be69f01330494b579fe90..08ed2f9f4a10d9f63ccdc66e73f4c3b34ae58faa 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -29,14 +28,17 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.router.Handler; import io.netty.handler.codec.http.router.Router; - import org.apache.commons.io.FileUtils; - +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; +import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler; import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler; import org.apache.flink.runtime.webmonitor.handlers.JarListHandler; @@ -46,29 +48,24 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler; import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler; -import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; -import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler; import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler; -import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler; import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; @@ -121,8 +118,11 @@ public class WebRuntimeMonitor implements WebMonitor { private final File uploadDir; - private AtomicBoolean cleanedUp = new AtomicBoolean(); + private final StackTraceSampleCoordinator stackTraceSamples; + private final BackPressureStatsTracker backPressureStatsTracker; + + private AtomicBoolean cleanedUp = new AtomicBoolean(); public WebRuntimeMonitor( Configuration config, @@ -163,6 +163,34 @@ public class WebRuntimeMonitor implements WebMonitor { ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(); + // - Back pressure stats ---------------------------------------------- + + stackTraceSamples = new StackTraceSampleCoordinator(actorSystem, 60000); + + // Back pressure stats tracker config + int cleanUpInterval = config.getInteger( + ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL); + + int refreshInterval = config.getInteger( + ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL); + + int numSamples = config.getInteger( + ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES); + + int delay = config.getInteger( + ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY); + + FiniteDuration delayBetweenSamples = new FiniteDuration(delay, TimeUnit.MILLISECONDS); + + backPressureStatsTracker = new BackPressureStatsTracker( + stackTraceSamples, cleanUpInterval, numSamples, delayBetweenSamples); + + // -------------------------------------------------------------------- + router = new Router() // config how to interact with this web server .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) @@ -187,7 +215,10 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs))) - + .GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler( + currentGraphs, + backPressureStatsTracker, + refreshInterval))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs))) @@ -298,6 +329,23 @@ public class WebRuntimeMonitor implements WebMonitor { synchronized (startupShutdownLock) { jobManagerAddressPromise.success(jobManagerAkkaUrl); leaderRetrievalService.start(retriever); + + long delay = backPressureStatsTracker.getCleanUpInterval(); + + // Scheduled back pressure stats tracker cache cleanup. We schedule + // this here repeatedly, because cache clean up only happens on + // interactions with the cache. We need it to make sure that we + // don't leak memory after completed jobs or long ago accessed stats. + bootstrap.childGroup().scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + backPressureStatsTracker.cleanUpOperatorStatsCache(); + } catch (Throwable t) { + LOG.error("Error during back pressure stats cache cleanup.", t); + } + } + }, delay, delay, TimeUnit.MILLISECONDS); } } @@ -316,6 +364,10 @@ public class WebRuntimeMonitor implements WebMonitor { } } + stackTraceSamples.shutDown(); + + backPressureStatsTracker.shutDown(); + cleanup(); } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..3ce6f025ed52ec090e09df1c01630f872737a4e1 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; +import scala.Option; + +import java.io.StringWriter; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Request handler that returns back pressure stats for a single job vertex and + * all its sub tasks. + */ +public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler { + + /** Back pressure stats tracker. */ + private final BackPressureStatsTracker backPressureStatsTracker; + + /** Time after which stats are considered outdated. */ + private final int refreshInterval; + + public JobVertexBackPressureHandler( + ExecutionGraphHolder executionGraphHolder, + BackPressureStatsTracker backPressureStatsTracker, + int refreshInterval) { + + super(executionGraphHolder); + this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker"); + checkArgument(refreshInterval >= 0, "Negative timeout"); + this.refreshInterval = refreshInterval; + } + + @Override + public String handleRequest( + ExecutionJobVertex jobVertex, + Map params) throws Exception { + + try (StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) { + + gen.writeStartObject(); + + Option statsOption = backPressureStatsTracker + .getOperatorBackPressureStats(jobVertex); + + if (statsOption.isDefined()) { + OperatorBackPressureStats stats = statsOption.get(); + + // Check whether we need to refresh + if (refreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) { + backPressureStatsTracker.triggerStackTraceSample(jobVertex); + gen.writeStringField("status", "deprecated"); + } else { + gen.writeStringField("status", "ok"); + } + + gen.writeStringField("backpressure-level", getBackPressureLevel(stats.getMaxBackPressureRatio())); + gen.writeNumberField("end-timestamp", stats.getEndTimestamp()); + + // Sub tasks + gen.writeArrayFieldStart("subtasks"); + int numSubTasks = stats.getNumberOfSubTasks(); + for (int i = 0; i < numSubTasks; i++) { + double ratio = stats.getBackPressureRatio(i); + + gen.writeStartObject(); + gen.writeNumberField("subtask", i); + gen.writeStringField("backpressure-level", getBackPressureLevel(ratio)); + gen.writeNumberField("ratio", ratio); + gen.writeEndObject(); + } + gen.writeEndArray(); + } else { + backPressureStatsTracker.triggerStackTraceSample(jobVertex); + gen.writeStringField("status", "deprecated"); + } + + gen.writeEndObject(); + gen.close(); + + return writer.toString(); + } + } + + /** + * Returns the back pressure level as a String. + * + * @param backPressureRatio Ratio of back pressures samples to total number of samples. + * + * @return Back pressure level ('no', 'low', or 'high') + */ + static String getBackPressureLevel(double backPressureRatio) { + if (backPressureRatio <= 0.10) { + return "ok"; + } else if (backPressureRatio <= 0.5) { + return "low"; + } else { + return "high"; + } + } + +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..52b0794d5ce23b62dedfd1c8da5b218f40be26a4 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.Option; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; +import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Simple back pressured task test. + */ +public class BackPressureStatsTrackerITCase extends TestLogger { + + private static NetworkBufferPool networkBufferPool; + private static ActorSystem testActorSystem; + + /** Shared as static variable with the test task. */ + private static BufferPool testBufferPool; + + @BeforeClass + public static void setup() { + testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(testActorSystem); + networkBufferPool.destroy(); + } + + /** + * Tests a simple fake-back pressured task. Back pressure is assumed when + * sampled stack traces are in blocking buffer requests. + */ + @Test + public void testBackPressuredProducer() throws Exception { + new JavaTestKit(testActorSystem) {{ + final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS); + + // The JobGraph + final JobGraph jobGraph = new JobGraph(); + final int parallelism = 4; + + final JobVertex task = new JobVertex("Task"); + task.setInvokableClass(BackPressuredTask.class); + task.setParallelism(parallelism); + + jobGraph.addVertex(task); + + ActorGateway jobManger = null; + ActorGateway taskManager = null; + + // + // 1) Consume all buffers at first (no buffers for the test task) + // + testBufferPool = networkBufferPool.createBufferPool(1, false); + final List buffers = new ArrayList<>(); + while (true) { + Buffer buffer = testBufferPool.requestBuffer(); + if (buffer != null) { + buffers.add(buffer); + } else { + break; + } + } + + try { + jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration()); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism); + + taskManager = TestingUtils.createTaskManager( + testActorSystem, jobManger, config, true, true); + + final ActorGateway jm = jobManger; + + new Within(deadline) { + @Override + protected void run() { + try { + ActorGateway testActor = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait until it is running + JobClient.submitJobDetached( + jm, + jobGraph, + deadline, + ClassLoader.getSystemClassLoader()); + + jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor); + + expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID())); + + // Get the ExecutionGraph + jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor); + + ExecutionGraphFound executionGraphResponse = + expectMsgClass(ExecutionGraphFound.class); + + ExecutionGraph executionGraph = executionGraphResponse.executionGraph(); + ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); + + StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( + testActorSystem, 60000); + + // Verify back pressure (clean up interval can be ignored) + BackPressureStatsTracker statsTracker = new BackPressureStatsTracker( + coordinator, + 100 * 1000, + 20, + new FiniteDuration(10, TimeUnit.MILLISECONDS)); + + int numAttempts = 10; + + int nextSampleId = 0; + + // Verify that all tasks are back pressured. This + // can fail if the task takes longer to request + // the buffer. + for (int attempt = 0; attempt < numAttempts; attempt++) { + try { + OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex); + + assertEquals(nextSampleId + attempt, stats.getSampleId()); + assertEquals(parallelism, stats.getNumberOfSubTasks()); + assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0); + + for (int i = 0; i < parallelism; i++) { + assertEquals(1.0, stats.getBackPressureRatio(i), 0.0); + } + + nextSampleId = stats.getSampleId() + 1; + + break; + } catch (Throwable t) { + if (attempt == numAttempts - 1) { + throw t; + } else { + Thread.sleep(500); + } + } + } + + // + // 2) Release all buffers and let the tasks grab one + // + for (Buffer buf : buffers) { + buf.recycle(); + } + + // Wait for all buffers to be available. The tasks + // grab them and then immediately release them. + while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) { + Thread.sleep(100); + } + + // Verify that no task is back pressured any more. + for (int attempt = 0; attempt < numAttempts; attempt++) { + try { + OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex); + + assertEquals(nextSampleId + attempt, stats.getSampleId()); + assertEquals(parallelism, stats.getNumberOfSubTasks()); + + // Verify that no task is back pressured + for (int i = 0; i < parallelism; i++) { + assertEquals(0.0, stats.getBackPressureRatio(i), 0.0); + } + + break; + } catch (Throwable t) { + if (attempt == numAttempts - 1) { + throw t; + } else { + Thread.sleep(500); + } + } + } + + // Shut down + jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor); + + // Cancel job + jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); + + // Response to removal notification + expectMsgEquals(true); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }; + } finally { + TestingUtils.stopActor(jobManger); + TestingUtils.stopActor(taskManager); + + for (Buffer buf : buffers) { + buf.recycle(); + } + + testBufferPool.lazyDestroy(); + } + }}; + } + + /** + * Triggers a new stats sample. + */ + private OperatorBackPressureStats triggerStatsSample( + BackPressureStatsTracker statsTracker, + ExecutionJobVertex vertex) throws InterruptedException { + + statsTracker.invalidateOperatorStatsCache(); + statsTracker.triggerStackTraceSample(vertex); + + // Sleep minimum duration + Thread.sleep(20 * 10); + + Option stats; + + // Get the stats + while ((stats = statsTracker.getOperatorBackPressureStats(vertex)).isEmpty()) { + Thread.sleep(10); + } + + return stats.get(); + } + + /** + * A back pressured producer sharing a {@link BufferPool} with the + * test driver. + */ + public static class BackPressuredTask extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + while (true) { + Buffer buffer = testBufferPool.requestBufferBlocking(); + // Got a buffer, yay! + buffer.recycle(); + + new CountDownLatch(1).await(); + } + } + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b0955e1a7662f4c4dfbb8d57798f47ad03e01525 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.junit.Test; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.impl.Promise; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BackPressureStatsTrackerTest { + + /** Tests simple statistics with fake stack traces. */ + @Test + @SuppressWarnings("unchecked") + public void testTriggerStackTraceSample() throws Exception { + Promise samplePromise = new Promise.DefaultPromise<>(); + + StackTraceSampleCoordinator sampleCoordinator = mock(StackTraceSampleCoordinator.class); + when(sampleCoordinator.triggerStackTraceSample( + any(ExecutionVertex[].class), + anyInt(), + any(FiniteDuration.class), + anyInt())).thenReturn(samplePromise.future()); + + ExecutionGraph graph = mock(ExecutionGraph.class); + + // Same Thread execution context + when(graph.getExecutionContext()).thenReturn(new ExecutionContext() { + + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + + @Override + public void reportFailure(Throwable t) { + fail(); + } + + @Override + public ExecutionContext prepare() { + return this; + } + }); + + ExecutionVertex[] taskVertices = new ExecutionVertex[4]; + + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + when(jobVertex.getJobId()).thenReturn(new JobID()); + when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); + when(jobVertex.getGraph()).thenReturn(graph); + when(jobVertex.getTaskVertices()).thenReturn(taskVertices); + + taskVertices[0] = mockExecutionVertex(jobVertex, 0); + taskVertices[1] = mockExecutionVertex(jobVertex, 1); + taskVertices[2] = mockExecutionVertex(jobVertex, 2); + taskVertices[3] = mockExecutionVertex(jobVertex, 3); + + int numSamples = 100; + FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS); + + BackPressureStatsTracker tracker = new BackPressureStatsTracker( + sampleCoordinator, 9999, numSamples, delayBetweenSamples); + + // Trigger + tracker.triggerStackTraceSample(jobVertex); + + verify(sampleCoordinator).triggerStackTraceSample( + eq(taskVertices), + eq(numSamples), + eq(delayBetweenSamples), + eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); + + // Trigger again for pending request, should not fire + tracker.triggerStackTraceSample(jobVertex); + + assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty()); + + verify(sampleCoordinator).triggerStackTraceSample( + eq(taskVertices), + eq(numSamples), + eq(delayBetweenSamples), + eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); + + assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty()); + + // Complete the future + Map> traces = new HashMap<>(); + for (ExecutionVertex vertex : taskVertices) { + List taskTraces = new ArrayList<>(); + + for (int i = 0; i < taskVertices.length; i++) { + // Traces until sub task index are back pressured + taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex())); + } + + traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces); + } + + int sampleId = 1231; + int endTime = 841; + + StackTraceSample sample = new StackTraceSample( + sampleId, + 0, + endTime, + traces); + + // Succeed the promise + samplePromise.success(sample); + + assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isDefined()); + + OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get(); + + // Verify the stats + assertEquals(sampleId, stats.getSampleId()); + assertEquals(endTime, stats.getEndTimestamp()); + assertEquals(taskVertices.length, stats.getNumberOfSubTasks()); + + for (int i = 0; i < taskVertices.length; i++) { + double ratio = stats.getBackPressureRatio(i); + // Traces until sub task index are back pressured + assertEquals((i + 1) / ((double) 4), ratio, 0.0); + } + } + + private StackTraceElement[] createStackTrace(boolean isBackPressure) { + if (isBackPressure) { + return new StackTraceElement[] { new StackTraceElement( + BackPressureStatsTracker.EXPECTED_CLASS_NAME, + BackPressureStatsTracker.EXPECTED_METHOD_NAME, + "LocalBufferPool.java", + 133) }; + } else { + return Thread.currentThread().getStackTrace(); + } + } + + private ExecutionVertex mockExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex) { + + Execution exec = mock(Execution.class); + when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); + + JobVertexID id = jobVertex.getJobVertexId(); + + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getJobvertexId()).thenReturn(id); + when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex); + + return vertex; + } + +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..406197c6b8646af78db001ceefc49a335b48170f --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import akka.actor.ActorSystem; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for the {@link StackTraceSampleCoordinator}. + */ +public class StackTraceSampleCoordinatorTest { + + private static ActorSystem system; + + private StackTraceSampleCoordinator coord; + + @BeforeClass + public static void setUp() throws Exception { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void tearDown() throws Exception { + if (system != null) { + system.shutdown(); + } + } + + @Before + public void init() throws Exception { + this.coord = new StackTraceSampleCoordinator(system, 60000); + } + + /** Tests simple trigger and collect of stack trace samples. */ + @Test + public void testTriggerStackTraceSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true) + }; + + int numSamples = 1; + FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS); + int maxStackTraceDepth = 0; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, numSamples, delayBetweenSamples, maxStackTraceDepth); + + // Verify messages have been sent + for (ExecutionVertex vertex : vertices) { + ExecutionAttemptID expectedExecutionId = vertex + .getCurrentExecutionAttempt().getAttemptId(); + + TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample( + 0, + expectedExecutionId, + numSamples, + delayBetweenSamples, + maxStackTraceDepth); + + verify(vertex).sendMessageToCurrentExecution( + eq(expectedMsg), eq(expectedExecutionId), any(AkkaActorGateway.class)); + } + + assertFalse(sampleFuture.isCompleted()); + + StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace(); + List traces = new ArrayList<>(); + traces.add(stackTraceSample); + traces.add(stackTraceSample); + traces.add(stackTraceSample); + + // Collect stack traces + for (int i = 0; i < vertices.length; i++) { + ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, traces); + + if (i == vertices.length - 1) { + assertTrue(sampleFuture.isCompleted()); + } else { + assertFalse(sampleFuture.isCompleted()); + } + } + + // Verify completed stack trace sample + StackTraceSample sample = sampleFuture.value().get().get(); + + assertEquals(0, sample.getSampleId()); + assertTrue(sample.getEndTime() >= sample.getStartTime()); + + Map> tracesByTask = sample.getStackTraces(); + + for (ExecutionVertex vertex : vertices) { + ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId(); + List sampleTraces = tracesByTask.get(executionId); + + assertNotNull("Task not found", sampleTraces); + assertTrue(traces.equals(sampleTraces)); + } + + // Verify no more pending sample + assertEquals(0, coord.getNumberOfPendingSamples()); + + // Verify no error on late collect + coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces); + } + + /** Tests triggering for non-running tasks fails the future. */ + @Test + public void testTriggerStackTraceSampleNotRunningTasks() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true) + }; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, + 1, + new FiniteDuration(100, TimeUnit.MILLISECONDS), + 0); + + assertTrue(sampleFuture.isCompleted()); + assertTrue(sampleFuture.failed().isCompleted()); + + assertTrue(sampleFuture.failed().value().get().get() instanceof IllegalStateException); + } + + /** Tests triggering for reset tasks fails the future. */ + @Test + public void testTriggerStackTraceSampleResetRunningTasks() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + // Fails to send the message to the execution (happens when execution is reset) + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false) + }; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, + 1, + new FiniteDuration(100, TimeUnit.MILLISECONDS), + 0); + + assertTrue(sampleFuture.isCompleted()); + assertTrue(sampleFuture.failed().isCompleted()); + assertTrue(sampleFuture.failed().value().get().get().getCause() instanceof RuntimeException); + } + + /** Tests that samples time out if they don't finish in time. */ + @Test + public void testTriggerStackTraceSampleTimeout() throws Exception { + int timeout = 100; + + coord = new StackTraceSampleCoordinator(system, timeout); + + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0); + + // Wait for the timeout + Thread.sleep(timeout * 2); + + boolean success = false; + for (int i = 0; i < 10; i++) { + if (sampleFuture.isCompleted()) { + success = true; + break; + } + + Thread.sleep(timeout); + } + + assertTrue("Sample did not time out", success); + + Throwable cause = sampleFuture.failed().value().get().get(); + assertTrue(cause.getCause().getMessage().contains("Time out")); + + // Collect after the timeout + try { + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList()); + fail("Did not throw expected Exception"); + } catch (IllegalStateException ignored) { + } + } + + /** Tests that collecting an unknown sample fails. */ + @Test(expected = IllegalStateException.class) + public void testCollectStackTraceForUnknownSample() throws Exception { + coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList()); + } + + /** Tests cancelling of a pending sample. */ + @Test + public void testCancelStackTraceSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0); + + assertFalse(sampleFuture.isCompleted()); + + // Cancel + coord.cancelStackTraceSample(0, null); + + // Verify completed + assertTrue(sampleFuture.isCompleted()); + + // Verify no more pending samples + assertEquals(0, coord.getNumberOfPendingSamples()); + } + + /** Tests that collecting for a cancelled sample throws no Exception. */ + @Test + public void testCollectStackTraceForCanceledSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0); + + assertFalse(sampleFuture.isCompleted()); + + coord.cancelStackTraceSample(0, null); + + assertTrue(sampleFuture.isCompleted()); + + // Verify no error on late collect + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList()); + } + + /** Tests that collecting for a cancelled sample throws no Exception. */ + @Test + public void testCollectForDiscardedPendingSample() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + Future sampleFuture = coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0); + + assertFalse(sampleFuture.isCompleted()); + + coord.cancelStackTraceSample(0, null); + + assertTrue(sampleFuture.isCompleted()); + + // Verify no error on late collect + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList()); + } + + + /** Tests that collecting for a unknown task fails. */ + @Test(expected = IllegalArgumentException.class) + public void testCollectStackTraceForUnknownTask() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0); + + coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList()); + } + + /** Tests that shut down fails all pending samples and future sample triggers. */ + @Test + public void testShutDown() throws Exception { + ExecutionVertex[] vertices = new ExecutionVertex[] { + mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), + }; + + List> sampleFutures = new ArrayList<>(); + + // Trigger + sampleFutures.add(coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0)); + + sampleFutures.add(coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0)); + + for (Future future : sampleFutures) { + assertFalse(future.isCompleted()); + } + + // Shut down + coord.shutDown(); + + // Verify all completed + for (Future future : sampleFutures) { + assertTrue(future.isCompleted()); + } + + // Verify new trigger returns failed future + Future future = coord.triggerStackTraceSample( + vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0); + + assertTrue(future.isCompleted()); + assertTrue(future.failed().isCompleted()); + } + + // ------------------------------------------------------------------------ + + private ExecutionVertex mockExecutionVertex( + ExecutionAttemptID executionId, + ExecutionState state, + boolean sendSuccess) { + + Execution exec = mock(Execution.class); + when(exec.getAttemptId()).thenReturn(executionId); + when(exec.getState()).thenReturn(state); + + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); + when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); + when(vertex.sendMessageToCurrentExecution( + any(Serializable.class), any(ExecutionAttemptID.class), any(AkkaActorGateway.class))) + .thenReturn(sendSuccess); + + return vertex; + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2b8f804d23c35ad33913a818ee87f0684859d3c2 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats; +import org.junit.Test; +import scala.Option; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for back pressure handler responses. + */ +public class JobVertexBackPressureHandlerTest { + + /** Tests the response when no stats are available */ + @Test + public void testResponseNoStatsAvailable() throws Exception { + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); + + when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) + .thenReturn(Option.empty()); + + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( + mock(ExecutionGraphHolder.class), + statsTracker, + 9999); + + String response = handler.handleRequest(jobVertex, Collections.emptyMap()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + + // Single element + assertEquals(1, rootNode.size()); + + // Status + JsonNode status = rootNode.get("status"); + assertNotNull(status); + assertEquals("deprecated", status.textValue()); + + verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class)); + } + + /** Tests the response when stats are available */ + @Test + public void testResponseStatsAvailable() throws Exception { + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); + + OperatorBackPressureStats stats = new OperatorBackPressureStats( + 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 }); + + when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) + .thenReturn(Option.apply(stats)); + + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( + mock(ExecutionGraphHolder.class), + statsTracker, + 9999); + + String response = handler.handleRequest(jobVertex, Collections.emptyMap()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + + // Single element + assertEquals(4, rootNode.size()); + + // Status + JsonNode status = rootNode.get("status"); + assertNotNull(status); + assertEquals("ok", status.textValue()); + + // Back pressure level + JsonNode backPressureLevel = rootNode.get("backpressure-level"); + assertNotNull(backPressureLevel); + assertEquals("high", backPressureLevel.textValue()); + + // End time stamp + JsonNode endTimeStamp = rootNode.get("end-timestamp"); + assertNotNull(endTimeStamp); + assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue()); + + // Subtasks + JsonNode subTasks = rootNode.get("subtasks"); + assertEquals(stats.getNumberOfSubTasks(), subTasks.size()); + for (int i = 0; i < subTasks.size(); i++) { + JsonNode subTask = subTasks.get(i); + + JsonNode index = subTask.get("subtask"); + assertEquals(i, index.intValue()); + + JsonNode level = subTask.get("backpressure-level"); + assertEquals(JobVertexBackPressureHandler + .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue()); + + JsonNode ratio = subTask.get("ratio"); + assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0); + } + + // Verify not triggered + verify(statsTracker, never()).triggerStackTraceSample(any(ExecutionJobVertex.class)); + } + + /** Tests that after the refresh interval another sample is triggered. */ + @Test + public void testResponsePassedRefreshInterval() throws Exception { + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + BackPressureStatsTracker statsTracker = mock(BackPressureStatsTracker.class); + + OperatorBackPressureStats stats = new OperatorBackPressureStats( + 0, System.currentTimeMillis(), new double[] { 0.31, 0.48, 1.0, 0.0 }); + + when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class))) + .thenReturn(Option.apply(stats)); + + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( + mock(ExecutionGraphHolder.class), + statsTracker, + 0); // <----- refresh interval should fire immediately + + String response = handler.handleRequest(jobVertex, Collections.emptyMap()); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + + // Single element + assertEquals(4, rootNode.size()); + + // Status + JsonNode status = rootNode.get("status"); + assertNotNull(status); + // Interval passed, hence deprecated + assertEquals("deprecated", status.textValue()); + + // Back pressure level + JsonNode backPressureLevel = rootNode.get("backpressure-level"); + assertNotNull(backPressureLevel); + assertEquals("high", backPressureLevel.textValue()); + + // End time stamp + JsonNode endTimeStamp = rootNode.get("end-timestamp"); + assertNotNull(endTimeStamp); + assertEquals(stats.getEndTimestamp(), endTimeStamp.longValue()); + + // Subtasks + JsonNode subTasks = rootNode.get("subtasks"); + assertEquals(stats.getNumberOfSubTasks(), subTasks.size()); + for (int i = 0; i < subTasks.size(); i++) { + JsonNode subTask = subTasks.get(i); + + JsonNode index = subTask.get("subtask"); + assertEquals(i, index.intValue()); + + JsonNode level = subTask.get("backpressure-level"); + assertEquals(JobVertexBackPressureHandler + .getBackPressureLevel(stats.getBackPressureRatio(i)), level.textValue()); + + JsonNode ratio = subTask.get("ratio"); + assertEquals(stats.getBackPressureRatio(i), ratio.doubleValue(), 0.0); + } + + // Verify triggered + verify(statsTracker).triggerStackTraceSample(any(ExecutionJobVertex.class)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index a03f0bf7a1018ff44bfe2d23df3a4f1923c5f655..c3bfbb045e1afb83498dde28c081d9e8ab5dd829 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1089,8 +1089,7 @@ public class ExecutionGraph implements Serializable { // We don't clean the checkpoint stats tracker, because we want // it to be available after the job has terminated. - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error while cleaning up after execution", e); } @@ -1100,8 +1099,7 @@ public class ExecutionGraph implements Serializable { if (coord != null) { coord.shutdown(); } - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Error while cleaning up after execution", e); } } @@ -1231,7 +1229,6 @@ public class ExecutionGraph implements Serializable { } } - private void notifyJobStatusChange(JobStatus newState, Throwable error) { if (jobStatusListenerActors.size() > 0) { ExecutionGraphMessages.JobStatusChanged message = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index c89a01e623edb15577949cbf0f24bbd094fe21bf..165dce4641fd4e49c76db414309431674cae8546 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -471,7 +471,17 @@ public class ExecutionVertex implements Serializable { this.currentExecution.fail(t); } - public void sendMessageToCurrentExecution(Serializable message, ExecutionAttemptID attemptID) { + public boolean sendMessageToCurrentExecution( + Serializable message, + ExecutionAttemptID attemptID) { + + return sendMessageToCurrentExecution(message, attemptID, null); + } + + public boolean sendMessageToCurrentExecution( + Serializable message, + ExecutionAttemptID attemptID, + ActorGateway sender) { Execution exec = getCurrentExecutionAttempt(); // check that this is for the correct execution attempt @@ -482,16 +492,26 @@ public class ExecutionVertex implements Serializable { if (slot != null) { ActorGateway gateway = slot.getInstance().getActorGateway(); if (gateway != null) { - gateway.tell(message); + if (sender == null) { + gateway.tell(message); + } else { + gateway.tell(message, sender); + } + + return true; + } else { + return false; } } else { LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID); + return false; } } else { LOG.debug("Skipping message to {}/{} because it does not match the current execution", getSimpleName(), attemptID); + return false; } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index df7e6c6b7296be3acd29ae81f0d32466cc3bfa28..4410ec3d45570f4aca6cdd036f82599f3a123479 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -110,7 +110,7 @@ class JobManager( protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory) - extends FlinkActor + extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging with LogMessages // mixin order is important, we want first logging with LeaderContender diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala new file mode 100644 index 0000000000000000000000000000000000000000..9f2e6e9dcb7f7a0c1113e823b9e986c07f96c8cf --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages + +import akka.actor.ActorRef +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID + +import scala.concurrent.duration.FiniteDuration + +/** + * A set of messages exchanged with task manager instances in order to sample + * the stack traces of running tasks. + */ +object StackTraceSampleMessages { + + trait StackTraceSampleMessages + + /** + * Triggers the sampling of a running task (sent by the job manager to the + * task managers). + * + * @param sampleId ID of this sample. + * @param executionId ID of the task to sample. + * @param numSamples Number of stack trace samples to collect. + * @param delayBetweenSamples Delay between consecutive samples. + * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates + * no maximum and collects the complete stack + * trace. + */ + case class TriggerStackTraceSample( + sampleId: Int, + executionId: ExecutionAttemptID, + numSamples: Int, + delayBetweenSamples: FiniteDuration, + maxStackTraceDepth: Int = 0) + extends StackTraceSampleMessages with java.io.Serializable + + /** + * Response after a successful stack trace sample (sent by the task managers + * to the job manager). + * + * @param sampleId ID of the this sample. + * @param executionId ID of the sampled task. + * @param samples Stack trace samples (head is most recent sample). + */ + case class ResponseStackTraceSampleSuccess( + sampleId: Int, + executionId: ExecutionAttemptID, + samples: java.util.List[Array[StackTraceElement]]) + extends StackTraceSampleMessages + + /** + * Response after a failed stack trace sample (sent by the task managers to + * the job manager). + * + * @param sampleId ID of the this sample. + * @param executionId ID of the sampled task. + * @param cause Failure cause. + */ + case class ResponseStackTraceSampleFailure( + sampleId: Int, + executionId: ExecutionAttemptID, + cause: Exception) + extends StackTraceSampleMessages + + /** + * Task manager internal sample message. + * + * @param sampleId ID of the this sample. + * @param executionId ID of the task to sample. + * @param delayBetweenSamples Delay between consecutive samples. + * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates + * no maximum and collects the complete stack + * trace. + * @param numRemainingSamples Number of remaining samples before this + * sample is finished. + * @param currentTraces The current list of gathered stack traces. + * @param sender Actor triggering this sample (receiver of result). + */ + case class SampleTaskStackTrace( + sampleId: Int, + executionId: ExecutionAttemptID, + delayBetweenSamples: FiniteDuration, + maxStackTraceDepth: Int, + numRemainingSamples: Int, + currentTraces: java.util.List[Array[StackTraceElement]], + sender: ActorRef) + extends StackTraceSampleMessages + +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 5ad9dad69069cedfbdc1a681cec4a7cd5c7e4994..3b688788b3298ade08baa18ceff16151b0bb3876 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -19,34 +19,29 @@ package org.apache.flink.runtime.taskmanager import java.io.{File, IOException} +import java.lang.management.{ManagementFactory, OperatingSystemMXBean} +import java.lang.reflect.Method import java.net.{InetAddress, InetSocketAddress} +import java.util import java.util.UUID import java.util.concurrent.TimeUnit -import java.lang.reflect.Method -import java.lang.management.{OperatingSystemMXBean, ManagementFactory} import _root_.akka.actor._ import _root_.akka.pattern.ask import _root_.akka.util.Timeout - -import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.codahale.metrics.json.MetricsModule -import com.codahale.metrics.jvm.{BufferPoolMetricSet, MemoryUsageGaugeSet, GarbageCollectorMetricSet} - +import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, MemoryUsageGaugeSet} +import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger - import org.apache.flink.configuration._ -import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType} +import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot -import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} -import org.apache.flink.runtime.messages.TaskMessages._ -import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage} -import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.blob.{BlobService, BlobCache} +import org.apache.flink.runtime.blob.{BlobCache, BlobService} import org.apache.flink.runtime.broadcast.BroadcastVariableManager import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor} +import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.filecache.FileCache @@ -56,24 +51,28 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync} import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobgraph.IntermediateDataSetID -import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} +import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.Messages._ import org.apache.flink.runtime.messages.RegistrationMessages._ +import org.apache.flink.runtime.messages.StackTraceSampleMessages.{SampleTaskStackTrace, ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, StackTraceSampleMessages, TriggerStackTraceSample} import org.apache.flink.runtime.messages.TaskManagerMessages._ -import org.apache.flink.util.NetUtils +import org.apache.flink.runtime.messages.TaskMessages._ +import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner -import org.apache.flink.runtime.util.{SignalHandler, LeaderRetrievalUtils, MathUtils, EnvironmentInformation} +import org.apache.flink.runtime.util.{EnvironmentInformation, LeaderRetrievalUtils, MathUtils, SignalHandler} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} +import org.apache.flink.util.NetUtils +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool -import scala.util.{Failure, Success} -import scala.collection.JavaConverters._ -import scala.collection.JavaConversions._ - import scala.language.postfixOps +import scala.util.{Failure, Success} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -273,6 +272,9 @@ class TaskManager( // registration messages for connecting and disconnecting from / to the JobManager case message: RegistrationMessage => handleRegistrationMessage(message) + // task sampling messages + case message: StackTraceSampleMessages => handleStackTraceSampleMessage(message) + // ----- miscellaneous messages ---- // periodic heart beats that transport metrics @@ -640,6 +642,120 @@ class TaskManager( } } + private def handleStackTraceSampleMessage(message: StackTraceSampleMessages): Unit = { + message match { + + // Triggers the sampling of a task + case TriggerStackTraceSample( + sampleId, + executionId, + numSamples, + delayBetweenSamples, + maxStackTraceDepth) => + + log.debug(s"Triggering stack trace sample $sampleId.") + + val senderRef = sender() + + self ! SampleTaskStackTrace( + sampleId, + executionId, + delayBetweenSamples, + maxStackTraceDepth, + numSamples, + new java.util.ArrayList(), + senderRef) + + // Repeatedly sent to self to sample a task + case SampleTaskStackTrace( + sampleId, + executionId, + delayBetweenSamples, + maxStackTraceDepth, + remainingNumSamples, + currentTraces, + sender) => + + try { + if (remainingNumSamples >= 1) { + getStackTrace(executionId, maxStackTraceDepth) match { + case Some(stackTrace) => + + currentTraces.add(stackTrace) + + if (remainingNumSamples > 1) { + // ---- Continue ---- + val msg = SampleTaskStackTrace( + sampleId, + executionId, + delayBetweenSamples, + maxStackTraceDepth, + remainingNumSamples - 1, + currentTraces, + sender) + + context.system.scheduler.scheduleOnce( + delayBetweenSamples, + self, + msg)(context.dispatcher) + } else { + // ---- Done ---- + log.debug(s"Done with stack trace sample $sampleId.") + + sender ! ResponseStackTraceSampleSuccess( + sampleId, + executionId, + currentTraces) + } + + case None => + if (currentTraces.size() == 0) { + throw new IllegalStateException(s"Cannot sample task $executionId. " + + s"Either the task is not known to the task manager or it is not running.") + } else { + throw new IllegalStateException(s"Cannot sample task $executionId. " + + s"Task was removed after ${currentTraces.size()} sample(s).") + } + } + } else { + throw new IllegalStateException("Non-positive number of remaining samples") + } + } catch { + case e: Exception => + sender ! ResponseStackTraceSampleFailure(sampleId, executionId, e) + } + + case _ => unhandled(message) + } + + /** + * Returns a stack trace of a running task. + * + * @param executionId ID of the running task. + * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates + * no maximum and collects the complete stack + * trace. + * @return Stack trace of the running task. + */ + def getStackTrace( + executionId: ExecutionAttemptID, + maxStackTraceDepth: Int): Option[Array[StackTraceElement]] = { + + val task = runningTasks.get(executionId) + + if (task != null && task.getExecutionState == ExecutionState.RUNNING) { + val stackTrace : Array[StackTraceElement] = task.getExecutingThread.getStackTrace + + if (maxStackTraceDepth > 0) { + Option(util.Arrays.copyOfRange(stackTrace, 0, maxStackTraceDepth.min(stackTrace.length))) + } else { + Option(stackTrace) + } + } else { + Option.empty + } + } + } // -------------------------------------------------------------------------- // Task Manager / JobManager association and initialization diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 17e8e115dbffba6292aa6f01ebbc0f8bf64112e2..f5271dfa6d3a909a1d67b72279a8b7d86c471d58 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -23,7 +23,6 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.Creator; import akka.testkit.JavaTestKit; - import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; @@ -52,6 +51,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess; +import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; @@ -61,14 +63,12 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.NetUtils; - +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; @@ -77,6 +77,7 @@ import scala.concurrent.duration.FiniteDuration; import java.net.InetSocketAddress; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -89,10 +90,11 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartit import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @SuppressWarnings("serial") -public class TaskManagerTest { +public class TaskManagerTest extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class); @@ -118,12 +120,6 @@ public class TaskManagerTest { @Test public void testSubmitAndExecuteTask() { - - LOG.info( "--------------------------------------------------------------------\n" + - " Starting testSubmitAndExecuteTask() \n" + - "--------------------------------------------------------------------"); - - new JavaTestKit(system){{ ActorGateway taskManager = null; @@ -233,11 +229,6 @@ public class TaskManagerTest { @Test public void testJobSubmissionAndCanceling() { - - LOG.info( "--------------------------------------------------------------------\n" + - " Starting testJobSubmissionAndCanceling() \n" + - "--------------------------------------------------------------------"); - new JavaTestKit(system){{ ActorGateway jobManager = null; @@ -370,11 +361,6 @@ public class TaskManagerTest { @Test public void testGateChannelEdgeMismatch() { - - LOG.info( "--------------------------------------------------------------------\n" + - " Starting testGateChannelEdgeMismatch() \n" + - "--------------------------------------------------------------------"); - new JavaTestKit(system){{ ActorGateway jobManager = null; @@ -462,11 +448,6 @@ public class TaskManagerTest { @Test public void testRunJobWithForwardChannel() { - - LOG.info( "--------------------------------------------------------------------\n" + - " Starting testRunJobWithForwardChannel() \n" + - "--------------------------------------------------------------------"); - new JavaTestKit(system){{ ActorGateway jobManager = null; @@ -596,11 +577,6 @@ public class TaskManagerTest { @Test public void testCancellingDependentAndStateUpdateFails() { - - LOG.info( "--------------------------------------------------------------------\n" + - " Starting testCancellingDependentAndStateUpdateFails() \n" + - "--------------------------------------------------------------------"); - // this tests creates two tasks. the sender sends data, and fails to send the // state update back to the job manager // the second one blocks to be canceled @@ -929,6 +905,283 @@ public class TaskManagerTest { }}; } + // ------------------------------------------------------------------------ + // Stack trace sample + // ------------------------------------------------------------------------ + + /** + * Tests sampling of task stack traces. + */ + @Test + @SuppressWarnings("unchecked") + public void testTriggerStackTraceSampleMessage() throws Exception { + new JavaTestKit(system) {{ + ActorGateway taskManagerActorGateway = null; + ActorGateway jobManagerActorGateway = TestingUtils.createForwardingJobManager( + system, + getTestActor(), + Option.empty()); + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final ActorGateway jobManager = jobManagerActorGateway; + final ActorGateway taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + false); + + // Registration + new Within(d) { + @Override + protected void run() { + expectMsgClass(RegistrationMessages.RegisterTaskManager.class); + assertEquals(taskManager.actor(), getLastSender()); + + taskManager.tell(new RegistrationMessages.AcknowledgeRegistration( + new InstanceID(), 12345), jobManager); + } + }; + + // Single blocking task + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( + new ApplicationID(), + new JobID(), + new JobVertexID(), + new ExecutionAttemptID(), + "Task", + 0, + 1, + 0, + new Configuration(), + new Configuration(), + Tasks.BlockingNoOpInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + // Submit the task + new Within(d) { + @Override + protected void run() { + try { + Future taskRunningFuture = taskManager.ask( + new TestingTaskManagerMessages.NotifyWhenTaskIsRunning( + tdd.getExecutionId()), timeout); + + taskManager.tell(new SubmitTask(tdd)); + + Await.ready(taskRunningFuture, d); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }; + + // + // 1) Trigger sample for non-existing task + // + new Within(d) { + @Override + protected void run() { + try { + ExecutionAttemptID taskId = new ExecutionAttemptID(); + + taskManager.tell(new TriggerStackTraceSample( + 112223, + taskId, + 100, + d, + 0), + testActorGateway); + + // Receive the expected message (heartbeat races possible) + Object[] msg = receiveN(1); + while (!(msg[0] instanceof ResponseStackTraceSampleFailure)) { + msg = receiveN(1); + } + + ResponseStackTraceSampleFailure response = (ResponseStackTraceSampleFailure) msg[0]; + + assertEquals(112223, response.sampleId()); + assertEquals(taskId, response.executionId()); + assertEquals(IllegalStateException.class, response.cause().getClass()); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }; + + // + // 2) Trigger sample for the blocking task + // + new Within(d) { + @Override + protected void run() { + boolean success = false; + Throwable lastError = null; + + for (int i = 0; i < 100 && !success; i++) { + try { + int numSamples = 5; + + taskManager.tell(new TriggerStackTraceSample( + 19230, + tdd.getExecutionId(), + numSamples, + new FiniteDuration(100, TimeUnit.MILLISECONDS), + 0), + testActorGateway); + + // Receive the expected message (heartbeat races possible) + Object[] msg = receiveN(1); + while (!(msg[0] instanceof ResponseStackTraceSampleSuccess)) { + msg = receiveN(1); + } + + ResponseStackTraceSampleSuccess response = (ResponseStackTraceSampleSuccess) msg[0]; + + // ---- Verify response ---- + assertEquals(19230, response.sampleId()); + assertEquals(tdd.getExecutionId(), response.executionId()); + + List traces = response.samples(); + + assertEquals("Number of samples", numSamples, traces.size()); + + for (StackTraceElement[] trace : traces) { + // Look for BlockingNoOpInvokable#invoke + for (StackTraceElement elem : trace) { + if (elem.getClassName().equals( + Tasks.BlockingNoOpInvokable.class.getName())) { + + assertEquals("invoke", elem.getMethodName()); + success = true; + break; + } + } + + assertTrue("Unexpected stack trace: " + + Arrays.toString(trace), success); + } + } catch (Throwable t) { + lastError = t; + LOG.warn("Failed to find invokable.", t); + } + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Interrupted while sleeping before retry.", e); + break; + } + } + + if (!success) { + if (lastError == null) { + fail("Failed to find invokable"); + } else { + fail(lastError.getMessage()); + } + } + } + }; + + // + // 4) Trigger sample for the blocking task with max depth + // + new Within(d) { + @Override + protected void run() { + try { + int numSamples = 5; + int maxDepth = 2; + + taskManager.tell(new TriggerStackTraceSample( + 1337, + tdd.getExecutionId(), + numSamples, + new FiniteDuration(100, TimeUnit.MILLISECONDS), + maxDepth), + testActorGateway); + + // Receive the expected message (heartbeat races possible) + Object[] msg = receiveN(1); + while (!(msg[0] instanceof ResponseStackTraceSampleSuccess)) { + msg = receiveN(1); + } + + ResponseStackTraceSampleSuccess response = (ResponseStackTraceSampleSuccess) msg[0]; + + // ---- Verify response ---- + assertEquals(1337, response.sampleId()); + assertEquals(tdd.getExecutionId(), response.executionId()); + + List traces = response.samples(); + + assertEquals("Number of samples", numSamples, traces.size()); + + for (StackTraceElement[] trace : traces) { + assertEquals("Max depth", maxDepth, trace.length); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }; + + // + // 5) Trigger sample for the blocking task, but cancel it during sampling + // + new Within(d) { + @Override + protected void run() { + try { + // Trigger many samples in order to cancel the task + // during a sample + taskManager.tell(new TriggerStackTraceSample( + 0, + tdd.getExecutionId(), + 10000, + new FiniteDuration(100, TimeUnit.MILLISECONDS), + 0), + testActorGateway); + + // Cancel the task + taskManager.tell(new CancelTask(tdd.getExecutionId())); + + // Receive the expected message (heartbeat races possible) + Object[] msg = receiveN(1); + while (!(msg[0] instanceof ResponseStackTraceSampleFailure)) { + msg = receiveN(1); + } + + ResponseStackTraceSampleFailure response = (ResponseStackTraceSampleFailure) msg[0]; + + assertEquals(tdd.getExecutionId(), response.executionId()); + assertEquals(IllegalStateException.class, response.cause().getClass()); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }; + } finally { + TestingUtils.stopActor(taskManagerActorGateway); + TestingUtils.stopActor(jobManagerActorGateway); + } + }}; + } + // -------------------------------------------------------------------------------------------- public static class SimpleJobManager extends FlinkUntypedActor {