diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java index 3e9eb11abbb0ec375886f84b0beeb0583bd7edd5..0a3a60362f1663f8206f0c44db8d731807750e36 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -18,38 +18,24 @@ package org.apache.flink.streaming.connectors.fs; -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; - import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.EnumSet; /** * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. */ public abstract class StreamWriterBase implements Writer { - private static final Logger LOG = LoggerFactory.getLogger(BucketingSink.class); + private static final long serialVersionUID = 2L; /** * The {@code FSDataOutputStream} for the current part file. */ private transient FSDataOutputStream outStream; - /** - * We use reflection to get the hflush method or use sync as a fallback. - * The idea for this and the code comes from the Flume HDFS Sink. - */ - private transient Method refHflushOrSync; - /** * Returns the current output stream, if the stream is open. */ @@ -60,74 +46,12 @@ public abstract class StreamWriterBase implements Writer { return outStream; } - /** - * If hflush is available in this version of HDFS, then this method calls - * hflush, else it calls sync. - * - *

Note: This code comes from Flume - * - * @param os - The stream to flush/sync - * @throws java.io.IOException - */ - protected void hflushOrSync(FSDataOutputStream os) throws IOException { - try { - // At this point the refHflushOrSync cannot be null, - // since register method would have thrown if it was. - this.refHflushOrSync.invoke(os); - - if (os instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - } - } catch (InvocationTargetException e) { - String msg = "Error while trying to hflushOrSync!"; - LOG.error(msg + " " + e.getCause()); - Throwable cause = e.getCause(); - if (cause != null && cause instanceof IOException) { - throw (IOException) cause; - } - throw new RuntimeException(msg, e); - } catch (Exception e) { - String msg = "Error while trying to hflushOrSync!"; - LOG.error(msg + " " + e); - throw new RuntimeException(msg, e); - } - } - - /** - * Gets the hflush call using reflection. Fallback to sync if hflush is not available. - * - *

Note: This code comes from Flume - */ - private Method reflectHflushOrSync(FSDataOutputStream os) { - Method m = null; - if (os != null) { - Class fsDataOutputStreamClass = os.getClass(); - try { - m = fsDataOutputStreamClass.getMethod("hflush"); - } catch (NoSuchMethodException ex) { - LOG.debug("HFlush not found. Will use sync() instead"); - try { - m = fsDataOutputStreamClass.getMethod("sync"); - } catch (Exception ex1) { - String msg = "Neither hflush not sync were found. That seems to be " + - "a problem!"; - LOG.error(msg); - throw new RuntimeException(msg, ex1); - } - } - } - return m; - } - @Override public void open(FileSystem fs, Path path) throws IOException { if (outStream != null) { throw new IllegalStateException("Writer has already been opened"); } outStream = fs.create(path, false); - if (refHflushOrSync == null) { - refHflushOrSync = reflectHflushOrSync(outStream); - } } @Override @@ -135,7 +59,7 @@ public abstract class StreamWriterBase implements Writer { if (outStream == null) { throw new IllegalStateException("Writer is not open"); } - hflushOrSync(outStream); + outStream.hflush(); return outStream.getPos(); } @@ -155,5 +79,4 @@ public abstract class StreamWriterBase implements Writer { outStream = null; } } - }