From f414e55e1c82b3238c0f2e994523e31b76cc1e26 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Sun, 13 Dec 2015 16:56:02 +0100 Subject: [PATCH] [hotfix] Minor cleanups in classes FileSystem, OperatingSystem, CopyableValue, GenericCsvInputFormat --- .../api/common/io/GenericCsvInputFormat.java | 17 ++- .../org/apache/flink/core/fs/FileSystem.java | 76 ++----------- .../org/apache/flink/types/CopyableValue.java | 8 +- .../org/apache/flink/util/ClassUtils.java | 105 ------------------ .../apache/flink/util/InstantiationUtil.java | 53 +++++---- .../apache/flink/util/OperatingSystem.java | 12 +- 6 files changed, 48 insertions(+), 223 deletions(-) delete mode 100644 flink-core/src/main/java/org/apache/flink/util/ClassUtils.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index e68d2710de6..89caf2fe5f9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -46,7 +46,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat private static final long serialVersionUID = 1L; - private static final Class[] EMPTY_TYPES = new Class[0]; + private static final Class[] EMPTY_TYPES = new Class[0]; private static final boolean[] EMPTY_INCLUDED = new boolean[0]; @@ -240,9 +240,8 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat fieldIncluded[i] = true; } } - - Class[] denseTypeArray = (Class[]) types.toArray(new Class[types.size()]); - this.fieldTypes = denseTypeArray; + + this.fieldTypes = types.toArray(new Class[types.size()]); } protected void setFieldsGeneric(int[] sourceFieldIndices, Class[] fieldTypes) { @@ -275,8 +274,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat } } - Class[] denseTypeArray = (Class[]) types.toArray(new Class[types.size()]); - this.fieldTypes = denseTypeArray; + this.fieldTypes = types.toArray(new Class[types.size()]); } protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { @@ -307,8 +305,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat } } - Class[] denseTypeArray = (Class[]) types.toArray(new Class[types.size()]); - this.fieldTypes = denseTypeArray; + this.fieldTypes = types.toArray(new Class[types.size()]); this.fieldIncluded = includedMask; } @@ -321,7 +318,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat super.open(split); // instantiate the parsers - FieldParser[] parsers = new FieldParser[fieldTypes.length]; + FieldParser[] parsers = new FieldParser[fieldTypes.length]; for (int i = 0; i < fieldTypes.length; i++) { if (fieldTypes[i] != null) { @@ -441,7 +438,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat final int delimLimit = limit - delim.length + 1; - if(quotedStringParsing == true && bytes[i] == quoteCharacter) { + if (quotedStringParsing && bytes[i] == quoteCharacter) { // quoted string parsing enabled and field is quoted // search for ending quote character, continue when it is escaped diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 7dc92c617ff..03ca6a221dc 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -34,9 +34,7 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; -import org.apache.flink.util.ClassUtils; import org.apache.flink.util.OperatingSystem; -import org.apache.flink.util.StringUtils; /** * An abstract base class for a fairly generic file system. It @@ -262,37 +260,26 @@ public abstract class FileSystem { //Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'. private static FileSystem instantiateHadoopFileSystemWrapper(Class wrappedFileSystem) throws IOException { - FileSystem fs = null; - Class fsClass; try { - fsClass = ClassUtils.getFileSystemByName(HADOOP_WRAPPER_FILESYSTEM_CLASS); + Class fsClass = getFileSystemByName(HADOOP_WRAPPER_FILESYSTEM_CLASS); Constructor fsClassCtor = fsClass.getConstructor(Class.class); - fs = fsClassCtor.newInstance(wrappedFileSystem); + return fsClassCtor.newInstance(wrappedFileSystem); } catch (Throwable e) { throw new IOException("Error loading Hadoop FS wrapper", e); } - return fs; } private static FileSystem instantiateFileSystem(String className) throws IOException { - FileSystem fs = null; - Class fsClass; try { - fsClass = ClassUtils.getFileSystemByName(className); - } catch (ClassNotFoundException e1) { - throw new IOException(StringUtils.stringifyException(e1)); + Class fsClass = getFileSystemByName(className); + return fsClass.newInstance(); } - - try { - fs = fsClass.newInstance(); + catch (ClassNotFoundException e) { + throw new IOException("Could not load file system class '" + className + '\'', e); } - catch (InstantiationException e) { + catch (InstantiationException | IllegalAccessException e) { throw new IOException("Could not instantiate file system class: " + e.getMessage(), e); } - catch (IllegalAccessException e) { - throw new IOException("Could not instantiate file system class: " + e.getMessage(), e); - } - return fs; } private static HadoopFileSystemWrapper hadoopWrapper; @@ -450,6 +437,7 @@ public abstract class FileSystem { * @param replication * required block replication for the file. * @param blockSize + * the size of the file blocks * @throws IOException */ public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, @@ -723,52 +711,8 @@ public abstract class FileSystem { */ public abstract boolean isDistributedFS(); - /** - * Returns the number of blocks this file/directory consists of - * assuming the file system's standard block size. - * - * @param file - * the file - * @return the number of block's the file/directory consists of - * @throws IOException - */ - public int getNumberOfBlocks(final FileStatus file) throws IOException { - - int numberOfBlocks = 0; - - if (file == null) { - return 0; - } - // For a file, this is easy - if (!file.isDir()) { - return getNumberOfBlocks(file.getLen(), file.getBlockSize()); - } - - // file is a directory - final FileStatus[] files = this.listStatus(file.getPath()); - for (FileStatus file1 : files) { - if (!file1.isDir()) { - numberOfBlocks += getNumberOfBlocks(file1.getLen(), file1.getBlockSize()); - } - } - - return numberOfBlocks; - } - - private int getNumberOfBlocks(final long length, final long blocksize) { - - if (blocksize != 0) { - int numberOfBlocks; - numberOfBlocks = (int) (length / blocksize); - - if ((length % blocksize) != 0) { - numberOfBlocks++; - } - - return numberOfBlocks; - } else { - return 1; - } + private static Class getFileSystemByName(String className) throws ClassNotFoundException { + return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class); } } diff --git a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java index 3974cb2d9f9..95498fe82f8 100644 --- a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java +++ b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java @@ -44,10 +44,10 @@ public interface CopyableValue extends Value { /** * Performs a deep copy of this object into a new instance. - * - * This method is useful for generic user-defined functions to clone a - * {@link CopyableValue} when storing multiple objects. With object reuse - * a deep copy must be created and type erasure prevents calling new. + * + * This method is useful for generic user-defined functions to clone a + * {@link CopyableValue} when storing multiple objects. With object reuse + * a deep copy must be created and type erasure prevents calling new. * * @return New object with copied fields. */ diff --git a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java deleted file mode 100644 index a57e0040b15..00000000000 --- a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.util; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.core.fs.FileSystem; - -/** - * Utility class which provides various methods for dynamic class loading. - */ -public final class ClassUtils { - - /** - * Private constructor used to overwrite public one. - */ - private ClassUtils() {} - - /** - * Searches for a file system class by its name and attempts to load it. - * - * @param className - * the name of the file system class - * @return an instance of the file system class - * @throws ClassNotFoundException - * thrown if no class with such a name can be found - */ - public static Class getFileSystemByName(final String className) throws ClassNotFoundException { - return Class.forName(className, true, getClassLoader()).asSubclass(FileSystem.class); - } - - - private static ClassLoader getClassLoader() { - return ClassUtils.class.getClassLoader(); - } - - public static Class resolveClassPrimitiveAware(String className) throws ClassNotFoundException { - if (className == null) { - throw new NullPointerException(); - } - - Class primClass = PRIMITIVE_TYPES.get(className); - if (primClass != null) { - return primClass; - } else { - return Class.forName(className); - } - } - - public static boolean isPrimitiveOrBoxedOrString(Class clazz) { - return clazz != null && (clazz.isPrimitive() || ClassUtils.isBoxedTypeOrString(clazz)); - } - - public static boolean isBoxedTypeOrString(Class clazz) { - return BOXED_TYPES.contains(clazz); - } - - // -------------------------------------------------------------------------------------------- - - private static final Map> PRIMITIVE_TYPES = new HashMap>(9); - - private static final Set> BOXED_TYPES = new HashSet>(); - - static { - PRIMITIVE_TYPES.put("byte", byte.class); - PRIMITIVE_TYPES.put("short", short.class); - PRIMITIVE_TYPES.put("int", int.class); - PRIMITIVE_TYPES.put("long", long.class); - PRIMITIVE_TYPES.put("float", float.class); - PRIMITIVE_TYPES.put("double", double.class); - PRIMITIVE_TYPES.put("boolean", boolean.class); - PRIMITIVE_TYPES.put("char", char.class); - PRIMITIVE_TYPES.put("void", void.class); - - BOXED_TYPES.add(Byte.class); - BOXED_TYPES.add(Short.class); - BOXED_TYPES.add(Integer.class); - BOXED_TYPES.add(Long.class); - BOXED_TYPES.add(Float.class); - BOXED_TYPES.add(Double.class); - BOXED_TYPES.add(Boolean.class); - BOXED_TYPES.add(Character.class); - BOXED_TYPES.add(Void.class); - BOXED_TYPES.add(String.class); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 4f6ee32492d..b1ef35dfea9 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -47,20 +47,12 @@ public final class InstantiationUtil { * */ public static class ClassLoaderObjectInputStream extends ObjectInputStream { - private ClassLoader classLoader; - private static final HashMap> primitiveClasses - = new HashMap>(8, 1.0F); - static { - primitiveClasses.put("boolean", boolean.class); - primitiveClasses.put("byte", byte.class); - primitiveClasses.put("char", char.class); - primitiveClasses.put("short", short.class); - primitiveClasses.put("int", int.class); - primitiveClasses.put("long", long.class); - primitiveClasses.put("float", float.class); - primitiveClasses.put("double", double.class); - primitiveClasses.put("void", void.class); + private final ClassLoader classLoader; + + public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException { + super(in); + this.classLoader = classLoader; } @Override @@ -84,10 +76,21 @@ public final class InstantiationUtil { return super.resolveClass(desc); } + + // ------------------------------------------------ - public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException { - super(in); - this.classLoader = classLoader; + private static final HashMap> primitiveClasses = new HashMap<>(9); + + static { + primitiveClasses.put("boolean", boolean.class); + primitiveClasses.put("byte", byte.class); + primitiveClasses.put("char", char.class); + primitiveClasses.put("short", short.class); + primitiveClasses.put("int", int.class); + primitiveClasses.put("long", long.class); + primitiveClasses.put("float", float.class); + primitiveClasses.put("double", double.class); + primitiveClasses.put("void", void.class); } } @@ -293,28 +296,22 @@ public final class InstantiationUtil { @SuppressWarnings("unchecked") public static T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException { - ObjectInputStream oois = null; final ClassLoader old = Thread.currentThread().getContextClassLoader(); - try { + try (ObjectInputStream oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl)) { Thread.currentThread().setContextClassLoader(cl); - oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl); return (T) oois.readObject(); - } finally { + } + finally { Thread.currentThread().setContextClassLoader(old); - if (oois != null) { - oois.close(); - } } } public static byte[] serializeObject(Object o) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(o); + return baos.toByteArray(); } - - return baos.toByteArray(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/OperatingSystem.java b/flink-core/src/main/java/org/apache/flink/util/OperatingSystem.java index 4aa4eed2cad..eed1fc9ecd1 100644 --- a/flink-core/src/main/java/org/apache/flink/util/OperatingSystem.java +++ b/flink-core/src/main/java/org/apache/flink/util/OperatingSystem.java @@ -19,13 +19,9 @@ package org.apache.flink.util; /** - * An enumeration indicating the operating system that the engine runs on. + * An enumeration indicating the operating system that the JVM runs on. */ public enum OperatingSystem { - - // -------------------------------------------------------------------------------------------- - // Constants to extract the OS type from the java environment - // -------------------------------------------------------------------------------------------- LINUX, WINDOWS, @@ -33,9 +29,7 @@ public enum OperatingSystem { FREE_BSD, UNKNOWN; - // -------------------------------------------------------------------------------------------- - // Constants to extract the OS type from the java environment - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ /** * Gets the operating system that the JVM runs on from the java system properties. @@ -117,8 +111,6 @@ public enum OperatingSystem { return UNKNOWN; } - - // -------------------------------------------------------------------------------------------- // Constants to extract the OS type from the java environment // -------------------------------------------------------------------------------------------- -- GitLab