diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index df67464f50761617c3a976b0e87e244e860fe468..e0ea3889309d688ed341f06c607397a76ef1c989 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -126,6 +126,9 @@ public class SourceOperator extends AbstractStr */ private TimestampsAndWatermarks eventTimeLogic; + /** Indicating whether the source operator has been closed. */ + private boolean closed; + public SourceOperator( FunctionWithException, Exception> readerFactory, @@ -247,9 +250,8 @@ public class SourceOperator extends AbstractStr } if (sourceReader != null) { sourceReader.close(); - // Set the field to null so the reader won't be closed again in dispose(). - sourceReader = null; } + closed = true; super.close(); } @@ -257,7 +259,7 @@ public class SourceOperator extends AbstractStr public void dispose() throws Exception { // We also need to close the source reader to make sure the resources // are released if the task does not finish normally. - if (sourceReader != null) { + if (!closed && sourceReader != null) { sourceReader.close(); } }