提交 cd1f858b 编写于 作者: A Arvid Heise 提交者: Piotr Nowojski

[FLINK-17218][tests] Adding recoverable failures and correctness checks to...

[FLINK-17218][tests] Adding recoverable failures and correctness checks to UnalignedCheckpointITCase.
上级 89446fab
......@@ -21,38 +21,96 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.apache.commons.lang3.ArrayUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
* Integration test for performing the unaligned checkpoint.
* <p>This tests checks for completeness and orderness of results after recovery. In particular, the following
* topology is used:
* <ol>
* <li>A source that generates unique, monotonically increasing, equidistant longs. Given parallelism=4, the first
* subtask generates [0, 4, ...], the second subtask [1, 5, ...].</li>
* <li>A shuffle that shifts all outputs of partition i to the input of partition i+1 mod n, such that the
* order of records are retained.</li>
* <li>A failing map that fails during map, snapshotState, initializeState, and close on certain checkpoints /
* run attempts (more see below).</li>
* <li>A shuffle that fairly distributes the records deterministically, such that duplicates can be detected.</li>
* <li>A verifying sink that counts all anomalies and exposes counters for verification in the calling test case.</li>
* </ol>
* <p>The tests are executed in a certain degree of parallelism until a given number of checkpoints have been
* successfully taken. Tests time out to guard against infinite failure loops or if no successful checkpoint has been
* taken for other reasons.
* <p>Failures are triggered on certain checkpoints to spread them evenly and run attempts to avoid infinite failure
* loops. For the tests, the failures are induced in the 1. subtask after m&lt;n successful checkpoints with n the
* number of successful checkpoints needed to pass the test:
* <ol>
* <li>After {@code m=1/4*n}, map fails.</li>
* <li>After {@code m=1/2*n}, snapshotState fails.</li>
* <li>After {@code m=3/4*n}, map fails and the corresponding recovery fails once.</li>
* <li>At the end, close fails once.</li>
* </ol>
* <p>The following verifications are performed.
* <ul>
* <li>The number of outputs should be the number of inputs (no lost records).</li>
* <li>No record of source subtask {@code i} can overtake a record of the same subtask (orderness).</li>
* <li>No record should arrive at the sink twice (e.g., being emitted or recovered multiple times), which tests
* exactly once.</li>
* <li>The number of successful checkpoints is indeed {@code >=n}.</li>
* </ul>
public class UnalignedCheckpointITCase extends TestLogger {
public static final String NUM_COMPLETED_CHECKPOINTS = "numCompletedCheckpoints";
public static final String NUM_INPUTS = "inputs";
public static final String NUM_OUTPUTS = "outputs";
private static final String NUM_OUT_OF_ORDER = "outOfOrder";
private static final String NUM_DUPLICATES = "duplicates";
private static final String NUM_LOST = "lost";
private static final Logger LOG = LoggerFactory.getLogger(UnalignedCheckpointITCase.class);
public ErrorCollector collector = new ErrorCollector();
public final TemporaryFolder temp = new TemporaryFolder();
......@@ -63,118 +121,347 @@ public class UnalignedCheckpointITCase extends TestLogger {
public void shouldPerformUnalignedCheckpointOnNonparallelTopology() throws Exception {
public void shouldPerformUnalignedCheckpointOnNonParallelLocalChannel() throws Exception {
execute(1, 1, true);
public void shouldPerformUnalignedCheckpointOnLocalChannelsOnly() throws Exception {
public void shouldPerformUnalignedCheckpointOnParallelLocalChannel() throws Exception {
execute(5, 5, true);
public void shouldPerformUnalignedCheckpointOnRemoteChannels() throws Exception {
public void shouldPerformUnalignedCheckpointOnNonParallelRemoteChannel() throws Exception {
execute(1, 1, false);
public void shouldPerformUnalignedCheckpointOnParallelRemoteChannel() throws Exception {
execute(5, 1, false);
public void shouldPerformUnalignedCheckpointOnLocalAndRemoteChannel() throws Exception {
execute(5, 3, true);
public void shouldPerformUnalignedCheckpointMassivelyParallel() throws Exception {
execute(20, 20, true);
private void execute(int paralellism) throws Exception {
StreamExecutionEnvironment env = createEnv(paralellism);
private void execute(int parallelism, int slotsPerTaskManager, boolean slotSharing) throws Exception {
StreamExecutionEnvironment env = createEnv(parallelism, slotsPerTaskManager, slotSharing);
long minCheckpoints = 10;
createDAG(env, minCheckpoints, slotSharing);
final JobExecutionResult result = env.execute();
createDAG(env, 30);
final JobExecutionResult executionResult = env.execute();
collector.checkThat(result.<Long>getAccumulatorResult(NUM_OUT_OF_ORDER), equalTo(0L));
collector.checkThat(result.<Long>getAccumulatorResult(NUM_DUPLICATES), equalTo(0L));
collector.checkThat(result.<Long>getAccumulatorResult(NUM_LOST), equalTo(0L));
assertThat(executionResult.<Long>getAccumulatorResult(NUM_COMPLETED_CHECKPOINTS) / paralellism,
// at this point, there is no way that #input != #output, but still perform these sanity checks
Long inputs = result.<Long>getAccumulatorResult(NUM_INPUTS);
collector.checkThat(inputs, greaterThan(0L));
collector.checkThat(result.<Long>getAccumulatorResult(NUM_OUTPUTS), equalTo(inputs));
private LocalStreamEnvironment createEnv(final int parallelism) throws IOException {
private LocalStreamEnvironment createEnv(int parallelism, int slotsPerTaskManager, boolean slotSharing) throws IOException {
Configuration conf = new Configuration();
final int numSlots = 3;
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
conf.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, .9f);
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, (parallelism + numSlots - 1) / numSlots);
slotSharing ? (parallelism + slotsPerTaskManager - 1) / slotsPerTaskManager : parallelism * 3);
conf.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temp.newFolder().toURI().toString());
final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
// keep in sync with FailingMapper in #createDAG
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(100)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
return env;
private void createDAG(final StreamExecutionEnvironment env, final long minCheckpoints) {
final SingleOutputStreamOperator<Integer> source = env.addSource(new IntegerSource(minCheckpoints));
final SingleOutputStreamOperator<Integer> transform = source.shuffle().map(i -> 2 * i);
transform.shuffle().addSink(new CountingSink<>());
private void createDAG(StreamExecutionEnvironment env, long minCheckpoints, boolean slotSharing) {
env.addSource(new LongSource(minCheckpoints))
.slotSharingGroup(slotSharing ? "default" : "source")
// shifts records from one partition to another evenly to retain order
.partitionCustom(new ShiftingPartitioner(), l -> l)
.map(new FailingMapper(state -> state.completedCheckpoints == minCheckpoints / 4 && state.runNumber == 0
|| state.completedCheckpoints == minCheckpoints * 3 / 4 && state.runNumber == 2,
state -> state.completedCheckpoints == minCheckpoints / 2 && state.runNumber == 1,
state -> state.runNumber == 3,
state -> state.runNumber == 4))
.slotSharingGroup(slotSharing ? "default" : "map")
.partitionCustom(new DistributingPartitioner(), l -> l)
.addSink(new VerifyingSink(minCheckpoints))
.slotSharingGroup(slotSharing ? "default" : "sink");
private static class IntegerSource extends RichParallelSourceFunction<Integer> implements CheckpointListener {
private static class LongSource extends RichParallelSourceFunction<Long> implements CheckpointListener,
CheckpointedFunction {
private final long minCheckpoints;
private volatile boolean running = true;
private LongCounter numCompletedCheckpoints = new LongCounter();
private static final ListStateDescriptor<State> STATE_DESCRIPTOR =
new ListStateDescriptor<>("state", State.class);
private final LongCounter numInputsCounter = new LongCounter();
private ListState<State> stateList;
private State state;
public IntegerSource(final long minCheckpoints) {
public LongSource(final long minCheckpoints) {
this.minCheckpoints = minCheckpoints;
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator(NUM_COMPLETED_CHECKPOINTS, numCompletedCheckpoints);
getRuntimeContext().addAccumulator(NUM_INPUTS, numInputsCounter);
public void initializeState(FunctionInitializationContext context) throws Exception {
stateList = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
state = getOnlyElement(stateList.get(), new State(0, getRuntimeContext().getIndexOfThisSubtask()));
public void snapshotState(FunctionSnapshotContext context) throws Exception {
info("Snapshotted next input {}", state.nextNumber);
private void info(String description, Object... args) {
UnalignedCheckpointITCase.info(getRuntimeContext(), description, args);
public void notifyCheckpointComplete(long checkpointId) throws Exception {
public void notifyCheckpointComplete(long checkpointId) {
public void run(SourceContext<Integer> ctx) throws Exception {
int counter = 0;
public void run(SourceContext<Long> ctx) throws Exception {
int increment = getRuntimeContext().getNumberOfParallelSubtasks();
info("First emitted input {}", state.nextNumber);
while (running) {
synchronized (ctx.getCheckpointLock()) {
state.nextNumber += increment;
if (numCompletedCheckpoints.getLocalValue() >= minCheckpoints) {
if (state.numCompletedCheckpoints >= minCheckpoints) {
// wait for all instances to finish, such that checkpoints are still processed
numInputsCounter.add(state.nextNumber / increment);
info("Last emitted input {} = {} total emits", state.nextNumber - increment, numInputsCounter.getLocalValue());
public void cancel() {
running = false;
private static class State {
private long numCompletedCheckpoints;
private long nextNumber;
private State(long numCompletedCheckpoints, long nextNumber) {
this.numCompletedCheckpoints = numCompletedCheckpoints;
this.nextNumber = nextNumber;
static void info(RuntimeContext runtimeContext, String description, Object[] args) {
LOG.info(description + " @ {} subtask ({} attempt)",
ArrayUtils.addAll(args, new Object[]{runtimeContext.getIndexOfThisSubtask(), runtimeContext.getAttemptNumber()}));
private static class CountingSink<T> extends RichSinkFunction<T> {
private LongCounter counter = new LongCounter();
private static class VerifyingSink extends RichSinkFunction<Long> implements CheckpointedFunction {
private final LongCounter numOutputCounter = new LongCounter();
private final LongCounter outOfOrderCounter = new LongCounter();
private final LongCounter lostCounter = new LongCounter();
private final LongCounter duplicatesCounter = new LongCounter();
private static final ListStateDescriptor<State> STATE_DESCRIPTOR =
new ListStateDescriptor<>("state", State.class);
private ListState<State> stateList;
private State state;
private final long minCheckpoints;
private VerifyingSink(long minCheckpoints) {
this.minCheckpoints = minCheckpoints;
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("outputs", counter);
getRuntimeContext().addAccumulator(NUM_OUTPUTS, numOutputCounter);
getRuntimeContext().addAccumulator(NUM_OUT_OF_ORDER, outOfOrderCounter);
getRuntimeContext().addAccumulator(NUM_DUPLICATES, duplicatesCounter);
getRuntimeContext().addAccumulator(NUM_LOST, lostCounter);
public void initializeState(FunctionInitializationContext context) throws Exception {
stateList = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
state = getOnlyElement(stateList.get(), new State(getRuntimeContext().getNumberOfParallelSubtasks()));
info("Initialized last snapshotted records {}", Arrays.asList(state.lastRecordInPartitions));
private void info(String description, Object... args) {
UnalignedCheckpointITCase.info(getRuntimeContext(), description, args);
public void snapshotState(FunctionSnapshotContext context) throws Exception {
info("Last snapshotted records {}", Arrays.asList(state.lastRecordInPartitions));
public void close() throws Exception {
info("Last received records {}", Arrays.asList(state.lastRecordInPartitions));
public void invoke(Long value, Context context) throws Exception {
int parallelism = state.lastRecordInPartitions.length;
int partition = (int) (value % parallelism);
long lastRecord = state.lastRecordInPartitions[partition];
if (value < lastRecord) {
info("Out of order records current={} and last={}", value, lastRecord);
} else if (value == lastRecord) {
info("Duplicate record {}", value);
} else if (lastRecord != -1) {
long expectedValue = lastRecord + parallelism * parallelism;
if (value != expectedValue) {
info("Lost records {}-{}", expectedValue, value);
state.lastRecordInPartitions[partition] = value;
private static class State {
private long numOutOfOrderness;
private long numLostValues;
private long numDuplicates;
private long numOutput = 0;
private long[] lastRecordInPartitions;
private State(int numberOfParallelSubtasks) {
lastRecordInPartitions = new long[numberOfParallelSubtasks];
for (int index = 0; index < lastRecordInPartitions.length; index++) {
lastRecordInPartitions[index] = -1;
private static class ShiftingPartitioner implements Partitioner<Long> {
public void invoke(T value, Context context) throws Exception {
if (counter.getLocalValue() % 100 == 0) {
public int partition(Long key, int numPartitions) {
return (int) ((key + 1) % numPartitions);
private static class DistributingPartitioner implements Partitioner<Long> {
public int partition(Long key, int numPartitions) {
return (int) ((key / numPartitions) % numPartitions);
private static class FailingMapperState {
private long completedCheckpoints;
private long runNumber;
private FailingMapperState(long completedCheckpoints, long runNumber) {
this.completedCheckpoints = completedCheckpoints;
this.runNumber = runNumber;
private static class FailingMapper extends RichMapFunction<Long, Long> implements CheckpointedFunction, CheckpointListener {
private static final ListStateDescriptor<FailingMapperState> FAILING_MAPPER_STATE_DESCRIPTOR =
new ListStateDescriptor<>("state", FailingMapperState.class);
private ListState<FailingMapperState> listState;
private FailingMapperState state;
private final FilterFunction<FailingMapperState> failDuringMap;
private final FilterFunction<FailingMapperState> failDuringSnapshot;
private final FilterFunction<FailingMapperState> failDuringRecovery;
private final FilterFunction<FailingMapperState> failDuringClose;
private long lastValue;
private FailingMapper(
FilterFunction<FailingMapperState> failDuringMap,
FilterFunction<FailingMapperState> failDuringSnapshot,
FilterFunction<FailingMapperState> failDuringRecovery,
FilterFunction<FailingMapperState> failDuringClose) {
this.failDuringMap = failDuringMap;
this.failDuringSnapshot = failDuringSnapshot;
this.failDuringRecovery = failDuringRecovery;
this.failDuringClose = failDuringClose;
public Long map(Long value) throws Exception {
lastValue = value;
checkFail(failDuringMap, "map");
return value;
public void checkFail(FilterFunction<FailingMapperState> failFunction, String description) throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && failFunction.filter(state)) {
private void failMapper(String description) throws Exception {
throw new Exception("Failing " + description + " @ " + state.completedCheckpoints + " (" + state.runNumber + " attempt); last value " + lastValue);
public void notifyCheckpointComplete(long checkpointId) {
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkFail(failDuringSnapshot, "snapshotState");
public void close() throws Exception {
checkFail(failDuringClose, "close");
public void initializeState(FunctionInitializationContext context) throws Exception {
listState = context.getOperatorStateStore().getListState(FAILING_MAPPER_STATE_DESCRIPTOR);
state = getOnlyElement(listState.get(), new FailingMapperState(0, 0));
state.runNumber = getRuntimeContext().getAttemptNumber();
checkFail(failDuringRecovery, "initializeState");
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册