[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

Disable failing when not all creator properties are known

Move CheckpointStatsCache out of legacy package; Remove unused CheckpointingStatistics#generateCheckpointStatistics method

Remove JsonInclude.Include.NON_NULL from CheckpointStatistics; Pull null check out of CheckpointStatistics#generateCheckpointStatistics; Make CheckpointStatistics#checkpointStatisticcsPerTask non nullable; Add fail on missing creator property

This closes #4763.
上级 6b3fdc28
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
......@@ -58,7 +59,6 @@ import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;
......
......@@ -32,7 +32,9 @@ import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
......@@ -43,8 +45,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
......@@ -52,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FileUtils;
......@@ -78,6 +81,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
private final Executor executor;
private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;
public DispatcherRestEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
......@@ -94,6 +98,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
this.executionGraphCache = new ExecutionGraphCache(
restConfiguration.getTimeout(),
Time.milliseconds(restConfiguration.getRefreshInterval()));
this.checkpointStatsCache = new CheckpointStatsCache(
restConfiguration.getMaxCheckpointStatisticCacheEntries());
}
@Override
......@@ -162,14 +169,23 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
executionGraphCache,
executor);
CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler(
CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler(
restAddressFuture,
leaderRetriever,
timeout,
CheckpointStatisticsHeaders.getInstance(),
CheckpointingStatisticsHeaders.getInstance(),
executionGraphCache,
executor);
CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler(
restAddressFuture,
leaderRetriever,
timeout,
CheckpointStatisticDetailsHeaders.getInstance(),
executionGraphCache,
executor,
checkpointStatsCache);
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
......@@ -192,7 +208,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
......
......@@ -32,14 +32,22 @@ public class RestHandlerConfiguration {
private final long refreshInterval;
private final int maxCheckpointStatisticCacheEntries;
private final Time timeout;
private final File tmpDir;
public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) {
public RestHandlerConfiguration(
long refreshInterval,
int maxCheckpointStatisticCacheEntries,
Time timeout,
File tmpDir) {
Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;
this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries;
this.timeout = Preconditions.checkNotNull(timeout);
this.tmpDir = Preconditions.checkNotNull(tmpDir);
}
......@@ -48,6 +56,10 @@ public class RestHandlerConfiguration {
return refreshInterval;
}
public int getMaxCheckpointStatisticCacheEntries() {
return maxCheckpointStatisticCacheEntries;
}
public Time getTimeout() {
return timeout;
}
......@@ -59,10 +71,12 @@ public class RestHandlerConfiguration {
public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
final int maxCheckpointStatisticCacheEntries = configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir);
return new RestHandlerConfiguration(refreshInterval, maxCheckpointStatisticCacheEntries, timeout, tmpDir);
}
}
......@@ -45,7 +45,7 @@ import java.util.concurrent.Executor;
*
* @param <R> response type
*/
public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, JobMessageParameters> {
public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M extends JobMessageParameters> extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, M> {
private final ExecutionGraphCache executionGraphCache;
......@@ -55,7 +55,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, R, JobMessageParameters> messageHeaders,
MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders);
......@@ -65,7 +65,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
}
@Override
protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
protected CompletableFuture<R> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
JobID jobId = request.getPathParameter(JobIDPathParameter.class);
CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway);
......@@ -73,7 +73,7 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
return executionGraphFuture.thenApplyAsync(
executionGraph -> {
try {
return handleRequest(executionGraph);
return handleRequest(request, executionGraph);
} catch (RestHandlerException rhe) {
throw new CompletionException(rhe);
}
......@@ -81,5 +81,5 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> exte
executor);
}
protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException;
protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionGraph executionGraph) throws RestHandlerException;
}
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobConfigInfo;
......@@ -35,7 +36,7 @@ import java.util.concurrent.Executor;
/**
* Handler serving the job configuration.
*/
public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo> {
public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
public JobConfigHandler(
CompletableFuture<String> localRestAddress,
......@@ -55,7 +56,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf
}
@Override
protected JobConfigInfo handleRequest(AccessExecutionGraph executionGraph) {
protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig();
final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.checkpoints;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
* Base class for checkpoint related REST handler.
*
* @param <R> type of the response
*/
public abstract class AbstractCheckpointHandler<R extends ResponseBody> extends AbstractExecutionGraphHandler<R, CheckpointMessageParameters> {
private final CheckpointStatsCache checkpointStatsCache;
protected AbstractCheckpointHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, R, CheckpointMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
CheckpointStatsCache checkpointStatsCache) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
this.checkpointStatsCache = Preconditions.checkNotNull(checkpointStatsCache);
}
@Override
protected R handleRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
final long checkpointId = request.getPathParameter(CheckpointIdPathParameter.class);
final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
if (checkpointStatsSnapshot != null) {
AbstractCheckpointStats checkpointStats = checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId);
if (checkpointStats != null) {
checkpointStatsCache.tryAdd(checkpointStats);
} else {
checkpointStats = checkpointStatsCache.tryGet(checkpointId);
}
if (checkpointStats != null) {
return handleCheckpointRequest(checkpointStats);
} else {
throw new RestHandlerException("Could not find checkpointing statistics for checkpoint " + checkpointId + '.', HttpResponseStatus.NOT_FOUND);
}
} else {
throw new RestHandlerException("Checkpointing was not enabled for job " + executionGraph.getJobID() + '.', HttpResponseStatus.NOT_FOUND);
}
}
protected abstract R handleCheckpointRequest(AbstractCheckpointStats checkpointStats);
}
......@@ -22,13 +22,14 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
......@@ -40,7 +41,7 @@ import java.util.concurrent.Executor;
/**
* Handler which serves the checkpoint configuration.
*/
public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo> {
public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
public CheckpointConfigHandler(
CompletableFuture<String> localRestAddress,
......@@ -59,7 +60,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
}
@Override
protected CheckpointConfigInfo handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
if (checkpointCoordinatorConfiguration == null) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.job.checkpoints;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
* REST handler which returns the details for a checkpoint.
*/
public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics> {
public CheckpointStatisticDetailsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor,
CheckpointStatsCache checkpointStatsCache) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache);
}
@Override
protected CheckpointStatistics handleCheckpointRequest(AbstractCheckpointStats checkpointStats) {
return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
package org.apache.flink.runtime.rest.handler.job.checkpoints;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
......@@ -56,7 +56,7 @@ public class CheckpointStatsCache {
*
* @param checkpoint Checkpoint to be added.
*/
void tryAdd(AbstractCheckpointStats checkpoint) {
public void tryAdd(AbstractCheckpointStats checkpoint) {
// Don't add in progress checkpoints as they will be replaced by their
// completed/failed version eventually.
if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) {
......@@ -70,7 +70,7 @@ public class CheckpointStatsCache {
* @param checkpointId ID of the checkpoint to look up.
* @return The checkpoint or <code>null</code> if checkpoint not found.
*/
AbstractCheckpointStats tryGet(long checkpointId) {
public AbstractCheckpointStats tryGet(long checkpointId) {
if (cache != null) {
return cache.getIfPresent(checkpointId);
} else {
......
......@@ -23,19 +23,19 @@ import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
......@@ -49,20 +49,20 @@ import java.util.concurrent.Executor;
/**
* Handler which serves the checkpoint statistics.
*/
public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointStatistics> {
public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
public CheckpointStatisticsHandler(
public CheckpointingStatisticsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> messageHeaders,
MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
}
@Override
protected CheckpointStatistics handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
......@@ -71,7 +71,7 @@ public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<C
} else {
final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts();
final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(
final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(
checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
checkpointStatsCounts.getTotalNumberOfCheckpoints(),
checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
......@@ -83,99 +83,71 @@ public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<C
final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats();
final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats();
final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
new CheckpointStatistics.MinMaxAvgStatistics(
final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
new CheckpointingStatistics.MinMaxAvgStatistics(
stateSize.getMinimum(),
stateSize.getMaximum(),
stateSize.getAverage()),
new CheckpointStatistics.MinMaxAvgStatistics(
new CheckpointingStatistics.MinMaxAvgStatistics(
duration.getMinimum(),
duration.getMaximum(),
duration.getAverage()),
new CheckpointStatistics.MinMaxAvgStatistics(
new CheckpointingStatistics.MinMaxAvgStatistics(
alignment.getMinimum(),
alignment.getMaximum(),
alignment.getAverage()));
final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory();
final CheckpointStatistics.CompletedCheckpointStatistics completed = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestCompletedCheckpoint());
final CheckpointStatistics.CompletedCheckpointStatistics savepoint = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestSavepoint());
final CheckpointStatistics.FailedCheckpointStatistics failed = (CheckpointStatistics.FailedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestFailedCheckpoint());
final CheckpointStatistics.CompletedCheckpointStatistics completed = checkpointStatsHistory.getLatestCompletedCheckpoint() != null ?
(CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
checkpointStatsHistory.getLatestCompletedCheckpoint(),
false) :
null;
final CheckpointStatistics.CompletedCheckpointStatistics savepoint = checkpointStatsHistory.getLatestSavepoint() != null ?
(CheckpointStatistics.CompletedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
checkpointStatsHistory.getLatestSavepoint(),
false) :
null;
final CheckpointStatistics.FailedCheckpointStatistics failed = checkpointStatsHistory.getLatestFailedCheckpoint() != null ?
(CheckpointStatistics.FailedCheckpointStatistics) CheckpointStatistics.generateCheckpointStatistics(
checkpointStatsHistory.getLatestFailedCheckpoint(),
false) :
null;
final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint();
final CheckpointStatistics.RestoredCheckpointStatistics restored;
final CheckpointingStatistics.RestoredCheckpointStatistics restored;
if (restoredCheckpointStats == null) {
restored = null;
} else {
restored = new CheckpointStatistics.RestoredCheckpointStatistics(
restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
restoredCheckpointStats.getCheckpointId(),
restoredCheckpointStats.getRestoreTimestamp(),
restoredCheckpointStats.getProperties().isSavepoint(),
restoredCheckpointStats.getExternalPath());
}
final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
completed,
savepoint,
failed,
restored);
final List<CheckpointStatistics.BaseCheckpointStatistics> history = new ArrayList<>(16);
final List<CheckpointStatistics> history = new ArrayList<>(16);
for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) {
history.add(generateCheckpointStatistics(abstractCheckpointStats));
history.add(CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats, false));
}
return new CheckpointStatistics(
return new CheckpointingStatistics(
counts,
summary,
latestCheckpoints,
history);
}
}
private static CheckpointStatistics.BaseCheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
if (checkpointStats != null) {
if (checkpointStats instanceof CompletedCheckpointStats) {
final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
return new CheckpointStatistics.CompletedCheckpointStatistics(
completedCheckpointStats.getCheckpointId(),
completedCheckpointStats.getStatus(),
completedCheckpointStats.getProperties().isSavepoint(),
completedCheckpointStats.getTriggerTimestamp(),
completedCheckpointStats.getLatestAckTimestamp(),
completedCheckpointStats.getStateSize(),
completedCheckpointStats.getEndToEndDuration(),
completedCheckpointStats.getAlignmentBuffered(),
completedCheckpointStats.getNumberOfSubtasks(),
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
completedCheckpointStats.getExternalPath(),
completedCheckpointStats.isDiscarded());
} else if (checkpointStats instanceof FailedCheckpointStats) {
final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
return new CheckpointStatistics.FailedCheckpointStatistics(
failedCheckpointStats.getCheckpointId(),
failedCheckpointStats.getStatus(),
failedCheckpointStats.getProperties().isSavepoint(),
failedCheckpointStats.getTriggerTimestamp(),
failedCheckpointStats.getLatestAckTimestamp(),
failedCheckpointStats.getStateSize(),
failedCheckpointStats.getEndToEndDuration(),
failedCheckpointStats.getAlignmentBuffered(),
failedCheckpointStats.getNumberOfSubtasks(),
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
failedCheckpointStats.getFailureTimestamp(),
failedCheckpointStats.getFailureMessage());
} else {
throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
}
} else {
return null;
}
}
}
......@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
......
......@@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
......
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
......
......@@ -31,7 +31,8 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.FlinkException;
......@@ -129,37 +130,37 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
}
private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS);
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_COUNTS);
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
gen.writeEndObject();
}
private static void writeSummary(
JsonGenerator gen,
CompletedCheckpointStatsSummary summary) throws IOException {
gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY);
gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE);
gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_SUMMARY);
gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_STATE_SIZE);
writeMinMaxAvg(gen, summary.getStateSizeStats());
gen.writeEndObject();
gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION);
gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_DURATION);
writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
gen.writeEndObject();
gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
gen.writeEndObject();
gen.writeEndObject();
}
static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
}
private static void writeLatestCheckpoints(
......@@ -169,10 +170,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
@Nullable FailedCheckpointStats failed,
@Nullable RestoredCheckpointStats restored) throws IOException {
gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
// Completed checkpoint
if (completed != null) {
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
writeCheckpoint(gen, completed);
String externalPath = completed.getExternalPath();
......@@ -185,7 +186,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
// Completed savepoint
if (savepoint != null) {
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
writeCheckpoint(gen, savepoint);
String externalPath = savepoint.getExternalPath();
......@@ -197,7 +198,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
// Failed checkpoint
if (failed != null) {
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
writeCheckpoint(gen, failed);
gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp());
......@@ -210,14 +211,14 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
// Restored checkpoint
if (restored != null) {
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
gen.writeBooleanField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
String externalPath = restored.getExternalPath();
if (externalPath != null) {
gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
gen.writeStringField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
}
gen.writeEndObject();
}
......@@ -225,29 +226,29 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
}
private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
}
private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY);
gen.writeArrayFieldStart(CheckpointingStatistics.FIELD_NAME_HISTORY);
for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
gen.writeStartObject();
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
gen.writeStringField(CheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
gen.writeBooleanField(CheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
if (checkpoint.getStatus().isCompleted()) {
// --- Completed ---
......
......@@ -26,7 +26,7 @@ import java.util.Collections;
*/
public class JobMessageParameters extends MessageParameters {
private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
protected final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
......
......@@ -16,10 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
......
......@@ -16,9 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.annotation.JsonCreator;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
/**
* Path parameter for the checkpoint id of type {@link Long}.
*/
public class CheckpointIdPathParameter extends MessagePathParameter<Long> {
public static final String KEY = "checkpointid";
protected CheckpointIdPathParameter() {
super(KEY);
}
@Override
protected Long convertFromString(String value) throws ConversionException {
try {
return Long.parseLong(value);
} catch (NumberFormatException nfe) {
throw new ConversionException("Could not parse long from " + value + '.', nfe);
}
}
@Override
protected String convertToString(Long value) {
return value.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import java.util.Arrays;
import java.util.Collection;
/**
* Message parameters for checkpoint related messages.
*/
public class CheckpointMessageParameters extends JobMessageParameters {
protected final CheckpointIdPathParameter checkpointIdPathParameter = new CheckpointIdPathParameter();
@Override
public Collection<MessagePathParameter<?>> getPathParameters() {
return Arrays.asList(jobPathParameter, checkpointIdPathParameter);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* Headers for the {@link CheckpointStatisticDetailsHandler}.
*/
public class CheckpointStatisticDetailsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> {
private static final CheckpointStatisticDetailsHeaders INSTANCE = new CheckpointStatisticDetailsHeaders();
public static final String URL = "/jobs/:jobid/checkpoints/:checkpointid";
private CheckpointStatisticDetailsHeaders() {}
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}
@Override
public Class<CheckpointStatistics> getResponseClass() {
return CheckpointStatistics.class;
}
@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}
@Override
public CheckpointMessageParameters getUnresolvedMessageParameters() {
return new CheckpointMessageParameters();
}
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
@Override
public String getTargetRestEndpointURL() {
return URL;
}
public static CheckpointStatisticDetailsHeaders getInstance() {
return INSTANCE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Statistics for a checkpoint.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
@JsonSubTypes({
@JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
@JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
public class CheckpointStatistics implements ResponseBody {
public static final String FIELD_NAME_ID = "id";
public static final String FIELD_NAME_STATUS = "status";
public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
public static final String FIELD_NAME_STATE_SIZE = "state_size";
public static final String FIELD_NAME_DURATION = "end_to_end_duration";
public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
public static final String FIELD_NAME_TASKS = "tasks";
@JsonProperty(FIELD_NAME_ID)
private final long id;
@JsonProperty(FIELD_NAME_STATUS)
private final CheckpointStatsStatus status;
@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
private final boolean savepoint;
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
private final long triggerTimestamp;
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
private final long latestAckTimestamp;
@JsonProperty(FIELD_NAME_STATE_SIZE)
private final long stateSize;
@JsonProperty(FIELD_NAME_DURATION)
private final long duration;
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
private final long alignmentBuffered;
@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
private final int numSubtasks;
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
@JsonCreator
private CheckpointStatistics(
@JsonProperty(FIELD_NAME_ID) long id,
@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
this.id = id;
this.status = Preconditions.checkNotNull(status);
this.savepoint = savepoint;
this.triggerTimestamp = triggerTimestamp;
this.latestAckTimestamp = latestAckTimestamp;
this.stateSize = stateSize;
this.duration = duration;
this.alignmentBuffered = alignmentBuffered;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask);
}
public long getId() {
return id;
}
public CheckpointStatsStatus getStatus() {
return status;
}
public boolean isSavepoint() {
return savepoint;
}
public long getTriggerTimestamp() {
return triggerTimestamp;
}
public long getLatestAckTimestamp() {
return latestAckTimestamp;
}
public long getStateSize() {
return stateSize;
}
public long getDuration() {
return duration;
}
public long getAlignmentBuffered() {
return alignmentBuffered;
}
public int getNumSubtasks() {
return numSubtasks;
}
public int getNumAckSubtasks() {
return numAckSubtasks;
}
@Nullable
public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() {
return checkpointStatisticsPerTask;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CheckpointStatistics that = (CheckpointStatistics) o;
return id == that.id &&
savepoint == that.savepoint &&
triggerTimestamp == that.triggerTimestamp &&
latestAckTimestamp == that.latestAckTimestamp &&
stateSize == that.stateSize &&
duration == that.duration &&
alignmentBuffered == that.alignmentBuffered &&
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask);
}
@Override
public int hashCode() {
return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask);
}
// -------------------------------------------------------------------------
// Static factory methods
// -------------------------------------------------------------------------
public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) {
Preconditions.checkNotNull(checkpointStats);
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
if (includeTaskCheckpointStatistics) {
Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats();
checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size());
for (TaskStateStats taskStateStat : taskStateStats) {
checkpointStatisticsPerTask.put(
taskStateStat.getJobVertexId(),
new TaskCheckpointStatistics(
taskStateStat.getLatestAckTimestamp(),
taskStateStat.getStateSize(),
taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
taskStateStat.getAlignmentBuffered(),
taskStateStat.getNumberOfSubtasks(),
taskStateStat.getNumberOfAcknowledgedSubtasks()));
}
} else {
checkpointStatisticsPerTask = Collections.emptyMap();
}
if (checkpointStats instanceof CompletedCheckpointStats) {
final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
return new CheckpointStatistics.CompletedCheckpointStatistics(
completedCheckpointStats.getCheckpointId(),
completedCheckpointStats.getStatus(),
completedCheckpointStats.getProperties().isSavepoint(),
completedCheckpointStats.getTriggerTimestamp(),
completedCheckpointStats.getLatestAckTimestamp(),
completedCheckpointStats.getStateSize(),
completedCheckpointStats.getEndToEndDuration(),
completedCheckpointStats.getAlignmentBuffered(),
completedCheckpointStats.getNumberOfSubtasks(),
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
checkpointStatisticsPerTask,
completedCheckpointStats.getExternalPath(),
completedCheckpointStats.isDiscarded());
} else if (checkpointStats instanceof FailedCheckpointStats) {
final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
return new CheckpointStatistics.FailedCheckpointStatistics(
failedCheckpointStats.getCheckpointId(),
failedCheckpointStats.getStatus(),
failedCheckpointStats.getProperties().isSavepoint(),
failedCheckpointStats.getTriggerTimestamp(),
failedCheckpointStats.getLatestAckTimestamp(),
failedCheckpointStats.getStateSize(),
failedCheckpointStats.getEndToEndDuration(),
failedCheckpointStats.getAlignmentBuffered(),
failedCheckpointStats.getNumberOfSubtasks(),
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
checkpointStatisticsPerTask,
failedCheckpointStats.getFailureTimestamp(),
failedCheckpointStats.getFailureMessage());
} else {
throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
}
}
// ---------------------------------------------------------------------
// Static inner classes
// ---------------------------------------------------------------------
/**
* Checkpoint statistics for a single task.
*/
public static final class TaskCheckpointStatistics {
public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
public static final String FIELD_NAME_STATE_SIZE = "state_size";
public static final String FIELD_NAME_DURATION = "end_to_end_duration";
public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
private final long latestAckTimestamp;
@JsonProperty(FIELD_NAME_STATE_SIZE)
private final long stateSize;
@JsonProperty(FIELD_NAME_DURATION)
private final long duration;
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
private final long alignmentBuffered;
@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
private final int numSubtasks;
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
@JsonCreator
public TaskCheckpointStatistics(
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
this.latestAckTimestamp = latestAckTimestamp;
this.stateSize = stateSize;
this.duration = duration;
this.alignmentBuffered = alignmentBuffered;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
}
public long getLatestAckTimestamp() {
return latestAckTimestamp;
}
public long getStateSize() {
return stateSize;
}
public long getDuration() {
return duration;
}
public long getAlignmentBuffered() {
return alignmentBuffered;
}
public int getNumSubtasks() {
return numSubtasks;
}
public int getNumAckSubtasks() {
return numAckSubtasks;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
return latestAckTimestamp == that.latestAckTimestamp &&
stateSize == that.stateSize &&
duration == that.duration &&
alignmentBuffered == that.alignmentBuffered &&
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks;
}
@Override
public int hashCode() {
return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
}
}
/**
* Statistics for a completed checkpoint.
*/
public static final class CompletedCheckpointStatistics extends CheckpointStatistics {
public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
public static final String FIELD_NAME_DISCARDED = "discarded";
@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
@Nullable
private final String externalPath;
@JsonProperty(FIELD_NAME_DISCARDED)
private final boolean discarded;
@JsonCreator
public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ID) long id,
@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
super(
id,
status,
savepoint,
triggerTimestamp,
latestAckTimestamp,
stateSize,
duration,
alignmentBuffered,
numSubtasks,
numAckSubtasks,
checkpointingStatisticsPerTask);
this.externalPath = externalPath;
this.discarded = discarded;
}
@Nullable
public String getExternalPath() {
return externalPath;
}
public boolean isDiscarded() {
return discarded;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
return discarded == that.discarded &&
Objects.equals(externalPath, that.externalPath);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), externalPath, discarded);
}
}
/**
* Statistics for a failed checkpoint.
*/
public static final class FailedCheckpointStatistics extends CheckpointStatistics {
public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
private final long failureTimestamp;
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
@Nullable
private final String failureMessage;
@JsonCreator
public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ID) long id,
@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
super(
id,
status,
savepoint,
triggerTimestamp,
latestAckTimestamp,
stateSize,
duration,
alignmentBuffered,
numSubtasks,
numAckSubtasks,
checkpointingStatisticsPerTask);
this.failureTimestamp = failureTimestamp;
this.failureMessage = failureMessage;
}
public long getFailureTimestamp() {
return failureTimestamp;
}
@Nullable
public String getFailureMessage() {
return failureMessage;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
return failureTimestamp == that.failureTimestamp &&
Objects.equals(failureMessage, that.failureMessage);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
}
}
}
......@@ -16,16 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import javax.annotation.Nullable;
......@@ -33,9 +31,10 @@ import java.util.List;
import java.util.Objects;
/**
* Response of the {@link CheckpointStatisticsHandler}.
* Response of the {@link CheckpointingStatisticsHandler}. This class contains information about
* the checkpointing of a given job.
*/
public class CheckpointStatistics implements ResponseBody {
public class CheckpointingStatistics implements ResponseBody {
public static final String FIELD_NAME_COUNTS = "counts";
......@@ -55,14 +54,14 @@ public class CheckpointStatistics implements ResponseBody {
private final LatestCheckpoints latestCheckpoints;
@JsonProperty(FIELD_NAME_HISTORY)
private final List<BaseCheckpointStatistics> history;
private final List<CheckpointStatistics> history;
@JsonCreator
public CheckpointStatistics(
public CheckpointingStatistics(
@JsonProperty(FIELD_NAME_COUNTS) Counts counts,
@JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints,
@JsonProperty(FIELD_NAME_HISTORY) List<BaseCheckpointStatistics> history) {
@JsonProperty(FIELD_NAME_HISTORY) List<CheckpointStatistics> history) {
this.counts = Preconditions.checkNotNull(counts);
this.summary = Preconditions.checkNotNull(summary);
this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints);
......@@ -81,7 +80,7 @@ public class CheckpointStatistics implements ResponseBody {
return latestCheckpoints;
}
public List<BaseCheckpointStatistics> getHistory() {
public List<CheckpointStatistics> getHistory() {
return history;
}
......@@ -93,7 +92,7 @@ public class CheckpointStatistics implements ResponseBody {
if (o == null || getClass() != o.getClass()) {
return false;
}
CheckpointStatistics that = (CheckpointStatistics) o;
CheckpointingStatistics that = (CheckpointingStatistics) o;
return Objects.equals(counts, that.counts) &&
Objects.equals(summary, that.summary) &&
Objects.equals(latestCheckpoints, that.latestCheckpoints) &&
......@@ -334,15 +333,15 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_COMPLETED)
@Nullable
private final CompletedCheckpointStatistics completedCheckpointStatistics;
private final CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics;
@JsonProperty(FIELD_NAME_SAVEPOINT)
@Nullable
private final CompletedCheckpointStatistics savepointStatistics;
private final CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics;
@JsonProperty(FIELD_NAME_FAILED)
@Nullable
private final FailedCheckpointStatistics failedCheckpointStatistics;
private final CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics;
@JsonProperty(FIELD_NAME_RESTORED)
@Nullable
......@@ -350,9 +349,9 @@ public class CheckpointStatistics implements ResponseBody {
@JsonCreator
public LatestCheckpoints(
@JsonProperty(FIELD_NAME_COMPLETED) @Nullable CompletedCheckpointStatistics completedCheckpointStatistics,
@JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CompletedCheckpointStatistics savepointStatistics,
@JsonProperty(FIELD_NAME_FAILED) @Nullable FailedCheckpointStatistics failedCheckpointStatistics,
@JsonProperty(FIELD_NAME_COMPLETED) @Nullable CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics,
@JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics,
@JsonProperty(FIELD_NAME_FAILED) @Nullable CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics,
@JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) {
this.completedCheckpointStatistics = completedCheckpointStatistics;
this.savepointStatistics = savepointStatistics;
......@@ -361,17 +360,17 @@ public class CheckpointStatistics implements ResponseBody {
}
@Nullable
public CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
public CheckpointStatistics.CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
return completedCheckpointStatistics;
}
@Nullable
public CompletedCheckpointStatistics getSavepointStatistics() {
public CheckpointStatistics.CompletedCheckpointStatistics getSavepointStatistics() {
return savepointStatistics;
}
@Nullable
public FailedCheckpointStatistics getFailedCheckpointStatistics() {
public CheckpointStatistics.FailedCheckpointStatistics getFailedCheckpointStatistics() {
return failedCheckpointStatistics;
}
......@@ -401,290 +400,6 @@ public class CheckpointStatistics implements ResponseBody {
}
}
/**
* Statistics for a checkpoint.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
@JsonSubTypes({
@JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, name = "completed"),
@JsonSubTypes.Type(value = FailedCheckpointStatistics.class, name = "failed")})
public static class BaseCheckpointStatistics {
public static final String FIELD_NAME_ID = "id";
public static final String FIELD_NAME_STATUS = "status";
public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
public static final String FIELD_NAME_STATE_SIZE = "state_size";
public static final String FIELD_NAME_DURATION = "end_to_end_duration";
public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
@JsonProperty(FIELD_NAME_ID)
private final long id;
@JsonProperty(FIELD_NAME_STATUS)
private final CheckpointStatsStatus status;
@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
private final boolean savepoint;
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
private final long triggerTimestamp;
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
private final long latestAckTimestamp;
@JsonProperty(FIELD_NAME_STATE_SIZE)
private final long stateSize;
@JsonProperty(FIELD_NAME_DURATION)
private final long duration;
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
private final long alignmentBuffered;
@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
private final int numSubtasks;
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
@JsonCreator
protected BaseCheckpointStatistics(
@JsonProperty(FIELD_NAME_ID) long id,
@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
this.id = id;
this.status = Preconditions.checkNotNull(status);
this.savepoint = savepoint;
this.triggerTimestamp = triggerTimestamp;
this.latestAckTimestamp = latestAckTimestamp;
this.stateSize = stateSize;
this.duration = duration;
this.alignmentBuffered = alignmentBuffered;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
}
public long getId() {
return id;
}
public CheckpointStatsStatus getStatus() {
return status;
}
public boolean isSavepoint() {
return savepoint;
}
public long getTriggerTimestamp() {
return triggerTimestamp;
}
public long getLatestAckTimestamp() {
return latestAckTimestamp;
}
public long getStateSize() {
return stateSize;
}
public long getDuration() {
return duration;
}
public long getAlignmentBuffered() {
return alignmentBuffered;
}
public int getNumSubtasks() {
return numSubtasks;
}
public int getNumAckSubtasks() {
return numAckSubtasks;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BaseCheckpointStatistics that = (BaseCheckpointStatistics) o;
return id == that.id &&
savepoint == that.savepoint &&
triggerTimestamp == that.triggerTimestamp &&
latestAckTimestamp == that.latestAckTimestamp &&
stateSize == that.stateSize &&
duration == that.duration &&
alignmentBuffered == that.alignmentBuffered &&
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status;
}
@Override
public int hashCode() {
return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
}
}
/**
* Statistics for a completed checkpoint.
*/
public static final class CompletedCheckpointStatistics extends BaseCheckpointStatistics {
public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
public static final String FIELD_NAME_DISCARDED = "discarded";
@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
@Nullable
private final String externalPath;
@JsonProperty(FIELD_NAME_DISCARDED)
private final boolean discarded;
@JsonCreator
public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ID) long id,
@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
this.externalPath = externalPath;
this.discarded = discarded;
}
@Nullable
public String getExternalPath() {
return externalPath;
}
public boolean isDiscarded() {
return discarded;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
return discarded == that.discarded &&
Objects.equals(externalPath, that.externalPath);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), externalPath, discarded);
}
}
/**
* Statistics for a failed checkpoint.
*/
public static final class FailedCheckpointStatistics extends BaseCheckpointStatistics {
public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
private final long failureTimestamp;
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
@Nullable
private final String failureMessage;
@JsonCreator
public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ID) long id,
@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
@JsonProperty(FIELD_NAME_DURATION) long duration,
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
this.failureTimestamp = failureTimestamp;
this.failureMessage = failureMessage;
}
public long getFailureTimestamp() {
return failureTimestamp;
}
@Nullable
public String getFailureMessage() {
return failureMessage;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
return failureTimestamp == that.failureTimestamp &&
Objects.equals(failureMessage, that.failureMessage);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
}
}
/**
* Statistics for a restored checkpoint.
*/
......
......@@ -16,19 +16,22 @@
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages;
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* Message headers for the {@link CheckpointStatisticsHandler}.
* Message headers for the {@link CheckpointingStatisticsHandler}.
*/
public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> {
public class CheckpointingStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> {
private static final CheckpointStatisticsHeaders INSTANCE = new CheckpointStatisticsHeaders();
private static final CheckpointingStatisticsHeaders INSTANCE = new CheckpointingStatisticsHeaders();
public static final String URL = "/jobs/:jobid/checkpoints";
......@@ -38,8 +41,8 @@ public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestB
}
@Override
public Class<CheckpointStatistics> getResponseClass() {
return CheckpointStatistics.class;
public Class<CheckpointingStatistics> getResponseClass() {
return CheckpointingStatistics.class;
}
@Override
......@@ -62,7 +65,7 @@ public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestB
return URL;
}
public static CheckpointStatisticsHeaders getInstance() {
public static CheckpointingStatisticsHeaders getInstance() {
return INSTANCE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.KeyDeserializer;
import java.io.IOException;
/**
* Jackson deserializer for {@link JobVertexID}.
*/
public class JobVertexIDDeserializer extends KeyDeserializer {
@Override
public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
return JobVertexID.fromHexString(key);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
/**
* Jackson serializer for {@link JobVertexID}.
*/
public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
private static final long serialVersionUID = 2970050507628933522L;
public JobVertexIDSerializer() {
super(JobVertexID.class);
}
@Override
public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeFieldName(value.toString());
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.junit.Test;
......
......@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
......
......@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
/**
* Tests for the {@link CheckpointConfigInfo}.
......
......@@ -19,27 +19,54 @@
package org.apache.flink.runtime.rest.handler.legacy.messages;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Tests for {@link CheckpointStatistics}.
* Tests for {@link CheckpointingStatistics}.
*/
public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<CheckpointStatistics> {
public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase<CheckpointingStatistics> {
@Override
protected Class<CheckpointStatistics> getTestResponseClass() {
return CheckpointStatistics.class;
protected Class<CheckpointingStatistics> getTestResponseClass() {
return CheckpointingStatistics.class;
}
@Override
protected CheckpointStatistics getTestResponseInstance() throws Exception {
protected CheckpointingStatistics getTestResponseInstance() throws Exception {
final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(1, 2, 3, 4, 5);
final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
new CheckpointStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
new CheckpointStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
new CheckpointStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
new CheckpointingStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
new CheckpointingStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
new CheckpointingStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
final Map<JobVertexID, CheckpointStatistics.TaskCheckpointStatistics> checkpointStatisticsPerTask = new HashMap<>(2);
checkpointStatisticsPerTask.put(
new JobVertexID(),
new CheckpointStatistics.TaskCheckpointStatistics(
1L,
2L,
3L,
4L,
5,
6));
checkpointStatisticsPerTask.put(
new JobVertexID(),
new CheckpointStatistics.TaskCheckpointStatistics(
2L,
3L,
4L,
5L,
6,
7));
final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
1L,
......@@ -52,6 +79,7 @@ public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<Ch
0L,
10,
10,
Collections.emptyMap(),
null,
false);
......@@ -66,6 +94,7 @@ public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<Ch
0L,
9,
9,
checkpointStatisticsPerTask,
"externalPath",
false);
......@@ -80,22 +109,23 @@ public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<Ch
0L,
11,
9,
Collections.emptyMap(),
100L,
"Test failure");
CheckpointStatistics.RestoredCheckpointStatistics restored = new CheckpointStatistics.RestoredCheckpointStatistics(
CheckpointingStatistics.RestoredCheckpointStatistics restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
4L,
1445L,
true,
"foobar");
final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
completed,
savepoint,
failed,
restored);
return new CheckpointStatistics(
return new CheckpointingStatistics(
counts,
summary,
latestCheckpoints,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册