提交 04a7cd4d 编写于 作者: S sihuazhou 提交者: zentol

[FLINK-9584][connector] Properly close output streams in Bucketing-/RollingSink

This closes #6164.
上级 5dbb6dda
......@@ -533,12 +533,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
}
// verify that truncate actually works
FSDataOutputStream outputStream;
Path testPath = new Path(UUID.randomUUID().toString());
try {
outputStream = fs.create(testPath);
try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
outputStream.close();
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works.", e);
......@@ -702,9 +699,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
lengthFileOut.close();
try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
}
}
}
......
......@@ -631,12 +631,9 @@ public class BucketingSink<T>
}
// verify that truncate actually works
FSDataOutputStream outputStream;
Path testPath = new Path(UUID.randomUUID().toString());
try {
outputStream = fs.create(testPath);
try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
outputStream.close();
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works. " +
......@@ -880,9 +877,9 @@ public class BucketingSink<T>
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
lengthFileOut.writeUTF(Long.toString(validLength));
lengthFileOut.close();
try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
lengthFileOut.writeUTF(Long.toString(validLength));
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册