提交 7a912e60 编写于 作者: S Stephan Ewen

[FLINK-9751] [filesystem] Use FileChannel directly in LocalRecoverableFsDataOutputStream

上级 9d238e1a
......@@ -27,10 +27,14 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -44,12 +48,16 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private final File tempFile;
private final FileOutputStream fos;
private final FileChannel fileChannel;
private final OutputStream fos;
LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
this.fos = new FileOutputStream(tempFile);
this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
this.fos = Channels.newOutputStream(fileChannel);
}
LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
......@@ -57,15 +65,15 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
this.tempFile = checkNotNull(resumable.tempFile());
if (!tempFile.exists()) {
throw new FileNotFoundException("File Not Found: " + tempFile);
throw new FileNotFoundException("File Not Found: " + tempFile.getAbsolutePath());
}
if (tempFile.length() < resumable.offset()) {
throw new IOException("Missing data in tmp file: " + tempFile);
this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND);
if (this.fileChannel.position() < resumable.offset()) {
throw new IOException("Missing data in tmp file: " + tempFile.getAbsolutePath());
}
this.fos = new FileOutputStream(this.tempFile, true);
this.fos.getChannel().truncate(resumable.offset());
this.fileChannel.truncate(resumable.offset());
this.fos = Channels.newOutputStream(fileChannel);
}
@Override
......@@ -85,12 +93,12 @@ class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
@Override
public void sync() throws IOException {
fos.getFD().sync();
fileChannel.force(true);
}
@Override
public long getPos() throws IOException {
return fos.getChannel().position();
return fileChannel.position();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册