提交 304139d4 编写于 作者: S Stephan Ewen

[FLINK-2580] [runtime] Expose more methods form Hadoop output streams and...

[FLINK-2580] [runtime] Expose more methods form Hadoop output streams and exposes wrapped input and output streams.
上级 b18e410b
......@@ -16,14 +16,17 @@
* limitations under the License.
*/
package org.apache.flink.core.fs;
import java.io.IOException;
import java.io.OutputStream;
/**
* Interface for a data output stream to a file on a {@link FileSystem}.
*
*/
public abstract class FSDataOutputStream extends OutputStream {
public abstract void flush() throws IOException;
public abstract void sync() throws IOException;
}
......@@ -77,4 +77,15 @@ public class LocalDataOutputStream extends FSDataOutputStream {
public void close() throws IOException {
fos.close();
}
@Override
public void flush() throws IOException {
fos.flush();
}
@Override
public void sync() throws IOException {
fos.getFD().sync();
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import java.io.IOException;
......@@ -26,11 +25,10 @@ import org.apache.flink.core.fs.FSDataInputStream;
/**
* Concrete implementation of the {@link FSDataInputStream} for the
* Hadoop Distributed File System.
*
*/
public final class HadoopDataInputStream extends FSDataInputStream {
private org.apache.hadoop.fs.FSDataInputStream fsDataInputStream = null;
private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
/**
* Creates a new data input stream from the given HDFS input stream
......@@ -39,13 +37,15 @@ public final class HadoopDataInputStream extends FSDataInputStream {
* the HDFS input stream
*/
public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
if (fsDataInputStream == null) {
throw new NullPointerException();
}
this.fsDataInputStream = fsDataInputStream;
}
@Override
public synchronized void seek(long desired) throws IOException {
fsDataInputStream.seek(desired);
}
......@@ -68,17 +68,22 @@ public final class HadoopDataInputStream extends FSDataInputStream {
public int read(byte[] buffer, int offset, int length) throws IOException {
return fsDataInputStream.read(buffer, offset, length);
}
@Override
public int available() throws IOException {
return fsDataInputStream.available();
}
@Override
public long skip(long n) throws IOException {
return fsDataInputStream.skip(n);
}
/**
* Gets the wrapped Hadoop input stream.
* @return The wrapped Hadoop input stream.
*/
public org.apache.hadoop.fs.FSDataInputStream getHadoopInputStream() {
return fsDataInputStream;
}
}
......@@ -16,24 +16,27 @@
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.flink.core.fs.FSDataOutputStream;
public final class HadoopDataOutputStream extends FSDataOutputStream {
public class HadoopDataOutputStream extends FSDataOutputStream {
private org.apache.hadoop.fs.FSDataOutputStream fdos;
private final org.apache.hadoop.fs.FSDataOutputStream fdos;
public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
if (fdos == null) {
throw new NullPointerException();
}
this.fdos = fdos;
}
@Override
public void write(int b) throws IOException {
fdos.write(b);
}
......@@ -47,4 +50,125 @@ public final class HadoopDataOutputStream extends FSDataOutputStream {
fdos.close();
}
@Override
public void flush() throws IOException {
if (HFLUSH_METHOD != null) {
try {
HFLUSH_METHOD.invoke(fdos);
}
catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
if (cause instanceof IOException) {
throw (IOException) cause;
}
else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
else if (cause instanceof Error) {
throw (Error) cause;
}
else {
throw new IOException("Exception while invoking hflush()", cause);
}
}
catch (IllegalAccessException e) {
throw new IOException("Cannot invoke hflush()", e);
}
}
else if (HFLUSH_ERROR != null) {
if (HFLUSH_ERROR instanceof NoSuchMethodException) {
throw new UnsupportedOperationException("hflush() method is not available in this version of Hadoop.");
}
else {
throw new IOException("Cannot access hflush() method", HFLUSH_ERROR);
}
}
else {
throw new UnsupportedOperationException("hflush() is not available in this version of Hadoop.");
}
}
@Override
public void sync() throws IOException {
if (HSYNC_METHOD != null) {
try {
HSYNC_METHOD.invoke(fdos);
}
catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
if (cause instanceof IOException) {
throw (IOException) cause;
}
else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
else if (cause instanceof Error) {
throw (Error) cause;
}
else {
throw new IOException("Exception while invoking hsync()", cause);
}
}
catch (IllegalAccessException e) {
throw new IOException("Cannot invoke hsync()", e);
}
}
else if (HSYNC_ERROR != null) {
if (HSYNC_ERROR instanceof NoSuchMethodException) {
throw new UnsupportedOperationException("hsync() method is not available in this version of Hadoop.");
}
else {
throw new IOException("Cannot access hsync() method", HSYNC_ERROR);
}
}
else {
throw new UnsupportedOperationException("hsync() is not available in this version of Hadoop.");
}
}
/**
* Gets the wrapped Hadoop output stream.
* @return The wrapped Hadoop output stream.
*/
public org.apache.hadoop.fs.FSDataOutputStream getHadoopOutputStream() {
return fdos;
}
// ------------------------------------------------------------------------
// utilities to bridge hsync and hflush to hadoop, even through it is not supported in Hadoop 1
// ------------------------------------------------------------------------
private static final Method HFLUSH_METHOD;
private static final Method HSYNC_METHOD;
private static final Throwable HFLUSH_ERROR;
private static final Throwable HSYNC_ERROR;
static {
Method hflush = null;
Method hsync = null;
Throwable flushError = null;
Throwable syncError = null;
try {
hflush = org.apache.hadoop.fs.FSDataOutputStream.class.getMethod("hflush");
}
catch (Throwable t) {
flushError = t;
}
try {
hsync = org.apache.hadoop.fs.FSDataOutputStream.class.getMethod("hsync");
}
catch (Throwable t) {
syncError = t;
}
HFLUSH_METHOD = hflush;
HSYNC_METHOD = hsync;
HFLUSH_ERROR = flushError;
HSYNC_ERROR = syncError;
}
}
......@@ -23,18 +23,18 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.net.UnknownHostException;
import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.core.fs.HadoopFileSystemWrapper;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configuration;
/**
......@@ -264,6 +264,14 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
return fs.getUri();
}
/**
* Gets the underlying Hadoop FileSystem.
* @return The underlying Hadoop FileSystem.
*/
public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
return this.fs;
}
@Override
public void initialize(URI path) throws IOException {
......@@ -367,21 +375,21 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
return new HadoopDataInputStream(fdis);
}
@Override
public FSDataInputStream open(final Path f) throws IOException {
public HadoopDataInputStream open(final Path f) throws IOException {
final org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(f.toString());
final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
return new HadoopDataInputStream(fdis);
}
@Override
public FSDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize)
throws IOException
{
......@@ -392,7 +400,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
@Override
public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
public HadoopDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
return new HadoopDataOutputStream(fsDataOutputStream);
......
......@@ -69,6 +69,7 @@ public final class S3DataOutputStream extends FSDataOutputStream {
private int partNumber = 1; // First valid upload part number is 1.
private int bytesWritten = 0;
private final class InternalUploadInputStream extends InputStream {
......@@ -83,12 +84,9 @@ public final class S3DataOutputStream extends FSDataOutputStream {
this.length = length;
}
/**
* {@inheritDoc}
*/
@Override
public int read() throws IOException {
if (this.length - this.bytesRead == 0) {
return -1;
}
......@@ -96,21 +94,14 @@ public final class S3DataOutputStream extends FSDataOutputStream {
return (int) this.srcBuf[this.bytesRead++];
}
/**
* {@inheritDoc}
*/
@Override
public int read(final byte[] buf) throws IOException {
return read(buf, 0, buf.length);
}
/**
* {@inheritDoc}
*/
@Override
public int read(final byte[] buf, final int off, final int len) throws IOException {
if (this.length - this.bytesRead == 0) {
return -1;
}
......@@ -122,18 +113,12 @@ public final class S3DataOutputStream extends FSDataOutputStream {
return bytesToCopy;
}
/**
* {@inheritDoc}
*/
@Override
public int available() throws IOException {
return (this.length - bytesRead);
}
/**
* {@inheritDoc}
*/
@Override
public long skip(final long n) throws IOException {
......@@ -159,7 +144,6 @@ public final class S3DataOutputStream extends FSDataOutputStream {
@Override
public void write(final int b) throws IOException {
// Upload buffer to S3
if (this.bytesWritten == this.buf.length) {
uploadPartAndFlushBuffer();
......@@ -171,11 +155,9 @@ public final class S3DataOutputStream extends FSDataOutputStream {
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
int nextPos = off;
while (nextPos < len) {
// Upload buffer to S3
if (this.bytesWritten == this.buf.length) {
uploadPartAndFlushBuffer();
......@@ -191,14 +173,12 @@ public final class S3DataOutputStream extends FSDataOutputStream {
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void close() throws IOException {
if (this.uploadId == null) {
// This is not a multipart upload
......@@ -251,10 +231,13 @@ public final class S3DataOutputStream extends FSDataOutputStream {
}
}
@Override
public void sync() throws IOException {
// can do nothing here
}
@Override
public void flush() throws IOException {
// Flush does nothing in this implementation since we ways have to transfer at least 5 MB in a multipart upload
}
......@@ -267,7 +250,6 @@ public final class S3DataOutputStream extends FSDataOutputStream {
}
try {
if (this.partNumber >= MAX_PART_NUMBER) {
throw new IOException("Cannot upload any more data: maximum part number reached");
}
......@@ -287,9 +269,11 @@ public final class S3DataOutputStream extends FSDataOutputStream {
this.bytesWritten = 0;
operationSuccessful = true;
} catch (AmazonServiceException e) {
throw new IOException(StringUtils.stringifyException(e));
} finally {
}
catch (AmazonServiceException e) {
throw new IOException(e.getMessage(), e);
}
finally {
if (!operationSuccessful) {
abortUpload();
}
......@@ -312,9 +296,11 @@ public final class S3DataOutputStream extends FSDataOutputStream {
operationSuccessful = true;
return result.getUploadId();
} catch (AmazonServiceException e) {
throw new IOException(StringUtils.stringifyException(e));
} finally {
}
catch (AmazonServiceException e) {
throw new IOException(e.getMessage(), e);
}
finally {
if (!operationSuccessful) {
abortUpload();
}
......@@ -322,7 +308,6 @@ public final class S3DataOutputStream extends FSDataOutputStream {
}
private void abortUpload() {
if (this.uploadId == null) {
// This is not a multipart upload, nothing to do here
return;
......@@ -332,7 +317,8 @@ public final class S3DataOutputStream extends FSDataOutputStream {
final AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(this.bucket, this.object,
this.uploadId);
this.s3Client.abortMultipartUpload(request);
} catch (AmazonServiceException e) {
}
catch (AmazonServiceException e) {
// Ignore exception
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册