提交 9720e563 编写于 作者: J Jiangjie (Becket) Qin

[FLINK-20781] Avoid NPE after SourceOperator is closed.

上级 049a61a5
...@@ -126,6 +126,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr ...@@ -126,6 +126,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
*/ */
private TimestampsAndWatermarks<OUT> eventTimeLogic; private TimestampsAndWatermarks<OUT> eventTimeLogic;
/** Indicating whether the source operator has been closed. */
private boolean closed;
public SourceOperator( public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory, readerFactory,
...@@ -247,9 +250,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr ...@@ -247,9 +250,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
} }
if (sourceReader != null) { if (sourceReader != null) {
sourceReader.close(); sourceReader.close();
// Set the field to null so the reader won't be closed again in dispose().
sourceReader = null;
} }
closed = true;
super.close(); super.close();
} }
...@@ -257,7 +259,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr ...@@ -257,7 +259,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
public void dispose() throws Exception { public void dispose() throws Exception {
// We also need to close the source reader to make sure the resources // We also need to close the source reader to make sure the resources
// are released if the task does not finish normally. // are released if the task does not finish normally.
if (sourceReader != null) { if (!closed && sourceReader != null) {
sourceReader.close(); sourceReader.close();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册