提交 fc4abd7f 编写于 作者: K kl0u 提交者: Aljoscha Krettek

[FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis

上级 ec6d9752
......@@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
private transient long recordsReadSinceLastSync;
private transient long lastSync = -1l;
private long lastSync = -1l;
public AvroInputFormat(Path filePath, Class<E> type) {
super(filePath);
......@@ -186,18 +186,21 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
this.open(split);
if (state.f0 != -1) {
try {
this.open(split);
} finally {
if (state.f0 != -1) {
lastSync = state.f0;
recordsReadSinceLastSync = state.f1;
}
}
// go to the block we stopped
lastSync = state.f0;
if (lastSync != -1) {
// open and read until the record we were before
// the checkpoint and discard the values
dataFileReader.seek(lastSync);
// read until the record we were before the checkpoint and discard the values
long recordsToDiscard = state.f1;
for(int i = 0; i < recordsToDiscard; i++) {
for(int i = 0; i < recordsReadSinceLastSync; i++) {
dataFileReader.next(null);
recordsReadSinceLastSync++;
}
}
}
......
......@@ -390,14 +390,17 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
this.open(split);
this.blockInfo = this.createAndReadBlockInfo();
try {
this.open(split);
} finally {
this.blockInfo = this.createAndReadBlockInfo();
long blockPos = state.f0;
this.readRecords = state.f1;
long blockPos = state.f0;
this.readRecords = state.f1;
this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
this.stream.seek(this.splitStart + blockPos);
this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
}
}
}
......@@ -144,7 +144,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
private transient boolean end;
private transient long offset = -1;
private long offset = -1;
// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
......@@ -638,9 +638,15 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
public void reopen(FileInputSplit split, Long state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
Preconditions.checkArgument(state == -1 || state >= split.getStart(),
" Illegal offset "+ state +", smaller than the splits start=" + split.getStart());
try {
this.open(split);
} finally {
this.offset = state;
}
this.open(split);
this.offset = state;
if (state > this.splitStart + split.getLength()) {
this.end = true;
} else if (state > split.getStart()) {
......
......@@ -188,6 +188,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private S restoredFormatState = null;
private volatile boolean isSplitOpen = false;
SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
TimestampedCollector<OT> collector,
......@@ -271,6 +273,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
this.format.open(currentSplit);
}
this.isSplitOpen = true;
}
LOG.info("Reading split: " + currentSplit);
......@@ -290,8 +293,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
} finally {
// close and prepare for the next iteration
this.format.close();
this.currentSplit = null;
synchronized (checkpointLock) {
this.format.close();
this.isSplitOpen = false;
this.currentSplit = null;
}
}
}
......@@ -303,8 +309,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
} finally {
synchronized (checkpointLock) {
LOG.info("Reader terminated, and exiting...");
this.format.closeInputFormat();
this.isSplitOpen = false;
this.currentSplit = null;
this.isRunning = false;
checkpointLock.notifyAll();
}
}
......@@ -321,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
if (this.format instanceof CheckpointableInputFormat) {
if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册