提交 bd273a8f 编写于 作者: K kl0u 提交者: Aljoscha Krettek

[FLINK-4075] Fix unstable ContinuousFileProcessingCheckpointITCase

上级 5709bf69
......@@ -166,9 +166,9 @@ public class ContinuousFileMonitoringTest {
content.add(element.getValue() +"\n");
}
Assert.assertEquals(actualFileContents.size(), expectedFileContents.size());
Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
for (Integer fileIdx: expectedFileContents.keySet()) {
Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
List<String> cntnt = actualFileContents.get(fileIdx);
Collections.sort(cntnt, new Comparator<String>() {
......@@ -182,7 +182,7 @@ public class ContinuousFileMonitoringTest {
for (String line: cntnt) {
cntntStr.append(line);
}
Assert.assertEquals(cntntStr.toString(), expectedFileContents.get(fileIdx));
Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
}
for(org.apache.hadoop.fs.Path file: filesCreated) {
......
......@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
......@@ -53,7 +52,7 @@ import java.util.Map;
*/
@Internal
public class ContinuousFileMonitoringFunction<OUT>
extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
extends RichSourceFunction<FileInputSplit> implements Checkpointed<Long> {
private static final long serialVersionUID = 1L;
......@@ -80,14 +79,12 @@ public class ContinuousFileMonitoringFunction<OUT>
/** Which new data to process (see {@link FileProcessingMode}. */
private final FileProcessingMode watchType;
private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
private long globalModificationTime;
private Long globalModificationTime;
private FilePathFilter pathFilter;
private transient Object checkpointLock;
private volatile boolean isRunning = true;
public ContinuousFileMonitoringFunction(
......@@ -113,7 +110,7 @@ public class ContinuousFileMonitoringFunction<OUT>
@SuppressWarnings("unchecked")
public void open(Configuration parameters) throws Exception {
LOG.info("Opening File Monitoring Source.");
super.open(parameters);
format.configure(parameters);
}
......@@ -122,17 +119,28 @@ public class ContinuousFileMonitoringFunction<OUT>
public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(path));
checkpointLock = context.getCheckpointLock();
switch (watchType) {
case PROCESS_CONTINUOUSLY:
while (isRunning) {
monitorDirAndForwardSplits(fileSystem, context);
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}
isRunning = false;
// here we do not need to set the running to false and the
// globalModificationTime to Long.MAX_VALUE because to arrive here,
// either close() or cancel() have already been called, so this
// is already done.
break;
case PROCESS_ONCE:
monitorDirAndForwardSplits(fileSystem, context);
isRunning = false;
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
globalModificationTime = Long.MAX_VALUE;
isRunning = false;
}
break;
default:
isRunning = false;
......@@ -141,41 +149,22 @@ public class ContinuousFileMonitoringFunction<OUT>
}
private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
final Object lock = context.getCheckpointLock();
assert (Thread.holdsLock(checkpointLock));
// it may be non-null in the case of a recovery after a failure.
if (currentSplitsToFwd != null) {
synchronized (lock) {
forwardSplits(currentSplitsToFwd, context);
}
}
currentSplitsToFwd = null;
// it may be non-null in the case of a recovery after a failure.
if (splitsToFwdOrderedAscByModTime == null) {
splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs);
}
Iterator<Tuple2<Long, List<FileInputSplit>>> it =
splitsToFwdOrderedAscByModTime.iterator();
List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = getInputSplitSortedOnModTime(fs);
Iterator<Tuple2<Long, List<FileInputSplit>>> it = splitsByModTime.iterator();
while (it.hasNext()) {
synchronized (lock) {
currentSplitsToFwd = it.next();
it.remove();
forwardSplits(currentSplitsToFwd, context);
}
forwardSplits(it.next(), context);
it.remove();
}
// set them to null to distinguish from a restore.
splitsToFwdOrderedAscByModTime = null;
currentSplitsToFwd = null;
}
private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
currentSplitsToFwd = splitsToFwd;
Long modTime = currentSplitsToFwd.f0;
List<FileInputSplit> splits = currentSplitsToFwd.f1;
assert (Thread.holdsLock(checkpointLock));
Long modTime = splitsToFwd.f0;
List<FileInputSplit> splits = splitsToFwd.f1;
Iterator<FileInputSplit> it = splits.iterator();
while (it.hasNext()) {
......@@ -284,6 +273,7 @@ public class ContinuousFileMonitoringFunction<OUT>
* is the time of the most recent modification found in any of the already processed files.
*/
private boolean shouldIgnore(Path filePath, long modificationTime) {
assert (Thread.holdsLock(checkpointLock));
boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
if (shouldIgnore) {
LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
......@@ -294,35 +284,36 @@ public class ContinuousFileMonitoringFunction<OUT>
@Override
public void close() throws Exception {
super.close();
isRunning = false;
synchronized (checkpointLock) {
globalModificationTime = Long.MAX_VALUE;
isRunning = false;
}
LOG.info("Closed File Monitoring Source.");
}
@Override
public void cancel() {
isRunning = false;
if (checkpointLock != null) {
// this is to cover the case where cancel() is called before the run()
synchronized (checkpointLock) {
globalModificationTime = Long.MAX_VALUE;
isRunning = false;
}
} else {
globalModificationTime = Long.MAX_VALUE;
isRunning = false;
}
}
// --------------------- Checkpointing --------------------------
@Override
public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
if (!isRunning) {
LOG.debug("snapshotState() called on closed source");
return null;
}
return new Tuple3<>(splitsToFwdOrderedAscByModTime,
currentSplitsToFwd, globalModificationTime);
public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return globalModificationTime;
}
@Override
public void restoreState(Tuple3<List<Tuple2<Long, List<FileInputSplit>>>,
Tuple2<Long, List<FileInputSplit>>, Long> state) throws Exception {
this.splitsToFwdOrderedAscByModTime = state.f0;
this.currentSplitsToFwd = state.f1;
this.globalModificationTime = state.f2;
public void restoreState(Long state) throws Exception {
this.globalModificationTime = state;
}
}
......@@ -104,7 +104,14 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.collector = new TimestampedCollector<>(output);
this.checkpointLock = getContainingTask().getCheckpointLock();
Preconditions.checkState(reader == null, "The reader is already initialized.");
this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
// the readerState is needed for the initialization of the reader
// when recovering from a failure. So after the initialization,
// we can set it to null.
this.readerState = null;
this.reader.start();
}
......@@ -191,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private volatile boolean isSplitOpen = false;
SplitReader(FileInputFormat<OT> format,
private SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
TimestampedCollector<OT> collector,
Object checkpointLock,
......@@ -212,18 +219,19 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = restoredState.f2;
for (FileInputSplit split : pending) {
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
pendingSplits.add(split);
}
this.currentSplit = current;
this.restoredFormatState = formatState;
}
ContinuousFileReaderOperator.this.readerState = null;
}
void addSplit(FileInputSplit split) {
private void addSplit(FileInputSplit split) {
Preconditions.checkNotNull(split);
synchronized (checkpointLock) {
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
this.pendingSplits.add(split);
}
}
......@@ -323,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
}
Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
for (FileInputSplit split: this.pendingSplits) {
snapshot.add(split);
......@@ -334,9 +342,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
S formatState = this.isSplitOpen ?
(S) ((CheckpointableInputFormat) format).getCurrentState() :
restoredFormatState;
return new Tuple3<>(snapshot, currentSplit, formatState);
} else {
LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
......@@ -405,6 +415,9 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = (S) ois.readObject();
// set the whole reader state for the open() to find.
Preconditions.checkState(this.readerState == null,
"The reader state has already been initialized.");
this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
div.close();
}
......
......@@ -85,7 +85,11 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
while (isRunning && !format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
ctx.collect(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
} else {
break;
}
}
format.close();
completedSplitsCounter.inc();
......
......@@ -219,6 +219,8 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
stream.write(line.getBytes());
}
stream.close();
Assert.assertTrue("Result file present", !fs.exists(file));
fs.rename(tmp, file);
Assert.assertTrue("No result file present", fs.exists(file));
return new Tuple2<>(file, str.toString());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册