提交 f9a6352a 编写于 作者: R Roman Khachatryan 提交者: Piotr Nowojski

[FLINK-20654][checkpointing] Decline checkpoints until restored channel state is consumed

In scenarios with multiple inputs (e.g. co-group; not union) one input may receive a
checkpoint barrier while the second input is still restoring state. This (previous)
state is currently not included into the snapshot, which therefore will be incomplete.
上级 3daee463
......@@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY;
/** An {@link InputGate} with a specific index. */
public abstract class IndexedInputGate extends InputGate implements CheckpointableInput {
/** Returns the index of this input gate. Only supported on */
......@@ -30,6 +32,9 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab
@Override
public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
if (!getStateConsumedFuture().isDone()) {
throw new CheckpointException(CHECKPOINT_DECLINED_TASK_NOT_READY);
}
for (int index = 0, numChannels = getNumberOfInputChannels();
index < numChannels;
index++) {
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
......@@ -32,6 +33,7 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
......@@ -70,11 +72,15 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout;
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
import static org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
......@@ -88,6 +94,14 @@ import static org.junit.Assert.fail;
/** Tests for {@link SingleInputGate}. */
public class SingleInputGateTest extends InputGateTestBase {
@Test(expected = CheckpointException.class)
public void testCheckpointsDeclinedUnlessStateConsumed() throws CheckpointException {
SingleInputGate gate = createInputGate(createNettyShuffleEnvironment());
checkState(!gate.getStateConsumedFuture().isDone());
gate.checkpointStarted(
new CheckpointBarrier(1L, 1L, alignedNoTimeout(CHECKPOINT, getDefault())));
}
/**
* Tests {@link InputGate#setup()} should create the respective {@link BufferPool} and assign
* exclusive buffers for {@link RemoteInputChannel}s, but should not request partitions.
......
......@@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -125,8 +124,9 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
},
new Object[] {"Parallel cogroup, p = 5", createCogroupSettings(5)},
new Object[] {"Parallel cogroup, p = 10", createCogroupSettings(10)},
new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
// todo: enable after completely fixing FLINK-20654
// new Object[] {"Parallel union, p = 5", createUnionSettings(5)},
// new Object[] {"Parallel union, p = 10", createUnionSettings(10)},
};
}
......@@ -189,7 +189,6 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase {
}
@Test
@Ignore
public void execute() throws Exception {
execute(settings);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册