提交 ece899a9 编写于 作者: J Jakub Havlik 提交者: Fabian Hueske

[FLINK-5518] [hadoopCompat] Add null check to HadoopInputFormatBase.close().

This closes #3133
This closes #243 // closing stale PR
上级 e187b5ee
......@@ -190,10 +190,12 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void close() throws IOException {
if (this.recordReader != null) {
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
}
}
}
......
......@@ -225,10 +225,12 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void close() throws IOException {
if (this.recordReader != null) {
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
}
}
}
......
......@@ -174,6 +174,13 @@ public class HadoopInputFormatTest {
assertThat(tupleType, is(equalTo(expectedType)));
}
@Test
public void testCloseWithoutOpen() throws Exception {
HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>(
new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf());
hadoopInputFormat.close();
}
private HadoopInputSplit getHadoopInputSplit() {
return new HadoopInputSplit(1, getFileSplit(), new JobConf());
}
......
......@@ -98,6 +98,12 @@ public class HadoopInputFormatTest {
verify(recordReader, times(1)).close();
}
@Test
public void testCloseWithoutOpen() throws Exception {
HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, Job.getInstance());
hadoopInputFormat.close();
}
@Test
public void testFetchNextInitialState() throws Exception {
DummyRecordReader recordReader = new DummyRecordReader();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册