提交 02314adc 编写于 作者: S Stephan Ewen

[FLINK-12] Clean up configuration object

  - Remove class loader (was inconsistently used and set)
  - Objects are stored in their type, rather than as a string
上级 da3e507d
......@@ -98,15 +98,21 @@ public class JDBCOutputFormat implements OutputFormat<Record> {
@SuppressWarnings("unchecked")
Class<Value>[] classes = new Class[this.fieldCount];
this.fieldClasses = classes;
ClassLoader cl = getClass().getClassLoader();
for (int i = 0; i < this.fieldCount; i++) {
@SuppressWarnings("unchecked")
Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_KEY + i, null);
if (clazz == null) {
throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
+ "No type class for parameter " + i);
try {
for (int i = 0; i < this.fieldCount; i++) {
Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_KEY + i, null, cl);
if (clazz == null) {
throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
+ "No type class for parameter " + i);
}
this.fieldClasses[i] = clazz;
}
this.fieldClasses[i] = clazz;
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load data type classes.", e);
}
}
......
......@@ -189,16 +189,20 @@ public class SpargelIteration {
public void open(Configuration parameters) throws Exception {
// instantiate only the first time
if (vertexUpdateFunction == null) {
Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, cl);
vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
try {
this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
} catch (Exception e) {
String message = e.getMessage() == null ? "." : ": " + e.getMessage();
throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
......@@ -248,10 +252,12 @@ public class SpargelIteration {
public void open(Configuration parameters) throws Exception {
// instantiate only the first time
if (messagingFunction == null) {
Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
// Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, Value.class);
Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, cl);
vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
......@@ -259,8 +265,10 @@ public class SpargelIteration {
K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
try {
this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
} catch (Exception e) {
String message = e.getMessage() == null ? "." : ": " + e.getMessage();
throw new Exception("Could not instantiate MessagingFunction" + message, e);
......
......@@ -309,9 +309,13 @@ public class StreamConfig {
config.setClass("functionClass", functionClass);
}
@SuppressWarnings("unchecked")
public Class<? extends AbstractRichFunction> getFunctionClass() {
return (Class<? extends AbstractRichFunction>) config.getClass("functionClass", null);
public Class<? extends AbstractRichFunction> getFunctionClass(ClassLoader cl) {
try {
return config.getClass("functionClass", null, cl);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load function class", e);
}
}
@SuppressWarnings("unchecked")
......
......@@ -29,6 +29,7 @@ import java.net.MalformedURLException;
import java.util.Map;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
public class CliFrontendTestUtils {
......@@ -86,16 +87,19 @@ public class CliFrontendTestUtils {
public static void clearGlobalConfiguration() {
try {
Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("configuration");
Field confDataMapField = GlobalConfiguration.class.getDeclaredField("confData");
Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("SINGLETON");
Field conf = GlobalConfiguration.class.getDeclaredField("config");
Field map = Configuration.class.getDeclaredField("confData");
singletonInstanceField.setAccessible(true);
confDataMapField.setAccessible(true);
conf.setAccessible(true);
map.setAccessible(true);
GlobalConfiguration gconf = (GlobalConfiguration) singletonInstanceField.get(null);
if (gconf != null) {
Configuration confObject = (Configuration) conf.get(gconf);
@SuppressWarnings("unchecked")
Map<String, String> confData = (Map<String, String>) confDataMapField.get(gconf);
Map<String, Object> confData = (Map<String, Object>) map.get(confObject);
confData.clear();
}
}
......
......@@ -94,7 +94,6 @@ import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plan.WorksetPlanNode;
import org.apache.flink.compiler.postpass.OptimizerPostPass;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
......@@ -410,11 +409,10 @@ public class PactCompiler {
this.statistics = stats;
this.costEstimator = estimator;
Configuration config = GlobalConfiguration.getConfiguration();
// determine the default parallelization degree
this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
if (defaultDegreeOfParallelism < 1) {
LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
......
......@@ -109,7 +109,6 @@ public class DataSourceNode extends OptimizerNode {
try {
format = getPactContract().getFormatWrapper().getUserCodeObject();
Configuration config = getPactContract().getParameters();
config.setClassLoader(format.getClass().getClassLoader());
format.configure(config);
}
catch (Throwable t) {
......
......@@ -60,6 +60,14 @@ public interface RuntimeContext {
*/
int getIndexOfThisSubtask();
/**
* Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the
* jar file of a user job.
*
* @return The ClassLoader for user code classes.
*/
ClassLoader getUserCodeClassLoader();
// --------------------------------------------------------------------------------------------
/**
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.configuration;
import java.io.BufferedReader;
......@@ -25,119 +24,68 @@ import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.util.StringUtils;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
/**
* Global configuration object in Nephele. Similar to Java properties configuration
* Global configuration object for Flink. Similar to Java properties configuration
* objects it includes key-value pairs which represent the framework's configuration.
* <p>
* This class is thread-safe.
*/
public final class GlobalConfiguration {
/**
* The log object used for debugging.
*/
/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);
/**
* The global configuration object accessible through a singleton pattern.
*/
private static GlobalConfiguration configuration = null;
/**
* The key to the directory this configuration was read from.
*/
private static final String CONFIGDIRKEY = "config.dir";
/** The global configuration object accessible through a singleton pattern. */
private static GlobalConfiguration SINGLETON = null;
/**
* The internal map holding the key-value pairs the configuration consists of.
*/
private final Map<String, String> confData = new HashMap<String, String>();
/** The internal map holding the key-value pairs the configuration consists of. */
private final Configuration config = new Configuration();
// --------------------------------------------------------------------------------------------
/**
* Retrieves the singleton object of the global configuration.
*
* @return the global configuration object
*/
private static synchronized GlobalConfiguration get() {
if (configuration == null) {
configuration = new GlobalConfiguration();
private static GlobalConfiguration get() {
// lazy initialization currently only for testibility
synchronized (GlobalConfiguration.class) {
if (SINGLETON == null) {
SINGLETON = new GlobalConfiguration();
}
return SINGLETON;
}
return configuration;
}
/**
* The constructor used to construct the singleton instance of the global configuration.
*/
private GlobalConfiguration() {
}
/**
* Returns the value associated with the given key as a string.
*
* @param key
* the key pointing to the associated value
* @param defaultValue
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
public static String getString(final String key, final String defaultValue) {
return get().getStringInternal(key, defaultValue);
}
private GlobalConfiguration() {}
// --------------------------------------------------------------------------------------------
/**
* Returns the value associated with the given key as a string.
*
* @param key
* key the key pointing to the associated value
* @param defaultValue
* defaultValue the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
private String getStringInternal(final String key, final String defaultValue) {
synchronized (this.confData) {
if (!this.confData.containsKey(key)) {
return defaultValue;
}
return this.confData.get(key);
}
}
/**
* Returns the value associated with the given key as a long integer.
*
* @param key
* the key pointing to the associated value
* @param defaultValue
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
public static long getLong(final String key, final long defaultValue) {
return get().getLongInternal(key, defaultValue);
public static String getString(String key, String defaultValue) {
return get().config.getString(key, defaultValue);
}
/**
......@@ -149,39 +97,8 @@ public final class GlobalConfiguration {
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
private long getLongInternal(final String key, final long defaultValue) {
long retVal = defaultValue;
try {
synchronized (this.confData) {
if (this.confData.containsKey(key)) {
retVal = Long.parseLong(this.confData.get(key));
}
}
} catch (NumberFormatException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(StringUtils.stringifyException(e));
}
}
return retVal;
}
/**
* Returns the value associated with the given key as an integer.
*
* @param key
* the key pointing to the associated value
* @param defaultValue
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
public static int getInteger(final String key, final int defaultValue) {
return get().getIntegerInternal(key, defaultValue);
public static long getLong(String key, long defaultValue) {
return get().config.getLong(key, defaultValue);
}
/**
......@@ -193,25 +110,8 @@ public final class GlobalConfiguration {
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
private int getIntegerInternal(final String key, final int defaultValue) {
int retVal = defaultValue;
try {
synchronized (this.confData) {
if (this.confData.containsKey(key)) {
retVal = Integer.parseInt(this.confData.get(key));
}
}
} catch (NumberFormatException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(StringUtils.stringifyException(e));
}
}
return retVal;
public static int getInteger(String key, int defaultValue) {
return get().config.getInteger(key, defaultValue);
}
/**
......@@ -224,38 +124,7 @@ public final class GlobalConfiguration {
* @return the (default) value associated with the given key
*/
public static float getFloat(String key, float defaultValue) {
return get().getFloatInternal(key, defaultValue);
}
/**
* Returns the value associated with the given key as an integer.
*
* @param key
* the key pointing to the associated value
* @param defaultValue
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
private float getFloatInternal(String key, float defaultValue) {
float retVal = defaultValue;
try {
synchronized (this.confData) {
if (this.confData.containsKey(key)) {
retVal = Float.parseFloat(this.confData.get(key));
}
}
} catch (NumberFormatException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(StringUtils.stringifyException(e));
}
}
return retVal;
return get().config.getFloat(key, defaultValue);
}
/**
......@@ -267,33 +136,8 @@ public final class GlobalConfiguration {
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
public static boolean getBoolean(final String key, final boolean defaultValue) {
return get().getBooleanInternal(key, defaultValue);
}
/**
* Returns the value associated with the given key as a boolean.
*
* @param key
* the key pointing to the associated value
* @param defaultValue
* the default value which is returned in case there is no value associated with the given key
* @return the (default) value associated with the given key
*/
private boolean getBooleanInternal(final String key, final boolean defaultValue) {
boolean retVal = defaultValue;
synchronized (this.confData) {
final String value = this.confData.get(key);
if (value != null) {
retVal = Boolean.parseBoolean(value);
}
}
return retVal;
public static boolean getBoolean(String key, boolean defaultValue) {
return get().config.getBoolean(key, defaultValue);
}
/**
......@@ -319,7 +163,7 @@ public final class GlobalConfiguration {
return;
}
if(confDirFile.isFile()) {
if (confDirFile.isFile()) {
final File file = new File(configDir);
if(configDir.endsWith(".xml")) {
get().loadXMLResource( file );
......@@ -329,7 +173,6 @@ public final class GlobalConfiguration {
LOG.warn("The given configuration has an unknown extension.");
return;
}
configuration.confData.put(CONFIGDIRKEY, file.getAbsolutePath() );
return;
}
......@@ -352,11 +195,6 @@ public final class GlobalConfiguration {
for (File f : yamlFiles) {
get().loadYAMLResource(f);
}
// Store the path to the configuration directory itself
if (configuration != null) {
configuration.confData.put(CONFIGDIRKEY, configDir);
}
}
/**
......@@ -379,52 +217,57 @@ public final class GlobalConfiguration {
* @param file the YAML file to read from
* @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>
*/
private void loadYAMLResource(final File file) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
private void loadYAMLResource(File file) {
String line = null;
while ((line = reader.readLine()) != null) {
// 1. check for comments
String[] comments = line.split("#", 2);
String conf = comments[0];
// 2. get key and value
if (conf.length() > 0) {
String[] kv = conf.split(": ", 2);
// skip line with no valid key-value pair
if (kv.length == 1) {
LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
continue;
}
synchronized (getClass()) {
String key = kv[0].trim();
String value = kv[1].trim();
// sanity check
if (key.length() == 0 || value.length() == 0) {
LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
continue;
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
String line = null;
while ((line = reader.readLine()) != null) {
// 1. check for comments
String[] comments = line.split("#", 2);
String conf = comments[0];
// 2. get key and value
if (conf.length() > 0) {
String[] kv = conf.split(": ", 2);
// skip line with no valid key-value pair
if (kv.length == 1) {
LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
continue;
}
String key = kv[0].trim();
String value = kv[1].trim();
// sanity check
if (key.length() == 0 || value.length() == 0) {
LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
continue;
}
LOG.debug("Loading configuration property: {}, {}", key, value);
this.config.setString(key, value);
}
LOG.debug("Loading configuration property: {}, {}", key, value);
this.confData.put(key, value);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(reader != null) {
reader.close();
catch (IOException e) {
LOG.error("Error parsing YAML configuration.", e);
}
finally {
try {
if(reader != null) {
reader.close();
}
} catch (IOException e) {
LOG.warn("Cannot to close reader with IOException.", e);
}
} catch (IOException e) {
LOG.warn("Cannot to close reader with IOException.", e);
}
}
}
......@@ -435,7 +278,7 @@ public final class GlobalConfiguration {
* @param file
* the XML document file
*/
private void loadXMLResource(final File file) {
private void loadXMLResource(File file) {
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
// Ignore comments in the XML file
......@@ -468,7 +311,7 @@ public final class GlobalConfiguration {
final NodeList props = root.getChildNodes();
int propNumber = -1;
synchronized (this.confData) {
synchronized (getClass()) {
for (int i = 0; i < props.getLength(); i++) {
......@@ -528,85 +371,28 @@ public final class GlobalConfiguration {
if (key != null && value != null) {
// Put key, value pair into the map
LOG.debug("Loading configuration property: {}, {}", key, value);
this.confData.put(key, value);
this.config.setString(key, value);
} else {
LOG.warn("Error while reading configuration: Cannot read property " + propNumber);
}
}
}
} catch (ParserConfigurationException e) {
LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e));
} catch (IOException e) {
LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e));
} catch (SAXException e) {
LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e));
}
catch (Exception e) {
LOG.error("Cannot load configuration.", e);
}
}
/**
* Copies the key/value pairs stored in the global configuration to
* a {@link Configuration} object and returns it.
* Gets a {@link Configuration} object with the values of this GlobalConfiguration
*
* @return the {@link Configuration} object including the key/value pairs
*/
public static Configuration getConfiguration() {
return get().getConfigurationInternal(null);
}
/**
* Copies a subset of the key/value pairs stored in the global configuration to
* a {@link Configuration} object and returns it. The subset is defined by the
* given array of keys. If <code>keys</code> is <code>null</code>, the entire
* global configuration is copied.
*
* @param keys
* array of keys specifying the subset of pairs to copy.
* @return the {@link Configuration} object including the key/value pairs
*/
public static Configuration getConfiguration(final String[] keys) {
return get().getConfigurationInternal(keys);
}
/**
* Internal non-static method to return configuration.
*
* @param keys
* array of keys specifying the subset of pairs to copy.
* @return the {@link Configuration} object including the key/value pairs
*/
private Configuration getConfigurationInternal(final String[] keys) {
Configuration conf = new Configuration();
synchronized (this.confData) {
final Iterator<String> it = this.confData.keySet().iterator();
while (it.hasNext()) {
final String key = it.next();
boolean found = false;
if (keys != null) {
for (int i = 0; i < keys.length; i++) {
if (key.equals(keys[i])) {
found = true;
break;
}
}
if (found) {
conf.setString(key, this.confData.get(key));
}
} else {
conf.setString(key, this.confData.get(key));
}
}
}
return conf;
Configuration copy = new Configuration();
copy.addAll(get().config);
return copy;
}
/**
......@@ -618,8 +404,7 @@ public final class GlobalConfiguration {
* @param conf
* the {@link Configuration} object to merge into the global configuration
*/
public static void includeConfiguration(final Configuration conf) {
public static void includeConfiguration(Configuration conf) {
get().includeConfigurationInternal(conf);
}
......@@ -629,25 +414,15 @@ public final class GlobalConfiguration {
* @param conf
* the {@link Configuration} object to merge into the global configuration
*/
private void includeConfigurationInternal(final Configuration conf) {
if (conf == null) {
LOG.error("Given configuration object is null, ignoring it...");
return;
}
synchronized (this.confData) {
final Iterator<String> it = conf.keySet().iterator();
while (it.hasNext()) {
final String key = it.next();
this.confData.put(key, conf.getString(key, ""));
}
private void includeConfigurationInternal(Configuration conf) {
// static synchronized
synchronized (getClass()) {
this.config.addAll(conf);
}
}
// --------------------------------------------------------------------------------------------
/**
* Filters files in directory which have the specified suffix (e.g. ".xml").
*
......
......@@ -16,15 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.configuration;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
......@@ -34,31 +32,146 @@ import org.junit.Test;
* objects is tested.
*/
public class ConfigurationTest {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final long TOO_LONG = Integer.MAX_VALUE + 10L;
private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE;
/**
* This test checks the serialization/deserialization of configuration objects.
*/
@Test
public void testConfigurationSerialization() {
// First, create initial configuration object with some parameters
final Configuration orig = new Configuration();
orig.setString("mykey", "myvalue");
orig.setBoolean("shouldbetrue", true);
orig.setInteger("mynumber", 100);
orig.setClass("myclass", this.getClass());
public void testConfigurationSerializationAndGetters() {
try {
final Configuration orig = new Configuration();
orig.setString("mykey", "myvalue");
orig.setInteger("mynumber", 100);
orig.setLong("longvalue", 478236947162389746L);
orig.setFloat("PI", 3.1415926f);
orig.setDouble("E", Math.E);
orig.setBoolean("shouldbetrue", true);
orig.setBytes("bytes sequence", new byte[] { 1, 2, 3, 4, 5 } );
orig.setClass("myclass", this.getClass());
final Configuration copy = (Configuration) CommonTestUtils.createCopy(orig);
assertEquals("myvalue", copy.getString("mykey", "null"));
assertEquals(100, copy.getInteger("mynumber", 0));
assertEquals(478236947162389746L, copy.getLong("longvalue", 0L));
assertEquals(3.1415926f, copy.getFloat("PI", 3.1415926f), 0.0);
assertEquals(Math.E, copy.getDouble("E", 0.0), 0.0);
assertEquals(true, copy.getBoolean("shouldbetrue", false));
assertArrayEquals(new byte[] { 1, 2, 3, 4, 5 }, copy.getBytes("bytes sequence", null));
assertEquals(getClass(), copy.getClass("myclass", null, getClass().getClassLoader()));
assertEquals(orig, copy);
assertEquals(orig.keySet(), copy.keySet());
assertEquals(orig.hashCode(), copy.hashCode());
assertEquals(copy.getString("mykey", "null"), "myvalue");
assertEquals(copy.getBoolean("shouldbetrue", false), true);
assertEquals(copy.getInteger("mynumber", 0), 100);
assertEquals(copy.getClass("myclass", null).toString(), this.getClass().toString());
assertTrue(orig.equals(copy));
assertTrue(orig.keySet().equals(copy.keySet()));
} catch (IOException e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testConversions() {
try {
Configuration pc = new Configuration();
pc.setInteger("int", 5);
pc.setLong("long", 15);
pc.setLong("too_long", TOO_LONG);
pc.setFloat("float", 2.1456775f);
pc.setDouble("double", Math.PI);
pc.setDouble("too_long_double", TOO_LONG_DOUBLE);
pc.setString("string", "42");
pc.setString("non_convertible_string", "bcdefg&&");
pc.setBoolean("boolean", true);
// as integer
assertEquals(5, pc.getInteger("int", 0));
assertEquals(5L, pc.getLong("int", 0));
assertEquals(5f, pc.getFloat("int", 0), 0.0);
assertEquals(5.0, pc.getDouble("int", 0), 0.0);
assertEquals(false, pc.getBoolean("int", true));
assertEquals("5", pc.getString("int", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("int", EMPTY_BYTES));
// as long
assertEquals(15, pc.getInteger("long", 0));
assertEquals(15L, pc.getLong("long", 0));
assertEquals(15f, pc.getFloat("long", 0), 0.0);
assertEquals(15.0, pc.getDouble("long", 0), 0.0);
assertEquals(false, pc.getBoolean("long", true));
assertEquals("15", pc.getString("long", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("long", EMPTY_BYTES));
// as too long
assertEquals(0, pc.getInteger("too_long", 0));
assertEquals(TOO_LONG, pc.getLong("too_long", 0));
assertEquals((float) TOO_LONG, pc.getFloat("too_long", 0), 10.0);
assertEquals((double) TOO_LONG, pc.getDouble("too_long", 0), 10.0);
assertEquals(false, pc.getBoolean("too_long", true));
assertEquals(String.valueOf(TOO_LONG), pc.getString("too_long", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("too_long", EMPTY_BYTES));
// as float
assertEquals(0, pc.getInteger("float", 0));
assertEquals(0L, pc.getLong("float", 0));
assertEquals(2.1456775f, pc.getFloat("float", 0), 0.0);
assertEquals(2.1456775, pc.getDouble("float", 0), 0.0000001);
assertEquals(false, pc.getBoolean("float", true));
assertTrue(pc.getString("float", "0").startsWith("2.145677"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("float", EMPTY_BYTES));
// as double
assertEquals(0, pc.getInteger("double", 0));
assertEquals(0L, pc.getLong("double", 0));
assertEquals(3.141592f, pc.getFloat("double", 0), 0.000001);
assertEquals(Math.PI, pc.getDouble("double", 0), 0.0);
assertEquals(false, pc.getBoolean("double", true));
assertTrue(pc.getString("double", "0").startsWith("3.1415926535"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("double", EMPTY_BYTES));
// as too long double
assertEquals(0, pc.getInteger("too_long_double", 0));
assertEquals(0L, pc.getLong("too_long_double", 0));
assertEquals(0f, pc.getFloat("too_long_double", 0f), 0.000001);
assertEquals(TOO_LONG_DOUBLE, pc.getDouble("too_long_double", 0), 0.0);
assertEquals(false, pc.getBoolean("too_long_double", true));
assertEquals(String.valueOf(TOO_LONG_DOUBLE), pc.getString("too_long_double", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("too_long_double", EMPTY_BYTES));
// as string
assertEquals(42, pc.getInteger("string", 0));
assertEquals(42L, pc.getLong("string", 0));
assertEquals(42f, pc.getFloat("string", 0f), 0.000001);
assertEquals(42.0, pc.getDouble("string", 0), 0.0);
assertEquals(false, pc.getBoolean("string", true));
assertEquals("42", pc.getString("string", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("string", EMPTY_BYTES));
// as non convertible string
assertEquals(0, pc.getInteger("non_convertible_string", 0));
assertEquals(0L, pc.getLong("non_convertible_string", 0));
assertEquals(0f, pc.getFloat("non_convertible_string", 0f), 0.000001);
assertEquals(0.0, pc.getDouble("non_convertible_string", 0), 0.0);
assertEquals(false, pc.getBoolean("non_convertible_string", true));
assertEquals("bcdefg&&", pc.getString("non_convertible_string", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("non_convertible_string", EMPTY_BYTES));
// as boolean
assertEquals(0, pc.getInteger("boolean", 0));
assertEquals(0L, pc.getLong("boolean", 0));
assertEquals(0f, pc.getFloat("boolean", 0f), 0.000001);
assertEquals(0.0, pc.getDouble("boolean", 0), 0.0);
assertEquals(true, pc.getBoolean("boolean", false));
assertEquals("true", pc.getString("boolean", "0"));
assertArrayEquals(EMPTY_BYTES, pc.getBytes("boolean", EMPTY_BYTES));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.configuration;
import static org.junit.Assert.assertEquals;
......@@ -40,7 +39,7 @@ public class GlobalConfigurationTest {
public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
IllegalAccessException {
// reset GlobalConfiguration between tests
Field instance = GlobalConfiguration.class.getDeclaredField("configuration");
Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
instance.setAccessible(true);
instance.set(null, null);
}
......@@ -73,8 +72,8 @@ public class GlobalConfigurationTest {
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
Configuration conf = GlobalConfiguration.getConfiguration();
// all distinct keys from confFile1 + confFile2 + 'config.dir' key
assertEquals(3 + 1, conf.keySet().size());
// all distinct keys from confFile1 + confFile2key
assertEquals(3, conf.keySet().size());
// keys 1, 2, 3 should be OK and match the expected values
// => configuration keys from YAML should overwrite keys from XML
......@@ -126,8 +125,8 @@ public class GlobalConfigurationTest {
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
Configuration conf = GlobalConfiguration.getConfiguration();
// all distinct keys from confFile1 + confFile2 + 'config.dir' key
assertEquals(6 + 1, conf.keySet().size());
// all distinct keys from confFile1 + confFile2 key
assertEquals(6, conf.keySet().size());
// keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
assertEquals("myvalue1", conf.getString("mykey1", null));
......@@ -202,14 +201,6 @@ public class GlobalConfigurationTest {
newconf.setInteger("mynewinteger", 1000);
GlobalConfiguration.includeConfiguration(newconf);
assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000);
// Test local "sub" configuration
final String[] configparams = { "mykey1", "mykey2" };
Configuration newconf2 = GlobalConfiguration.getConfiguration(configparams);
assertEquals(newconf2.keySet().size(), 2);
assertEquals(newconf2.getString("mykey1", "null"), "myvalue1");
assertEquals(newconf2.getString("mykey2", "null"), "myvalue2");
} finally {
// Remove temporary files
confFile1.delete();
......
......@@ -30,4 +30,5 @@
<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
</configuration>
\ No newline at end of file
......@@ -146,6 +146,11 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
public DistributedCache getDistributedCache() {
return context.getDistributedCache();
}
@Override
public ClassLoader getUserCodeClassLoader() {
return context.getUserCodeClassLoader();
}
}
private static class WrappingIterationRuntimeContext extends WrappingRuntimeContext implements IterationRuntimeContext {
......
......@@ -192,18 +192,25 @@ public class CsvInputFormat extends GenericCsvInputFormat<Record> {
Class<? extends Value>[] types = (Class<? extends Value>[]) new Class[maxTextPos+1];
int[] targetPos = new int[maxTextPos+1];
ClassLoader cl = Thread.currentThread().getContextClassLoader();
// set the fields
for (int i = 0; i < numConfigFields; i++) {
int pos = textPosIdx[i];
Class<? extends Value> clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null).asSubclass(Value.class);
if (clazz == null) {
throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " +
"No field parser class for parameter " + i);
try {
for (int i = 0; i < numConfigFields; i++) {
int pos = textPosIdx[i];
Class<? extends Value> clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl).asSubclass(Value.class);
if (clazz == null) {
throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " +
"No field parser class for parameter " + i);
}
types[pos] = clazz;
targetPos[pos] = i;
}
types[pos] = clazz;
targetPos[pos] = i;
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not resolve type classes", e);
}
// update the field types
......
......@@ -206,14 +206,20 @@ public class CsvOutputFormat extends FileOutputFormat {
Class<Value>[] arr = new Class[this.numFields];
this.classes = arr;
for (int i = 0; i < this.numFields; i++) {
@SuppressWarnings("unchecked")
Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null);
if (clazz == null) {
throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i);
try {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
for (int i = 0; i < this.numFields; i++) {
Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl);
if (clazz == null) {
throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i);
}
this.classes[i] = clazz;
}
this.classes[i] = clazz;
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not resolve type classes", e);
}
this.recordPositions = new int[this.numFields];
......
......@@ -158,7 +158,8 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
@Override
public RuntimeUDFContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup());
return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(),
env.getIndexInSubtaskGroup(), userCodeClassLoader);
}
// --------------------------------------------------------------------------------------------
......@@ -347,8 +348,8 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
private class IterativeRuntimeUdfContext extends RuntimeUDFContext implements IterationRuntimeContext {
public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex) {
super(name, numParallelSubtasks, subtaskIndex);
public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
}
@Override
......
......@@ -296,7 +296,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
}
// instantiate all aggregators and register them at the iteration global registry
aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators());
aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators(userCodeClassLoader));
IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
DataInputView superstepResult = null;
......
......@@ -86,13 +86,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
// store all aggregators
this.aggregators = new HashMap<String, Aggregator<?>>();
for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators()) {
for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators(userCodeClassLoader)) {
aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
}
// store the aggregator convergence criterion
if (taskConfig.usesConvergenceCriterion()) {
convergenceCriterion = taskConfig.getConvergenceCriterion();
convergenceCriterion = taskConfig.getConvergenceCriterion(userCodeClassLoader);
convergenceAggregatorName = taskConfig.getConvergenceCriterionAggregatorName();
Preconditions.checkNotNull(convergenceAggregatorName);
}
......
......@@ -275,7 +275,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
}
// obtain task configuration (including stub parameters)
Configuration taskConf = getTaskConfiguration();
taskConf.setClassLoader(this.userCodeClassLoader);
this.config = new TaskConfig(taskConf);
try {
......
......@@ -310,7 +310,6 @@ l *
// obtain task configuration (including stub parameters)
Configuration taskConf = getTaskConfiguration();
taskConf.setClassLoader(this.userCodeClassLoader);
this.config = new TaskConfig(taskConf);
try {
......
......@@ -243,7 +243,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// obtain task configuration (including stub parameters)
Configuration taskConf = getTaskConfiguration();
taskConf.setClassLoader(this.userCodeClassLoader);
this.config = new TaskConfig(taskConf);
// now get the operator class which drives the operation
......@@ -1066,7 +1065,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
public RuntimeUDFContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask());
}
// --------------------------------------------------------------------------------------------
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.operators.chaining;
import org.apache.flink.api.common.functions.RichFunction;
......@@ -25,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
@SuppressWarnings("deprecation")
public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
private GenericCollectorMap<IT, OT> mapper;
......
......@@ -57,7 +57,8 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName);
} else {
Environment env = parent.getEnvironment();
this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(),
env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask());
}
setup(parent);
......
......@@ -44,22 +44,26 @@ public class RuntimeUDFContext implements RuntimeContext {
private final int subtaskIndex;
private DistributedCache distributedCache = new DistributedCache();
private final ClassLoader userCodeClassLoader;
private final DistributedCache distributedCache = new DistributedCache();
private HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
private HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
private final HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex) {
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
this.userCodeClassLoader = userCodeClassLoader;
}
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, Map<String, FutureTask<Path>> cpTasks) {
public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
this.userCodeClassLoader = userCodeClassLoader;
this.distributedCache.setCopyTasks(cpTasks);
}
@Override
......@@ -158,4 +162,9 @@ public class RuntimeUDFContext implements RuntimeContext {
public DistributedCache getDistributedCache() {
return this.distributedCache;
}
@Override
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
}
}
......@@ -916,7 +916,7 @@ public class TaskConfig {
}
@SuppressWarnings("unchecked")
public Collection<AggregatorWithName<?>> getIterationAggregators() {
public Collection<AggregatorWithName<?>> getIterationAggregators(ClassLoader cl) {
final int numAggs = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0);
if (numAggs == 0) {
return Collections.emptyList();
......@@ -927,7 +927,7 @@ public class TaskConfig {
Aggregator<Value> aggObj;
try {
aggObj = (Aggregator<Value>) InstantiationUtil.readObjectFromConfig(
this.config, ITERATION_AGGREGATOR_PREFIX + i, getConfiguration().getClassLoader());
this.config, ITERATION_AGGREGATOR_PREFIX + i, cl);
} catch (IOException e) {
throw new RuntimeException("Error while reading the aggregator object from the task configuration.");
} catch (ClassNotFoundException e) {
......@@ -956,11 +956,11 @@ public class TaskConfig {
}
@SuppressWarnings("unchecked")
public <T extends Value> ConvergenceCriterion<T> getConvergenceCriterion() {
public <T extends Value> ConvergenceCriterion<T> getConvergenceCriterion(ClassLoader cl) {
ConvergenceCriterion<T> convCriterionObj = null;
try {
convCriterionObj = (ConvergenceCriterion<T>) InstantiationUtil.readObjectFromConfig(
this.config, ITERATION_CONVERGENCE_CRITERION, getConfiguration().getClassLoader());
this.config, ITERATION_CONVERGENCE_CRITERION, cl);
} catch (IOException e) {
throw new RuntimeException("Error while reading the covergence criterion object from the task configuration.");
} catch (ClassNotFoundException e) {
......@@ -974,7 +974,7 @@ public class TaskConfig {
}
public boolean usesConvergenceCriterion() {
return config.getString(ITERATION_CONVERGENCE_CRITERION, null) != null;
return config.getBytes(ITERATION_CONVERGENCE_CRITERION, null) != null;
}
public String getConvergenceCriterionAggregatorName() {
......@@ -1174,18 +1174,8 @@ public class TaskConfig {
}
@Override
public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, Class<? super T> ancestor) {
return this.backingConfig.getClass(this.prefix + key, defaultValue, ancestor);
}
@Override
public ClassLoader getClassLoader() {
return this.backingConfig.getClassLoader();
}
@Override
public Class<?> getClass(String key, Class<?> defaultValue) {
return this.backingConfig.getClass(this.prefix + key, defaultValue);
public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader);
}
@Override
......@@ -1268,11 +1258,6 @@ public class TaskConfig {
return backingConfig.toString();
}
@Override
public void setClassLoader(ClassLoader classLoader) {
backingConfig.setClassLoader(classLoader);
}
@Override
public Set<String> keySet() {
final HashSet<String> set = new HashSet<String>();
......
......@@ -16,34 +16,19 @@
* limitations under the License.
*/
package org.apache.flink.test.iterative;
import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.DanglingPageRankNepheleITCase;
import org.apache.flink.test.recordJobs.graph.DanglingPageRank;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class DanglingPageRankITCase extends RecordAPITestBase {
protected String pagesPath;
protected String edgesPath;
protected String resultPath;
public DanglingPageRankITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(DOP);
}
@Override
protected void preSubmit() throws Exception {
pagesPath = createTempFile("pages.txt", DanglingPageRankNepheleITCase.TEST_VERTICES);
......@@ -55,22 +40,13 @@ public class DanglingPageRankITCase extends RecordAPITestBase {
protected Plan getTestJob() {
DanglingPageRank pr = new DanglingPageRank();
Plan plan = pr.getPlan(
config.getString("PageRankITCase#NoSubtasks", "1"),
String.valueOf(DOP),
pagesPath,
edgesPath,
resultPath,
config.getString("PageRankITCase#NumIterations", "25"), // max iterations
"25", // max iterations
"5", // num vertices
"1"); // num dangling vertices
return plan;
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
config1.setInteger("PageRankITCase#NoSubtasks", DOP);
config1.setString("PageRankITCase#NumIterations", "25");
return toParameterList(config1);
}
}
......@@ -16,20 +16,12 @@
* limitations under the License.
*/
package org.apache.flink.test.iterative;
import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.SimplePageRank;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PageRankITCase extends RecordAPITestBase {
private static final String VERTICES = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n";
......@@ -40,12 +32,6 @@ public class PageRankITCase extends RecordAPITestBase {
protected String edgesPath;
protected String resultPath;
public PageRankITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(DOP);
}
@Override
protected void preSubmit() throws Exception {
pagesPath = createTempFile("pages.txt", VERTICES);
......@@ -57,21 +43,12 @@ public class PageRankITCase extends RecordAPITestBase {
protected Plan getTestJob() {
SimplePageRank pr = new SimplePageRank();
Plan plan = pr.getPlan(
config.getString("NumSubtasks", "1"),
String.valueOf(DOP),
pagesPath,
edgesPath,
resultPath,
config.getString("NumIterations", "5"), // max iterations
"5", // max iterations
"10"); // num vertices
return plan;
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config1 = new Configuration();
config1.setInteger("NumSubtasks", DOP);
config1.setString("NumIterations", "5");
return toParameterList(config1);
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.iterative.nephele;
import java.io.BufferedReader;
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.iterative.nephele;
import org.apache.flink.runtime.jobgraph.JobGraph;
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.iterative.nephele;
import org.apache.flink.runtime.jobgraph.JobGraph;
......
......@@ -52,7 +52,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees();
return computeDegrees.getPlan(config.getString("ComputeEdgeDegreesTest#NumSubtasks", "4"),
return computeDegrees.getPlan(String.valueOf(config.getInteger("NumSubtasks", 4)),
edgesPath, resultPath);
}
......@@ -64,7 +64,7 @@ public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP);
config.setInteger("NumSubtasks", DOP);
return toParameterList(config);
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
......@@ -54,7 +53,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
protected Plan getTestJob() {
EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees();
return enumTriangles.getPlan(
config.getString("EnumTrianglesTest#NumSubtasks", "4"),
String.valueOf(config.getInteger("NumSubtasks", 4)),
edgesPath, resultPath);
}
......@@ -66,7 +65,7 @@ public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("EnumTrianglesTest#NumSubtasks", DOP);
config.setInteger("NumSubtasks", DOP);
return toParameterList(config);
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
......@@ -60,7 +59,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase {
protected Plan getTestJob() {
EnumTrianglesRdfFoaf enumTriangles = new EnumTrianglesRdfFoaf();
return enumTriangles.getPlan(
config.getString("EnumTrianglesTest#NoSubtasks", new Integer(DOP).toString()), edgesPath, resultPath);
String.valueOf(config.getInteger("NumSubtasks", DOP)), edgesPath, resultPath);
}
@Override
......@@ -71,7 +70,7 @@ public class EnumTrianglesRDFITCase extends RecordAPITestBase {
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("EnumTrianglesTest#NoSubtasks", DOP);
config.setInteger("NumSubtasks", DOP);
return toParameterList(config);
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
......@@ -82,11 +81,11 @@ public class MergeOnlyJoinITCase extends RecordAPITestBase {
protected Plan getTestJob() {
MergeOnlyJoin mergeOnlyJoin = new MergeOnlyJoin();
return mergeOnlyJoin.getPlan(
config.getString("MergeOnlyJoinTest#NoSubtasks", "1"),
String.valueOf(config.getInteger("MergeOnlyJoinTest#NoSubtasks", 1)),
input1Path,
input2Path,
resultPath,
config.getString("MergeOnlyJoinTest#NoSubtasksInput2", "1"));
String.valueOf(config.getInteger("MergeOnlyJoinTest#NoSubtasksInput2", 1)));
}
@Override
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
......@@ -71,7 +70,8 @@ public class PairwiseSPITCase extends RecordAPITestBase {
@Override
protected Plan getTestJob() {
PairwiseSP a2aSP = new PairwiseSP();
return a2aSP.getPlan(config.getString("All2AllSPTest#NoSubtasks", new Integer(DOP).toString()),
return a2aSP.getPlan(
String.valueOf(config.getInteger("All2AllSPTest#NoSubtasks", DOP)),
rdfDataPath,
resultPath,
"true");
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
......@@ -181,7 +180,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase {
protected Plan getTestJob() {
TPCHQuery10 tpchq10 = new TPCHQuery10();
return tpchq10.getPlan(
config.getString("TPCHQuery10Test#NoSubtasks", "1"),
String.valueOf(config.getInteger("TPCHQuery10Test#NoSubtasks", 1)),
ordersPath,
lineitemsPath,
customersPath,
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
......@@ -142,7 +141,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase {
TPCHQuery3 tpch3 = new TPCHQuery3();
return tpch3.getPlan(
config.getString("dop", "1"),
String.valueOf(config.getInteger("dop", 1)),
ordersPath,
lineitemsPath,
resultPath);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册