diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java index 541f4ee106733ef72cfb90b64d2ca88606eed006..56839db03a3870aad83316da405af6523ded4a3d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; @@ -43,6 +44,8 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.IntStream; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while reading the first barrier * and keeping track of the number of received barriers and consumed barriers. @@ -86,6 +89,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { CheckpointBarrierUnaligner( int[] numberOfInputChannelsPerGate, + ChannelStateWriter channelStateWriter, String taskName, AbstractInvokable toNotifyOnCheckpoint) { super(toNotifyOnCheckpoint); @@ -108,7 +112,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { .flatMap(Function.identity()) .toArray(InputChannelInfo[]::new); - threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, this); + threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this); } @Override @@ -315,12 +319,16 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { /** The number of opened channels. */ private int numOpenChannels; + private final ChannelStateWriter channelStateWriter; + private final CheckpointBarrierUnaligner handler; public ThreadSafeUnaligner( int totalNumChannels, + ChannelStateWriter channelStateWriter, CheckpointBarrierUnaligner handler) { storeNewBuffers = new boolean[totalNumChannels]; + this.channelStateWriter = channelStateWriter; this.handler = handler; numOpenChannels = totalNumChannels; } @@ -350,7 +358,15 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { @Override public synchronized void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) { - buffer.recycleBuffer(); + if (storeNewBuffers[handler.getFlattenedChannelIndex(channelInfo)]) { + channelStateWriter.addInputData( + currentReceivedCheckpointId, + channelInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + buffer); + } else { + buffer.recycleBuffer(); + } } @Override @@ -382,6 +398,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { Arrays.fill(storeNewBuffers, true); numBarriersReceived = 0; allBarriersReceivedFuture = new CompletableFuture<>(); + channelStateWriter.start(barrierId, barrier.getCheckpointOptions()); } public synchronized void resetReceivedBarriers(long checkpointId) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java index cfcba48aa686c8fa7d1c9b04c6b1cc2d82ec15ac..1388dd145e451b7355aeae39e599364080bd8627 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java @@ -184,6 +184,10 @@ public class CheckpointedInputGate implements PullingAsyncDataInput getAllBarriersReceivedFuture(long checkpointId) { + return ((CheckpointBarrierUnaligner) barrierHandler).getAllBarriersReceivedFuture(checkpointId); + } + private int offsetChannelIndex(int channelIndex) { return channelIndex + channelIndexOffset; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 42ad81d6217223a1c1b299e29da6e3e29b9052c4..27156578e0521445139e1462dc207a01cb73681c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.metrics.MetricNames; @@ -41,6 +42,7 @@ public class InputProcessorUtil { public static CheckpointedInputGate createCheckpointedInputGate( AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, + ChannelStateWriter channelStateWriter, InputGate inputGate, Configuration taskManagerConfig, TaskIOMetricGroup taskIOMetricGroup, @@ -52,6 +54,7 @@ public class InputProcessorUtil { CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( config, IntStream.of(inputGate.getNumberOfInputChannels()), + channelStateWriter, taskName, toNotifyOnCheckpoint); registerCheckpointMetrics(taskIOMetricGroup, barrierHandler); @@ -68,6 +71,7 @@ public class InputProcessorUtil { public static CheckpointedInputGate[] createCheckpointedInputGatePair( AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, + ChannelStateWriter channelStateWriter, Configuration taskManagerConfig, TaskIOMetricGroup taskIOMetricGroup, String taskName, @@ -92,6 +96,7 @@ public class InputProcessorUtil { CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( config, Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels), + channelStateWriter, taskName, toNotifyOnCheckpoint); registerCheckpointMetrics(taskIOMetricGroup, barrierHandler); @@ -125,6 +130,7 @@ public class InputProcessorUtil { private static CheckpointBarrierHandler createCheckpointBarrierHandler( StreamConfig config, IntStream numberOfInputChannelsPerGate, + ChannelStateWriter channelStateWriter, String taskName, AbstractInvokable toNotifyOnCheckpoint) { switch (config.getCheckpointMode()) { @@ -132,6 +138,7 @@ public class InputProcessorUtil { if (config.isUnalignedCheckpointsEnabled()) { return new CheckpointBarrierUnaligner( numberOfInputChannelsPerGate.toArray(), + channelStateWriter, taskName, toNotifyOnCheckpoint); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 3c062392957872723c4e2d24d91653ee584cae27..699037984baad15aeca2f85710decdba5ede72d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -19,9 +19,12 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.AvailabilityProvider; import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Interface for processing records by {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. @@ -34,4 +37,6 @@ public interface StreamInputProcessor extends AvailabilityProvider, Closeable { * state and/or {@link #getAvailableFuture()}. */ InputStatus processInput() throws Exception; + + CompletableFuture prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java index 3505f75584335f8e06fc6d0cfbee928d24f15f3d..a16cc9518c1c6a55ed33aa892e31205df667fe8b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InputSelection; @@ -176,6 +177,17 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor } } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + CompletableFuture[] inputFutures = new CompletableFuture[inputProcessors.length]; + for (int index = 0; index < inputFutures.length; index++) { + inputFutures[index] = inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId); + } + return CompletableFuture.allOf(inputFutures); + } + private int selectNextReadingInputIndex() { if (!inputSelectionHandler.isAnyInputAvailable()) { fullCheckAndSetAvailable(); @@ -238,6 +250,12 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor public void close() throws IOException { networkInput.close(); } + + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + return networkInput.prepareSnapshot(channelStateWriter, checkpointId); + } } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java index 1f6ca27a6c8c3d075c57a7a6ae0f72ba640eabb1..2ca78cef136a02e7125c3715f8beb2e42e12637f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; import org.apache.flink.streaming.runtime.tasks.OperatorChain; @@ -71,6 +72,13 @@ public final class StreamOneInputProcessor implements StreamInputProcessor { return status; } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + return input.prepareSnapshot(channelStateWriter, checkpointId); + } + @Override public void close() throws IOException { input.close(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java index 484d9cd8d883c10cd2b1c722afec41b5d2097b0e..a645051e1f01fa16f2e9898f7a022786a860263d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java @@ -18,8 +18,11 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Basic interface for inputs of stream operators. @@ -32,4 +35,9 @@ public interface StreamTaskInput extends PushingAsyncDataInput, Closeable * Returns the input index of this input. */ int getInputIndex(); + + /** + * Prepares to spill the in-flight input buffers as checkpoint snapshot. + */ + CompletableFuture prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java index 84fa8aef7723efd22d0aa17c65c0114b3d5f1015..8572404c40600ead900d21d2c16911b1c60c822f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer. import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.streaming.api.watermark.Watermark; @@ -197,6 +199,30 @@ public final class StreamTaskNetworkInput implements StreamTaskInput { return checkpointedInputGate.getAvailableFuture(); } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) { + final InputChannel channel = checkpointedInputGate.getChannel(channelIndex); + + // Assumption for retrieving buffers = one concurrent checkpoint + recordDeserializers[channelIndex].getUnconsumedBuffer().ifPresent(buffer -> + channelStateWriter.addInputData( + checkpointId, + channel.getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + buffer)); + + channelStateWriter.addInputData( + checkpointId, + channel.getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + checkpointedInputGate.requestInflightBuffers(checkpointId, channelIndex).toArray(new Buffer[0])); + } + return checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId); + } + @Override public void close() throws IOException { // release the deserializers . this part should not ever fail diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java index a213558296f1f962fdce1079f6a1c06428defddf..95ce9bb5ed76b7da3ac98a94c2d390e276346057 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.streaming.api.operators.SourceReaderOperator; import org.apache.flink.util.IOUtils; @@ -62,5 +63,12 @@ public final class StreamTaskSourceInput implements StreamTaskInput { public void close() { IOUtils.closeQuietly(operator::close); } + + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) { + return CompletableFuture.completedFuture(null); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index aad275374722e569bba269193b80d56e8798f3f9..071c5d6966abb5fd78b46b67d1831e150bba2a4f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -184,6 +185,15 @@ public final class StreamTwoInputProcessor implements StreamInputProce return getInputStatus(); } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + return CompletableFuture.allOf( + input1.prepareSnapshot(channelStateWriter, checkpointId), + input2.prepareSnapshot(channelStateWriter, checkpointId)); + } + private int selectFirstReadingInputIndex() throws IOException { // Note: the first call to nextSelection () on the operator must be made after this operator // is opened to ensure that any changes about the input selection in its open() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index 098384a89cfe571f0fe996914fb4888045418ed8..b09c5a936d13adef0e74bb376cf932a1ec7e08c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -58,6 +59,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { private final Map operatorSnapshotsInProgress; private final CheckpointMetaData checkpointMetaData; private final CheckpointMetrics checkpointMetrics; + private final Future channelWrittenFuture; private final long asyncStartNanos; private final AtomicReference asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING); @@ -65,6 +67,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { Map operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, + Future channelWrittenFuture, long asyncStartNanos, String taskName, CloseableRegistry closeableRegistry, @@ -74,6 +77,7 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress); this.checkpointMetaData = checkNotNull(checkpointMetaData); this.checkpointMetrics = checkNotNull(checkpointMetrics); + this.channelWrittenFuture = checkNotNull(channelWrittenFuture); this.asyncStartNanos = asyncStartNanos; this.taskName = checkNotNull(taskName); this.closeableRegistry = checkNotNull(closeableRegistry); @@ -113,6 +117,8 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); + channelWrittenFuture.get(); + if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) { reportCompletedSnapshotStates( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 535389252aa5f3fe880fe067e7b39323addce5ae..69afaafdce26e79720e1621ce71727ab7fe502da 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -101,6 +101,7 @@ public class MultipleInputStreamTask extends StreamTask extends StreamTask> getAsyncOperationsThreadPool(), getEnvironment(), this, - false); // todo: pass true if unaligned checkpoints enabled + configuration.isUnalignedCheckpointsEnabled(), + this::prepareInputSnapshot); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { @@ -292,6 +295,17 @@ public abstract class StreamTask> } } + private CompletableFuture prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException { + if (inputProcessor == null) { + return FutureUtils.completedVoidFuture(); + } + return inputProcessor.prepareSnapshot(channelStateWriter, checkpointId); + } + + protected ChannelStateWriter getChannelStateWriter() { + return subtaskCheckpointCoordinator.getChannelStateWriter(); + } + // ------------------------------------------------------------------------ // Life cycle methods for specific implementations // ------------------------------------------------------------------------ @@ -726,6 +740,7 @@ public abstract class StreamTask> .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); + subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointMetaData.getCheckpointId(), checkpointOptions); boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime); if (!success) { declineCheckpoint(checkpointMetaData.getCheckpointId()); @@ -865,6 +880,7 @@ public abstract class StreamTask> LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName()); } + subtaskCheckpointCoordinator.getChannelStateWriter().notifyCheckpointComplete(checkpointId); getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId); if (isRunning && isSynchronousSavepointId(checkpointId)) { finishTask(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 528e80ac4c7f4f398050bd7c8e07c5e7c0bfba9b..1125838b3e36760598bb84ca51ae10fe4a89a8ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -25,9 +25,13 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; @@ -36,6 +40,7 @@ import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +48,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.function.Supplier; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; @@ -62,6 +69,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { private final AsyncExceptionHandler asyncExceptionHandler; private final ChannelStateWriter channelStateWriter; private final StreamTaskActionExecutor actionExecutor; + private final boolean unalignedCheckpointEnabled; + private final BiFunctionWithException, IOException> prepareInputSnapshot; SubtaskCheckpointCoordinatorImpl( CheckpointStorageWorkerView checkpointStorage, @@ -71,7 +80,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { ExecutorService executorService, Environment env, AsyncExceptionHandler asyncExceptionHandler, - boolean sendChannelState) throws IOException { + boolean unalignedCheckpointEnabled, + BiFunctionWithException, IOException> prepareInputSnapshot) throws IOException { this.checkpointStorage = new CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage)); this.taskName = checkNotNull(taskName); this.closeableRegistry = checkNotNull(closeableRegistry); @@ -79,11 +89,13 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { this.env = checkNotNull(env); this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); this.actionExecutor = checkNotNull(actionExecutor); - this.channelStateWriter = sendChannelState ? openChannelStateWriter() : ChannelStateWriter.NO_OP; + this.channelStateWriter = unalignedCheckpointEnabled ? openChannelStateWriter() : ChannelStateWriter.NO_OP; + this.unalignedCheckpointEnabled = unalignedCheckpointEnabled; + this.prepareInputSnapshot = prepareInputSnapshot; this.closeableRegistry.registerCloseable(this); } - private ChannelStateWriterImpl openChannelStateWriter() { + private ChannelStateWriter openChannelStateWriter() { ChannelStateWriterImpl writer = new ChannelStateWriterImpl(this.checkpointStorage); writer.open(); return writer; @@ -136,9 +148,16 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastEvent( - new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options)); + new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), + unalignedCheckpointEnabled); - // Step (3): Take the state snapshot. This should be largely asynchronous, to not impact progress of the streaming topology + // Step (3): Prepare to spill the in-flight buffers for input and output + if (unalignedCheckpointEnabled) { + prepareInflightDataSnapshot(metadata.getCheckpointId()); + } + + // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the + // streaming topology Map snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators()); try { @@ -184,12 +203,40 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } } + private void prepareInflightDataSnapshot(long checkpointId) throws IOException { + prepareInputSnapshot.apply(channelStateWriter, checkpointId) + .thenAccept(unused -> channelStateWriter.finishInput(checkpointId)); + + ResultPartitionWriter[] writers = env.getAllWriters(); + for (ResultPartitionWriter writer : writers) { + for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) { + ResultSubpartition subpartition = writer.getSubpartition(i); + channelStateWriter.addOutputData( + checkpointId, + subpartition.getSubpartitionInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + subpartition.requestInflightBufferSnapshot().toArray(new Buffer[0])); + } + } + channelStateWriter.finishOutput(checkpointId); + } + private void finishAndReportAsync(Map snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics) { + final Future channelWrittenFuture; + if (unalignedCheckpointEnabled) { + ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId()); + channelWrittenFuture = CompletableFuture.allOf( + writeResult.getInputChannelStateHandles(), + writeResult.getResultSubpartitionStateHandles()); + } else { + channelWrittenFuture = FutureUtils.completedVoidFuture(); + } // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit executorService.execute(new AsyncCheckpointRunnable( snapshotFutures, metadata, metrics, + channelWrittenFuture, System.nanoTime(), taskName, closeableRegistry, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 78078bca93433a859ceb9e701ef00d29e3c57906..37d550f34771e33aac7b2c809c48bb5d951850af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -62,6 +62,7 @@ public class TwoInputStreamTask extends AbstractTwoInputStreamTas CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair( this, getConfiguration(), + getChannelStateWriter(), getEnvironment().getTaskManagerInfo().getConfiguration(), getEnvironment().getMetricGroup().getIOMetricGroup(), getTaskNameWithSubtaskAndId(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index ae6cb8e4ac45f15514ef6b0a7b4e5bb0d6eb215f..8d4fae2aca81716586edd6394b1e2cdb862cfe1c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -59,6 +59,7 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; @@ -114,6 +115,7 @@ public class LocalStateForwardingTest extends TestLogger { snapshots, checkpointMetaData, checkpointMetrics, + CompletableFuture.completedFuture(null), 0L, testStreamTask.getName(), testStreamTask.getCancelables(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index b5e8430f4d2472a8f8dcda7a06f22afc8b2ce257..cbc39f728d9006494eb09ab521d87b6440a5d780 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler; import org.apache.flink.runtime.execution.CancelTaskException; @@ -1173,6 +1174,13 @@ public class StreamTaskTest extends TestLogger { return ++currentNumProcessCalls < totalProcessCalls ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT; } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + final long checkpointId) { + return FutureUtils.completedVoidFuture(); + } + @Override public void close() throws IOException { } @@ -1338,6 +1346,11 @@ public class StreamTaskTest extends TestLogger { return isFinished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE; } + @Override + public CompletableFuture prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) { + return FutureUtils.completedVoidFuture(); + } + @Override public void close() throws IOException { }