提交 9e540daf 编写于 作者: Z zentol

[FLINK-4057] Checkpoint Metrics

上级 8829f973
......@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint.stats;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
......@@ -109,7 +111,8 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
public SimpleCheckpointStatsTracker(
int historySize,
List<ExecutionJobVertex> tasksToWaitFor) {
List<ExecutionJobVertex> tasksToWaitFor,
MetricGroup metrics) {
checkArgument(historySize >= 0);
this.historySize = historySize;
......@@ -124,6 +127,9 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
} else {
taskParallelism = Collections.emptyMap();
}
metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge());
metrics.gauge("lastCheckpointDuration", new CheckpointDurationGauge());
}
@Override
......@@ -411,4 +417,18 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
return averageStateSize;
}
}
private class CheckpointSizeGauge implements Gauge<Long> {
@Override
public Long getValue() {
return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
}
}
private class CheckpointDurationGauge implements Gauge<Long> {
@Override
public Long getValue() {
return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getDuration();
}
}
}
......@@ -36,7 +36,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalCon
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.metrics.groups.JobManagerMetricGroup
import org.apache.flink.metrics.groups.{JobManagerMetricGroup, UnregisteredMetricsGroup}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.BlobServer
......@@ -1224,11 +1224,21 @@ class JobManager(
if (isStatsDisabled) {
new DisabledCheckpointStatsTracker()
} else {
val jobMetrics = jobManagerMetricGroup match {
case Some(group) =>
group.addJob(jobGraph.getJobID, jobGraph.getName) match {
case (jobGroup:Any) => jobGroup
case null => new UnregisteredMetricsGroup()
}
case None =>
new UnregisteredMetricsGroup()
}
val historySize: Int = flinkConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
new SimpleCheckpointStatsTracker(historySize, ackVertices)
new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics)
}
val jobParallelism = jobGraph.getSerializedExecutionConfig
......@@ -1655,6 +1665,7 @@ class JobManager(
case t: Throwable =>
log.error(s"Could not properly unregister job $jobID form the library cache.", t)
}
jobManagerMetricGroup.map(_.removeJob(jobID))
futureOption
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint.stats;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
......@@ -54,7 +55,7 @@ public class SimpleCheckpointStatsTrackerTest {
@Test
public void testNoCompletedCheckpointYet() throws Exception {
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(
0, Collections.<ExecutionJobVertex>emptyList());
0, Collections.<ExecutionJobVertex>emptyList(), new UnregisteredMetricsGroup());
assertFalse(tracker.getJobStats().isDefined());
assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined());
......@@ -64,7 +65,7 @@ public class SimpleCheckpointStatsTrackerTest {
public void testRandomStats() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
for (int i = 0; i < checkpoints.length; i++) {
CompletedCheckpoint checkpoint = checkpoints[i];
......@@ -80,7 +81,7 @@ public class SimpleCheckpointStatsTrackerTest {
public void testIllegalOperatorId() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16);
List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
for (CompletedCheckpoint checkpoint : checkpoints) {
tracker.onCompletedCheckpoint(checkpoint);
......@@ -95,7 +96,7 @@ public class SimpleCheckpointStatsTrackerTest {
public void testCompletedCheckpointReordering() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
// First the second checkpoint notifies
tracker.onCompletedCheckpoint(checkpoints[1]);
......@@ -115,7 +116,7 @@ public class SimpleCheckpointStatsTrackerTest {
public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2);
List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup());
tracker.onCompletedCheckpoint(checkpoints[0]);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
import static org.junit.Assert.assertEquals;
public class JobManagerMetricTest {
/**
* Tests that metrics registered on the JobManager are actually accessible.
*
* @throws Exception
*/
@Test
public void testJobManagerMetricAccess() throws Exception {
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(KEY_METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
TestingCluster flink = new TestingCluster(flinkConfiguration);
try {
flink.start();
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
500, 500, 50, 5));
flink.waitForActorsToBeAlive();
flink.submitJobDetached(jobGraph);
Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft())
.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
Await.ready(jobRunning, deadline.timeLeft());
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value"));
Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft())
.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
BlockingInvokable.unblock();
// wait til the job has finished
Await.ready(jobFinished, deadline.timeLeft());
} finally {
flink.stop();
}
}
public static class BlockingInvokable extends AbstractInvokable {
private static boolean blocking = true;
private static final Object lock = new Object();
@Override
public void invoke() throws Exception {
while (blocking) {
synchronized (lock) {
lock.wait();
}
}
}
public static void unblock() {
blocking = false;
synchronized (lock) {
lock.notifyAll();
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册