提交 e373c448 编写于 作者: Y Yu Li 提交者: Stephan Ewen

[FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest

Currently test cases will fail when trying to close the output stream if all data written
but ClosedByInterruptException occurs at the ending phase. This commit fixes it.

This closes #9235
上级 14afeea0
......@@ -19,6 +19,7 @@
package org.apache.flink.core.fs;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
......@@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final Path path = new Path(testDir, "part-0");
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = writer.open(path);
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
......@@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(testData1, fileContents.getValue());
}
} finally {
IOUtils.closeQuietly(stream);
}
}
......@@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final Path path = new Path(testDir, "part-0");
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = writer.open(path);
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.persist();
......@@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(testData1 + testData2, fileContents.getValue());
}
} finally {
IOUtils.closeQuietly(stream);
}
}
......@@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final RecoverableWriter initWriter = getNewFileSystemWriter();
final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4);
try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = initWriter.open(path);
recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
......@@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
} finally {
IOUtils.closeQuietly(stream);
}
final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer();
......@@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) {
RecoverableFsDataOutputStream recoveredStream = null;
try {
recoveredStream = newWriter.recover(recoveredRecoverable);
// we expect the data to be truncated
Map<Path, String> files = getFileContentByPath(testDir);
......@@ -238,6 +253,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
Assert.assertEquals("part-0", fileContents.getKey().getName());
Assert.assertEquals(expectedFinalContents, fileContents.getValue());
}
} finally {
IOUtils.closeQuietly(recoveredStream);
}
}
......@@ -249,7 +266,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final RecoverableWriter initWriter = getNewFileSystemWriter();
final RecoverableWriter.CommitRecoverable recoverable;
try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = initWriter.open(path);
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.persist();
......@@ -259,6 +278,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
recoverable = stream.closeForCommit().getRecoverable();
} finally {
IOUtils.closeQuietly(stream);
}
final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable);
......@@ -289,12 +310,16 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final RecoverableWriter writer = getNewFileSystemWriter();
final Path path = new Path(testDir, "part-0");
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = writer.open(path);
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().getRecoverable();
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
fail();
} finally {
IOUtils.closeQuietly(stream);
}
}
......@@ -306,13 +331,17 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final Path path = new Path(testDir, "part-0");
RecoverableWriter.ResumeRecoverable recoverable;
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = writer.open(path);
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
recoverable = stream.persist();
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
stream.closeForCommit().commit();
} finally {
IOUtils.closeQuietly(stream);
}
// this should throw an exception as the file is already committed
......@@ -332,7 +361,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
final RecoverableWriter.ResumeRecoverable recoverable1;
final RecoverableWriter.ResumeRecoverable recoverable2;
try (final RecoverableFsDataOutputStream stream = writer.open(path)) {
RecoverableFsDataOutputStream stream = null;
try {
stream = writer.open(path);
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
recoverable1 = stream.persist();
......@@ -340,6 +371,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger {
recoverable2 = stream.persist();
stream.write(testData3.getBytes(StandardCharsets.UTF_8));
} finally {
IOUtils.closeQuietly(stream);
}
try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册