提交 1ddec930 编写于 作者: S Stephan Ewen

[FLINK-1115] Local file streams retry file creation on FileNotFoundException...

[FLINK-1115] Local file streams retry file creation on FileNotFoundException to increase resilience against spurious failures in tests
上级 cf80d862
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.core.fs.local; package org.apache.flink.core.fs.local;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
...@@ -28,14 +28,15 @@ import org.apache.flink.core.fs.FSDataOutputStream; ...@@ -28,14 +28,15 @@ import org.apache.flink.core.fs.FSDataOutputStream;
/** /**
* The <code>LocalDataOutputStream</code> class is a wrapper class for a data * The <code>LocalDataOutputStream</code> class is a wrapper class for a data
* output stream to the local file system. * output stream to the local file system.
*
*/ */
public class LocalDataOutputStream extends FSDataOutputStream { public class LocalDataOutputStream extends FSDataOutputStream {
private static final int MAX_OPEN_TRIES = 3;
/** /**
* The file output stream used to write data. * The file output stream used to write data.
*/ */
private FileOutputStream fos = null; private FileOutputStream fos;
/** /**
* Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object. * Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
...@@ -46,26 +47,34 @@ public class LocalDataOutputStream extends FSDataOutputStream { ...@@ -46,26 +47,34 @@ public class LocalDataOutputStream extends FSDataOutputStream {
* thrown if the data output stream cannot be created * thrown if the data output stream cannot be created
*/ */
public LocalDataOutputStream(final File file) throws IOException { public LocalDataOutputStream(final File file) throws IOException {
// we allow multiple tries to create the file, to increase resilience against spurious I/O failures
FileNotFoundException lastException = null;
for (int attempt = 0; attempt < MAX_OPEN_TRIES; attempt++) {
try {
this.fos = new FileOutputStream(file); this.fos = new FileOutputStream(file);
return;
}
catch (FileNotFoundException e) {
lastException = e;
}
}
throw lastException;
} }
@Override @Override
public void write(final int b) throws IOException { public void write(final int b) throws IOException {
fos.write(b); fos.write(b);
} }
@Override @Override
public void write(final byte[] b, final int off, final int len) throws IOException { public void write(final byte[] b, final int off, final int len) throws IOException {
fos.write(b, off, len); fos.write(b, off, len);
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
fos.close(); fos.close();
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册