diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java index ef9999faf99e2cdfb407e0e3dd371bfdfaf9c6de..ae23a494739b2b9f01ee800f6d4752b5aa1afe69 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java @@ -190,10 +190,12 @@ public abstract class HadoopInputFormatBase extends HadoopInputFormatCo @Override public void close() throws IOException { + if (this.recordReader != null) { - // enforce sequential close() calls - synchronized (CLOSE_MUTEX) { - this.recordReader.close(); + // enforce sequential close() calls + synchronized (CLOSE_MUTEX) { + this.recordReader.close(); + } } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java index 0335c23d22ed4a66a2d59421cbfe8d3d006e32d1..9d8a8c502bd0ea5424d9a8d5857dcfce8626b820 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java @@ -225,10 +225,12 @@ public abstract class HadoopInputFormatBase extends HadoopInputFormatCo @Override public void close() throws IOException { + if (this.recordReader != null) { - // enforce sequential close() calls - synchronized (CLOSE_MUTEX) { - this.recordReader.close(); + // enforce sequential close() calls + synchronized (CLOSE_MUTEX) { + this.recordReader.close(); + } } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java index 3b8d22759bf39bff8ba7084abd26418bc1b939d1..434ad15651504edd1c71749bfdcbce8ad6d6961b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java @@ -174,6 +174,13 @@ public class HadoopInputFormatTest { assertThat(tupleType, is(equalTo(expectedType))); } + @Test + public void testCloseWithoutOpen() throws Exception { + HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>( + new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf()); + hadoopInputFormat.close(); + } + private HadoopInputSplit getHadoopInputSplit() { return new HadoopInputSplit(1, getFileSplit(), new JobConf()); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java index d6ec484d201a5d64cd582a8a672ecc5a5753001c..4c9c0093172db1a8ea51c1e986a7605f59866893 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java @@ -98,6 +98,12 @@ public class HadoopInputFormatTest { verify(recordReader, times(1)).close(); } + @Test + public void testCloseWithoutOpen() throws Exception { + HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, Job.getInstance()); + hadoopInputFormat.close(); + } + @Test public void testFetchNextInitialState() throws Exception { DummyRecordReader recordReader = new DummyRecordReader();