提交 77dce77d 编写于 作者: A Arvid Heise 提交者: Piotr Nowojski

[FLINK-16587][checkpointing] Fix abortion of ChannelStateWriter and add...

[FLINK-16587][checkpointing] Fix abortion of ChannelStateWriter and add test/IT cases for unaligned checkpoint.

In ChannelStateCheckpointWriter#fail, DataOutputStream flushes on close, which is not possible when checkpointStream is closed before. That can happen either directly in fail or through a previous finishWriteAndResult.
DataOutputStream#close does not need to be closed in both cases as long as checkpointStream is closed.
checkpointStream#close is idempotent.
上级 5cebfb76
......@@ -197,7 +197,6 @@ class ChannelStateCheckpointWriter {
public void fail(Throwable e) throws Exception {
result.fail(e);
checkpointStream.close();
dataStream.close();
}
}
......@@ -97,7 +97,7 @@ public class BufferOrEvent {
this.channelIndex = channelIndex;
}
boolean moreAvailable() {
public boolean moreAvailable() {
return moreAvailable;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint.channel;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
/**
* A no op implementation that performs basic checks of the contract, but does not actually write any data.
*/
public class MockChannelStateWriter implements ChannelStateWriter {
private volatile ChannelStateWriteResult channelStateWriteResult = ChannelStateWriteResult.EMPTY;
private volatile long startedCheckpointId = -1;
private final boolean autoComplete;
public MockChannelStateWriter() {
this(true);
}
public MockChannelStateWriter(boolean autoComplete) {
this.autoComplete = autoComplete;
}
@Override
public void start(long checkpointId, CheckpointOptions checkpointOptions) {
if (checkpointId == startedCheckpointId) {
throw new IllegalStateException("Already started " + checkpointId);
} else if (checkpointId < startedCheckpointId) {
throw new IllegalArgumentException("Expected a larger checkpoint id than " + startedCheckpointId + " but " +
"got " + checkpointId);
}
startedCheckpointId = checkpointId;
channelStateWriteResult = new ChannelStateWriteResult();
}
@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
checkCheckpointId(checkpointId);
for (final Buffer buffer : data) {
buffer.recycleBuffer();
}
}
@Override
public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
checkCheckpointId(checkpointId);
for (final Buffer buffer : data) {
buffer.recycleBuffer();
}
}
@Override
public void finishInput(long checkpointId) {
checkCheckpointId(checkpointId);
if (autoComplete) {
completeInput();
}
}
public void completeInput() {
channelStateWriteResult.getInputChannelStateHandles().complete(null);
}
@Override
public void finishOutput(long checkpointId) {
checkCheckpointId(checkpointId);
if (autoComplete) {
completeOutput();
}
}
public void completeOutput() {
channelStateWriteResult.getResultSubpartitionStateHandles().complete(null);
}
protected void checkCheckpointId(long checkpointId) {
if (checkpointId != startedCheckpointId) {
throw new IllegalStateException("Need to have recently called #start with " + checkpointId + " but " +
"currently started checkpoint id is " + startedCheckpointId);
}
}
@Override
public ChannelStateWriteResult getWriteResult(long checkpointId) {
return channelStateWriteResult;
}
@Override
public void abort(long checkpointId, Throwable cause) {
checkCheckpointId(checkpointId);
channelStateWriteResult.getInputChannelStateHandles().cancel(false);
channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false);
}
@Override
public void close() {
channelStateWriteResult.getInputChannelStateHandles().cancel(false);
channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint.channel;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.shaded.guava18.com.google.common.collect.LinkedListMultimap;
import org.apache.flink.shaded.guava18.com.google.common.collect.ListMultimap;
import java.util.Arrays;
/**
* A simple {@link ChannelStateWriter} used to write unit tests.
*/
public class RecordingChannelStateWriter extends MockChannelStateWriter {
private long lastStartedCheckpointId = -1;
private long lastFinishedCheckpointId = -1;
private ListMultimap<InputChannelInfo, Buffer> addedInput = LinkedListMultimap.create();
private ListMultimap<ResultSubpartitionInfo, Buffer> adedOutput = LinkedListMultimap.create();
public RecordingChannelStateWriter() {
super(false);
}
public void reset() {
lastStartedCheckpointId = -1;
lastFinishedCheckpointId = -1;
addedInput.values().forEach(Buffer::recycleBuffer);
addedInput.clear();
adedOutput.values().forEach(Buffer::recycleBuffer);
adedOutput.clear();
}
@Override
public void start(long checkpointId, CheckpointOptions checkpointOptions) {
super.start(checkpointId, checkpointOptions);
lastStartedCheckpointId = checkpointId;
}
@Override
public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, Buffer... data) {
checkCheckpointId(checkpointId);
addedInput.putAll(info, Arrays.asList(data));
}
@Override
public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
checkCheckpointId(checkpointId);
adedOutput.putAll(info, Arrays.asList(data));
}
public long getLastStartedCheckpointId() {
return lastStartedCheckpointId;
}
public long getLastFinishedCheckpointId() {
return lastFinishedCheckpointId;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
lastFinishedCheckpointId = checkpointId;
}
public ListMultimap<InputChannelInfo, Buffer> getAddedInput() {
return addedInput;
}
public ListMultimap<ResultSubpartitionInfo, Buffer> getAddedOutput() {
return adedOutput;
}
}
......@@ -99,7 +99,7 @@ public class InputChannelBuilder {
return this;
}
InputChannelBuilder setupFromNettyShuffleEnvironment(NettyShuffleEnvironment network) {
public InputChannelBuilder setupFromNettyShuffleEnvironment(NettyShuffleEnvironment network) {
this.partitionManager = network.getResultPartitionManager();
this.connectionManager = network.getConnectionManager();
this.initialBackoff = network.getConfiguration().partitionRequestInitialBackoff();
......
......@@ -703,7 +703,7 @@ public abstract class CheckpointBarrierAlignerTestBase {
createCancellationBarrier(3L, 1),
createBuffer(0)
};
AbstractInvokable validator = new CheckpointSequenceValidator(-3);
AbstractInvokable validator = new ValidatingCheckpointHandler();
inputGate = createBarrierBuffer(2, sequence, validator);
for (BufferOrEvent boe : sequence) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
/**
* Integration test for performing the unaligned checkpoint.
*/
public class UnalignedCheckpointITCase extends TestLogger {
public static final String NUM_COMPLETED_CHECKPOINTS = "numCompletedCheckpoints";
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
@Rule
public final Timeout timeout = Timeout.builder()
.withTimeout(90, TimeUnit.SECONDS)
.build();
@Test
public void shouldPerformUnalignedCheckpointOnNonparallelTopology() throws Exception {
execute(1);
}
@Test
public void shouldPerformUnalignedCheckpointOnLocalChannelsOnly() throws Exception {
execute(2);
}
@Test
public void shouldPerformUnalignedCheckpointOnRemoteChannels() throws Exception {
execute(10);
}
@Test
public void shouldPerformUnalignedCheckpointMassivelyParallel() throws Exception {
execute(20);
}
private void execute(int paralellism) throws Exception {
StreamExecutionEnvironment env = createEnv(paralellism);
createDAG(env, 30);
final JobExecutionResult executionResult = env.execute();
assertThat(executionResult.<Long>getAccumulatorResult(NUM_COMPLETED_CHECKPOINTS) / paralellism,
Matchers.greaterThanOrEqualTo(30L));
}
@Nonnull
private LocalStreamEnvironment createEnv(final int parallelism) throws IOException {
Configuration conf = new Configuration();
final int numSlots = 3;
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
conf.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, .9f);
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, (parallelism + numSlots - 1) / numSlots);
conf.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temp.newFolder().toURI().toString());
final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
env.enableCheckpointing(100);
env.getCheckpointConfig().enableUnalignedCheckpoints();
return env;
}
private void createDAG(final StreamExecutionEnvironment env, final long minCheckpoints) {
final SingleOutputStreamOperator<Integer> source = env.addSource(new IntegerSource(minCheckpoints));
final SingleOutputStreamOperator<Integer> transform = source.shuffle().map(i -> 2 * i);
transform.shuffle().addSink(new CountingSink<>());
}
private static class IntegerSource extends RichParallelSourceFunction<Integer> implements CheckpointListener {
private final long minCheckpoints;
private volatile boolean running = true;
private LongCounter numCompletedCheckpoints = new LongCounter();
public IntegerSource(final long minCheckpoints) {
this.minCheckpoints = minCheckpoints;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext().addAccumulator(NUM_COMPLETED_CHECKPOINTS, numCompletedCheckpoints);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
numCompletedCheckpoints.add(1);
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int counter = 0;
while (running) {
ctx.collect(counter++);
if (numCompletedCheckpoints.getLocalValue() >= minCheckpoints) {
cancel();
}
}
// wait for all instances to finish, such that checkpoints are still processed
Thread.sleep(1000);
}
@Override
public void cancel() {
running = false;
}
}
private static class CountingSink<T> extends RichSinkFunction<T> {
private LongCounter counter = new LongCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
getRuntimeContext().addAccumulator("outputs", counter);
}
@Override
public void invoke(T value, Context context) throws Exception {
counter.add(1);
if (counter.getLocalValue() % 100 == 0) {
Thread.sleep(1);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册