diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java index d204907b537bb57c94bf652a2be1f9175e498032..a1692a8872342d271ac5274bbae960582cba027c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/Bzip2InputStreamFactory.java @@ -15,26 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.io.compression; import org.apache.flink.annotation.Internal; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; + import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.Collections; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; - +/** + * Factory for Bzip2 decompressors. + */ @Internal public class Bzip2InputStreamFactory implements InflaterInputStreamFactory { - private static Bzip2InputStreamFactory INSTANCE = null; + private static final Bzip2InputStreamFactory INSTANCE = new Bzip2InputStreamFactory(); public static Bzip2InputStreamFactory getInstance() { - if (INSTANCE == null) { - INSTANCE = new Bzip2InputStreamFactory(); - } return INSTANCE; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java index b5051e6b42241a9dd82d1b277880feb7a6622a12..33f7d5145fb3a0e80eda49dbd347e5e97e01346b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.io.compression; import org.apache.flink.annotation.Internal; @@ -31,12 +32,9 @@ import java.util.zip.InflaterInputStream; @Internal public class DeflateInflaterInputStreamFactory implements InflaterInputStreamFactory { - private static DeflateInflaterInputStreamFactory INSTANCE = null; + private static final DeflateInflaterInputStreamFactory INSTANCE = new DeflateInflaterInputStreamFactory(); public static DeflateInflaterInputStreamFactory getInstance() { - if (INSTANCE == null) { - INSTANCE = new DeflateInflaterInputStreamFactory(); - } return INSTANCE; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java index 478eb2d4fdb8c9e4270025a39f6ea196b1cac7aa..335e365b93e674748cec4f8de66de087a63fedbb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.io.compression; import org.apache.flink.annotation.Internal; @@ -31,14 +32,12 @@ import java.util.zip.GZIPInputStream; @Internal public class GzipInflaterInputStreamFactory implements InflaterInputStreamFactory { - private static GzipInflaterInputStreamFactory INSTANCE = null; + private static final GzipInflaterInputStreamFactory INSTANCE = new GzipInflaterInputStreamFactory(); public static GzipInflaterInputStreamFactory getInstance() { - if (INSTANCE == null) { - INSTANCE = new GzipInflaterInputStreamFactory(); - } return INSTANCE; } + @Override public GZIPInputStream create(InputStream in) throws IOException { return new GZIPInputStream(in); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java index c80de401b3adb2b178007d2d59862f9e11d2b855..0802ab6f37d9b380e09c179831a1e8aa7277d989 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/XZInputStreamFactory.java @@ -15,25 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.io.compression; -import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import org.apache.flink.annotation.Internal; +import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; + import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.Collections; +/** + * Factory for XZ decompressors. + */ @Internal public class XZInputStreamFactory implements InflaterInputStreamFactory { - private static XZInputStreamFactory INSTANCE = null; + private static final XZInputStreamFactory INSTANCE = new XZInputStreamFactory(); public static XZInputStreamFactory getInstance() { - if (INSTANCE == null) { - INSTANCE = new XZInputStreamFactory(); - } return INSTANCE; }