diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/localDistributed/LocalDistributedExecutor.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/localDistributed/LocalDistributedExecutor.java index 81181c4e9975b938978b584f6c0224e431ee4900..7b39a340384174fe1f021e5dc6dadc436e57449f 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/localDistributed/LocalDistributedExecutor.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/localDistributed/LocalDistributedExecutor.java @@ -84,7 +84,7 @@ public class LocalDistributedExecutor implements PlanExecutor { } Configuration conf = NepheleMiniCluster.getMiniclusterDefaultConfig( - JOB_MANAGER_RPC_PORT, 6500, 7501, null, true); + JOB_MANAGER_RPC_PORT, 6500, 7501, null, true, true, false); GlobalConfiguration.includeConfiguration(conf); // start job manager diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java index 53833b9c5e0cac826c2f7989317e19467e8cd968..0cd644e6ef0646ee08d415cfc7a1a60a46231267 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java @@ -51,6 +51,10 @@ public class NepheleMiniCluster { private String hdfsConfigFile; private boolean visualizerEnabled = DEFAULT_VISUALIZER_ENABLED; + + private boolean defaultOverwriteFiles = false; + + private boolean defaultAlwaysCreateDirectory = false; private Thread runner; @@ -109,6 +113,22 @@ public class NepheleMiniCluster { this.visualizerEnabled = visualizerEnabled; } + public boolean isDefaultOverwriteFiles() { + return defaultOverwriteFiles; + } + + public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) { + this.defaultOverwriteFiles = defaultOverwriteFiles; + } + + public boolean isDefaultAlwaysCreateDirectory() { + return defaultAlwaysCreateDirectory; + } + + public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) { + this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory; + } + // ------------------------------------------------------------------------ // Life cycle and Job Submission @@ -128,7 +148,7 @@ public class NepheleMiniCluster { GlobalConfiguration.loadConfiguration(configDir); } else { Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort, - taskManagerDataPort, hdfsConfigFile, visualizerEnabled); + taskManagerDataPort, hdfsConfigFile, visualizerEnabled, defaultOverwriteFiles, defaultAlwaysCreateDirectory); GlobalConfiguration.includeConfiguration(conf); } @@ -188,7 +208,8 @@ public class NepheleMiniCluster { } public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort, - int taskManagerDataPort, String hdfsConfigFile, boolean visualization) + int taskManagerDataPort, String hdfsConfigFile, boolean visualization, + boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory) { final Configuration config = new Configuration(); @@ -209,8 +230,13 @@ public class NepheleMiniCluster { // hdfs if (hdfsConfigFile != null) { - config.setString("fs.hdfs.hdfsdefault", hdfsConfigFile); + config.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, hdfsConfigFile); } + + // file system behavior + config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, defaultOverwriteFiles); + config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, defaultAlwaysCreateDirectory); + return config; } } \ No newline at end of file diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java index 9ec802b797de4e5ade3ba7206490f27da2bbcaed..967a59e757340e1860eb6f29dbc9e70dde06d91a 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java @@ -13,63 +13,73 @@ package eu.stratosphere.api.common.io; - import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.stratosphere.api.common.operators.FileDataSink; +import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.core.fs.FSDataOutputStream; import eu.stratosphere.core.fs.FileSystem; import eu.stratosphere.core.fs.FileSystem.WriteMode; import eu.stratosphere.core.fs.Path; - /** * The abstract base class for all output formats that are file based. Contains the logic to open/close the target * file streams. */ public abstract class FileOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- /** - * The LOG for logging messages in this class. - */ - private static final Log LOG = LogFactory.getLog(FileOutputFormat.class); - - /** - * The key under which the name of the target path is stored in the configuration. + * Defines the behavior for creating output directories. + * */ - public static final String FILE_PARAMETER_KEY = "stratosphere.output.file"; + public static enum OutputDirectoryMode { + + /** A directory is always created, regardless of number of write tasks. */ + ALWAYS, + + /** A directory is only created for parallel output tasks, i.e., number of output tasks > 1. + * If number of output tasks = 1, the output is written to a single file. */ + PARONLY + } - /** - * The key under which the write mode is stored in the configuration - */ - public static final String WRITEMODE_PARAMETER_KEY = "stratosphere.output.writemode"; + // -------------------------------------------------------------------------------------------- + private static final WriteMode DEFAULT_WRITE_MODE; + + private static final OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE; + + + static { + final boolean overwrite = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, + ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE); + + DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.CREATE; + + final boolean alwaysCreateDirectory = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, + ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY); + + DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY; + } + + // -------------------------------------------------------------------------------------------- + /** - * Value keys for the write modes + * The LOG for logging messages in this class. */ - public static final String WRITEMODE_CREATE = "stratosphere.output.writemode.create"; - public static final String WRITEMODE_OVERWRITE = "stratosphere.output.writemode.overwrite"; + private static final Log LOG = LogFactory.getLog(FileOutputFormat.class); /** - * The key under which the output directory mode parameter is stored in the configuration - */ - public static final String OUT_DIRECTORY_PARAMETER_KEY = "stratosphere.output.directory"; - - /** - * Value keys for the output directory modes - */ - public static final String OUT_DIRECTORY_ALWAYS = "stratosphere.output.directory.always"; - public static final String OUT_DIRECTORY_PARONLY = "stratosphere.output.directory.paronly"; - - /** - * The config parameter for the opening timeout in milliseconds. + * The key under which the name of the target path is stored in the configuration. */ - public static final String OUTPUT_STREAM_OPEN_TIMEOUT_KEY = "stratosphere.output.file.timeout"; + public static final String FILE_PARAMETER_KEY = "stratosphere.output.file"; /** * The path of the file to be written. @@ -79,78 +89,58 @@ public abstract class FileOutputFormat implements OutputFormat { /** * The write mode of the output. */ - protected WriteMode writeMode; + private WriteMode writeMode; /** * The output directory mode */ - protected OutputDirectoryMode outDirMode; + private OutputDirectoryMode outputDirectoryMode; /** - * The stream to which the data is written; + * Stream opening timeout. */ - protected FSDataOutputStream stream; + private long openTimeout = -1; + + // -------------------------------------------------------------------------------------------- /** - * Stream opening timeout. + * The stream to which the data is written; */ - private long openTimeout; + protected transient FSDataOutputStream stream; // -------------------------------------------------------------------------------------------- - - /** - * Defines the behavior for creating output directories. - * - */ - public static enum OutputDirectoryMode { - ALWAYS, // A directory is always created, regardless of number of write tasks - PARONLY // A directory is only created for parallel output tasks, i.e., number of output tasks > 1. - // If number of output tasks = 1, the output is written to a single file. - } @Override public void configure(Configuration parameters) { - // get the file parameter - String filePath = parameters.getString(FILE_PARAMETER_KEY, null); - if (filePath == null) { - throw new IllegalArgumentException("Configuration file FileOutputFormat does not contain the file path."); - } - try { - this.outputFilePath = new Path(filePath); - } - catch (RuntimeException rex) { - throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); + // get the output file path, if it was not yet set + if (this.outputFilePath == null) { + // get the file parameter + String filePath = parameters.getString(FILE_PARAMETER_KEY, null); + if (filePath == null) { + throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" + + ", nor via the Configuration."); + } + + try { + this.outputFilePath = new Path(filePath); + } + catch (RuntimeException rex) { + throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage()); + } } - // get the write mode parameter - String writeModeParam = parameters.getString(WRITEMODE_PARAMETER_KEY, WRITEMODE_OVERWRITE); - if(writeModeParam.equals(WRITEMODE_OVERWRITE)) { - this.writeMode = WriteMode.OVERWRITE; - } else if(writeModeParam.equals(WRITEMODE_CREATE)) { - this.writeMode = WriteMode.CREATE; - } else { - throw new RuntimeException("Invalid write mode configuration: "+writeModeParam); + // check if have not been set and use the defaults in that case + if (this.writeMode == null) { + this.writeMode = DEFAULT_WRITE_MODE; } - // get the output directory parameter - String outDirParam = parameters.getString(OUT_DIRECTORY_PARAMETER_KEY, OUT_DIRECTORY_PARONLY); - if(outDirParam.equals(OUT_DIRECTORY_ALWAYS)) { - this.outDirMode = OutputDirectoryMode.ALWAYS; - } else if(outDirParam.equals(OUT_DIRECTORY_PARONLY)) { - this.outDirMode = OutputDirectoryMode.PARONLY; - } else { - throw new RuntimeException("Invalid output directory mode configuration: "+outDirParam); + if (this.outputDirectoryMode == null) { + this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE; } - // get timeout for stream opening - this.openTimeout = parameters.getLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, FileInputFormat.DEFAULT_OPENING_TIMEOUT); - if (this.openTimeout < 0) { + if (this.openTimeout == -1) { this.openTimeout = FileInputFormat.DEFAULT_OPENING_TIMEOUT; - if (LOG.isWarnEnabled()) - LOG.warn("Ignoring invalid parameter for stream opening timeout (requires a positive value or zero=infinite): " + this.openTimeout); - } else if (this.openTimeout == 0) { - this.openTimeout = Long.MAX_VALUE; } } @@ -158,7 +148,12 @@ public abstract class FileOutputFormat implements OutputFormat { @Override public void open(int taskNumber, int numTasks) throws IOException { - // obtain FSDataOutputStream asynchronously, since HDFS client can not handle InterruptedExceptions + + if (LOG.isDebugEnabled()) + LOG.debug("Openint stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode + + ", OutputDirectoryMode=" + outputDirectoryMode + ", timeout=" + openTimeout); + + // obtain FSDataOutputStream asynchronously, since HDFS client is vulnerable to InterruptedExceptions OutputPathOpenThread opot = new OutputPathOpenThread(this, (taskNumber + 1), numTasks); opot.start(); @@ -182,16 +177,50 @@ public abstract class FileOutputFormat implements OutputFormat { } } + public void setOutputFilePath(Path path) { + if (path == null) + throw new IllegalArgumentException("Output file path may not be null."); + + this.outputFilePath = path; + } + public Path getOutputFilePath() { return this.outputFilePath; } + + + public void setWriteMode(WriteMode mode) { + if (mode == null) { + throw new NullPointerException(); + } + + this.writeMode = mode; + } public WriteMode getWriteMode() { return this.writeMode; } + - public OutputDirectoryMode getOutDirMode() { - return this.outDirMode; + public void setOutputDirectoryMode(OutputDirectoryMode mode) { + if (mode == null) { + throw new NullPointerException(); + } + + this.outputDirectoryMode = mode; + } + + public OutputDirectoryMode getOutputDirectoryMode() { + return this.outputDirectoryMode; + } + + + public void setOpenTimeout(long timeout) { + if (timeout < 0) { + throw new IllegalArgumentException("The timeout must be a nonnegative numer of milliseconds (zero for infinite)."); + } + + this.openTimeout = (timeout == 0) ? Long.MAX_VALUE : timeout; } public long getOpenTimeout() { @@ -224,7 +253,7 @@ public abstract class FileOutputFormat implements OutputFormat { public OutputPathOpenThread(FileOutputFormat fof, int taskIndex, int numTasks) { this.path = fof.getOutputFilePath(); this.writeMode = fof.getWriteMode(); - this.outDirMode = fof.getOutDirMode(); + this.outDirMode = fof.getOutputDirectoryMode(); this.timeoutMillies = fof.getOpenTimeout(); this.taskIndex = taskIndex; this.numTasks = numTasks; @@ -232,7 +261,7 @@ public abstract class FileOutputFormat implements OutputFormat { @Override public void run() { - + try { Path p = this.path; final FileSystem fs = p.getFileSystem(); @@ -381,22 +410,6 @@ public abstract class FileOutputFormat implements OutputFormat { protected AbstractConfigBuilder(Configuration targetConfig) { this.config = targetConfig; } - - // -------------------------------------------------------------------- - - /** - * Sets the timeout after which the output format will abort the opening of the output stream, - * if the stream has not responded until then. - * - * @param timeoutInMillies The timeout, in milliseconds, or 0 for infinite. - * @return The builder itself. - */ - public T openingTimeout(int timeoutInMillies) { - this.config.setLong(OUTPUT_STREAM_OPEN_TIMEOUT_KEY, timeoutInMillies); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } } /** diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java index 6ede898b8f30110eb495a2ec7f5216b280cfa973..103e3eae995b5c17d8aa9ffa3fa02aa1e9349f5c 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java @@ -145,13 +145,15 @@ public class FormatUtil { * if an I/O error occurred while accessing the file or initializing the OutputFormat. */ public static > F openOutput( - Class outputFormatClass, String path, Configuration configuration) throws IOException { + Class outputFormatClass, String path, Configuration configuration) + throws IOException + { final F outputFormat = ReflectionUtil.newInstance(outputFormatClass); - + outputFormat.setOutputFilePath(new Path(path)); + outputFormat.setOpenTimeout(0); + configuration = configuration == null ? new Configuration() : configuration; - - configuration.setString(FileOutputFormat.FILE_PARAMETER_KEY, path); - configuration.setLong(FileOutputFormat.OUTPUT_STREAM_OPEN_TIMEOUT_KEY, 0); + outputFormat.configure(configuration); outputFormat.open(0, 1); return outputFormat; diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/CollectionDataSource.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/CollectionDataSource.java index 0683308c43f64e2057cf2dff60758202ff65483d..3d8228dd10cb9347af7da0ecd0de6751e2773e76 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/CollectionDataSource.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/CollectionDataSource.java @@ -104,7 +104,8 @@ public class CollectionDataSource extends GenericDataSource)data[0]); diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java index 8f846bd5b6d1fe8909ee2fbf771175f992939c2a..f9a26d25c02bfe7be50819ec3d6a6c0862bfb9bb 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java @@ -142,7 +142,19 @@ public final class ConfigConstants { * Path to Hadoop configuration */ public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf"; + + + // ------------------------ File System Bahavior ------------------------ + + /** + * Key to specify whether the file systems should simply overwrite existing files. + */ + public static final String FILESYSTEM_DEFAULT_OVERWRITE_KEY = "fs.overwrite-files"; + /** + * Key to specify whether the file systems should always create a directory for the output, even with a parallelism of one. + */ + public static final String FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY = "fs.output.always-create-directory"; // ---------------------------- Compiler ------------------------------- @@ -312,6 +324,19 @@ public final class ConfigConstants { public static final boolean DEFAULT_USE_MULTICAST_FOR_BROADCAST = false; + // ------------------------ File System Bahavior ------------------------ + + /** + * The default behavior with respect to overwriting existing files (= not overwrite) + */ + public static final boolean DEFAULT_FILESYSTEM_OVERWRITE = false; + + /** + * The default behavior for output directory creating (create only directory when parallelism > 1). + */ + public static final boolean DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY = false; + + // ---------------------------- Compiler ------------------------------- /** diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java index 41bbeef1719225e580ca45d039ede1e41dc9ce89..3a0e0ff088c50db064d619807a79af2bcf10d4fe 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java @@ -43,9 +43,8 @@ public abstract class FileSystem { private static final String S3_FILESYSTEM_CLASS = "eu.stratosphere.runtime.fs.s3.S3FileSystem"; - /** - * Object used to protect calls to specific methods. - */ + + /** Object used to protect calls to specific methods.*/ private static final Object SYNCHRONIZATION_OBJECT = new Object(); /** @@ -53,14 +52,16 @@ public abstract class FileSystem { * */ public static enum WriteMode { - CREATE, // creates write path if it does not exist. Does not overwrite existing files and directories. - OVERWRITE // creates write path if it does not exist. Overwrites existing files and directories. + + /** Creates write path if it does not exist. Does not overwrite existing files and directories. */ + CREATE, + + /** creates write path if it does not exist. Overwrites existing files and directories. */ + OVERWRITE } /** - * An auxiliary class to identify a file system by its scheme - * and its authority. - * + * An auxiliary class to identify a file system by its scheme and its authority. */ public static class FSKey { @@ -149,7 +150,6 @@ public abstract class FileSystem { private static final Map FSDIRECTORY = new HashMap(); static { - // TODO: Use configuration to retrieve this mapping FSDIRECTORY.put("hdfs", DISTRIBUTED_FILESYSTEM_CLASS); FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS); FSDIRECTORY.put("s3", S3_FILESYSTEM_CLASS); @@ -219,10 +219,12 @@ public abstract class FileSystem { try { fs = fsClass.newInstance(); - } catch (InstantiationException e) { - throw new IOException(StringUtils.stringifyException(e)); - } catch (IllegalAccessException e) { - throw new IOException(StringUtils.stringifyException(e)); + } + catch (InstantiationException e) { + throw new IOException("Could not instantiate file system class: " + e.getMessage(), e); + } + catch (IllegalAccessException e) { + throw new IOException("Could not instantiate file system class: " + e.getMessage(), e); } // Initialize new file system object diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/FileOutputFormatTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/FileOutputFormatTest.java index 27eda6f71ea8e6aa133810052a5a5a2ea2a263d5..6543845ff15dc4297235f98ba60af355954b763f 100644 --- a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/FileOutputFormatTest.java +++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/FileOutputFormatTest.java @@ -8,7 +8,10 @@ import junit.framework.Assert; import org.junit.BeforeClass; import org.junit.Test; +import eu.stratosphere.api.common.io.FileOutputFormat.OutputDirectoryMode; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.FileSystem.WriteMode; +import eu.stratosphere.core.fs.Path; import eu.stratosphere.types.IntValue; import eu.stratosphere.util.LogUtils; @@ -32,15 +35,14 @@ public class FileOutputFormatTest { } String tmpFilePath = tmpOutPath.toURI().toString(); - - Configuration config = new Configuration(); - config.setString(FileOutputFormat.FILE_PARAMETER_KEY, tmpFilePath); - config.setString(FileOutputFormat.WRITEMODE_PARAMETER_KEY, FileOutputFormat.WRITEMODE_CREATE); - config.setString(FileOutputFormat.OUT_DIRECTORY_PARAMETER_KEY, FileOutputFormat.OUT_DIRECTORY_PARONLY); // check fail if file exists DummyFileOutputFormat dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); boolean exception = false; try { @@ -56,7 +58,11 @@ public class FileOutputFormatTest { Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir()); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -71,7 +77,11 @@ public class FileOutputFormatTest { tmpOutPath.delete(); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -85,11 +95,13 @@ public class FileOutputFormatTest { // ----------- test again with always directory mode - config.setString(FileOutputFormat.OUT_DIRECTORY_PARAMETER_KEY, FileOutputFormat.OUT_DIRECTORY_ALWAYS); - // check fail if file exists dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -105,7 +117,11 @@ public class FileOutputFormatTest { Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir()); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -120,7 +136,11 @@ public class FileOutputFormatTest { // check fail if file in directory exists dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -137,7 +157,11 @@ public class FileOutputFormatTest { tmpOutPath.delete(); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -169,14 +193,14 @@ public class FileOutputFormatTest { } String tmpFilePath = tmpOutPath.toURI().toString(); - - Configuration config = new Configuration(); - config.setString(FileOutputFormat.FILE_PARAMETER_KEY, tmpFilePath); - config.setString(FileOutputFormat.WRITEMODE_PARAMETER_KEY, FileOutputFormat.WRITEMODE_CREATE); // check fail if file exists DummyFileOutputFormat dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); boolean exception = false; try { @@ -192,7 +216,11 @@ public class FileOutputFormatTest { Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir()); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -207,7 +235,11 @@ public class FileOutputFormatTest { // check fail if file in directory exists dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -224,7 +256,11 @@ public class FileOutputFormatTest { tmpOutPath.delete(); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.CREATE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -255,15 +291,14 @@ public class FileOutputFormatTest { } String tmpFilePath = tmpOutPath.toURI().toString(); - - Configuration config = new Configuration(); - config.setString(FileOutputFormat.FILE_PARAMETER_KEY, tmpFilePath); - config.setString(FileOutputFormat.WRITEMODE_PARAMETER_KEY, FileOutputFormat.WRITEMODE_OVERWRITE); - config.setString(FileOutputFormat.OUT_DIRECTORY_PARAMETER_KEY, FileOutputFormat.OUT_DIRECTORY_PARONLY); // check success if file exists DummyFileOutputFormat dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); boolean exception = false; try { @@ -280,7 +315,11 @@ public class FileOutputFormatTest { Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir()); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -296,7 +335,11 @@ public class FileOutputFormatTest { tmpOutPath.delete(); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -310,11 +353,13 @@ public class FileOutputFormatTest { // ----------- test again with always directory mode - config.setString(FileOutputFormat.OUT_DIRECTORY_PARAMETER_KEY, FileOutputFormat.OUT_DIRECTORY_ALWAYS); - // check success if file exists dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -333,7 +378,11 @@ public class FileOutputFormatTest { Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir()); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -348,7 +397,11 @@ public class FileOutputFormatTest { // check success if file in directory exists dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -367,7 +420,11 @@ public class FileOutputFormatTest { tmpOutPath.delete(); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS); + + dfof.configure(new Configuration()); exception = false; try { @@ -399,14 +456,14 @@ public class FileOutputFormatTest { } String tmpFilePath = tmpOutPath.toURI().toString(); - - Configuration config = new Configuration(); - config.setString(FileOutputFormat.FILE_PARAMETER_KEY, tmpFilePath); - config.setString(FileOutputFormat.WRITEMODE_PARAMETER_KEY, FileOutputFormat.WRITEMODE_OVERWRITE); // check success if file exists DummyFileOutputFormat dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); boolean exception = false; try { @@ -425,7 +482,11 @@ public class FileOutputFormatTest { Assert.assertTrue("Directory could not be created.", tmpOutPath.mkdir()); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -440,7 +501,11 @@ public class FileOutputFormatTest { // check success if file in directory exists dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -459,7 +524,11 @@ public class FileOutputFormatTest { tmpOutPath.delete(); dfof = new DummyFileOutputFormat(); - dfof.configure(config); + dfof.setOutputFilePath(new Path(tmpFilePath)); + dfof.setWriteMode(WriteMode.OVERWRITE); + dfof.setOutputDirectoryMode(OutputDirectoryMode.PARONLY); + + dfof.configure(new Configuration()); exception = false; try { @@ -487,7 +556,6 @@ public class FileOutputFormatTest { public void writeRecord(IntValue record) throws IOException { // DO NOTHING } - } } diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/record/io/CsvOutputFormatTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/record/io/CsvOutputFormatTest.java index 33b8c042b2160fc60c37db8b8722f51a4eb30b7e..97ef789186b903315c708c6fddef2a209eba643c 100644 --- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/record/io/CsvOutputFormatTest.java +++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/record/io/CsvOutputFormatTest.java @@ -16,7 +16,6 @@ package eu.stratosphere.api.java.record.io; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.MockitoAnnotations.initMocks; import java.io.BufferedReader; import java.io.File; @@ -28,17 +27,16 @@ import junit.framework.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import eu.stratosphere.api.java.record.io.CsvOutputFormat; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.Path; import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Record; import eu.stratosphere.types.StringValue; public class CsvOutputFormatTest { - @Mock protected Configuration config; protected File tempFile; @@ -49,8 +47,8 @@ public class CsvOutputFormatTest { @Before public void setup() throws IOException { - initMocks(this); this.tempFile = File.createTempFile("test_output", "tmp"); + this.format.setOutputFilePath(new Path(tempFile.toURI())); } @After @@ -68,7 +66,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); // check missing number of fields boolean validConfig = true; @@ -166,7 +163,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); @@ -214,7 +210,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); @@ -261,7 +256,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); @@ -310,7 +304,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); @@ -362,7 +355,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); @@ -414,7 +406,6 @@ public class CsvOutputFormatTest { { try { Configuration config = new Configuration(); - config.setString(CsvOutputFormat.FILE_PARAMETER_KEY, this.tempFile.toURI().toString()); config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|"); config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2); config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, StringValue.class); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java index 4735e17d065df47273f4db7a1a74b87d8760fe84..80d19c8025f7c6449c1a78f9280100cca6d0a345 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java @@ -360,29 +360,18 @@ public class DataSinkTask extends AbstractOutputTask @Override - public int getMaximumNumberOfSubtasks() - { + public int getMaximumNumberOfSubtasks() { if (!(this.format instanceof FileOutputFormat)) { return -1; } - // ----------------- This code applies only to file inputs ------------------ - - final String pathName = this.config.getStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, null); - final WriteMode writeMode = ((FileOutputFormat)this.format).getWriteMode(); - final OutputDirectoryMode outDirMode = ((FileOutputFormat)this.format).getOutDirMode(); - final Path path; + final FileOutputFormat fileOutputFormat = (FileOutputFormat) this.format; - if (pathName == null) { - return 0; - } + // ----------------- This code applies only to file inputs ------------------ - try { - path = new Path(pathName); - } - catch (Throwable t) { - return 0; - } + final Path path = fileOutputFormat.getOutputFilePath(); + final WriteMode writeMode = fileOutputFormat.getWriteMode(); + final OutputDirectoryMode outDirMode = fileOutputFormat.getOutputDirectoryMode(); // Prepare output path and determine max DOP try { diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java index 05fbf296c194ea86d8e54c74cd9cce22bf3fab3d..9bc696f2c74b082f04379f6b789718dffd199f01 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java @@ -24,6 +24,7 @@ import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; import eu.stratosphere.api.java.record.io.DelimitedInputFormat; import eu.stratosphere.api.java.record.io.FileOutputFormat; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.Path; import eu.stratosphere.nephele.services.memorymanager.MemoryManager; import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.nephele.template.AbstractOutputTask; @@ -36,6 +37,7 @@ import eu.stratosphere.pact.runtime.task.PactDriver; import eu.stratosphere.pact.runtime.task.RegularPactTask; import eu.stratosphere.pact.runtime.task.util.TaskConfig; import eu.stratosphere.types.Record; +import eu.stratosphere.util.InstantiationUtil; import eu.stratosphere.util.MutableObjectIterator; public abstract class TaskTestBase { @@ -93,13 +95,17 @@ public abstract class TaskTestBase { task.registerInputOutput(); } - public void registerFileOutputTask(AbstractOutputTask outTask, - Class stubClass, String outPath) + public void registerFileOutputTask(AbstractOutputTask outTask, Class stubClass, String outPath) { + registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath); + } + + public void registerFileOutputTask(AbstractOutputTask outTask, FileOutputFormat outputFormat, String outPath) { TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration()); + + outputFormat.setOutputFilePath(new Path(outPath)); - dsConfig.setStubWrapper(new UserCodeClassWrapper(stubClass)); - dsConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outPath); + dsConfig.setStubWrapper(new UserCodeObjectWrapper(outputFormat)); outTask.setEnvironment(this.mockEnv); diff --git a/stratosphere-tests/src/main/java/eu/stratosphere/test/util/TestBase2.java b/stratosphere-tests/src/main/java/eu/stratosphere/test/util/TestBase2.java index 5a325b2d8e03c2a2357dbf8c427b36110c78a2e5..c7a7fac7cf3ede74d87160fc15f4734a31d72beb 100644 --- a/stratosphere-tests/src/main/java/eu/stratosphere/test/util/TestBase2.java +++ b/stratosphere-tests/src/main/java/eu/stratosphere/test/util/TestBase2.java @@ -91,6 +91,8 @@ public abstract class TestBase2 { @Before public void startCluster() throws Exception { this.executor = new NepheleMiniCluster(); + this.executor.setDefaultOverwriteFiles(true); + this.executor.start(); } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java index bfbcc0add1a390f2ad7bb392a7ddd22e4568579f..a96b4efc29e0e4dd51cb280093bec487909b4cbf 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/BroadcastVarsNepheleITCase.java @@ -22,13 +22,14 @@ import java.util.regex.Pattern; import org.junit.Assert; -import eu.stratosphere.api.common.io.FileOutputFormat; import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper; +import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; import eu.stratosphere.api.common.typeutils.TypeSerializerFactory; import eu.stratosphere.api.java.record.functions.MapFunction; import eu.stratosphere.api.java.record.io.CsvInputFormat; import eu.stratosphere.api.java.record.io.CsvOutputFormat; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.Path; import eu.stratosphere.nephele.io.DistributionPattern; import eu.stratosphere.nephele.io.channels.ChannelType; import eu.stratosphere.nephele.jobgraph.JobGraph; @@ -278,19 +279,11 @@ public class BroadcastVarsNepheleITCase extends TestBase2 { taskConfig.addInputToGroup(0); taskConfig.setInputSerializer(serializer, 0); - taskConfig.setStubWrapper(new UserCodeClassWrapper(CsvOutputFormat.class)); - taskConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, resultPath); - - Configuration stubConfig = taskConfig.getStubParameters(); - stubConfig.setString(CsvOutputFormat.RECORD_DELIMITER_PARAMETER, "\n"); - stubConfig.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, " "); - stubConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, LongValue.class); - stubConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0); - stubConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, LongValue.class); - stubConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1); - stubConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, LongValue.class); - stubConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 2); - stubConfig.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 3); + @SuppressWarnings("unchecked") + CsvOutputFormat outFormat = new CsvOutputFormat("\n", " ", LongValue.class, LongValue.class, LongValue.class); + outFormat.setOutputFilePath(new Path(resultPath)); + + taskConfig.setStubWrapper(new UserCodeObjectWrapper(outFormat)); } return output; diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java index 5d72720388ec7c3f1ccd8ee3aa4fdbd1a4553083..6be110df1afa8d27bf89719e707a01001600a790 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/broadcastvars/KMeansIterativeNepheleITCase.java @@ -14,12 +14,12 @@ **********************************************************************************************************************/ package eu.stratosphere.test.broadcastvars; -import eu.stratosphere.api.common.io.FileOutputFormat; import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; import eu.stratosphere.api.common.typeutils.TypeComparatorFactory; import eu.stratosphere.api.common.typeutils.TypeSerializerFactory; import eu.stratosphere.api.java.record.io.CsvInputFormat; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.Path; import eu.stratosphere.example.java.record.kmeans.KMeans.PointBuilder; import eu.stratosphere.example.java.record.kmeans.KMeans.PointOutFormat; import eu.stratosphere.example.java.record.kmeans.KMeans.RecomputeClusterCenter; @@ -134,8 +134,10 @@ public class KMeansIterativeNepheleITCase extends TestBase2 { taskConfig.addInputToGroup(0); taskConfig.setInputSerializer(serializer, 0); - taskConfig.setStubWrapper(new UserCodeObjectWrapper(new PointOutFormat())); - taskConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, resultPath); + PointOutFormat outFormat = new PointOutFormat(); + outFormat.setOutputFilePath(new Path(resultPath)); + + taskConfig.setStubWrapper(new UserCodeObjectWrapper(outFormat)); } return output; diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/minicluster/LocalClusterProvider.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/minicluster/LocalClusterProvider.java index 8af833d7bbfa0c86e7d60fd1a28924417eb7edae..344fbfd14c2895b317d2d431c3cedade1396fb4e 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/minicluster/LocalClusterProvider.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/minicluster/LocalClusterProvider.java @@ -65,6 +65,7 @@ public class LocalClusterProvider extends ClusterProvider { } this.nephele = new NepheleMiniCluster(); + this.nephele.setDefaultOverwriteFiles(true); this.nephele.start(); this.nepheleRunning = true; }