diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 0bf6552e07750ca688947542dfe77da643ed7084..1a6178f78b39e8822c31d9a66a785b583694b3c4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rest.handler.WebHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; @@ -58,7 +59,6 @@ import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler; import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler; import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index d64e6493ac78ab8bdf42b35cd2ae3925d84e8ea2..2a2d9be3cad39c4564e4723b39f1df3cf60cee9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -32,7 +32,9 @@ import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; @@ -43,8 +45,6 @@ import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo; import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion; -import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders; -import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders; import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders; @@ -52,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.JobConfigHeaders; import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.FileUtils; @@ -78,6 +81,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { private final Executor executor; private final ExecutionGraphCache executionGraphCache; + private final CheckpointStatsCache checkpointStatsCache; public DispatcherRestEndpoint( RestServerEndpointConfiguration endpointConfiguration, @@ -94,6 +98,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { this.executionGraphCache = new ExecutionGraphCache( restConfiguration.getTimeout(), Time.milliseconds(restConfiguration.getRefreshInterval())); + + this.checkpointStatsCache = new CheckpointStatsCache( + restConfiguration.getMaxCheckpointStatisticCacheEntries()); } @Override @@ -162,14 +169,23 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { executionGraphCache, executor); - CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler( + CheckpointingStatisticsHandler checkpointStatisticsHandler = new CheckpointingStatisticsHandler( restAddressFuture, leaderRetriever, timeout, - CheckpointStatisticsHeaders.getInstance(), + CheckpointingStatisticsHeaders.getInstance(), executionGraphCache, executor); + CheckpointStatisticDetailsHandler checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler( + restAddressFuture, + leaderRetriever, + timeout, + CheckpointStatisticDetailsHeaders.getInstance(), + executionGraphCache, + executor, + checkpointStatsCache); + final File tmpDir = restConfiguration.getTmpDir(); Optional> optWebContent; @@ -192,7 +208,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler)); handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler)); handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler)); - handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); + handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); + handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler)); BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java index 9220bd9eaf9f384a5c665a5cfe521048014ed3c0..034459702f27fa9a5ee6424ae54fd1851b6a6236 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -32,14 +32,22 @@ public class RestHandlerConfiguration { private final long refreshInterval; + private final int maxCheckpointStatisticCacheEntries; + private final Time timeout; private final File tmpDir; - public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) { + public RestHandlerConfiguration( + long refreshInterval, + int maxCheckpointStatisticCacheEntries, + Time timeout, + File tmpDir) { Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); this.refreshInterval = refreshInterval; + this.maxCheckpointStatisticCacheEntries = maxCheckpointStatisticCacheEntries; + this.timeout = Preconditions.checkNotNull(timeout); this.tmpDir = Preconditions.checkNotNull(tmpDir); } @@ -48,6 +56,10 @@ public class RestHandlerConfiguration { return refreshInterval; } + public int getMaxCheckpointStatisticCacheEntries() { + return maxCheckpointStatisticCacheEntries; + } + public Time getTimeout() { return timeout; } @@ -59,10 +71,12 @@ public class RestHandlerConfiguration { public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); + final int maxCheckpointStatisticCacheEntries = configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); + final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR)); - return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir); + return new RestHandlerConfiguration(refreshInterval, maxCheckpointStatisticCacheEntries, timeout, tmpDir); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java index f2b1ac82ad98cfe158b88291dcdf2f30e5cbf70b..5348b55acbaff96b306664611d403cb4fa42b2ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java @@ -45,7 +45,7 @@ import java.util.concurrent.Executor; * * @param response type */ -public abstract class AbstractExecutionGraphHandler extends AbstractRestHandler { +public abstract class AbstractExecutionGraphHandler extends AbstractRestHandler { private final ExecutionGraphCache executionGraphCache; @@ -55,7 +55,7 @@ public abstract class AbstractExecutionGraphHandler exte CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - MessageHeaders messageHeaders, + MessageHeaders messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { super(localRestAddress, leaderRetriever, timeout, messageHeaders); @@ -65,7 +65,7 @@ public abstract class AbstractExecutionGraphHandler exte } @Override - protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway) throws RestHandlerException { JobID jobId = request.getPathParameter(JobIDPathParameter.class); CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, gateway); @@ -73,7 +73,7 @@ public abstract class AbstractExecutionGraphHandler exte return executionGraphFuture.thenApplyAsync( executionGraph -> { try { - return handleRequest(executionGraph); + return handleRequest(request, executionGraph); } catch (RestHandlerException rhe) { throw new CompletionException(rhe); } @@ -81,5 +81,5 @@ public abstract class AbstractExecutionGraphHandler exte executor); } - protected abstract R handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException; + protected abstract R handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java index bbe4eef829fb49dcfa56859714306acbd28356aa..f27d84fde1a37a207bdc51c5e12df824f7be4a2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobConfigInfo; @@ -35,7 +36,7 @@ import java.util.concurrent.Executor; /** * Handler serving the job configuration. */ -public class JobConfigHandler extends AbstractExecutionGraphHandler { +public class JobConfigHandler extends AbstractExecutionGraphHandler { public JobConfigHandler( CompletableFuture localRestAddress, @@ -55,7 +56,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler request, AccessExecutionGraph executionGraph) { final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig(); final JobConfigInfo.ExecutionConfigInfo executionConfigInfo; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..62ed1a4e11467f42f8cd9c1b3d50600e18af6962 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * Base class for checkpoint related REST handler. + * + * @param type of the response + */ +public abstract class AbstractCheckpointHandler extends AbstractExecutionGraphHandler { + + private final CheckpointStatsCache checkpointStatsCache; + + protected AbstractCheckpointHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + + this.checkpointStatsCache = Preconditions.checkNotNull(checkpointStatsCache); + } + + @Override + protected R handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { + final long checkpointId = request.getPathParameter(CheckpointIdPathParameter.class); + + final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); + + if (checkpointStatsSnapshot != null) { + AbstractCheckpointStats checkpointStats = checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId); + + if (checkpointStats != null) { + checkpointStatsCache.tryAdd(checkpointStats); + } else { + checkpointStats = checkpointStatsCache.tryGet(checkpointId); + } + + if (checkpointStats != null) { + return handleCheckpointRequest(checkpointStats); + } else { + throw new RestHandlerException("Could not find checkpointing statistics for checkpoint " + checkpointId + '.', HttpResponseStatus.NOT_FOUND); + } + } else { + throw new RestHandlerException("Checkpointing was not enabled for job " + executionGraph.getJobID() + '.', HttpResponseStatus.NOT_FOUND); + } + } + + protected abstract R handleCheckpointRequest(AbstractCheckpointStats checkpointStats); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java index 94646eb9772f67b8f513b76d3efbccbb3594f07d..1efa7af1beb957af215adf6959e1b46a19ea5c76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java @@ -22,13 +22,14 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; -import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -40,7 +41,7 @@ import java.util.concurrent.Executor; /** * Handler which serves the checkpoint configuration. */ -public class CheckpointConfigHandler extends AbstractExecutionGraphHandler { +public class CheckpointConfigHandler extends AbstractExecutionGraphHandler { public CheckpointConfigHandler( CompletableFuture localRestAddress, @@ -59,7 +60,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler request, AccessExecutionGraph executionGraph) throws RestHandlerException { final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration(); if (checkpointCoordinatorConfiguration == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..2fc3008493ab84db89eeaa60ddce5ab5975b073e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.checkpoints; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * REST handler which returns the details for a checkpoint. + */ +public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler { + + public CheckpointStatisticDetailsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + MessageHeaders messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor, + CheckpointStatsCache checkpointStatsCache) { + super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + } + + @Override + protected CheckpointStatistics handleCheckpointRequest(AbstractCheckpointStats checkpointStats) { + return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java similarity index 93% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java index f21fc76e272291a1292e6ffc66bb1a6be988337c..dcd36b053c5a57340cedc68c889f5b5bc5554662 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.handler.legacy.checkpoints; +package org.apache.flink.runtime.rest.handler.job.checkpoints; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; @@ -56,7 +56,7 @@ public class CheckpointStatsCache { * * @param checkpoint Checkpoint to be added. */ - void tryAdd(AbstractCheckpointStats checkpoint) { + public void tryAdd(AbstractCheckpointStats checkpoint) { // Don't add in progress checkpoints as they will be replaced by their // completed/failed version eventually. if (cache != null && checkpoint != null && !checkpoint.getStatus().isInProgress()) { @@ -70,7 +70,7 @@ public class CheckpointStatsCache { * @param checkpointId ID of the checkpoint to look up. * @return The checkpoint or null if checkpoint not found. */ - AbstractCheckpointStats tryGet(long checkpointId) { + public AbstractCheckpointStats tryGet(long checkpointId) { if (cache != null) { return cache.getIfPresent(checkpointId); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java similarity index 56% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java index 21ded78c448c3ac768ea7fae749615a87a4e5aab..1c5762e647bb4753c6649562cd9640237e1cd2fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java @@ -23,19 +23,19 @@ import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; -import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary; -import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; -import org.apache.flink.runtime.rest.messages.CheckpointStatistics; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -49,20 +49,20 @@ import java.util.concurrent.Executor; /** * Handler which serves the checkpoint statistics. */ -public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler { +public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler { - public CheckpointStatisticsHandler( + public CheckpointingStatisticsHandler( CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout, - MessageHeaders messageHeaders, + MessageHeaders messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); } @Override - protected CheckpointStatistics handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException { + protected CheckpointingStatistics handleRequest(HandlerRequest request, AccessExecutionGraph executionGraph) throws RestHandlerException { final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); @@ -71,7 +71,7 @@ public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler history = new ArrayList<>(16); + final List history = new ArrayList<>(16); for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) { - history.add(generateCheckpointStatistics(abstractCheckpointStats)); + history.add(CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats, false)); } - return new CheckpointStatistics( + return new CheckpointingStatistics( counts, summary, latestCheckpoints, history); } } - - private static CheckpointStatistics.BaseCheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) { - if (checkpointStats != null) { - if (checkpointStats instanceof CompletedCheckpointStats) { - final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats); - - return new CheckpointStatistics.CompletedCheckpointStatistics( - completedCheckpointStats.getCheckpointId(), - completedCheckpointStats.getStatus(), - completedCheckpointStats.getProperties().isSavepoint(), - completedCheckpointStats.getTriggerTimestamp(), - completedCheckpointStats.getLatestAckTimestamp(), - completedCheckpointStats.getStateSize(), - completedCheckpointStats.getEndToEndDuration(), - completedCheckpointStats.getAlignmentBuffered(), - completedCheckpointStats.getNumberOfSubtasks(), - completedCheckpointStats.getNumberOfAcknowledgedSubtasks(), - completedCheckpointStats.getExternalPath(), - completedCheckpointStats.isDiscarded()); - } else if (checkpointStats instanceof FailedCheckpointStats) { - final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats); - - return new CheckpointStatistics.FailedCheckpointStatistics( - failedCheckpointStats.getCheckpointId(), - failedCheckpointStats.getStatus(), - failedCheckpointStats.getProperties().isSavepoint(), - failedCheckpointStats.getTriggerTimestamp(), - failedCheckpointStats.getLatestAckTimestamp(), - failedCheckpointStats.getStateSize(), - failedCheckpointStats.getEndToEndDuration(), - failedCheckpointStats.getAlignmentBuffered(), - failedCheckpointStats.getNumberOfSubtasks(), - failedCheckpointStats.getNumberOfAcknowledgedSubtasks(), - failedCheckpointStats.getFailureTimestamp(), - failedCheckpointStats.getFailureMessage()); - } else { - throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted."); - } - } else { - return null; - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java index f50c42d3ed92509ae09b9ccd36dcdffd914717f0..60b979981fe1672ce063e83a3cf942dd7f9369b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; -import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java index e27797189c09c03baa31386ffd720adebb186ec1..dce1641b448d9cc79841b0bd0b987fd49d610d22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index 5420cf4c22d3a024c1388b9fd9505d903ba65266..1421fb24b47704809c2c64a15ddbfb64c47ed9e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java index 5b35c7fb289933f1e7b58f00b6f3ab2af25e060d..b6c86beb39252c6aa345abd75e417721c2de2d18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java @@ -31,7 +31,8 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; -import org.apache.flink.runtime.rest.messages.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.FlinkException; @@ -129,37 +130,37 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler } private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException { - gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints()); - gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints()); + gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_COUNTS); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints()); + gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints()); gen.writeEndObject(); } private static void writeSummary( JsonGenerator gen, CompletedCheckpointStatsSummary summary) throws IOException { - gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY); - gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE); + gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_SUMMARY); + gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_STATE_SIZE); writeMinMaxAvg(gen, summary.getStateSizeStats()); gen.writeEndObject(); - gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION); + gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_DURATION); writeMinMaxAvg(gen, summary.getEndToEndDurationStats()); gen.writeEndObject(); - gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED); + gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED); writeMinMaxAvg(gen, summary.getAlignmentBufferedStats()); gen.writeEndObject(); gen.writeEndObject(); } static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { - gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum()); - gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum()); - gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage()); + gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum()); + gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum()); + gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage()); } private static void writeLatestCheckpoints( @@ -169,10 +170,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler @Nullable FailedCheckpointStats failed, @Nullable RestoredCheckpointStats restored) throws IOException { - gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS); + gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS); // Completed checkpoint if (completed != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED); writeCheckpoint(gen, completed); String externalPath = completed.getExternalPath(); @@ -185,7 +186,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler // Completed savepoint if (savepoint != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT); writeCheckpoint(gen, savepoint); String externalPath = savepoint.getExternalPath(); @@ -197,7 +198,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler // Failed checkpoint if (failed != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_FAILED); writeCheckpoint(gen, failed); gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp()); @@ -210,14 +211,14 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler // Restored checkpoint if (restored != null) { - gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED); - gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId()); - gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp()); - gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint()); + gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED); + gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId()); + gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp()); + gen.writeBooleanField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint()); String externalPath = restored.getExternalPath(); if (externalPath != null) { - gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath); + gen.writeStringField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath); } gen.writeEndObject(); } @@ -225,29 +226,29 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler } private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException { - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); } private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException { - gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY); + gen.writeArrayFieldStart(CheckpointingStatistics.FIELD_NAME_HISTORY); for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { gen.writeStartObject(); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); - gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString()); - gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks()); - gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId()); + gen.writeStringField(CheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString()); + gen.writeBooleanField(CheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks()); + gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks()); if (checkpoint.getStatus().isCompleted()) { // --- Completed --- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java index 9d74c95ecd880f8ffdcc3ba035b89820a5f2d6c1..1155892a6176932a432472edabdcaa8af29d4bbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java @@ -26,7 +26,7 @@ import java.util.Collections; */ public class JobMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + protected final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); @Override public Collection> getPathParameters() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java similarity index 89% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java index bfc0b7a331d9b9bd026fc1962d5eae615a00e46c..f0526a0710d84771b57b02162d9c4e8dd4c38d7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java @@ -16,10 +16,13 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.messages; +package org.apache.flink.runtime.rest.messages.checkpoints; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java index fbda12a2aa662a88c50e52d3283180547c068b3a..797d3a5adbb7f755cf5383a981664fe2e1faa67e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.messages; +package org.apache.flink.runtime.rest.messages.checkpoints; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.annotation.JsonCreator; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java new file mode 100644 index 0000000000000000000000000000000000000000..c08cc82acb11c399cbcf4d12847d7a60b917aa72 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +/** + * Path parameter for the checkpoint id of type {@link Long}. + */ +public class CheckpointIdPathParameter extends MessagePathParameter { + + public static final String KEY = "checkpointid"; + + protected CheckpointIdPathParameter() { + super(KEY); + } + + @Override + protected Long convertFromString(String value) throws ConversionException { + try { + return Long.parseLong(value); + } catch (NumberFormatException nfe) { + throw new ConversionException("Could not parse long from " + value + '.', nfe); + } + } + + @Override + protected String convertToString(Long value) { + return value.toString(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java new file mode 100644 index 0000000000000000000000000000000000000000..040aa87987cc7ddaeefa37503e205af28579ceff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Message parameters for checkpoint related messages. + */ +public class CheckpointMessageParameters extends JobMessageParameters { + + protected final CheckpointIdPathParameter checkpointIdPathParameter = new CheckpointIdPathParameter(); + + @Override + public Collection> getPathParameters() { + return Arrays.asList(jobPathParameter, checkpointIdPathParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java new file mode 100644 index 0000000000000000000000000000000000000000..3d7ba2bfbfdb722b04e3b0cb060fe9283801fd9a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Headers for the {@link CheckpointStatisticDetailsHandler}. + */ +public class CheckpointStatisticDetailsHeaders implements MessageHeaders { + + private static final CheckpointStatisticDetailsHeaders INSTANCE = new CheckpointStatisticDetailsHeaders(); + + public static final String URL = "/jobs/:jobid/checkpoints/:checkpointid"; + + private CheckpointStatisticDetailsHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return CheckpointStatistics.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public CheckpointMessageParameters getUnresolvedMessageParameters() { + return new CheckpointMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static CheckpointStatisticDetailsHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java new file mode 100644 index 0000000000000000000000000000000000000000..9fb1094a706645353f1ae071fc2556c9879ca399 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS) + @JsonSerialize(keyUsing = JobVertexIDSerializer.class) + private final Map checkpointStatisticsPerTask; + + @JsonCreator + private CheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map checkpointStatisticsPerTask) { + this.id = id; + this.status = Preconditions.checkNotNull(status); + this.savepoint = savepoint; + this.triggerTimestamp = triggerTimestamp; + this.latestAckTimestamp = latestAckTimestamp; + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + this.numSubtasks = numSubtasks; + this.numAckSubtasks = numAckSubtasks; + this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask); + } + + public long getId() { + return id; + } + + public CheckpointStatsStatus getStatus() { + return status; + } + + public boolean isSavepoint() { + return savepoint; + } + + public long getTriggerTimestamp() { + return triggerTimestamp; + } + + public long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public long getStateSize() { + return stateSize; + } + + public long getDuration() { + return duration; + } + + public long getAlignmentBuffered() { + return alignmentBuffered; + } + + public int getNumSubtasks() { + return numSubtasks; + } + + public int getNumAckSubtasks() { + return numAckSubtasks; + } + + @Nullable + public Map getCheckpointStatisticsPerTask() { + return checkpointStatisticsPerTask; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointStatistics that = (CheckpointStatistics) o; + return id == that.id && + savepoint == that.savepoint && + triggerTimestamp == that.triggerTimestamp && + latestAckTimestamp == that.latestAckTimestamp && + stateSize == that.stateSize && + duration == that.duration && + alignmentBuffered == that.alignmentBuffered && + numSubtasks == that.numSubtasks && + numAckSubtasks == that.numAckSubtasks && + status == that.status && + Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask); + } + + @Override + public int hashCode() { + return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask); + } + + // ------------------------------------------------------------------------- + // Static factory methods + // ------------------------------------------------------------------------- + + public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) { + Preconditions.checkNotNull(checkpointStats); + + Map checkpointStatisticsPerTask; + + if (includeTaskCheckpointStatistics) { + Collection taskStateStats = checkpointStats.getAllTaskStateStats(); + + checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size()); + + for (TaskStateStats taskStateStat : taskStateStats) { + checkpointStatisticsPerTask.put( + taskStateStat.getJobVertexId(), + new TaskCheckpointStatistics( + taskStateStat.getLatestAckTimestamp(), + taskStateStat.getStateSize(), + taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()), + taskStateStat.getAlignmentBuffered(), + taskStateStat.getNumberOfSubtasks(), + taskStateStat.getNumberOfAcknowledgedSubtasks())); + } + } else { + checkpointStatisticsPerTask = Collections.emptyMap(); + } + + if (checkpointStats instanceof CompletedCheckpointStats) { + final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats); + + return new CheckpointStatistics.CompletedCheckpointStatistics( + completedCheckpointStats.getCheckpointId(), + completedCheckpointStats.getStatus(), + completedCheckpointStats.getProperties().isSavepoint(), + completedCheckpointStats.getTriggerTimestamp(), + completedCheckpointStats.getLatestAckTimestamp(), + completedCheckpointStats.getStateSize(), + completedCheckpointStats.getEndToEndDuration(), + completedCheckpointStats.getAlignmentBuffered(), + completedCheckpointStats.getNumberOfSubtasks(), + completedCheckpointStats.getNumberOfAcknowledgedSubtasks(), + checkpointStatisticsPerTask, + completedCheckpointStats.getExternalPath(), + completedCheckpointStats.isDiscarded()); + } else if (checkpointStats instanceof FailedCheckpointStats) { + final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats); + + return new CheckpointStatistics.FailedCheckpointStatistics( + failedCheckpointStats.getCheckpointId(), + failedCheckpointStats.getStatus(), + failedCheckpointStats.getProperties().isSavepoint(), + failedCheckpointStats.getTriggerTimestamp(), + failedCheckpointStats.getLatestAckTimestamp(), + failedCheckpointStats.getStateSize(), + failedCheckpointStats.getEndToEndDuration(), + failedCheckpointStats.getAlignmentBuffered(), + failedCheckpointStats.getNumberOfSubtasks(), + failedCheckpointStats.getNumberOfAcknowledgedSubtasks(), + checkpointStatisticsPerTask, + failedCheckpointStats.getFailureTimestamp(), + failedCheckpointStats.getFailureMessage()); + } else { + throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted."); + } + } + + // --------------------------------------------------------------------- + // Static inner classes + // --------------------------------------------------------------------- + + /** + * Checkpoint statistics for a single task. + */ + public static final class TaskCheckpointStatistics { + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonCreator + public TaskCheckpointStatistics( + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) { + this.latestAckTimestamp = latestAckTimestamp; + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + this.numSubtasks = numSubtasks; + this.numAckSubtasks = numAckSubtasks; + } + + public long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public long getStateSize() { + return stateSize; + } + + public long getDuration() { + return duration; + } + + public long getAlignmentBuffered() { + return alignmentBuffered; + } + + public int getNumSubtasks() { + return numSubtasks; + } + + public int getNumAckSubtasks() { + return numAckSubtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskCheckpointStatistics that = (TaskCheckpointStatistics) o; + return latestAckTimestamp == that.latestAckTimestamp && + stateSize == that.stateSize && + duration == that.duration && + alignmentBuffered == that.alignmentBuffered && + numSubtasks == that.numSubtasks && + numAckSubtasks == that.numAckSubtasks; + } + + @Override + public int hashCode() { + return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); + } + } + + /** + * Statistics for a completed checkpoint. + */ + public static final class CompletedCheckpointStatistics extends CheckpointStatistics { + + public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; + + public static final String FIELD_NAME_DISCARDED = "discarded"; + + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) + @Nullable + private final String externalPath; + + @JsonProperty(FIELD_NAME_DISCARDED) + private final boolean discarded; + + @JsonCreator + public CompletedCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map checkpointingStatisticsPerTask, + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath, + @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) { + super( + id, + status, + savepoint, + triggerTimestamp, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + numSubtasks, + numAckSubtasks, + checkpointingStatisticsPerTask); + + this.externalPath = externalPath; + this.discarded = discarded; + } + + @Nullable + public String getExternalPath() { + return externalPath; + } + + public boolean isDiscarded() { + return discarded; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o; + return discarded == that.discarded && + Objects.equals(externalPath, that.externalPath); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), externalPath, discarded); + } + } + + /** + * Statistics for a failed checkpoint. + */ + public static final class FailedCheckpointStatistics extends CheckpointStatistics { + + public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp"; + + public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message"; + + @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) + private final long failureTimestamp; + + @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) + @Nullable + private final String failureMessage; + + @JsonCreator + public FailedCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map checkpointingStatisticsPerTask, + @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp, + @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) { + super( + id, + status, + savepoint, + triggerTimestamp, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + numSubtasks, + numAckSubtasks, + checkpointingStatisticsPerTask); + + this.failureTimestamp = failureTimestamp; + this.failureMessage = failureMessage; + } + + public long getFailureTimestamp() { + return failureTimestamp; + } + + @Nullable + public String getFailureMessage() { + return failureMessage; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + FailedCheckpointStatistics that = (FailedCheckpointStatistics) o; + return failureTimestamp == that.failureTimestamp && + Objects.equals(failureMessage, that.failureMessage); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), failureTimestamp, failureMessage); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java similarity index 55% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java index ade8c7a2357f60a831ad44208d8e21d06cc2b64a..1f00fcc7c75ec0656bffa3a59cea3ef339b87171 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java @@ -16,16 +16,14 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.messages; +package org.apache.flink.runtime.rest.messages.checkpoints; -import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import javax.annotation.Nullable; @@ -33,9 +31,10 @@ import java.util.List; import java.util.Objects; /** - * Response of the {@link CheckpointStatisticsHandler}. + * Response of the {@link CheckpointingStatisticsHandler}. This class contains information about + * the checkpointing of a given job. */ -public class CheckpointStatistics implements ResponseBody { +public class CheckpointingStatistics implements ResponseBody { public static final String FIELD_NAME_COUNTS = "counts"; @@ -55,14 +54,14 @@ public class CheckpointStatistics implements ResponseBody { private final LatestCheckpoints latestCheckpoints; @JsonProperty(FIELD_NAME_HISTORY) - private final List history; + private final List history; @JsonCreator - public CheckpointStatistics( + public CheckpointingStatistics( @JsonProperty(FIELD_NAME_COUNTS) Counts counts, @JsonProperty(FIELD_NAME_SUMMARY) Summary summary, @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints, - @JsonProperty(FIELD_NAME_HISTORY) List history) { + @JsonProperty(FIELD_NAME_HISTORY) List history) { this.counts = Preconditions.checkNotNull(counts); this.summary = Preconditions.checkNotNull(summary); this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints); @@ -81,7 +80,7 @@ public class CheckpointStatistics implements ResponseBody { return latestCheckpoints; } - public List getHistory() { + public List getHistory() { return history; } @@ -93,7 +92,7 @@ public class CheckpointStatistics implements ResponseBody { if (o == null || getClass() != o.getClass()) { return false; } - CheckpointStatistics that = (CheckpointStatistics) o; + CheckpointingStatistics that = (CheckpointingStatistics) o; return Objects.equals(counts, that.counts) && Objects.equals(summary, that.summary) && Objects.equals(latestCheckpoints, that.latestCheckpoints) && @@ -334,15 +333,15 @@ public class CheckpointStatistics implements ResponseBody { @JsonProperty(FIELD_NAME_COMPLETED) @Nullable - private final CompletedCheckpointStatistics completedCheckpointStatistics; + private final CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics; @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable - private final CompletedCheckpointStatistics savepointStatistics; + private final CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics; @JsonProperty(FIELD_NAME_FAILED) @Nullable - private final FailedCheckpointStatistics failedCheckpointStatistics; + private final CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics; @JsonProperty(FIELD_NAME_RESTORED) @Nullable @@ -350,9 +349,9 @@ public class CheckpointStatistics implements ResponseBody { @JsonCreator public LatestCheckpoints( - @JsonProperty(FIELD_NAME_COMPLETED) @Nullable CompletedCheckpointStatistics completedCheckpointStatistics, - @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CompletedCheckpointStatistics savepointStatistics, - @JsonProperty(FIELD_NAME_FAILED) @Nullable FailedCheckpointStatistics failedCheckpointStatistics, + @JsonProperty(FIELD_NAME_COMPLETED) @Nullable CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics, + @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics, + @JsonProperty(FIELD_NAME_FAILED) @Nullable CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics, @JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) { this.completedCheckpointStatistics = completedCheckpointStatistics; this.savepointStatistics = savepointStatistics; @@ -361,17 +360,17 @@ public class CheckpointStatistics implements ResponseBody { } @Nullable - public CompletedCheckpointStatistics getCompletedCheckpointStatistics() { + public CheckpointStatistics.CompletedCheckpointStatistics getCompletedCheckpointStatistics() { return completedCheckpointStatistics; } @Nullable - public CompletedCheckpointStatistics getSavepointStatistics() { + public CheckpointStatistics.CompletedCheckpointStatistics getSavepointStatistics() { return savepointStatistics; } @Nullable - public FailedCheckpointStatistics getFailedCheckpointStatistics() { + public CheckpointStatistics.FailedCheckpointStatistics getFailedCheckpointStatistics() { return failedCheckpointStatistics; } @@ -401,290 +400,6 @@ public class CheckpointStatistics implements ResponseBody { } } - /** - * Statistics for a checkpoint. - */ - @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") - @JsonSubTypes({ - @JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, name = "completed"), - @JsonSubTypes.Type(value = FailedCheckpointStatistics.class, name = "failed")}) - public static class BaseCheckpointStatistics { - - public static final String FIELD_NAME_ID = "id"; - - public static final String FIELD_NAME_STATUS = "status"; - - public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; - - public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; - - public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; - - public static final String FIELD_NAME_STATE_SIZE = "state_size"; - - public static final String FIELD_NAME_DURATION = "end_to_end_duration"; - - public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; - - public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; - - public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; - - @JsonProperty(FIELD_NAME_ID) - private final long id; - - @JsonProperty(FIELD_NAME_STATUS) - private final CheckpointStatsStatus status; - - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) - private final boolean savepoint; - - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) - private final long triggerTimestamp; - - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) - private final long latestAckTimestamp; - - @JsonProperty(FIELD_NAME_STATE_SIZE) - private final long stateSize; - - @JsonProperty(FIELD_NAME_DURATION) - private final long duration; - - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) - private final long alignmentBuffered; - - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) - private final int numSubtasks; - - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) - private final int numAckSubtasks; - - @JsonCreator - protected BaseCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) { - this.id = id; - this.status = Preconditions.checkNotNull(status); - this.savepoint = savepoint; - this.triggerTimestamp = triggerTimestamp; - this.latestAckTimestamp = latestAckTimestamp; - this.stateSize = stateSize; - this.duration = duration; - this.alignmentBuffered = alignmentBuffered; - this.numSubtasks = numSubtasks; - this.numAckSubtasks = numAckSubtasks; - } - - public long getId() { - return id; - } - - public CheckpointStatsStatus getStatus() { - return status; - } - - public boolean isSavepoint() { - return savepoint; - } - - public long getTriggerTimestamp() { - return triggerTimestamp; - } - - public long getLatestAckTimestamp() { - return latestAckTimestamp; - } - - public long getStateSize() { - return stateSize; - } - - public long getDuration() { - return duration; - } - - public long getAlignmentBuffered() { - return alignmentBuffered; - } - - public int getNumSubtasks() { - return numSubtasks; - } - - public int getNumAckSubtasks() { - return numAckSubtasks; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BaseCheckpointStatistics that = (BaseCheckpointStatistics) o; - return id == that.id && - savepoint == that.savepoint && - triggerTimestamp == that.triggerTimestamp && - latestAckTimestamp == that.latestAckTimestamp && - stateSize == that.stateSize && - duration == that.duration && - alignmentBuffered == that.alignmentBuffered && - numSubtasks == that.numSubtasks && - numAckSubtasks == that.numAckSubtasks && - status == that.status; - } - - @Override - public int hashCode() { - return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - } - } - - /** - * Statistics for a completed checkpoint. - */ - public static final class CompletedCheckpointStatistics extends BaseCheckpointStatistics { - - public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; - - public static final String FIELD_NAME_DISCARDED = "discarded"; - - @JsonProperty(FIELD_NAME_EXTERNAL_PATH) - @Nullable - private final String externalPath; - - @JsonProperty(FIELD_NAME_DISCARDED) - private final boolean discarded; - - @JsonCreator - public CompletedCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, - @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath, - @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) { - super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - - this.externalPath = externalPath; - this.discarded = discarded; - } - - @Nullable - public String getExternalPath() { - return externalPath; - } - - public boolean isDiscarded() { - return discarded; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o; - return discarded == that.discarded && - Objects.equals(externalPath, that.externalPath); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), externalPath, discarded); - } - } - - /** - * Statistics for a failed checkpoint. - */ - public static final class FailedCheckpointStatistics extends BaseCheckpointStatistics { - - public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp"; - - public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message"; - - @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) - private final long failureTimestamp; - - @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) - @Nullable - private final String failureMessage; - - @JsonCreator - public FailedCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, - @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp, - @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) { - super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - - this.failureTimestamp = failureTimestamp; - this.failureMessage = failureMessage; - } - - public long getFailureTimestamp() { - return failureTimestamp; - } - - @Nullable - public String getFailureMessage() { - return failureMessage; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - FailedCheckpointStatistics that = (FailedCheckpointStatistics) o; - return failureTimestamp == that.failureTimestamp && - Objects.equals(failureMessage, that.failureMessage); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), failureTimestamp, failureMessage); - } - } - /** * Statistics for a restored checkpoint. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java similarity index 68% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java index b062d0dff2fc7599e2257227f1c5f7a6f95ea6ec..ce809e7ce0c48fd2015242f3e7be7d9c189f64cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java @@ -16,19 +16,22 @@ * limitations under the License. */ -package org.apache.flink.runtime.rest.messages; +package org.apache.flink.runtime.rest.messages.checkpoints; import org.apache.flink.runtime.rest.HttpMethodWrapper; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; /** - * Message headers for the {@link CheckpointStatisticsHandler}. + * Message headers for the {@link CheckpointingStatisticsHandler}. */ -public class CheckpointStatisticsHeaders implements MessageHeaders { +public class CheckpointingStatisticsHeaders implements MessageHeaders { - private static final CheckpointStatisticsHeaders INSTANCE = new CheckpointStatisticsHeaders(); + private static final CheckpointingStatisticsHeaders INSTANCE = new CheckpointingStatisticsHeaders(); public static final String URL = "/jobs/:jobid/checkpoints"; @@ -38,8 +41,8 @@ public class CheckpointStatisticsHeaders implements MessageHeaders getResponseClass() { - return CheckpointStatistics.class; + public Class getResponseClass() { + return CheckpointingStatistics.class; } @Override @@ -62,7 +65,7 @@ public class CheckpointStatisticsHeaders implements MessageHeaders { + + private static final long serialVersionUID = 2970050507628933522L; + + public JobVertexIDSerializer() { + super(JobVertexID.class); + } + + @Override + public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeFieldName(value.toString()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java index 04b1c556d394fc3f30eb2316fa3ca9f7042e6294..73377725d7587a42bf271c7cb139560d84f9b40a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.junit.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java index 1eac20b337e2ff508aceaced31621a8954ccdb12..263117a1ff9b79202c95c0011e4566a503d0eba1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java index b352baec989638ccbdfeaf97c7cc920c9bb59dd4..bcb13d08e15e72463aeb298c724bd8e0747bf72d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java index 709c21d28238b109e1cc8c794c45e12843697918..deffaaeb82c4ce3b27b7cf1974b89968541366d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; -import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; /** * Tests for the {@link CheckpointConfigInfo}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java similarity index 51% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java index 8e8a50f668c5e5fb85f83e9e2aa55ba974936124..8521d34fe4562753fa9279f5fff3aecfba487299 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java @@ -19,27 +19,54 @@ package org.apache.flink.runtime.rest.handler.legacy.messages; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.rest.messages.CheckpointStatistics; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** - * Tests for {@link CheckpointStatistics}. + * Tests for {@link CheckpointingStatistics}. */ -public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase { +public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase { @Override - protected Class getTestResponseClass() { - return CheckpointStatistics.class; + protected Class getTestResponseClass() { + return CheckpointingStatistics.class; } @Override - protected CheckpointStatistics getTestResponseInstance() throws Exception { + protected CheckpointingStatistics getTestResponseInstance() throws Exception { - final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(1, 2, 3, 4, 5); - final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary( - new CheckpointStatistics.MinMaxAvgStatistics(1L, 1L, 1L), - new CheckpointStatistics.MinMaxAvgStatistics(2L, 2L, 2L), - new CheckpointStatistics.MinMaxAvgStatistics(3L, 3L, 3L)); + final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5); + final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary( + new CheckpointingStatistics.MinMaxAvgStatistics(1L, 1L, 1L), + new CheckpointingStatistics.MinMaxAvgStatistics(2L, 2L, 2L), + new CheckpointingStatistics.MinMaxAvgStatistics(3L, 3L, 3L)); + + final Map checkpointStatisticsPerTask = new HashMap<>(2); + + checkpointStatisticsPerTask.put( + new JobVertexID(), + new CheckpointStatistics.TaskCheckpointStatistics( + 1L, + 2L, + 3L, + 4L, + 5, + 6)); + + checkpointStatisticsPerTask.put( + new JobVertexID(), + new CheckpointStatistics.TaskCheckpointStatistics( + 2L, + 3L, + 4L, + 5L, + 6, + 7)); final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics( 1L, @@ -52,6 +79,7 @@ public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase