提交 755b576b 编写于 作者: P Piotr Nowojski 提交者: Piotr Nowojski

[FLINK-17315][checkpointing] Fix NPE in unaligned checkpoint after EndOfPartition events

Released deserializers cause NPE in StreamTaskNetworkInput#prepareSnapshot.
上级 84574fbc
......@@ -248,7 +248,8 @@ public abstract class AbstractInvokable {
ThrowingRunnable<E> runnable,
String descriptionFormat,
Object... descriptionArgs) throws E {
throw new UnsupportedOperationException(String.format("runInTaskThread not supported by %s", this.getClass().getName()));
throw new UnsupportedOperationException(
String.format("executeInTaskThread not supported by %s", getClass().getName()));
}
/**
......
......@@ -20,11 +20,8 @@ package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import javax.annotation.Nullable;
import org.apache.flink.util.function.ThrowingRunnable;
/**
* An invokable that does nothing.
......@@ -57,4 +54,12 @@ public class DummyInvokable extends AbstractInvokable {
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}
@Override
public <E extends Exception> void executeInTaskThread(
ThrowingRunnable<E> runnable,
String descriptionFormat,
Object... descriptionArgs) throws E {
runnable.run();
}
}
......@@ -210,12 +210,15 @@ public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> {
final InputChannel channel = checkpointedInputGate.getChannel(channelIndex);
// Assumption for retrieving buffers = one concurrent checkpoint
recordDeserializers[channelIndex].getUnconsumedBuffer().ifPresent(buffer ->
channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
buffer));
RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
if (deserializer != null) {
deserializer.getUnconsumedBuffer().ifPresent(buffer ->
channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
buffer));
}
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, channelStateWriter);
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
......@@ -52,6 +53,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
......@@ -106,6 +109,50 @@ public class StreamTaskNetworkInputTest {
assertEquals(0, output.getNumberOfEmittedRecords());
}
@Test
public void testSnapshotAfterEndOfPartition() throws Exception {
int numInputChannels = 1;
int channelId = 0;
int checkpointId = 0;
VerifyRecordsDataOutput<Long> output = new VerifyRecordsDataOutput<>();
LongSerializer inSerializer = LongSerializer.INSTANCE;
StreamTestSingleInputGate<Long> inputGate = new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
TestRecordDeserializer[] deserializers = IntStream.range(0, numInputChannels)
.mapToObj(index -> new TestRecordDeserializer(ioManager.getSpillingDirectoriesPaths()))
.toArray(TestRecordDeserializer[]::new);
StreamTaskNetworkInput<Long> input = new StreamTaskNetworkInput<>(
new CheckpointedInputGate(
inputGate.getInputGate(),
new CheckpointBarrierUnaligner(
new int[] { numInputChannels },
ChannelStateWriter.NO_OP,
"test",
new DummyCheckpointInvokable())),
inSerializer,
new StatusWatermarkValve(numInputChannels, output),
0,
deserializers);
inputGate.sendEvent(
new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()),
channelId);
inputGate.sendElement(new StreamRecord<>(42L), channelId);
assertHasNextElement(input, output);
assertHasNextElement(input, output);
assertEquals(1, output.getNumberOfEmittedRecords());
// send EndOfPartitionEvent and ensure that deserializer has been released
inputGate.sendEvent(EndOfPartitionEvent.INSTANCE, channelId);
input.emitNext(output);
assertNull(deserializers[channelId]);
// now snapshot all inflight buffers
CompletableFuture<Void> completableFuture = input.prepareSnapshot(ChannelStateWriter.NO_OP, checkpointId);
completableFuture.join();
}
@Test
public void testReleasingDeserializerTimely()
throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册