提交 f414e55e 编写于 作者: S Stephan Ewen

[hotfix] Minor cleanups in classes FileSystem, OperatingSystem, CopyableValue,...

[hotfix] Minor cleanups in classes FileSystem, OperatingSystem, CopyableValue, GenericCsvInputFormat
上级 44eb58e0
......@@ -46,7 +46,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private static final long serialVersionUID = 1L;
private static final Class<?>[] EMPTY_TYPES = new Class[0];
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
private static final boolean[] EMPTY_INCLUDED = new boolean[0];
......@@ -240,9 +240,8 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
fieldIncluded[i] = true;
}
}
Class<?>[] denseTypeArray = (Class<?>[]) types.toArray(new Class[types.size()]);
this.fieldTypes = denseTypeArray;
this.fieldTypes = types.toArray(new Class<?>[types.size()]);
}
protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
......@@ -275,8 +274,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
}
Class<?>[] denseTypeArray = (Class<?>[]) types.toArray(new Class[types.size()]);
this.fieldTypes = denseTypeArray;
this.fieldTypes = types.toArray(new Class<?>[types.size()]);
}
protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
......@@ -307,8 +305,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
}
Class<?>[] denseTypeArray = (Class<?>[]) types.toArray(new Class[types.size()]);
this.fieldTypes = denseTypeArray;
this.fieldTypes = types.toArray(new Class<?>[types.size()]);
this.fieldIncluded = includedMask;
}
......@@ -321,7 +318,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
super.open(split);
// instantiate the parsers
FieldParser<?>[] parsers = new FieldParser[fieldTypes.length];
FieldParser<?>[] parsers = new FieldParser<?>[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i] != null) {
......@@ -441,7 +438,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
final int delimLimit = limit - delim.length + 1;
if(quotedStringParsing == true && bytes[i] == quoteCharacter) {
if (quotedStringParsing && bytes[i] == quoteCharacter) {
// quoted string parsing enabled and field is quoted
// search for ending quote character, continue when it is escaped
......
......@@ -34,9 +34,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.util.ClassUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.StringUtils;
/**
* An abstract base class for a fairly generic file system. It
......@@ -262,37 +260,26 @@ public abstract class FileSystem {
//Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'.
private static FileSystem instantiateHadoopFileSystemWrapper(Class<?> wrappedFileSystem) throws IOException {
FileSystem fs = null;
Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(HADOOP_WRAPPER_FILESYSTEM_CLASS);
Class<? extends FileSystem> fsClass = getFileSystemByName(HADOOP_WRAPPER_FILESYSTEM_CLASS);
Constructor<? extends FileSystem> fsClassCtor = fsClass.getConstructor(Class.class);
fs = fsClassCtor.newInstance(wrappedFileSystem);
return fsClassCtor.newInstance(wrappedFileSystem);
} catch (Throwable e) {
throw new IOException("Error loading Hadoop FS wrapper", e);
}
return fs;
}
private static FileSystem instantiateFileSystem(String className) throws IOException {
FileSystem fs = null;
Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(className);
} catch (ClassNotFoundException e1) {
throw new IOException(StringUtils.stringifyException(e1));
Class<? extends FileSystem> fsClass = getFileSystemByName(className);
return fsClass.newInstance();
}
try {
fs = fsClass.newInstance();
catch (ClassNotFoundException e) {
throw new IOException("Could not load file system class '" + className + '\'', e);
}
catch (InstantiationException e) {
catch (InstantiationException | IllegalAccessException e) {
throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
}
catch (IllegalAccessException e) {
throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
}
return fs;
}
private static HadoopFileSystemWrapper hadoopWrapper;
......@@ -450,6 +437,7 @@ public abstract class FileSystem {
* @param replication
* required block replication for the file.
* @param blockSize
* the size of the file blocks
* @throws IOException
*/
public abstract FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
......@@ -723,52 +711,8 @@ public abstract class FileSystem {
*/
public abstract boolean isDistributedFS();
/**
* Returns the number of blocks this file/directory consists of
* assuming the file system's standard block size.
*
* @param file
* the file
* @return the number of block's the file/directory consists of
* @throws IOException
*/
public int getNumberOfBlocks(final FileStatus file) throws IOException {
int numberOfBlocks = 0;
if (file == null) {
return 0;
}
// For a file, this is easy
if (!file.isDir()) {
return getNumberOfBlocks(file.getLen(), file.getBlockSize());
}
// file is a directory
final FileStatus[] files = this.listStatus(file.getPath());
for (FileStatus file1 : files) {
if (!file1.isDir()) {
numberOfBlocks += getNumberOfBlocks(file1.getLen(), file1.getBlockSize());
}
}
return numberOfBlocks;
}
private int getNumberOfBlocks(final long length, final long blocksize) {
if (blocksize != 0) {
int numberOfBlocks;
numberOfBlocks = (int) (length / blocksize);
if ((length % blocksize) != 0) {
numberOfBlocks++;
}
return numberOfBlocks;
} else {
return 1;
}
private static Class<? extends FileSystem> getFileSystemByName(String className) throws ClassNotFoundException {
return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class);
}
}
......@@ -44,10 +44,10 @@ public interface CopyableValue<T> extends Value {
/**
* Performs a deep copy of this object into a new instance.
*
* This method is useful for generic user-defined functions to clone a
* {@link CopyableValue} when storing multiple objects. With object reuse
* a deep copy must be created and type erasure prevents calling new.
*
* This method is useful for generic user-defined functions to clone a
* {@link CopyableValue} when storing multiple objects. With object reuse
* a deep copy must be created and type erasure prevents calling new.
*
* @return New object with copied fields.
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.util;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.core.fs.FileSystem;
/**
* Utility class which provides various methods for dynamic class loading.
*/
public final class ClassUtils {
/**
* Private constructor used to overwrite public one.
*/
private ClassUtils() {}
/**
* Searches for a file system class by its name and attempts to load it.
*
* @param className
* the name of the file system class
* @return an instance of the file system class
* @throws ClassNotFoundException
* thrown if no class with such a name can be found
*/
public static Class<? extends FileSystem> getFileSystemByName(final String className) throws ClassNotFoundException {
return Class.forName(className, true, getClassLoader()).asSubclass(FileSystem.class);
}
private static ClassLoader getClassLoader() {
return ClassUtils.class.getClassLoader();
}
public static Class<?> resolveClassPrimitiveAware(String className) throws ClassNotFoundException {
if (className == null) {
throw new NullPointerException();
}
Class<?> primClass = PRIMITIVE_TYPES.get(className);
if (primClass != null) {
return primClass;
} else {
return Class.forName(className);
}
}
public static boolean isPrimitiveOrBoxedOrString(Class<?> clazz) {
return clazz != null && (clazz.isPrimitive() || ClassUtils.isBoxedTypeOrString(clazz));
}
public static boolean isBoxedTypeOrString(Class<?> clazz) {
return BOXED_TYPES.contains(clazz);
}
// --------------------------------------------------------------------------------------------
private static final Map<String, Class<?>> PRIMITIVE_TYPES = new HashMap<String, Class<?>>(9);
private static final Set<Class<?>> BOXED_TYPES = new HashSet<Class<?>>();
static {
PRIMITIVE_TYPES.put("byte", byte.class);
PRIMITIVE_TYPES.put("short", short.class);
PRIMITIVE_TYPES.put("int", int.class);
PRIMITIVE_TYPES.put("long", long.class);
PRIMITIVE_TYPES.put("float", float.class);
PRIMITIVE_TYPES.put("double", double.class);
PRIMITIVE_TYPES.put("boolean", boolean.class);
PRIMITIVE_TYPES.put("char", char.class);
PRIMITIVE_TYPES.put("void", void.class);
BOXED_TYPES.add(Byte.class);
BOXED_TYPES.add(Short.class);
BOXED_TYPES.add(Integer.class);
BOXED_TYPES.add(Long.class);
BOXED_TYPES.add(Float.class);
BOXED_TYPES.add(Double.class);
BOXED_TYPES.add(Boolean.class);
BOXED_TYPES.add(Character.class);
BOXED_TYPES.add(Void.class);
BOXED_TYPES.add(String.class);
}
}
......@@ -47,20 +47,12 @@ public final class InstantiationUtil {
*
*/
public static class ClassLoaderObjectInputStream extends ObjectInputStream {
private ClassLoader classLoader;
private static final HashMap<String, Class<?>> primitiveClasses
= new HashMap<String, Class<?>>(8, 1.0F);
static {
primitiveClasses.put("boolean", boolean.class);
primitiveClasses.put("byte", byte.class);
primitiveClasses.put("char", char.class);
primitiveClasses.put("short", short.class);
primitiveClasses.put("int", int.class);
primitiveClasses.put("long", long.class);
primitiveClasses.put("float", float.class);
primitiveClasses.put("double", double.class);
primitiveClasses.put("void", void.class);
private final ClassLoader classLoader;
public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
super(in);
this.classLoader = classLoader;
}
@Override
......@@ -84,10 +76,21 @@ public final class InstantiationUtil {
return super.resolveClass(desc);
}
// ------------------------------------------------
public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
super(in);
this.classLoader = classLoader;
private static final HashMap<String, Class<?>> primitiveClasses = new HashMap<>(9);
static {
primitiveClasses.put("boolean", boolean.class);
primitiveClasses.put("byte", byte.class);
primitiveClasses.put("char", char.class);
primitiveClasses.put("short", short.class);
primitiveClasses.put("int", int.class);
primitiveClasses.put("long", long.class);
primitiveClasses.put("float", float.class);
primitiveClasses.put("double", double.class);
primitiveClasses.put("void", void.class);
}
}
......@@ -293,28 +296,22 @@ public final class InstantiationUtil {
@SuppressWarnings("unchecked")
public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
ObjectInputStream oois = null;
final ClassLoader old = Thread.currentThread().getContextClassLoader();
try {
try (ObjectInputStream oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl)) {
Thread.currentThread().setContextClassLoader(cl);
oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl);
return (T) oois.readObject();
} finally {
}
finally {
Thread.currentThread().setContextClassLoader(old);
if (oois != null) {
oois.close();
}
}
}
public static byte[] serializeObject(Object o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(o);
return baos.toByteArray();
}
return baos.toByteArray();
}
// --------------------------------------------------------------------------------------------
......
......@@ -19,13 +19,9 @@
package org.apache.flink.util;
/**
* An enumeration indicating the operating system that the engine runs on.
* An enumeration indicating the operating system that the JVM runs on.
*/
public enum OperatingSystem {
// --------------------------------------------------------------------------------------------
// Constants to extract the OS type from the java environment
// --------------------------------------------------------------------------------------------
LINUX,
WINDOWS,
......@@ -33,9 +29,7 @@ public enum OperatingSystem {
FREE_BSD,
UNKNOWN;
// --------------------------------------------------------------------------------------------
// Constants to extract the OS type from the java environment
// --------------------------------------------------------------------------------------------
// ------------------------------------------------------------------------
/**
* Gets the operating system that the JVM runs on from the java system properties.
......@@ -117,8 +111,6 @@ public enum OperatingSystem {
return UNKNOWN;
}
// --------------------------------------------------------------------------------------------
// Constants to extract the OS type from the java environment
// --------------------------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册