diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index b9b8a47c5d74f378c7681577c778ea87b804851d..db88ffd58600e7502ac2628bb67d78c0c83b6e12 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -33,6 +33,7 @@ import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -170,6 +171,10 @@ public class BackPressureStatsTracker { if (executionContext != null) { pendingStats.add(vertex); + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); + } + Future sample = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), numSamples, @@ -246,7 +251,7 @@ public class BackPressureStatsTracker { OperatorBackPressureStats stats = createStatsFromSample(success); operatorStatsCache.put(vertex, stats); } else { - LOG.warn("Failed to gather stack trace sample.", failure); + LOG.debug("Failed to gather stack trace sample.", failure); } } catch (Throwable t) { LOG.error("Error during stats completion.", t); @@ -278,7 +283,7 @@ public class BackPressureStatsTracker { if (sampledTasks.contains(taskId)) { subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex()); } else { - throw new RuntimeException("Outdated sample. A task, which is part of the " + + LOG.debug("Outdated sample. A task, which is part of the " + "sample has been reset."); } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java index e7b292f546b7c828002f0abb04bc1beda671ee33..bbfb530538af6fa6fe9b7741c41385327a1d6460 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java @@ -179,7 +179,9 @@ public class StackTraceSampleCoordinator { pending.getSampleId()); pending.discard(new RuntimeException("Time out")); - pendingSamples.remove(pending.getSampleId()); + if (pendingSamples.remove(pending.getSampleId()) != null) { + rememberRecentSampleId(pending.getSampleId()); + } } } } catch (Throwable t) { @@ -319,7 +321,9 @@ public class StackTraceSampleCoordinator { sampleId, executionId); } } else { - throw new IllegalStateException("Unknown sample ID " + sampleId); + if (LOG.isDebugEnabled()) { + LOG.debug("Unknown sample ID " + sampleId); + } } } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java index 406197c6b8646af78db001ceefc49a335b48170f..29345a63efcf9dc697054fc557ebce76859f28dc 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java @@ -226,17 +226,13 @@ public class StackTraceSampleCoordinatorTest { Throwable cause = sampleFuture.failed().value().get().get(); assertTrue(cause.getCause().getMessage().contains("Time out")); - // Collect after the timeout - try { - ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); - coord.collectStackTraces(0, executionId, new ArrayList()); - fail("Did not throw expected Exception"); - } catch (IllegalStateException ignored) { - } + // Collect after the timeout (should be ignored) + ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId(); + coord.collectStackTraces(0, executionId, new ArrayList()); } - /** Tests that collecting an unknown sample fails. */ - @Test(expected = IllegalStateException.class) + /** Tests that collecting an unknown sample is ignored. */ + @Test public void testCollectStackTraceForUnknownSample() throws Exception { coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList()); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala index 9f2e6e9dcb7f7a0c1113e823b9e986c07f96c8cf..c3c26dcef0ce8c5130494fdc7a14b51d9bde0dba 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala @@ -63,7 +63,11 @@ object StackTraceSampleMessages { sampleId: Int, executionId: ExecutionAttemptID, samples: java.util.List[Array[StackTraceElement]]) - extends StackTraceSampleMessages + extends StackTraceSampleMessages { + + override def toString: String = + s"ResponseStackTraceSampleSuccess($sampleId, $executionId, ${samples.size()} samples)" + } /** * Response after a failed stack trace sample (sent by the task managers to