提交 6c6b17b4 编写于 作者: A Aljoscha Krettek

Remove polluting log message in ContinuousFileReaderOperator

Before, when snapshotting, we printed a log message about the file
input format not being checkpointable when the current split was
"null". Now, we only print the message when when appropriate.

This closes #2174
上级 a9733a9a
......@@ -65,6 +65,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
......@@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private FileInputFormat<OUT> format;
private TypeSerializer<OUT> serializer;
private Object checkpointLock;
private transient Object checkpointLock;
private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
......@@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
@SuppressWarnings("unchecked")
CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
(CheckpointableInputFormat<FileInputSplit, S>) this.format;
checkpointableFormat.reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
......@@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
S formatState = this.isSplitOpen ?
(S) ((CheckpointableInputFormat) format).getCurrentState() :
restoredFormatState;
return new Tuple3<>(snapshot, currentSplit, formatState);
if (this.currentSplit != null) {
if (this.format instanceof CheckpointableInputFormat) {
@SuppressWarnings("unchecked")
CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
(CheckpointableInputFormat<FileInputSplit, S>) this.format;
S formatState = this.isSplitOpen ?
checkpointableFormat.getCurrentState() :
restoredFormatState;
return new Tuple3<>(snapshot, currentSplit, formatState);
} else {
LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
}
} else {
LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
return new Tuple3<>(snapshot, null, null);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册