From 5dd85e2867a639e10891209dd59be0136a19ecfa Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 20 Jul 2016 17:08:18 +0200 Subject: [PATCH] [FLINK-4202] Add restarting time JM metric This PR adds a JM metric which shows the time it took to restart a job. The time is measured between entering the JobStatus.RESTARTING and reaching the JobStatus.RUNNING state. During this time, the restarting time is continuously updated. The metric only shows the time for the last restart attempt. The metric is published in the job metric group under the name of "restartingTime". This closes #2271. --- .../apache/flink/metrics/MetricRegistry.java | 4 + .../executiongraph/ExecutionGraph.java | 47 ++- .../flink/runtime/jobgraph/JobStatus.java | 4 + .../flink/runtime/jobmanager/JobManager.scala | 23 +- ...ecutionGraphCheckpointCoordinatorTest.java | 4 +- .../ExecutionGraphMetricsTest.java | 321 ++++++++++++++++++ .../partitioner/RescalePartitionerTest.java | 4 +- 7 files changed, 391 insertions(+), 16 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java index d9f9bdc0977..274821e8f74 100644 --- a/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java @@ -132,6 +132,10 @@ public class MetricRegistry { return this.delimiter; } + public MetricReporter getReporter() { + return reporter; + } + /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 93f6f32eaf6..887c49038d6 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -24,6 +24,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -118,6 +121,8 @@ public class ExecutionGraph implements Serializable { /** The log object used for debugging. */ static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class); + static final String RESTARTING_TIME_METRIC_NAME = "restartingTime"; + // -------------------------------------------------------------------------------------------- /** The lock used to secure all access to mutable fields, especially the tracking of progress @@ -258,7 +263,8 @@ public class ExecutionGraph implements Serializable { restartStrategy, new ArrayList(), new ArrayList(), - ExecutionGraph.class.getClassLoader() + ExecutionGraph.class.getClassLoader(), + new UnregisteredMetricsGroup() ); } @@ -272,7 +278,8 @@ public class ExecutionGraph implements Serializable { RestartStrategy restartStrategy, List requiredJarFiles, List requiredClasspaths, - ClassLoader userClassLoader) { + ClassLoader userClassLoader, + MetricGroup metricGroup) { checkNotNull(executionContext); checkNotNull(jobId); @@ -306,6 +313,8 @@ public class ExecutionGraph implements Serializable { this.timeout = timeout; this.restartStrategy = restartStrategy; + + metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge()); } // -------------------------------------------------------------------------------------------- @@ -908,7 +917,11 @@ public class ExecutionGraph implements Serializable { } for (int i = 0; i < stateTimestamps.length; i++) { - stateTimestamps[i] = 0; + if (i != JobStatus.RESTARTING.ordinal()) { + // Only clear the non restarting state in order to preserve when the job was + // restarted. This is needed for the restarting time gauge + stateTimestamps[i] = 0; + } } numFinishedJobVertices = 0; transitionState(JobStatus.RESTARTING, JobStatus.CREATED); @@ -1290,4 +1303,32 @@ public class ExecutionGraph implements Serializable { fail(error); } } + + /** + * Gauge which returns the last restarting time. Restarting time is the time between + * JobStatus.RESTARTING and JobStatus.RUNNING. If it is still the initial job execution, + * then the gauge will return 0. + */ + private class RestartTimeGauge implements Gauge { + + @Override + public Long getValue() { + long restartingTimestamp = stateTimestamps[JobStatus.RESTARTING.ordinal()]; + + if (restartingTimestamp <= 0) { + // we haven't yet restarted our job + return 0L; + } else if (stateTimestamps[JobStatus.RUNNING.ordinal()] >= restartingTimestamp) { + // we have transitioned to RUNNING since the last restart + return stateTimestamps[JobStatus.RUNNING.ordinal()] - restartingTimestamp; + } else if (state.isTerminalState()) { + // since the last restart we've switched to a terminal state without touching + // the RUNNING state (e.g. failing from RESTARTING) + return stateTimestamps[state.ordinal()] - restartingTimestamp; + } else { + // we're still somwhere between RESTARTING and RUNNING + return System.currentTimeMillis() - restartingTimestamp; + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 52a2abe1b32..4ae566dd478 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -70,6 +70,10 @@ public enum JobStatus { public boolean isGloballyTerminalState() { return terminalState == TerminalState.GLOBALLY; } + + public boolean isTerminalState() { + return terminalState != TerminalState.NON_TERMINAL; + } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3c8e7168683..ccbd263410b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1126,6 +1126,16 @@ class JobManager( log.info(s"Using restart strategy $restartStrategy for $jobId.") + val jobMetrics = jobManagerMetricGroup match { + case Some(group) => + group.addJob(jobGraph.getJobID, jobGraph.getName) match { + case (jobGroup:Any) => jobGroup + case null => new UnregisteredMetricsGroup() + } + case None => + new UnregisteredMetricsGroup() + } + // see if there already exists an ExecutionGraph for the corresponding job ID executionGraph = currentJobs.get(jobGraph.getJobID) match { case Some((graph, currentJobInfo)) => @@ -1142,7 +1152,8 @@ class JobManager( restartStrategy, jobGraph.getUserJarBlobKeys, jobGraph.getClasspaths, - userCodeLoader) + userCodeLoader, + jobMetrics) currentJobs.put(jobGraph.getJobID, (graph, jobInfo)) graph @@ -1239,16 +1250,6 @@ class JobManager( if (isStatsDisabled) { new DisabledCheckpointStatsTracker() } else { - - val jobMetrics = jobManagerMetricGroup match { - case Some(group) => - group.addJob(jobGraph.getJobID, jobGraph.getName) match { - case (jobGroup:Any) => jobGroup - case null => new UnregisteredMetricsGroup() - } - case None => - new UnregisteredMetricsGroup() - } val historySize: Int = flinkConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index dcc98c925eb..50dbf4ff755 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; @@ -59,7 +60,8 @@ public class ExecutionGraphCheckpointCoordinatorTest { new NoRestartStrategy(), Collections.emptyList(), Collections.emptyList(), - ClassLoader.getSystemClassLoader()); + ClassLoader.getSystemClassLoader(), + new UnregisteredMetricsGroup()); ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java new file mode 100644 index 00000000000..219e4405509 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.MetricRegistry; +import org.apache.flink.metrics.groups.AbstractMetricGroup; +import org.apache.flink.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ExecutionGraphMetricsTest extends TestLogger { + + /** + * This test tests that the restarting time metric correctly displays restarting times. + */ + @Test + public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException { + // setup execution graph with mocked scheduling logic + int parallelism = 1; + + JobVertex jobVertex = new JobVertex("TestVertex"); + jobVertex.setParallelism(parallelism); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + JobGraph jobGraph = new JobGraph("Test Job", jobVertex); + + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestingReporter.class.getName()); + + Configuration jobConfig = new Configuration(); + + FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + MetricRegistry metricRegistry = new MetricRegistry(config); + + MetricReporter reporter = metricRegistry.getReporter(); + + assertTrue(reporter instanceof TestingReporter); + + TestingReporter testingReporter = (TestingReporter) reporter; + + MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost"); + + Scheduler scheduler = mock(Scheduler.class); + + SimpleSlot simpleSlot = mock(SimpleSlot.class); + + Instance instance = mock(Instance.class); + + InstanceConnectionInfo instanceConnectionInfo = mock(InstanceConnectionInfo.class); + + Slot rootSlot = mock(Slot.class); + + ActorGateway actorGateway = mock(ActorGateway.class); + + when(simpleSlot.isAlive()).thenReturn(true); + when(simpleSlot.getInstance()).thenReturn(instance); + when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true); + when(simpleSlot.getRoot()).thenReturn(rootSlot); + + when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot); + + when(instance.getInstanceConnectionInfo()).thenReturn(instanceConnectionInfo); + when(instance.getActorGateway()).thenReturn(actorGateway); + when(instanceConnectionInfo.getHostname()).thenReturn("localhost"); + + when(rootSlot.getSlotNumber()).thenReturn(0); + + when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.successful(Messages.getAcknowledge())); + + TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy(); + + ExecutionGraph executionGraph = new ExecutionGraph( + ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()), + jobGraph.getJobID(), + jobGraph.getName(), + jobConfig, + new SerializedValue(null), + timeout, + testingRestartStrategy, + Collections.emptyList(), + Collections.emptyList(), + getClass().getClassLoader(), + metricGroup); + + // get restarting time metric + Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME); + + assertNotNull(metric); + assertTrue(metric instanceof Gauge); + + Gauge restartingTime = (Gauge) metric; + + // check that the restarting time is 0 since it's the initial start + assertTrue(0L == restartingTime.getValue()); + + executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + // start execution + executionGraph.scheduleForExecution(scheduler); + + assertTrue(0L == restartingTime.getValue()); + + List executionIDs = new ArrayList<>(); + + for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) { + executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId()); + } + + // tell execution graph that the tasks are in state running --> job status switches to state running + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING)); + } + + assertEquals(JobStatus.RUNNING, executionGraph.getState()); + + assertTrue(0L == restartingTime.getValue()); + + // fail the job so that it goes into state restarting + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception())); + } + + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + + long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING); + + // wait some time so that the restarting time gauge shows a value different from 0 + Thread.sleep(50); + + long previousRestartingTime = restartingTime.getValue(); + + // check that the restarting time is monotonically increasing + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime >= previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + // check that we have measured some restarting time + assertTrue(previousRestartingTime > 0); + + // restart job + testingRestartStrategy.restartExecutionGraph(); + + executionIDs.clear(); + + for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) { + executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId()); + } + + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING)); + } + + assertEquals(JobStatus.RUNNING, executionGraph.getState()); + + assertTrue(firstRestartingTimestamp != 0); + + previousRestartingTime = restartingTime.getValue(); + + // check that the restarting time does not increase after we've reached the running state + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime == previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + // fail job again + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception())); + } + + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + + long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING); + + assertTrue(firstRestartingTimestamp != secondRestartingTimestamp); + + Thread.sleep(50); + + previousRestartingTime = restartingTime.getValue(); + + // check that the restarting time is increasing again + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime >= previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + assertTrue(previousRestartingTime > 0); + + // now lets fail the job while it is in restarting and see whether the restarting time then stops to increase + executionGraph.fail(new Exception()); + + assertEquals(JobStatus.FAILED, executionGraph.getState()); + + previousRestartingTime = restartingTime.getValue(); + + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime == previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + } + + public static class TestingReporter implements MetricReporter { + + private final Map metrics = new HashMap<>(); + + @Override + public void open(Configuration config) {} + + @Override + public void close() {} + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + metrics.put(metricName, metric); + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) { + metrics.remove(metricName); + } + + Metric getMetric(String metricName) { + return metrics.get(metricName); + } + } + + static class TestingRestartStrategy implements RestartStrategy { + + private boolean restartable = true; + private ExecutionGraph executionGraph = null; + + @Override + public boolean canRestart() { + return restartable; + } + + @Override + public void restart(ExecutionGraph executionGraph) { + this.executionGraph = executionGraph; + } + + public void setRestartable(boolean restartable) { + this.restartable = restartable; + } + + public void restartExecutionGraph() { + executionGraph.restart(); + } + } + +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index df4efdb88a7..8c7360aa592 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; @@ -142,7 +143,8 @@ public class RescalePartitionerTest extends TestLogger { new NoRestartStrategy(), new ArrayList(), new ArrayList(), - ExecutionGraph.class.getClassLoader()); + ExecutionGraph.class.getClassLoader(), + new UnregisteredMetricsGroup()); try { eg.attachJobGraph(jobVertices); } -- GitLab