提交 5dd85e28 编写于 作者: T Till Rohrmann

[FLINK-4202] Add restarting time JM metric

This PR adds a JM metric which shows the time it took to restart a job. The time is
measured between entering the JobStatus.RESTARTING and reaching the JobStatus.RUNNING
state. During this time, the restarting time is continuously updated. The metric only
shows the time for the last restart attempt. The metric is published in the job metric
group under the name of "restartingTime".

This closes #2271.
上级 d3bc5563
......@@ -132,6 +132,10 @@ public class MetricRegistry {
return this.delimiter;
}
public MetricReporter getReporter() {
return reporter;
}
/**
* Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
*/
......
......@@ -24,6 +24,9 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
......@@ -118,6 +121,8 @@ public class ExecutionGraph implements Serializable {
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
static final String RESTARTING_TIME_METRIC_NAME = "restartingTime";
// --------------------------------------------------------------------------------------------
/** The lock used to secure all access to mutable fields, especially the tracking of progress
......@@ -258,7 +263,8 @@ public class ExecutionGraph implements Serializable {
restartStrategy,
new ArrayList<BlobKey>(),
new ArrayList<URL>(),
ExecutionGraph.class.getClassLoader()
ExecutionGraph.class.getClassLoader(),
new UnregisteredMetricsGroup()
);
}
......@@ -272,7 +278,8 @@ public class ExecutionGraph implements Serializable {
RestartStrategy restartStrategy,
List<BlobKey> requiredJarFiles,
List<URL> requiredClasspaths,
ClassLoader userClassLoader) {
ClassLoader userClassLoader,
MetricGroup metricGroup) {
checkNotNull(executionContext);
checkNotNull(jobId);
......@@ -306,6 +313,8 @@ public class ExecutionGraph implements Serializable {
this.timeout = timeout;
this.restartStrategy = restartStrategy;
metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
}
// --------------------------------------------------------------------------------------------
......@@ -908,7 +917,11 @@ public class ExecutionGraph implements Serializable {
}
for (int i = 0; i < stateTimestamps.length; i++) {
stateTimestamps[i] = 0;
if (i != JobStatus.RESTARTING.ordinal()) {
// Only clear the non restarting state in order to preserve when the job was
// restarted. This is needed for the restarting time gauge
stateTimestamps[i] = 0;
}
}
numFinishedJobVertices = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
......@@ -1290,4 +1303,32 @@ public class ExecutionGraph implements Serializable {
fail(error);
}
}
/**
* Gauge which returns the last restarting time. Restarting time is the time between
* JobStatus.RESTARTING and JobStatus.RUNNING. If it is still the initial job execution,
* then the gauge will return 0.
*/
private class RestartTimeGauge implements Gauge<Long> {
@Override
public Long getValue() {
long restartingTimestamp = stateTimestamps[JobStatus.RESTARTING.ordinal()];
if (restartingTimestamp <= 0) {
// we haven't yet restarted our job
return 0L;
} else if (stateTimestamps[JobStatus.RUNNING.ordinal()] >= restartingTimestamp) {
// we have transitioned to RUNNING since the last restart
return stateTimestamps[JobStatus.RUNNING.ordinal()] - restartingTimestamp;
} else if (state.isTerminalState()) {
// since the last restart we've switched to a terminal state without touching
// the RUNNING state (e.g. failing from RESTARTING)
return stateTimestamps[state.ordinal()] - restartingTimestamp;
} else {
// we're still somwhere between RESTARTING and RUNNING
return System.currentTimeMillis() - restartingTimestamp;
}
}
}
}
......@@ -70,6 +70,10 @@ public enum JobStatus {
public boolean isGloballyTerminalState() {
return terminalState == TerminalState.GLOBALLY;
}
public boolean isTerminalState() {
return terminalState != TerminalState.NON_TERMINAL;
}
}
......@@ -1126,6 +1126,16 @@ class JobManager(
log.info(s"Using restart strategy $restartStrategy for $jobId.")
val jobMetrics = jobManagerMetricGroup match {
case Some(group) =>
group.addJob(jobGraph.getJobID, jobGraph.getName) match {
case (jobGroup:Any) => jobGroup
case null => new UnregisteredMetricsGroup()
}
case None =>
new UnregisteredMetricsGroup()
}
// see if there already exists an ExecutionGraph for the corresponding job ID
executionGraph = currentJobs.get(jobGraph.getJobID) match {
case Some((graph, currentJobInfo)) =>
......@@ -1142,7 +1152,8 @@ class JobManager(
restartStrategy,
jobGraph.getUserJarBlobKeys,
jobGraph.getClasspaths,
userCodeLoader)
userCodeLoader,
jobMetrics)
currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
graph
......@@ -1239,16 +1250,6 @@ class JobManager(
if (isStatsDisabled) {
new DisabledCheckpointStatsTracker()
} else {
val jobMetrics = jobManagerMetricGroup match {
case Some(group) =>
group.addJob(jobGraph.getJobID, jobGraph.getName) match {
case (jobGroup:Any) => jobGroup
case null => new UnregisteredMetricsGroup()
}
case None =>
new UnregisteredMetricsGroup()
}
val historySize: Int = flinkConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
......
......@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
......@@ -59,7 +60,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
new NoRestartStrategy(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
ClassLoader.getSystemClassLoader());
ClassLoader.getSystemClassLoader(),
new UnregisteredMetricsGroup());
ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.MetricRegistry;
import org.apache.flink.metrics.groups.AbstractMetricGroup;
import org.apache.flink.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Matchers;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExecutionGraphMetricsTest extends TestLogger {
/**
* This test tests that the restarting time metric correctly displays restarting times.
*/
@Test
public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
// setup execution graph with mocked scheduling logic
int parallelism = 1;
JobVertex jobVertex = new JobVertex("TestVertex");
jobVertex.setParallelism(parallelism);
jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_CLASS, TestingReporter.class.getName());
Configuration jobConfig = new Configuration();
FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
MetricRegistry metricRegistry = new MetricRegistry(config);
MetricReporter reporter = metricRegistry.getReporter();
assertTrue(reporter instanceof TestingReporter);
TestingReporter testingReporter = (TestingReporter) reporter;
MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
Scheduler scheduler = mock(Scheduler.class);
SimpleSlot simpleSlot = mock(SimpleSlot.class);
Instance instance = mock(Instance.class);
InstanceConnectionInfo instanceConnectionInfo = mock(InstanceConnectionInfo.class);
Slot rootSlot = mock(Slot.class);
ActorGateway actorGateway = mock(ActorGateway.class);
when(simpleSlot.isAlive()).thenReturn(true);
when(simpleSlot.getInstance()).thenReturn(instance);
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
when(simpleSlot.getRoot()).thenReturn(rootSlot);
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
when(instance.getInstanceConnectionInfo()).thenReturn(instanceConnectionInfo);
when(instance.getActorGateway()).thenReturn(actorGateway);
when(instanceConnectionInfo.getHostname()).thenReturn("localhost");
when(rootSlot.getSlotNumber()).thenReturn(0);
when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
ExecutionGraph executionGraph = new ExecutionGraph(
ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()),
jobGraph.getJobID(),
jobGraph.getName(),
jobConfig,
new SerializedValue<ExecutionConfig>(null),
timeout,
testingRestartStrategy,
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
getClass().getClassLoader(),
metricGroup);
// get restarting time metric
Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
assertNotNull(metric);
assertTrue(metric instanceof Gauge);
Gauge<Long> restartingTime = (Gauge<Long>) metric;
// check that the restarting time is 0 since it's the initial start
assertTrue(0L == restartingTime.getValue());
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
// start execution
executionGraph.scheduleForExecution(scheduler);
assertTrue(0L == restartingTime.getValue());
List<ExecutionAttemptID> executionIDs = new ArrayList<>();
for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
}
// tell execution graph that the tasks are in state running --> job status switches to state running
for (ExecutionAttemptID executionID : executionIDs) {
executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
}
assertEquals(JobStatus.RUNNING, executionGraph.getState());
assertTrue(0L == restartingTime.getValue());
// fail the job so that it goes into state restarting
for (ExecutionAttemptID executionID : executionIDs) {
executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
}
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
// wait some time so that the restarting time gauge shows a value different from 0
Thread.sleep(50);
long previousRestartingTime = restartingTime.getValue();
// check that the restarting time is monotonically increasing
for (int i = 0; i < 10; i++) {
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime >= previousRestartingTime);
previousRestartingTime = currentRestartingTime;
}
// check that we have measured some restarting time
assertTrue(previousRestartingTime > 0);
// restart job
testingRestartStrategy.restartExecutionGraph();
executionIDs.clear();
for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
}
for (ExecutionAttemptID executionID : executionIDs) {
executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
}
assertEquals(JobStatus.RUNNING, executionGraph.getState());
assertTrue(firstRestartingTimestamp != 0);
previousRestartingTime = restartingTime.getValue();
// check that the restarting time does not increase after we've reached the running state
for (int i = 0; i < 10; i++) {
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime == previousRestartingTime);
previousRestartingTime = currentRestartingTime;
}
// fail job again
for (ExecutionAttemptID executionID : executionIDs) {
executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
}
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
Thread.sleep(50);
previousRestartingTime = restartingTime.getValue();
// check that the restarting time is increasing again
for (int i = 0; i < 10; i++) {
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime >= previousRestartingTime);
previousRestartingTime = currentRestartingTime;
}
assertTrue(previousRestartingTime > 0);
// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
executionGraph.fail(new Exception());
assertEquals(JobStatus.FAILED, executionGraph.getState());
previousRestartingTime = restartingTime.getValue();
for (int i = 0; i < 10; i++) {
long currentRestartingTime = restartingTime.getValue();
assertTrue(currentRestartingTime == previousRestartingTime);
previousRestartingTime = currentRestartingTime;
}
}
public static class TestingReporter implements MetricReporter {
private final Map<String, Metric> metrics = new HashMap<>();
@Override
public void open(Configuration config) {}
@Override
public void close() {}
@Override
public void notifyOfAddedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
metrics.put(metricName, metric);
}
@Override
public void notifyOfRemovedMetric(Metric metric, String metricName, AbstractMetricGroup group) {
metrics.remove(metricName);
}
Metric getMetric(String metricName) {
return metrics.get(metricName);
}
}
static class TestingRestartStrategy implements RestartStrategy {
private boolean restartable = true;
private ExecutionGraph executionGraph = null;
@Override
public boolean canRestart() {
return restartable;
}
@Override
public void restart(ExecutionGraph executionGraph) {
this.executionGraph = executionGraph;
}
public void setRestartable(boolean restartable) {
this.restartable = restartable;
}
public void restartExecutionGraph() {
executionGraph.restart();
}
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
......@@ -142,7 +143,8 @@ public class RescalePartitionerTest extends TestLogger {
new NoRestartStrategy(),
new ArrayList<BlobKey>(),
new ArrayList<URL>(),
ExecutionGraph.class.getClassLoader());
ExecutionGraph.class.getClassLoader(),
new UnregisteredMetricsGroup());
try {
eg.attachJobGraph(jobVertices);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册