diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java index ef8323b3bb85afbdb0f3fbae2c2db50b26891e72..780001a873ba7e18522a1cb09f11a83c3c748345 100644 --- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java +++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java @@ -98,15 +98,21 @@ public class JDBCOutputFormat implements OutputFormat { @SuppressWarnings("unchecked") Class[] classes = new Class[this.fieldCount]; this.fieldClasses = classes; + + ClassLoader cl = getClass().getClassLoader(); - for (int i = 0; i < this.fieldCount; i++) { - @SuppressWarnings("unchecked") - Class clazz = (Class) parameters.getClass(FIELD_TYPE_KEY + i, null); - if (clazz == null) { - throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: " - + "No type class for parameter " + i); + try { + for (int i = 0; i < this.fieldCount; i++) { + Class clazz = parameters.getClass(FIELD_TYPE_KEY + i, null, cl); + if (clazz == null) { + throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: " + + "No type class for parameter " + i); + } + this.fieldClasses[i] = clazz; } - this.fieldClasses[i] = clazz; + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load data type classes.", e); } } diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java index 38294f6cce3234c44923e5376e4e354deddfbd8a..780bc94d6734c5688189ca349180be2f64bfe5f3 100644 --- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java +++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java @@ -189,16 +189,20 @@ public class SpargelIteration { public void open(Configuration parameters) throws Exception { // instantiate only the first time if (vertexUpdateFunction == null) { - Class vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class); - Class vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class); - Class messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class); + ClassLoader cl = getRuntimeContext().getUserCodeClassLoader(); + + Class vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl); + Class vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl); + Class messageClass = parameters.getClass(MESSAGE_PARAM, null, cl); vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class); vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class); messageIter = new MessageIterator(InstantiationUtil.instantiate(messageClass, Value.class)); + ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader(); + try { - this.vertexUpdateFunction = (VertexUpdateFunction) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader()); + this.vertexUpdateFunction = (VertexUpdateFunction) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl); } catch (Exception e) { String message = e.getMessage() == null ? "." : ": " + e.getMessage(); throw new Exception("Could not instantiate VertexUpdateFunction" + message, e); @@ -248,10 +252,12 @@ public class SpargelIteration { public void open(Configuration parameters) throws Exception { // instantiate only the first time if (messagingFunction == null) { - Class vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class); - Class vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class); + ClassLoader cl = getRuntimeContext().getUserCodeClassLoader(); + + Class vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl); + Class vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl); // Class messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class); - Class edgeClass = parameters.getClass(EDGE_PARAM, null, Value.class); + Class edgeClass = parameters.getClass(EDGE_PARAM, null, cl); vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class); vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class); @@ -259,8 +265,10 @@ public class SpargelIteration { K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class); E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class); + ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader(); + try { - this.messagingFunction = (MessagingFunction) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader()); + this.messagingFunction = (MessagingFunction) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl); } catch (Exception e) { String message = e.getMessage() == null ? "." : ": " + e.getMessage(); throw new Exception("Could not instantiate MessagingFunction" + message, e); diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 42c1adf86073ab330ef27309049235dc230e57a7..f1913d68a8f32fc01cb62e7802632f509e3f68df 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -309,9 +309,13 @@ public class StreamConfig { config.setClass("functionClass", functionClass); } - @SuppressWarnings("unchecked") - public Class getFunctionClass() { - return (Class) config.getClass("functionClass", null); + public Class getFunctionClass(ClassLoader cl) { + try { + return config.getClass("functionClass", null, cl); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load function class", e); + } } @SuppressWarnings("unchecked") diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java index a680f4d18e9afc3ef8f35875800f6584e1f1e259..9d0d526540bd8eb960a3299a06e0143f2622692f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java @@ -29,6 +29,7 @@ import java.net.MalformedURLException; import java.util.Map; import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; public class CliFrontendTestUtils { @@ -86,16 +87,19 @@ public class CliFrontendTestUtils { public static void clearGlobalConfiguration() { try { - Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("configuration"); - Field confDataMapField = GlobalConfiguration.class.getDeclaredField("confData"); + Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("SINGLETON"); + Field conf = GlobalConfiguration.class.getDeclaredField("config"); + Field map = Configuration.class.getDeclaredField("confData"); singletonInstanceField.setAccessible(true); - confDataMapField.setAccessible(true); + conf.setAccessible(true); + map.setAccessible(true); GlobalConfiguration gconf = (GlobalConfiguration) singletonInstanceField.get(null); if (gconf != null) { + Configuration confObject = (Configuration) conf.get(gconf); @SuppressWarnings("unchecked") - Map confData = (Map) confDataMapField.get(gconf); + Map confData = (Map) map.get(confObject); confData.clear(); } } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index 744575928b68bfc004b5baaad5c33d25fd7e7ac8..0ff0aee02faaac2a40ee11b244b29c9dcfe3737d 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -94,7 +94,6 @@ import org.apache.flink.compiler.plan.WorksetIterationPlanNode; import org.apache.flink.compiler.plan.WorksetPlanNode; import org.apache.flink.compiler.postpass.OptimizerPostPass; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -410,11 +409,10 @@ public class PactCompiler { this.statistics = stats; this.costEstimator = estimator; - Configuration config = GlobalConfiguration.getConfiguration(); - // determine the default parallelization degree - this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, + this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE); + if (defaultDegreeOfParallelism < 1) { LOG.warn("Config value " + defaultDegreeOfParallelism + " for option " + ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1."); diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java index 6742581bc2828c20d1d08f2eb55b8f207d0008ff..3de4e03a373270389c9441af9e0d78f092dd2a92 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java @@ -109,7 +109,6 @@ public class DataSourceNode extends OptimizerNode { try { format = getPactContract().getFormatWrapper().getUserCodeObject(); Configuration config = getPactContract().getParameters(); - config.setClassLoader(format.getClass().getClassLoader()); format.configure(config); } catch (Throwable t) { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 63457faa0307e98e6ea22363ddab6d956b51b916..5f6aaa5d13dfde056a1ce4e639f5d39a63b2fee6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -60,6 +60,14 @@ public interface RuntimeContext { */ int getIndexOfThisSubtask(); + /** + * Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the + * jar file of a user job. + * + * @return The ClassLoader for user code classes. + */ + ClassLoader getUserCodeClassLoader(); + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index a6f2cca48e42a87fd30132c563baff581ace2e0d..e798609b17992bbc6aaae540ca4e1f9f2db29056 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -16,140 +16,86 @@ * limitations under the License. */ - package org.apache.flink.configuration; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.io.StringRecord; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; - -import com.google.common.io.BaseEncoding; +import org.apache.flink.types.StringValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Lightweight configuration object which can store key/value pairs. Configuration objects - * can be extracted from or integrated into the {@link GlobalConfiguration} object. They can - * be transported via Nephele's IPC system to distribute configuration data at runtime. - * This class is thread-safe. - * + * Lightweight configuration object which can store key/value pairs. */ public class Configuration implements IOReadableWritable, java.io.Serializable { private static final long serialVersionUID = 1L; - - /** - * Stores the concrete key/value pairs of this configuration object. - */ - private final Map confData = new HashMap(); - - /** - * The class loader to be used for the getClass method. - */ - private transient ClassLoader classLoader; - - /** - * Constructs a new configuration object. - */ - public Configuration() { - this.classLoader = this.getClass().getClassLoader(); - } - - /** - * Constructs a new configuration object. - * - * @param classLoader - * the class loader to be use for the getClass method - */ - public Configuration(final ClassLoader classLoader) { - this.classLoader = classLoader; - } + private static final byte TYPE_STRING = 0; + private static final byte TYPE_INT = 1; + private static final byte TYPE_LONG = 2; + private static final byte TYPE_BOOLEAN = 3; + private static final byte TYPE_FLOAT = 4; + private static final byte TYPE_DOUBLE = 5; + private static final byte TYPE_BYTES = 6; + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(Configuration.class); - /** - * @return the class loader that knows where to locate user classes - */ - public ClassLoader getClassLoader() { - return this.classLoader; - } - - /** - * Sets the class loader that knows where to locate user classes - * - * @param classLoader - * the class loader to be use for the getClass method - */ - public void setClassLoader(ClassLoader classLoader) { - this.classLoader = classLoader; - } + /** Stores the concrete key/value pairs of this configuration object. */ + private final Map confData = new HashMap(); + // -------------------------------------------------------------------------------------------- - + + public Configuration() {} + + // -------------------------------------------------------------------------------------------- + /** * Returns the class associated with the given key as a string. * - * @param - * the ancestor of both the default value and the potential value - * @param key - * the key pointing to the associated value - * @param defaultValue - * the optional default value returned if no entry exists - * @param ancestor - * the ancestor of both the default value and the potential value - * @return the (default) value associated with the given key - * @throws IllegalStateException - * if the class identified by the associated value cannot be resolved - * @see #setClass(String, Class) + * @param The type of the class to return. + + * @param key The key pointing to the associated value + * @param defaultValue The optional default value returned if no entry exists + * @param classLoader The class loader used to resolve the class. + * + * @return The value associated with the given key, or the default value, if to entry for the key exists. */ @SuppressWarnings("unchecked") - public Class getClass(String key, Class defaultValue, Class ancestor) { - String className = getStringInternal(key); - if (className == null) { + public Class getClass(String key, Class defaultValue, ClassLoader classLoader) throws ClassNotFoundException { + Object o = getRawValue(key); + if (o == null) { return (Class) defaultValue; } - - try { - return (Class) Class.forName(className, true, this.classLoader); - } catch (ClassNotFoundException e) { - throw new IllegalStateException(e); + + if (o.getClass() == String.class) { + return (Class) Class.forName((String) o, true, classLoader); } - } - - /** - * Returns the class associated with the given key as a string. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - * @throws IllegalStateException - * if the class identified by the associated value cannot be resolved - * @see #setClass(String, Class) - */ - public Class getClass(String key, Class defaultValue) { - return getClass(key, defaultValue, Object.class); + + LOG.warn("Configuration cannot evaluate value " + o + " as a class name"); + return (Class) defaultValue; } /** * Adds the given key/value pair to the configuration object. The class can be retrieved by invoking - * {@link #getClass(String, Class, Class)} if it is in the scope of the class loader on the caller. + * {@link #getClass(String, Class, ClassLoader)} if it is in the scope of the class loader on the caller. * - * @param key - * the key of the pair to be added - * @param klazz - * the value of the pair to be added - * @see #getClass(String, Class) - * @see #getClass(String, Class, Class) + * @param key The key of the pair to be added + * @param klazz The value of the pair to be added + * @see #getClass(String, Class, ClassLoader) */ public void setClass(String key, Class klazz) { - setStringInternal(key, klazz.getName()); + setValueInternal(key, klazz.getName()); } /** @@ -162,8 +108,12 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the (default) value associated with the given key */ public String getString(String key, String defaultValue) { - String val = getStringInternal(key); - return val == null ? defaultValue : val; + Object o = getRawValue(key); + if (o == null) { + return defaultValue; + } else { + return o.toString(); + } } /** @@ -175,7 +125,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * the value of the key/value pair to be added */ public void setString(String key, String value) { - setStringInternal(key, value); + setValueInternal(key, value); } /** @@ -188,11 +138,31 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the (default) value associated with the given key */ public int getInteger(String key, int defaultValue) { - String val = getStringInternal(key); - if (val == null) { + Object o = getRawValue(key); + if (o == null) { return defaultValue; - } else { - return Integer.parseInt(val); + } + + if (o.getClass() == Integer.class) { + return (Integer) o; + } + else if (o.getClass() == Long.class) { + long value = ((Long) o).longValue(); + if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) { + return (int) value; + } else { + LOG.warn("Configuation value " + value + " overflows/underflows the integer type."); + return defaultValue; + } + } + else { + try { + return Integer.parseInt(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value " + o + " as an integer number"); + return defaultValue; + } } } @@ -205,7 +175,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * the value of the key/value pair to be added */ public void setInteger(String key, int value) { - setStringInternal(key, Integer.toString(value)); + setValueInternal(key, value); } /** @@ -218,11 +188,25 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the (default) value associated with the given key */ public long getLong(String key, long defaultValue) { - String val = getStringInternal(key); - if (val == null) { + Object o = getRawValue(key); + if (o == null) { return defaultValue; - } else { - return Long.parseLong(val); + } + + if (o.getClass() == Long.class) { + return (Long) o; + } + else if (o.getClass() == Integer.class) { + return ((Integer) o).longValue(); + } + else { + try { + return Long.parseLong(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number"); + return defaultValue; + } } } @@ -235,7 +219,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * the value of the key/value pair to be added */ public void setLong(String key, long value) { - setStringInternal(key, Long.toString(value)); + setValueInternal(key, value); } /** @@ -248,11 +232,16 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the (default) value associated with the given key */ public boolean getBoolean(String key, boolean defaultValue) { - String val = getStringInternal(key); - if (val == null) { + Object o = getRawValue(key); + if (o == null) { return defaultValue; - } else { - return Boolean.parseBoolean(val); + } + + if (o.getClass() == Boolean.class) { + return (Boolean) o; + } + else { + return Boolean.parseBoolean(o.toString()); } } @@ -264,8 +253,8 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @param value * the value of the key/value pair to be added */ - public void setBoolean(final String key, final boolean value) { - setStringInternal(key, Boolean.toString(value)); + public void setBoolean(String key, boolean value) { + setValueInternal(key, value); } /** @@ -277,12 +266,32 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * the default value which is returned in case there is no value associated with the given key * @return the (default) value associated with the given key */ - public float getFloat(final String key, final float defaultValue) { - String val = getStringInternal(key); - if (val == null) { + public float getFloat(String key, float defaultValue) { + Object o = getRawValue(key); + if (o == null) { return defaultValue; - } else { - return Float.parseFloat(val); + } + + if (o.getClass() == Float.class) { + return (Float) o; + } + else if (o.getClass() == Double.class) { + double value = ((Double) o); + if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) { + return (float) value; + } else { + LOG.warn("Configuation value " + value + " overflows/underflows the float type."); + return defaultValue; + } + } + else { + try { + return Float.parseFloat(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value " + o + " as a float value"); + return defaultValue; + } } } @@ -295,7 +304,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * the value of the key/value pair to be added */ public void setFloat(String key, float value) { - setStringInternal(key, Float.toString(value)); + setValueInternal(key, value); } /** @@ -308,11 +317,25 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the (default) value associated with the given key */ public double getDouble(String key, double defaultValue) { - String val = getStringInternal(key); - if (val == null) { + Object o = getRawValue(key); + if (o == null) { return defaultValue; - } else { - return Double.parseDouble(val); + } + + if (o.getClass() == Double.class) { + return (Double) o; + } + else if (o.getClass() == Float.class) { + return ((Float) o).doubleValue(); + } + else { + try { + return Double.parseDouble(o.toString()); + } + catch (NumberFormatException e) { + LOG.warn("Configuration cannot evaluate value " + o + " as a double value"); + return defaultValue; + } } } @@ -325,7 +348,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * the value of the key/value pair to be added */ public void setDouble(String key, double value) { - setStringInternal(key, Double.toString(value)); + setValueInternal(key, value); } /** @@ -338,11 +361,17 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the (default) value associated with the given key. */ public byte[] getBytes(String key, byte[] defaultValue) { - final String encoded = getStringInternal(key); - if (encoded == null) { + + Object o = getRawValue(key); + if (o == null) { + return defaultValue; + } + else if (o.getClass() == byte[].class) { + return (byte[]) o; + } + else { + LOG.warn("Configuration cannot evaluate value " + o + " as a byte[] value"); return defaultValue; - } else { - return BaseEncoding.base64().decode(encoded); } } @@ -355,10 +384,11 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * The bytes to be added. */ public void setBytes(String key, byte[] bytes) { - final String encoded = BaseEncoding.base64().encode(bytes); - setStringInternal(key, encoded); + setValueInternal(key, bytes); } + // -------------------------------------------------------------------------------------------- + /** * Returns the keys of all key/value pairs stored inside this * configuration object. @@ -366,19 +396,9 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { * @return the keys of all key/value pairs stored inside this configuration object */ public Set keySet() { - - // Copy key set, so return value is independent from the object's internal data structure - final Set retVal = new HashSet(); - synchronized (this.confData) { - - final Iterator it = this.confData.keySet().iterator(); - while (it.hasNext()) { - retVal.add(it.next()); - } + return new HashSet(this.confData.keySet()); } - - return retVal; } public void addAll(Configuration other) { @@ -405,7 +425,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { synchronized (this.confData) { synchronized (other.confData) { - for (Map.Entry entry : other.confData.entrySet()) { + for (Map.Entry entry : other.confData.entrySet()) { bld.setLength(pl); bld.append(entry.getKey()); this.confData.put(bld.toString(), entry.getValue()); @@ -428,93 +448,164 @@ public class Configuration implements IOReadableWritable, java.io.Serializable { // -------------------------------------------------------------------------------------------- - private String getStringInternal(String key) { + private void setValueInternal(String key, T value) { if (key == null) { throw new NullPointerException("Key must not be null."); } + if (value == null) { + throw new NullPointerException("Value must not be null."); + } synchronized (this.confData) { - return this.confData.get(key); + this.confData.put(key, value); } } - private void setStringInternal(String key, String value) { + private Object getRawValue(String key) { if (key == null) { throw new NullPointerException("Key must not be null."); } - if (value == null) { - throw new NullPointerException("Value must not be null."); - } - synchronized (this.confData) { - this.confData.put(key, value); + return this.confData.get(key); } } // -------------------------------------------------------------------------------------------- @Override - public void read(final DataInputView in) throws IOException { - + public void read(DataInputView in) throws IOException { synchronized (this.confData) { - final int numberOfProperties = in.readInt(); for (int i = 0; i < numberOfProperties; i++) { - final String key = StringRecord.readString(in); - final String value = StringRecord.readString(in); + String key = StringValue.readString(in); + Object value; + + byte type = in.readByte(); + switch (type) { + case TYPE_STRING: + value = StringValue.readString(in); + break; + case TYPE_INT: + value = in.readInt(); + break; + case TYPE_LONG: + value = in.readLong(); + break; + case TYPE_FLOAT: + value = in.readFloat(); + break; + case TYPE_DOUBLE: + value = in.readDouble(); + break; + case TYPE_BOOLEAN: + value = in.readBoolean(); + break; + case TYPE_BYTES: + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + value = bytes; + break; + default: + throw new IOException("Unrecognized type: " + type); + } + this.confData.put(key, value); } } } - @Override public void write(final DataOutputView out) throws IOException { - synchronized (this.confData) { - out.writeInt(this.confData.size()); - - final Iterator it = this.confData.keySet().iterator(); - while (it.hasNext()) { - final String key = it.next(); - final String value = this.confData.get(key); - StringRecord.writeString(out, key); - StringRecord.writeString(out, value); + + for (Map.Entry entry : this.confData.entrySet()) { + String key = entry.getKey(); + Object val = entry.getValue(); + + StringValue.writeString(key, out); + Class clazz = val.getClass(); + + if (clazz == String.class) { + out.write(TYPE_STRING); + StringValue.writeString((String) val, out); + } + else if (clazz == Integer.class) { + out.write(TYPE_INT); + out.writeInt((Integer) val); + } + else if (clazz == Long.class) { + out.write(TYPE_LONG); + out.writeLong((Long) val); + } + else if (clazz == Float.class) { + out.write(TYPE_FLOAT); + out.writeFloat((Float) val); + } + else if (clazz == Double.class) { + out.write(TYPE_DOUBLE); + out.writeDouble((Double) val); + } + else if (clazz == byte[].class) { + out.write(TYPE_BYTES); + byte[] bytes = (byte[]) val; + out.writeInt(bytes.length); + out.write(bytes); + } + else if (clazz == Boolean.class) { + out.write(TYPE_BOOLEAN); + out.writeBoolean((Boolean) val); + } + else { + throw new IllegalArgumentException("Unrecognized type"); + } } } } - private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - this.classLoader = getClass().getClassLoader(); - } - // -------------------------------------------------------------------------------------------- @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + confData.hashCode(); - return result; + int hash = 0; + for (String s : this.confData.keySet()) { + hash ^= s.hashCode(); + } + return hash; } @Override - public boolean equals(final Object obj) { + public boolean equals(Object obj) { if (this == obj) { return true; } - if (obj == null) { - return false; + else if (obj instanceof Configuration) { + Map otherConf = ((Configuration) obj).confData; + + for (Map.Entry e : this.confData.entrySet()) { + Object thisVal = e.getValue(); + Object otherVal = otherConf.get(e.getKey()); + + if (thisVal.getClass() != byte[].class) { + if (!thisVal.equals(otherVal)) { + return false; + } + } else if (otherVal.getClass() == byte[].class) { + if (!Arrays.equals((byte[]) thisVal, (byte[]) otherVal)) { + return false; + } + } else { + return false; + } + } + + return true; } - if (getClass() != obj.getClass()) { + else { return false; } - final Configuration other = (Configuration) obj; - return confData.equals(other.confData); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 23846ca5846df423284dc5b03412b4b902a78dec..e57aa6cde76b81904435ed8b41ead27cceed4cca 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.configuration; import java.io.BufferedReader; @@ -25,119 +24,68 @@ import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.util.StringUtils; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.w3c.dom.Text; -import org.xml.sax.SAXException; /** - * Global configuration object in Nephele. Similar to Java properties configuration + * Global configuration object for Flink. Similar to Java properties configuration * objects it includes key-value pairs which represent the framework's configuration. - *

- * This class is thread-safe. */ public final class GlobalConfiguration { - /** - * The log object used for debugging. - */ + /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class); - /** - * The global configuration object accessible through a singleton pattern. - */ - private static GlobalConfiguration configuration = null; - - /** - * The key to the directory this configuration was read from. - */ - private static final String CONFIGDIRKEY = "config.dir"; + /** The global configuration object accessible through a singleton pattern. */ + private static GlobalConfiguration SINGLETON = null; - /** - * The internal map holding the key-value pairs the configuration consists of. - */ - private final Map confData = new HashMap(); + /** The internal map holding the key-value pairs the configuration consists of. */ + private final Configuration config = new Configuration(); + // -------------------------------------------------------------------------------------------- + /** * Retrieves the singleton object of the global configuration. * * @return the global configuration object */ - private static synchronized GlobalConfiguration get() { - - if (configuration == null) { - configuration = new GlobalConfiguration(); + private static GlobalConfiguration get() { + // lazy initialization currently only for testibility + synchronized (GlobalConfiguration.class) { + if (SINGLETON == null) { + SINGLETON = new GlobalConfiguration(); + } + return SINGLETON; } - - return configuration; } /** * The constructor used to construct the singleton instance of the global configuration. */ - private GlobalConfiguration() { - } - - /** - * Returns the value associated with the given key as a string. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - public static String getString(final String key, final String defaultValue) { - - return get().getStringInternal(key, defaultValue); - } + private GlobalConfiguration() {} + // -------------------------------------------------------------------------------------------- + /** * Returns the value associated with the given key as a string. * * @param key - * key the key pointing to the associated value - * @param defaultValue - * defaultValue the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - private String getStringInternal(final String key, final String defaultValue) { - - synchronized (this.confData) { - - if (!this.confData.containsKey(key)) { - return defaultValue; - } - - return this.confData.get(key); - } - } - - /** - * Returns the value associated with the given key as a long integer. - * - * @param key * the key pointing to the associated value * @param defaultValue * the default value which is returned in case there is no value associated with the given key * @return the (default) value associated with the given key */ - public static long getLong(final String key, final long defaultValue) { - - return get().getLongInternal(key, defaultValue); + public static String getString(String key, String defaultValue) { + return get().config.getString(key, defaultValue); } /** @@ -149,39 +97,8 @@ public final class GlobalConfiguration { * the default value which is returned in case there is no value associated with the given key * @return the (default) value associated with the given key */ - private long getLongInternal(final String key, final long defaultValue) { - - long retVal = defaultValue; - - try { - synchronized (this.confData) { - - if (this.confData.containsKey(key)) { - retVal = Long.parseLong(this.confData.get(key)); - } - } - } catch (NumberFormatException e) { - - if (LOG.isDebugEnabled()) { - LOG.debug(StringUtils.stringifyException(e)); - } - } - - return retVal; - } - - /** - * Returns the value associated with the given key as an integer. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - public static int getInteger(final String key, final int defaultValue) { - - return get().getIntegerInternal(key, defaultValue); + public static long getLong(String key, long defaultValue) { + return get().config.getLong(key, defaultValue); } /** @@ -193,25 +110,8 @@ public final class GlobalConfiguration { * the default value which is returned in case there is no value associated with the given key * @return the (default) value associated with the given key */ - private int getIntegerInternal(final String key, final int defaultValue) { - - int retVal = defaultValue; - - try { - synchronized (this.confData) { - - if (this.confData.containsKey(key)) { - retVal = Integer.parseInt(this.confData.get(key)); - } - } - } catch (NumberFormatException e) { - - if (LOG.isDebugEnabled()) { - LOG.debug(StringUtils.stringifyException(e)); - } - } - - return retVal; + public static int getInteger(String key, int defaultValue) { + return get().config.getInteger(key, defaultValue); } /** @@ -224,38 +124,7 @@ public final class GlobalConfiguration { * @return the (default) value associated with the given key */ public static float getFloat(String key, float defaultValue) { - - return get().getFloatInternal(key, defaultValue); - } - - /** - * Returns the value associated with the given key as an integer. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - private float getFloatInternal(String key, float defaultValue) { - - float retVal = defaultValue; - - try { - synchronized (this.confData) { - - if (this.confData.containsKey(key)) { - retVal = Float.parseFloat(this.confData.get(key)); - } - } - } catch (NumberFormatException e) { - - if (LOG.isDebugEnabled()) { - LOG.debug(StringUtils.stringifyException(e)); - } - } - - return retVal; + return get().config.getFloat(key, defaultValue); } /** @@ -267,33 +136,8 @@ public final class GlobalConfiguration { * the default value which is returned in case there is no value associated with the given key * @return the (default) value associated with the given key */ - public static boolean getBoolean(final String key, final boolean defaultValue) { - - return get().getBooleanInternal(key, defaultValue); - } - - /** - * Returns the value associated with the given key as a boolean. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - private boolean getBooleanInternal(final String key, final boolean defaultValue) { - - boolean retVal = defaultValue; - - synchronized (this.confData) { - - final String value = this.confData.get(key); - if (value != null) { - retVal = Boolean.parseBoolean(value); - } - } - - return retVal; + public static boolean getBoolean(String key, boolean defaultValue) { + return get().config.getBoolean(key, defaultValue); } /** @@ -319,7 +163,7 @@ public final class GlobalConfiguration { return; } - if(confDirFile.isFile()) { + if (confDirFile.isFile()) { final File file = new File(configDir); if(configDir.endsWith(".xml")) { get().loadXMLResource( file ); @@ -329,7 +173,6 @@ public final class GlobalConfiguration { LOG.warn("The given configuration has an unknown extension."); return; } - configuration.confData.put(CONFIGDIRKEY, file.getAbsolutePath() ); return; } @@ -352,11 +195,6 @@ public final class GlobalConfiguration { for (File f : yamlFiles) { get().loadYAMLResource(f); } - - // Store the path to the configuration directory itself - if (configuration != null) { - configuration.confData.put(CONFIGDIRKEY, configDir); - } } /** @@ -379,52 +217,57 @@ public final class GlobalConfiguration { * @param file the YAML file to read from * @see YAML 1.2 specification */ - private void loadYAMLResource(final File file) { - - BufferedReader reader = null; - try { - reader = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + private void loadYAMLResource(File file) { - String line = null; - while ((line = reader.readLine()) != null) { - - // 1. check for comments - String[] comments = line.split("#", 2); - String conf = comments[0]; - - // 2. get key and value - if (conf.length() > 0) { - String[] kv = conf.split(": ", 2); - - // skip line with no valid key-value pair - if (kv.length == 1) { - LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line); - continue; - } + synchronized (getClass()) { - String key = kv[0].trim(); - String value = kv[1].trim(); - - // sanity check - if (key.length() == 0 || value.length() == 0) { - LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line); - continue; + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(file))); + + String line = null; + while ((line = reader.readLine()) != null) { + + // 1. check for comments + String[] comments = line.split("#", 2); + String conf = comments[0]; + + // 2. get key and value + if (conf.length() > 0) { + String[] kv = conf.split(": ", 2); + + // skip line with no valid key-value pair + if (kv.length == 1) { + LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line); + continue; + } + + String key = kv[0].trim(); + String value = kv[1].trim(); + + // sanity check + if (key.length() == 0 || value.length() == 0) { + LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line); + continue; + } + + LOG.debug("Loading configuration property: {}, {}", key, value); + + this.config.setString(key, value); } - - LOG.debug("Loading configuration property: {}, {}", key, value); - - this.confData.put(key, value); } } - } catch (IOException e) { - e.printStackTrace(); - } finally { - try { - if(reader != null) { - reader.close(); + catch (IOException e) { + LOG.error("Error parsing YAML configuration.", e); + } + finally { + try { + if(reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.warn("Cannot to close reader with IOException.", e); } - } catch (IOException e) { - LOG.warn("Cannot to close reader with IOException.", e); } } } @@ -435,7 +278,7 @@ public final class GlobalConfiguration { * @param file * the XML document file */ - private void loadXMLResource(final File file) { + private void loadXMLResource(File file) { final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); // Ignore comments in the XML file @@ -468,7 +311,7 @@ public final class GlobalConfiguration { final NodeList props = root.getChildNodes(); int propNumber = -1; - synchronized (this.confData) { + synchronized (getClass()) { for (int i = 0; i < props.getLength(); i++) { @@ -528,85 +371,28 @@ public final class GlobalConfiguration { if (key != null && value != null) { // Put key, value pair into the map LOG.debug("Loading configuration property: {}, {}", key, value); - this.confData.put(key, value); + this.config.setString(key, value); } else { LOG.warn("Error while reading configuration: Cannot read property " + propNumber); } } } - } catch (ParserConfigurationException e) { - LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e)); - } catch (IOException e) { - LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e)); - } catch (SAXException e) { - LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e)); + } + catch (Exception e) { + LOG.error("Cannot load configuration.", e); } } /** - * Copies the key/value pairs stored in the global configuration to - * a {@link Configuration} object and returns it. + * Gets a {@link Configuration} object with the values of this GlobalConfiguration * * @return the {@link Configuration} object including the key/value pairs */ public static Configuration getConfiguration() { - - return get().getConfigurationInternal(null); - } - - /** - * Copies a subset of the key/value pairs stored in the global configuration to - * a {@link Configuration} object and returns it. The subset is defined by the - * given array of keys. If keys is null, the entire - * global configuration is copied. - * - * @param keys - * array of keys specifying the subset of pairs to copy. - * @return the {@link Configuration} object including the key/value pairs - */ - public static Configuration getConfiguration(final String[] keys) { - - return get().getConfigurationInternal(keys); - } - - /** - * Internal non-static method to return configuration. - * - * @param keys - * array of keys specifying the subset of pairs to copy. - * @return the {@link Configuration} object including the key/value pairs - */ - private Configuration getConfigurationInternal(final String[] keys) { - - Configuration conf = new Configuration(); - - synchronized (this.confData) { - - final Iterator it = this.confData.keySet().iterator(); - - while (it.hasNext()) { - - final String key = it.next(); - boolean found = false; - if (keys != null) { - for (int i = 0; i < keys.length; i++) { - if (key.equals(keys[i])) { - found = true; - break; - } - } - - if (found) { - conf.setString(key, this.confData.get(key)); - } - } else { - conf.setString(key, this.confData.get(key)); - } - } - } - - return conf; + Configuration copy = new Configuration(); + copy.addAll(get().config); + return copy; } /** @@ -618,8 +404,7 @@ public final class GlobalConfiguration { * @param conf * the {@link Configuration} object to merge into the global configuration */ - public static void includeConfiguration(final Configuration conf) { - + public static void includeConfiguration(Configuration conf) { get().includeConfigurationInternal(conf); } @@ -629,25 +414,15 @@ public final class GlobalConfiguration { * @param conf * the {@link Configuration} object to merge into the global configuration */ - private void includeConfigurationInternal(final Configuration conf) { - - if (conf == null) { - LOG.error("Given configuration object is null, ignoring it..."); - return; - } - - synchronized (this.confData) { - - final Iterator it = conf.keySet().iterator(); - - while (it.hasNext()) { - - final String key = it.next(); - this.confData.put(key, conf.getString(key, "")); - } + private void includeConfigurationInternal(Configuration conf) { + // static synchronized + synchronized (getClass()) { + this.config.addAll(conf); } } + // -------------------------------------------------------------------------------------------- + /** * Filters files in directory which have the specified suffix (e.g. ".xml"). * diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index c30e488baf29d40b72c2483a9f818c91b14e41a5..e1318927d0262fa13eee47e849c02ed91126e6bb 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -16,15 +16,13 @@ * limitations under the License. */ - package org.apache.flink.configuration; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.IOException; - import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; @@ -34,31 +32,146 @@ import org.junit.Test; * objects is tested. */ public class ConfigurationTest { + + private static final byte[] EMPTY_BYTES = new byte[0]; + private static final long TOO_LONG = Integer.MAX_VALUE + 10L; + private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE; + /** * This test checks the serialization/deserialization of configuration objects. */ @Test - public void testConfigurationSerialization() { - - // First, create initial configuration object with some parameters - final Configuration orig = new Configuration(); - orig.setString("mykey", "myvalue"); - orig.setBoolean("shouldbetrue", true); - orig.setInteger("mynumber", 100); - orig.setClass("myclass", this.getClass()); - + public void testConfigurationSerializationAndGetters() { try { + final Configuration orig = new Configuration(); + orig.setString("mykey", "myvalue"); + orig.setInteger("mynumber", 100); + orig.setLong("longvalue", 478236947162389746L); + orig.setFloat("PI", 3.1415926f); + orig.setDouble("E", Math.E); + orig.setBoolean("shouldbetrue", true); + orig.setBytes("bytes sequence", new byte[] { 1, 2, 3, 4, 5 } ); + orig.setClass("myclass", this.getClass()); + final Configuration copy = (Configuration) CommonTestUtils.createCopy(orig); + assertEquals("myvalue", copy.getString("mykey", "null")); + assertEquals(100, copy.getInteger("mynumber", 0)); + assertEquals(478236947162389746L, copy.getLong("longvalue", 0L)); + assertEquals(3.1415926f, copy.getFloat("PI", 3.1415926f), 0.0); + assertEquals(Math.E, copy.getDouble("E", 0.0), 0.0); + assertEquals(true, copy.getBoolean("shouldbetrue", false)); + assertArrayEquals(new byte[] { 1, 2, 3, 4, 5 }, copy.getBytes("bytes sequence", null)); + assertEquals(getClass(), copy.getClass("myclass", null, getClass().getClassLoader())); + + assertEquals(orig, copy); + assertEquals(orig.keySet(), copy.keySet()); + assertEquals(orig.hashCode(), copy.hashCode()); - assertEquals(copy.getString("mykey", "null"), "myvalue"); - assertEquals(copy.getBoolean("shouldbetrue", false), true); - assertEquals(copy.getInteger("mynumber", 0), 100); - assertEquals(copy.getClass("myclass", null).toString(), this.getClass().toString()); - assertTrue(orig.equals(copy)); - assertTrue(orig.keySet().equals(copy.keySet())); - - } catch (IOException e) { + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConversions() { + try { + Configuration pc = new Configuration(); + + pc.setInteger("int", 5); + pc.setLong("long", 15); + pc.setLong("too_long", TOO_LONG); + pc.setFloat("float", 2.1456775f); + pc.setDouble("double", Math.PI); + pc.setDouble("too_long_double", TOO_LONG_DOUBLE); + pc.setString("string", "42"); + pc.setString("non_convertible_string", "bcdefg&&"); + pc.setBoolean("boolean", true); + + // as integer + assertEquals(5, pc.getInteger("int", 0)); + assertEquals(5L, pc.getLong("int", 0)); + assertEquals(5f, pc.getFloat("int", 0), 0.0); + assertEquals(5.0, pc.getDouble("int", 0), 0.0); + assertEquals(false, pc.getBoolean("int", true)); + assertEquals("5", pc.getString("int", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("int", EMPTY_BYTES)); + + // as long + assertEquals(15, pc.getInteger("long", 0)); + assertEquals(15L, pc.getLong("long", 0)); + assertEquals(15f, pc.getFloat("long", 0), 0.0); + assertEquals(15.0, pc.getDouble("long", 0), 0.0); + assertEquals(false, pc.getBoolean("long", true)); + assertEquals("15", pc.getString("long", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("long", EMPTY_BYTES)); + + // as too long + assertEquals(0, pc.getInteger("too_long", 0)); + assertEquals(TOO_LONG, pc.getLong("too_long", 0)); + assertEquals((float) TOO_LONG, pc.getFloat("too_long", 0), 10.0); + assertEquals((double) TOO_LONG, pc.getDouble("too_long", 0), 10.0); + assertEquals(false, pc.getBoolean("too_long", true)); + assertEquals(String.valueOf(TOO_LONG), pc.getString("too_long", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("too_long", EMPTY_BYTES)); + + // as float + assertEquals(0, pc.getInteger("float", 0)); + assertEquals(0L, pc.getLong("float", 0)); + assertEquals(2.1456775f, pc.getFloat("float", 0), 0.0); + assertEquals(2.1456775, pc.getDouble("float", 0), 0.0000001); + assertEquals(false, pc.getBoolean("float", true)); + assertTrue(pc.getString("float", "0").startsWith("2.145677")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("float", EMPTY_BYTES)); + + // as double + assertEquals(0, pc.getInteger("double", 0)); + assertEquals(0L, pc.getLong("double", 0)); + assertEquals(3.141592f, pc.getFloat("double", 0), 0.000001); + assertEquals(Math.PI, pc.getDouble("double", 0), 0.0); + assertEquals(false, pc.getBoolean("double", true)); + assertTrue(pc.getString("double", "0").startsWith("3.1415926535")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("double", EMPTY_BYTES)); + + // as too long double + assertEquals(0, pc.getInteger("too_long_double", 0)); + assertEquals(0L, pc.getLong("too_long_double", 0)); + assertEquals(0f, pc.getFloat("too_long_double", 0f), 0.000001); + assertEquals(TOO_LONG_DOUBLE, pc.getDouble("too_long_double", 0), 0.0); + assertEquals(false, pc.getBoolean("too_long_double", true)); + assertEquals(String.valueOf(TOO_LONG_DOUBLE), pc.getString("too_long_double", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("too_long_double", EMPTY_BYTES)); + + // as string + assertEquals(42, pc.getInteger("string", 0)); + assertEquals(42L, pc.getLong("string", 0)); + assertEquals(42f, pc.getFloat("string", 0f), 0.000001); + assertEquals(42.0, pc.getDouble("string", 0), 0.0); + assertEquals(false, pc.getBoolean("string", true)); + assertEquals("42", pc.getString("string", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("string", EMPTY_BYTES)); + + // as non convertible string + assertEquals(0, pc.getInteger("non_convertible_string", 0)); + assertEquals(0L, pc.getLong("non_convertible_string", 0)); + assertEquals(0f, pc.getFloat("non_convertible_string", 0f), 0.000001); + assertEquals(0.0, pc.getDouble("non_convertible_string", 0), 0.0); + assertEquals(false, pc.getBoolean("non_convertible_string", true)); + assertEquals("bcdefg&&", pc.getString("non_convertible_string", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("non_convertible_string", EMPTY_BYTES)); + + // as boolean + assertEquals(0, pc.getInteger("boolean", 0)); + assertEquals(0L, pc.getLong("boolean", 0)); + assertEquals(0f, pc.getFloat("boolean", 0f), 0.000001); + assertEquals(0.0, pc.getDouble("boolean", 0), 0.0); + assertEquals(true, pc.getBoolean("boolean", false)); + assertEquals("true", pc.getString("boolean", "0")); + assertArrayEquals(EMPTY_BYTES, pc.getBytes("boolean", EMPTY_BYTES)); + } + catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java index 18702c798b382a2e871b3b9a2721ff9d0217b17a..f207f783d67e22a4ad343e00b6dd7818ecc213a0 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.configuration; import static org.junit.Assert.assertEquals; @@ -40,7 +39,7 @@ public class GlobalConfigurationTest { public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException { // reset GlobalConfiguration between tests - Field instance = GlobalConfiguration.class.getDeclaredField("configuration"); + Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON"); instance.setAccessible(true); instance.set(null, null); } @@ -73,8 +72,8 @@ public class GlobalConfigurationTest { GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); Configuration conf = GlobalConfiguration.getConfiguration(); - // all distinct keys from confFile1 + confFile2 + 'config.dir' key - assertEquals(3 + 1, conf.keySet().size()); + // all distinct keys from confFile1 + confFile2key + assertEquals(3, conf.keySet().size()); // keys 1, 2, 3 should be OK and match the expected values // => configuration keys from YAML should overwrite keys from XML @@ -126,8 +125,8 @@ public class GlobalConfigurationTest { GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); Configuration conf = GlobalConfiguration.getConfiguration(); - // all distinct keys from confFile1 + confFile2 + 'config.dir' key - assertEquals(6 + 1, conf.keySet().size()); + // all distinct keys from confFile1 + confFile2 key + assertEquals(6, conf.keySet().size()); // keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values assertEquals("myvalue1", conf.getString("mykey1", null)); @@ -202,14 +201,6 @@ public class GlobalConfigurationTest { newconf.setInteger("mynewinteger", 1000); GlobalConfiguration.includeConfiguration(newconf); assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000); - - // Test local "sub" configuration - final String[] configparams = { "mykey1", "mykey2" }; - Configuration newconf2 = GlobalConfiguration.getConfiguration(configparams); - - assertEquals(newconf2.keySet().size(), 2); - assertEquals(newconf2.getString("mykey1", "null"), "myvalue1"); - assertEquals(newconf2.getString("mykey2", "null"), "myvalue2"); } finally { // Remove temporary files confFile1.delete(); diff --git a/flink-core/src/test/resources/logback-test.xml b/flink-core/src/test/resources/logback-test.xml index 4f484cb1f66eb6e1f5f7394e71fff8af626a7862..1c4ea08c44a2085e1b0ba42ffb3f6ab72a3c3a5e 100644 --- a/flink-core/src/test/resources/logback-test.xml +++ b/flink-core/src/test/resources/logback-test.xml @@ -30,4 +30,5 @@ + \ No newline at end of file diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java index 3f42e879d3b6ab56323fff755bd7dea267c46e1b..391ef7f5efd10c99de0687206fa36aaf6b097d10 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java @@ -146,6 +146,11 @@ public abstract class WrappingFunction extends AbstractRichF public DistributedCache getDistributedCache() { return context.getDistributedCache(); } + + @Override + public ClassLoader getUserCodeClassLoader() { + return context.getUserCodeClassLoader(); + } } private static class WrappingIterationRuntimeContext extends WrappingRuntimeContext implements IterationRuntimeContext { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java index 85611332e5f619d26a4489addcb3eb98b65c3ea3..c7f86af815d1800b45d15850dd1565b0d4f88053 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java @@ -192,18 +192,25 @@ public class CsvInputFormat extends GenericCsvInputFormat { Class[] types = (Class[]) new Class[maxTextPos+1]; int[] targetPos = new int[maxTextPos+1]; + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + // set the fields - for (int i = 0; i < numConfigFields; i++) { - int pos = textPosIdx[i]; - - Class clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null).asSubclass(Value.class); - if (clazz == null) { - throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " + - "No field parser class for parameter " + i); + try { + for (int i = 0; i < numConfigFields; i++) { + int pos = textPosIdx[i]; + + Class clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl).asSubclass(Value.class); + if (clazz == null) { + throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " + + "No field parser class for parameter " + i); + } + + types[pos] = clazz; + targetPos[pos] = i; } - - types[pos] = clazz; - targetPos[pos] = i; + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Could not resolve type classes", e); } // update the field types diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java index 8760cd8eae839323cf650d2efffbe49d8af20bcc..2c514fe961f1283c705c5b9206fbc89b86395d5b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java @@ -206,14 +206,20 @@ public class CsvOutputFormat extends FileOutputFormat { Class[] arr = new Class[this.numFields]; this.classes = arr; - for (int i = 0; i < this.numFields; i++) { - @SuppressWarnings("unchecked") - Class clazz = (Class) parameters.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null); - if (clazz == null) { - throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i); + try { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + for (int i = 0; i < this.numFields; i++) { + Class clazz = parameters.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl); + if (clazz == null) { + throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i); + } + + this.classes[i] = clazz; } - - this.classes[i] = clazz; + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Could not resolve type classes", e); } this.recordPositions = new int[this.numFields]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java index 603965472bae796ed033e699cfebeb257da3f07d..b5cfb8b033b35af7f6966904610efba5548d10e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java @@ -158,7 +158,8 @@ public abstract class AbstractIterativePactTask extends @Override public RuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); - return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup()); + return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(), + env.getIndexInSubtaskGroup(), userCodeClassLoader); } // -------------------------------------------------------------------------------------------- @@ -347,8 +348,8 @@ public abstract class AbstractIterativePactTask extends private class IterativeRuntimeUdfContext extends RuntimeUDFContext implements IterationRuntimeContext { - public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex) { - super(name, numParallelSubtasks, subtaskIndex); + public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java index cca45599d04a481ce68de1675db6452f0f2320e6..0c04538c61745574a27ea55cfc4b354ec7754648 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java @@ -296,7 +296,7 @@ public class IterationHeadPactTask extends Abstrac } // instantiate all aggregators and register them at the iteration global registry - aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators()); + aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators(userCodeClassLoader)); IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry); DataInputView superstepResult = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java index 3ac08585b6b5145aa09f36008d76db8ed0ee3156..f75ba61ab70ed4e4f5a8c5ec74734b6c049d0f86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java @@ -86,13 +86,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen // store all aggregators this.aggregators = new HashMap>(); - for (AggregatorWithName aggWithName : taskConfig.getIterationAggregators()) { + for (AggregatorWithName aggWithName : taskConfig.getIterationAggregators(userCodeClassLoader)) { aggregators.put(aggWithName.getName(), aggWithName.getAggregator()); } // store the aggregator convergence criterion if (taskConfig.usesConvergenceCriterion()) { - convergenceCriterion = taskConfig.getConvergenceCriterion(); + convergenceCriterion = taskConfig.getConvergenceCriterion(userCodeClassLoader); convergenceAggregatorName = taskConfig.getConvergenceCriterionAggregatorName(); Preconditions.checkNotNull(convergenceAggregatorName); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 17918574879d2cc7f20b1aea2ef56493a12bbb87..2d7afc88acb1c5bde4e8c2a499cd08745b406320 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -275,7 +275,6 @@ public class DataSinkTask extends AbstractInvokable { } // obtain task configuration (including stub parameters) Configuration taskConf = getTaskConfiguration(); - taskConf.setClassLoader(this.userCodeClassLoader); this.config = new TaskConfig(taskConf); try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 5d5409e23a2974cee0c136c65d08653916da5698..af3eff302c45c2d2de8f6376e3fc5ce08b933830 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -310,7 +310,6 @@ l * // obtain task configuration (including stub parameters) Configuration taskConf = getTaskConfiguration(); - taskConf.setClassLoader(this.userCodeClassLoader); this.config = new TaskConfig(taskConf); try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 72d0dc00a73685bbcd85be9810f1df4aa94c9bd0..b7fa8729c3c059232f581a39cba18e362b40d0d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -243,7 +243,6 @@ public class RegularPactTask extends AbstractInvokable i // obtain task configuration (including stub parameters) Configuration taskConf = getTaskConfiguration(); - taskConf.setClassLoader(this.userCodeClassLoader); this.config = new TaskConfig(taskConf); // now get the operator class which drives the operation @@ -1066,7 +1065,7 @@ public class RegularPactTask extends AbstractInvokable i public RuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); - return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask()); + return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask()); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java index 3ebf45ced64d03b1b86c2ac1d1c0b363c92a6f22..482103c0ffefd1a697b41d8130f6365dfb09ad28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators.chaining; import org.apache.flink.api.common.functions.RichFunction; @@ -25,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.RegularPactTask; +@SuppressWarnings("deprecation") public class ChainedCollectorMapDriver extends ChainedDriver { private GenericCollectorMap mapper; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 4c0ac2d6d0bd5eff50ae575e902835799dce6ad1..679062a1508c095874f217e9bc1b17ec5e6f2307 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -57,7 +57,8 @@ public abstract class ChainedDriver implements Collector { this.udfContext = ((RegularPactTask) parent).createRuntimeContext(taskName); } else { Environment env = parent.getEnvironment(); - this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask()); + this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), + env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask()); } setup(parent); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java index a6678448e8fb956b4b9d9463a4acdbbe3b40d1d2..540cc89312e83a22f8da6ec036b6560fa46264cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java @@ -44,22 +44,26 @@ public class RuntimeUDFContext implements RuntimeContext { private final int subtaskIndex; - private DistributedCache distributedCache = new DistributedCache(); + private final ClassLoader userCodeClassLoader; + + private final DistributedCache distributedCache = new DistributedCache(); - private HashMap> accumulators = new HashMap>(); + private final HashMap> accumulators = new HashMap>(); - private HashMap> broadcastVars = new HashMap>(); + private final HashMap> broadcastVars = new HashMap>(); - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex) { + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) { this.name = name; this.numParallelSubtasks = numParallelSubtasks; this.subtaskIndex = subtaskIndex; + this.userCodeClassLoader = userCodeClassLoader; } - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, Map> cpTasks) { + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map> cpTasks) { this.name = name; this.numParallelSubtasks = numParallelSubtasks; this.subtaskIndex = subtaskIndex; + this.userCodeClassLoader = userCodeClassLoader; this.distributedCache.setCopyTasks(cpTasks); } @Override @@ -158,4 +162,9 @@ public class RuntimeUDFContext implements RuntimeContext { public DistributedCache getDistributedCache() { return this.distributedCache; } + + @Override + public ClassLoader getUserCodeClassLoader() { + return this.userCodeClassLoader; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index 003b872e59c7a1529e2d60b23fcec1e4773ec4a2..9facb33497c9d58bcab9c1991753290b37f3cb87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -916,7 +916,7 @@ public class TaskConfig { } @SuppressWarnings("unchecked") - public Collection> getIterationAggregators() { + public Collection> getIterationAggregators(ClassLoader cl) { final int numAggs = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); if (numAggs == 0) { return Collections.emptyList(); @@ -927,7 +927,7 @@ public class TaskConfig { Aggregator aggObj; try { aggObj = (Aggregator) InstantiationUtil.readObjectFromConfig( - this.config, ITERATION_AGGREGATOR_PREFIX + i, getConfiguration().getClassLoader()); + this.config, ITERATION_AGGREGATOR_PREFIX + i, cl); } catch (IOException e) { throw new RuntimeException("Error while reading the aggregator object from the task configuration."); } catch (ClassNotFoundException e) { @@ -956,11 +956,11 @@ public class TaskConfig { } @SuppressWarnings("unchecked") - public ConvergenceCriterion getConvergenceCriterion() { + public ConvergenceCriterion getConvergenceCriterion(ClassLoader cl) { ConvergenceCriterion convCriterionObj = null; try { convCriterionObj = (ConvergenceCriterion) InstantiationUtil.readObjectFromConfig( - this.config, ITERATION_CONVERGENCE_CRITERION, getConfiguration().getClassLoader()); + this.config, ITERATION_CONVERGENCE_CRITERION, cl); } catch (IOException e) { throw new RuntimeException("Error while reading the covergence criterion object from the task configuration."); } catch (ClassNotFoundException e) { @@ -974,7 +974,7 @@ public class TaskConfig { } public boolean usesConvergenceCriterion() { - return config.getString(ITERATION_CONVERGENCE_CRITERION, null) != null; + return config.getBytes(ITERATION_CONVERGENCE_CRITERION, null) != null; } public String getConvergenceCriterionAggregatorName() { @@ -1174,18 +1174,8 @@ public class TaskConfig { } @Override - public Class getClass(String key, Class defaultValue, Class ancestor) { - return this.backingConfig.getClass(this.prefix + key, defaultValue, ancestor); - } - - @Override - public ClassLoader getClassLoader() { - return this.backingConfig.getClassLoader(); - } - - @Override - public Class getClass(String key, Class defaultValue) { - return this.backingConfig.getClass(this.prefix + key, defaultValue); + public Class getClass(String key, Class defaultValue, ClassLoader classLoader) throws ClassNotFoundException { + return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader); } @Override @@ -1268,11 +1258,6 @@ public class TaskConfig { return backingConfig.toString(); } - @Override - public void setClassLoader(ClassLoader classLoader) { - backingConfig.setClassLoader(classLoader); - } - @Override public Set keySet() { final HashSet set = new HashSet(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java index 0c6f0a53a18d0ee9a1b806766294cc87c0b197c6..39617b4a3742b54d6e42ddfe749e3ab8a19f9406 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java @@ -16,34 +16,19 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; -import java.util.Collection; - import org.apache.flink.api.common.Plan; -import org.apache.flink.configuration.Configuration; import org.apache.flink.test.iterative.nephele.DanglingPageRankNepheleITCase; import org.apache.flink.test.recordJobs.graph.DanglingPageRank; import org.apache.flink.test.util.RecordAPITestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -@RunWith(Parameterized.class) public class DanglingPageRankITCase extends RecordAPITestBase { protected String pagesPath; protected String edgesPath; protected String resultPath; - - public DanglingPageRankITCase(Configuration config) { - super(config); - setTaskManagerNumSlots(DOP); - } - - @Override protected void preSubmit() throws Exception { pagesPath = createTempFile("pages.txt", DanglingPageRankNepheleITCase.TEST_VERTICES); @@ -55,22 +40,13 @@ public class DanglingPageRankITCase extends RecordAPITestBase { protected Plan getTestJob() { DanglingPageRank pr = new DanglingPageRank(); Plan plan = pr.getPlan( - config.getString("PageRankITCase#NoSubtasks", "1"), + String.valueOf(DOP), pagesPath, edgesPath, resultPath, - config.getString("PageRankITCase#NumIterations", "25"), // max iterations + "25", // max iterations "5", // num vertices "1"); // num dangling vertices return plan; } - - - @Parameters - public static Collection getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("PageRankITCase#NoSubtasks", DOP); - config1.setString("PageRankITCase#NumIterations", "25"); - return toParameterList(config1); - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java index 96d2b15b245139755452e5d1b068a90b929636fd..8e42dd79eeac6862535306c0aeaa1edaba34f920 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java @@ -16,20 +16,12 @@ * limitations under the License. */ - package org.apache.flink.test.iterative; -import java.util.Collection; - import org.apache.flink.api.common.Plan; -import org.apache.flink.configuration.Configuration; import org.apache.flink.test.recordJobs.graph.SimplePageRank; import org.apache.flink.test.util.RecordAPITestBase; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -@RunWith(Parameterized.class) public class PageRankITCase extends RecordAPITestBase { private static final String VERTICES = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n"; @@ -40,12 +32,6 @@ public class PageRankITCase extends RecordAPITestBase { protected String edgesPath; protected String resultPath; - - public PageRankITCase(Configuration config) { - super(config); - setTaskManagerNumSlots(DOP); - } - @Override protected void preSubmit() throws Exception { pagesPath = createTempFile("pages.txt", VERTICES); @@ -57,21 +43,12 @@ public class PageRankITCase extends RecordAPITestBase { protected Plan getTestJob() { SimplePageRank pr = new SimplePageRank(); Plan plan = pr.getPlan( - config.getString("NumSubtasks", "1"), + String.valueOf(DOP), pagesPath, edgesPath, resultPath, - config.getString("NumIterations", "5"), // max iterations + "5", // max iterations "10"); // num vertices return plan; } - - - @Parameters - public static Collection getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("NumSubtasks", DOP); - config1.setString("NumIterations", "5"); - return toParameterList(config1); - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java index 8b4bbc81e29b3be8abd0d95c5fbd976648a0dcea..8227f970786696a7dd4ce7c6ca47b38e7319234e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele; import java.io.BufferedReader; diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java index 2ebe86fb6fdda0f97b33266dc0b37823d6ae69fc..36962254a31e0ce55232e7b73a3cc3cd58283af0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele; import org.apache.flink.runtime.jobgraph.JobGraph; diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java index a357b13e9e05c689c61308f85618133db7e40bef..76438aaf85238e13662283075d7b86c05c2fa1ba 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele; import org.apache.flink.runtime.jobgraph.JobGraph; diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/ComputeEdgeDegreesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/ComputeEdgeDegreesITCase.java index ac543463208b9379761eaf98e1dd8f9f2ea1a725..0a6d29d6c16e80645239df493e4e241f577a1141 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/ComputeEdgeDegreesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/ComputeEdgeDegreesITCase.java @@ -52,7 +52,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase { @Override protected Plan getTestJob() { ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees(); - return computeDegrees.getPlan(config.getString("ComputeEdgeDegreesTest#NumSubtasks", "4"), + return computeDegrees.getPlan(String.valueOf(config.getInteger("NumSubtasks", 4)), edgesPath, resultPath); } @@ -64,7 +64,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase { @Parameters public static Collection getConfigurations() { Configuration config = new Configuration(); - config.setInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP); + config.setInteger("NumSubtasks", DOP); return toParameterList(config); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java index 1c25f371d67a97b266b31cfc48b6c2b84ce0e28c..c758f324838b57ded7a85d2eb6d820d6ef6a5dfd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesOnEdgesWithDegreesITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import java.util.Collection; @@ -54,7 +53,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase { protected Plan getTestJob() { EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees(); return enumTriangles.getPlan( - config.getString("EnumTrianglesTest#NumSubtasks", "4"), + String.valueOf(config.getInteger("NumSubtasks", 4)), edgesPath, resultPath); } @@ -66,7 +65,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase { @Parameters public static Collection getConfigurations() { Configuration config = new Configuration(); - config.setInteger("EnumTrianglesTest#NumSubtasks", DOP); + config.setInteger("NumSubtasks", DOP); return toParameterList(config); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesRDFITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesRDFITCase.java index a45fe44708edf4fe5c77d9b5438ec319707db255..f0a1288cbe250c3741414e3534dabd4b6dc037d9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesRDFITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/EnumTrianglesRDFITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import org.apache.flink.api.common.Plan; @@ -60,7 +59,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase { protected Plan getTestJob() { EnumTrianglesRdfFoaf enumTriangles = new EnumTrianglesRdfFoaf(); return enumTriangles.getPlan( - config.getString("EnumTrianglesTest#NoSubtasks", new Integer(DOP).toString()), edgesPath, resultPath); + String.valueOf(config.getInteger("NumSubtasks", DOP)), edgesPath, resultPath); } @Override @@ -71,7 +70,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase { @Parameters public static Collection getConfigurations() { Configuration config = new Configuration(); - config.setInteger("EnumTrianglesTest#NoSubtasks", DOP); + config.setInteger("NumSubtasks", DOP); return toParameterList(config); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/MergeOnlyJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/MergeOnlyJoinITCase.java index e5a1ef3967e315710e4ca0e24cfe51cfa3af23d7..ae3b8b7367b3b6492fb6236e9a519a2a0fb80eb9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/MergeOnlyJoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/MergeOnlyJoinITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import org.apache.flink.api.common.Plan; @@ -82,11 +81,11 @@ public class MergeOnlyJoinITCase extends RecordAPITestBase { protected Plan getTestJob() { MergeOnlyJoin mergeOnlyJoin = new MergeOnlyJoin(); return mergeOnlyJoin.getPlan( - config.getString("MergeOnlyJoinTest#NoSubtasks", "1"), + String.valueOf(config.getInteger("MergeOnlyJoinTest#NoSubtasks", 1)), input1Path, input2Path, resultPath, - config.getString("MergeOnlyJoinTest#NoSubtasksInput2", "1")); + String.valueOf(config.getInteger("MergeOnlyJoinTest#NoSubtasksInput2", 1))); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/PairwiseSPITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/PairwiseSPITCase.java index 28b504622007508d31144fa411c75010e299c6e4..231f196a75fd5755cf26d26475c9ecd93943e78e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/PairwiseSPITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/PairwiseSPITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import java.util.Collection; @@ -71,7 +70,8 @@ public class PairwiseSPITCase extends RecordAPITestBase { @Override protected Plan getTestJob() { PairwiseSP a2aSP = new PairwiseSP(); - return a2aSP.getPlan(config.getString("All2AllSPTest#NoSubtasks", new Integer(DOP).toString()), + return a2aSP.getPlan( + String.valueOf(config.getInteger("All2AllSPTest#NoSubtasks", DOP)), rdfDataPath, resultPath, "true"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java index ffa1cefb0dd9939e346808985137b7df63b3ed9b..a6a98974b6b59ddfb388f651dbe999f4899a0a2d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import org.apache.flink.api.common.Plan; @@ -181,7 +180,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase { protected Plan getTestJob() { TPCHQuery10 tpchq10 = new TPCHQuery10(); return tpchq10.getPlan( - config.getString("TPCHQuery10Test#NoSubtasks", "1"), + String.valueOf(config.getInteger("TPCHQuery10Test#NoSubtasks", 1)), ordersPath, lineitemsPath, customersPath, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java index 89e0f42f8b64f10ded930e1accce76595a0b443f..461d6c0fff3e981fdccaf303fdbc09d309747309 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.recordJobTests; import java.util.Collection; @@ -142,7 +141,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase { TPCHQuery3 tpch3 = new TPCHQuery3(); return tpch3.getPlan( - config.getString("dop", "1"), + String.valueOf(config.getInteger("dop", 1)), ordersPath, lineitemsPath, resultPath);