提交 1fff5137 编写于 作者: S StephanEwen

Add javaDoc comments to ExecutionEnvironment (final), DataSet (parts),...

Add javaDoc comments to ExecutionEnvironment (final), DataSet (parts), InputTypeConfigurable, and ResultTypeQueryable.
上级 98001d92
......@@ -582,6 +582,13 @@ public abstract class DataSet<T> {
// -------------------------------------------------------------------------------------------
/**
* Runs a {@link CustomUnaryOperation} on the data set. Custom operations are typically complex
* operators that are composed of multiple steps.
*
* @param operation The operation to run.
* @return The data set produced by the operation.
*/
public <X> DataSet<X> runOperation(CustomUnaryOperation<T, X> operation) {
Validate.notNull(operation, "The custom operator must not be null.");
operation.setInput(this);
......@@ -684,6 +691,7 @@ public abstract class DataSet<T> {
/**
* Writes a DataSet using a {@link FileOutputFormat} to a specified location.
* This method adds a data sink to the program.
*
* @param outputFormat The FileOutputFormat to write the DataSet.
* @param filePath The path to the location where the DataSet is written.
......@@ -701,6 +709,7 @@ public abstract class DataSet<T> {
/**
* Writes a DataSet using a {@link FileOutputFormat} to a specified location.
* This method adds a data sink to the program.
*
* @param outputFormat The FileOutputFormat to write the DataSet.
* @param filePath The path to the location where the DataSet is written.
......@@ -720,10 +729,12 @@ public abstract class DataSet<T> {
}
/**
* Writes a DataSet using an {@link OutputFormat}.
* Processes a DataSet using an {@link OutputFormat}. This method adds a data sink to the program.
* Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks
* or transformations) at the same time.
*
* @param outputFormat The OutputFormat to write the DataSet.
* @return The DataSink that writes the DataSet.
* @param outputFormat The OutputFormat to process the DataSet.
* @return The DataSink that processes the DataSet.
*
* @see OutputFormat
* @see DataSink
......
......@@ -52,14 +52,23 @@ import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.NumberSequenceIterator;
import eu.stratosphere.util.SplittableIterator;
/**
* The ExecutionEnviroment is the context in which a program is executed. A
* {@link LocalEnvironment} will cause execution in the current JVM, a
* {@link RemoteEnvironment} will cause execution on a remote setup.
* <p>
* The environment provides methods to control the job execution (such as
* setting the parallelism) and to interact with the outside world (data access).
* The environment provides methods to control the job execution (such as setting the parallelism)
* and to interact with the outside world (data access).
* <p>
* Please note that the execution environment needs strong type information for the input and return types
* of all operations that are executed. This means that the environments needs to know that the return
* value of an operation is for example a Tuple of String and Integer.
* Because the Java compiler throws much of the generic type information away, most methods attempt to re-
* obtain that information using reflection. In certain cases, it may be necessary to manually supply that
* information to some of the methods.
*
* @see LocalEnvironment
* @see RemoteEnvironment
*/
public abstract class ExecutionEnvironment {
......@@ -229,16 +238,36 @@ public abstract class ExecutionEnvironment {
// ----------------------------------- CSV Input Format ---------------------------------------
public CsvReader readCsvFile(Path filePath) {
return new CsvReader(filePath, this);
}
/**
* Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
* define parameters and field types and will eventually produce the DataSet that corresponds to
* the read and parsed CSV input.
*
* @param filePath The path of the CSV file.
* @return A CsvReader that can be used to configure the CSV input.
*/
public CsvReader readCsvFile(String filePath) {
return new CsvReader(filePath, this);
}
// ----------------------------------- Generic Input Format ---------------------------------------
/**
* Generic method to create an input DataSet with in {@link InputFormat}. The DataSet will not be
* immediately created - instead, this method returns a DataSet that will be lazily created from
* the input format once the program is executed.
* <p>
* Since all data sets need specific information about their types, this method needs to determine
* the type of the data produced by the input format. It will attempt to determine the data type
* by reflection, unless the the input format implements the {@link ResultTypeQueryable} interface.
* In the latter case, this method will invoke the {@link ResultTypeQueryable#getProducedType()}
* method to determine data type produced by the input format.
*
* @param inputFormat The input format used to create the data set.
* @return A DataSet that represents the data created by the input format.
*
* @see #createInput(InputFormat, TypeInformation)
*/
public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.");
......@@ -257,7 +286,21 @@ public abstract class ExecutionEnvironment {
"Please specify the TypeInformation of the produced type explicitly.");
}
}
/**
* Generic method to create an input DataSet with in {@link InputFormat}. The DataSet will not be
* immediately created - instead, this method returns a DataSet that will be lazily created from
* the input format once the program is executed.
* <p>
* The data set is typed to the given TypeInformation. This method is intended for input formats that
* where the return type cannot be determined by reflection analysis, and that do not implement the
* {@link ResultTypeQueryable} interface.
*
* @param inputFormat The input format used to create the data set.
* @return A DataSet that represents the data created by the input format.
*
* @see #createInput(InputFormat)
*/
public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat, TypeInformation<X> producedType) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.");
......@@ -272,6 +315,24 @@ public abstract class ExecutionEnvironment {
// ----------------------------------- Collection ---------------------------------------
/**
* Creates a DataSet from the given non-empty collection. The type of the data set is that
* of the elements in the collection. The elements need to be serializable (as defined by
* {@link java.io.Serializable}), because the framework may move the elements into the cluster
* if needed.
* <p>
* The framework will try and determine the exact type from the collection elements.
* In case of generic elements, it may be necessary to manually supply the type information
* via {@link #fromCollection(Collection, TypeInformation)}.
* <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a degree of parallelism of one.
*
* @param data The collection of elements to create the data set from.
* @return A DataSet representing the given collection.
*
* @see #fromCollection(Collection, TypeInformation)
*/
public <X> DataSource<X> fromCollection(Collection<X> data) {
if (data == null) {
throw new IllegalArgumentException("The data must not be null.");
......@@ -285,17 +346,70 @@ public abstract class ExecutionEnvironment {
return fromCollection(data, TypeExtractor.getForObject(firstValue));
}
/**
* Creates a DataSet from the given non-empty collection. The type of the data set is that
* of the elements in the collection. The elements need to be serializable (as defined by
* {@link java.io.Serializable}), because the framework may move the elements into the cluster
* if needed.
* <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a degree of parallelism of one.
* <p>
* The returned DataSet is typed to the given TypeInformation.
*
* @param data The collection of elements to create the data set from.
* @param type The TypeInformation for the produced data set.
* @return A DataSet representing the given collection.
*
* @see #fromCollection(Collection)
*/
public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
CollectionInputFormat.checkCollection(data, type.getTypeClass());
return new DataSource<X>(this, new CollectionInputFormat<X>(data), type);
}
/**
* Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
* the actual execution happens, the type of data returned by the iterator must be given
* explicitly in the form of the type class (this is due to the fact that the Java compiler
* erases the generic type information).
* <p>
* The iterator must be serializable (as defined in {@link java.io.Serializable}), because the
* framework may move it to a remote environment, if needed.
* <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a degree of parallelism of one.
*
* @param data The collection of elements to create the data set from.
* @param type The class of the data produced by the iterator. Must not be a generic class.
* @return A DataSet representing the elements in the iterator.
*
* @see #fromCollection(Iterator, TypeInformation)
*/
public <X> DataSource<X> fromCollection(Iterator<X> data, Class<X> type) {
return fromCollection(data, TypeExtractor.getForClass(type));
}
/**
* Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
* the actual execution happens, the type of data returned by the iterator must be given
* explicitly in the form of the type information. This method is useful for cases where the type
* is generic. In that case, the type class (as given in {@link #fromCollection(Iterator, Class)}
* does not supply all type information.
* <p>
* The iterator must be serializable (as defined in {@link java.io.Serializable}), because the
* framework may move it to a remote environment, if needed.
* <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a degree of parallelism of one.
*
* @param data The collection of elements to create the data set from.
* @param type The TypeInformation for the produced data set.
* @return A DataSet representing the elements in the iterator.
*
* @see #fromCollection(Iterator, Class)
*/
public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
if (!(data instanceof Serializable)) {
throw new IllegalArgumentException("The iterator must be serializable.");
......@@ -310,9 +424,16 @@ public abstract class ExecutionEnvironment {
* for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
* Furthermore, the elements must be serializable (as defined in {@link java.io.Serializable}, because the
* execution environment may ship the elements into the cluster.
* <p>
* The framework will try and determine the exact type from the collection elements.
* In case of generic elements, it may be necessary to manually supply the type information
* via {@link #fromCollection(Collection, TypeInformation)}.
* <p>
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a degree of parallelism of one.
*
* @param data The elements to make up the data set.
* @return A data set representing the given list of elements.
* @return A DataSet representing the given list of elements.
*/
public <X> DataSource<X> fromElements(X... data) {
if (data == null) {
......@@ -326,16 +447,55 @@ public abstract class ExecutionEnvironment {
}
/**
* Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data source that returns the elements in the iterator.
* The iterator must be serializable (as defined in {@link java.io.Serializable}, because the
* execution environment may ship the elements into the cluster.
* <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data
* returned by the iterator must be given explicitly in the form of the type class (this is due to the
* fact that the Java compiler erases the generic type information).
*
* @param iterator The iterator that produces the elements of the data set.
* @param type The class of the data produced by the iterator. Must not be a generic class.
* @return A DataSet representing the elements in the iterator.
*
* @see #fromParallelCollection(SplittableIterator, TypeInformation)
*/
public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) {
return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
}
/**
* Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
* framework to create a parallel data source that returns the elements in the iterator.
* The iterator must be serializable (as defined in {@link java.io.Serializable}, because the
* execution environment may ship the elements into the cluster.
* <p>
* Because the iterator will remain unmodified until the actual execution happens, the type of data
* returned by the iterator must be given explicitly in the form of the type information.
* This method is useful for cases where the type is generic. In that case, the type class
* (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information.
*
* @param iterator The iterator that produces the elements of the data set.
* @param type The TypeInformation for the produced data set.
* @return A DataSet representing the elements in the iterator.
*
* @see #fromParallelCollection(SplittableIterator, Class)
*/
public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
return new DataSource<X>(this, new ParallelIteratorInputFormat<X>(iterator), type);
}
/**
* Creates a new data set that contains a sequence of numbers. The data set will be created in parallel,
* so there is no guarantee about the oder of the elements.
*
* @param from The number to start at (inclusive).
* @param to The number to stop at (inclusive).
* @return A DataSet, containing all number in the {@code [from, to]} interval.
*/
public DataSource<Long> generateSequence(long from, long to) {
return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO);
}
......@@ -344,28 +504,100 @@ public abstract class ExecutionEnvironment {
// Executing
// --------------------------------------------------------------------------------------------
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(eu.stratosphere.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(eu.stratosphere.api.common.io.OutputFormat)}.
* <p>
* The program execution will be logged and displayed with a generated default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute() throws Exception {
return execute(getDefaultName());
}
/**
* Triggers the program execution. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(eu.stratosphere.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(eu.stratosphere.api.common.io.OutputFormat)}.
* <p>
* The program execution will be logged and displayed with the given job name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception Thrown, if the program executions fails.
*/
public abstract JobExecutionResult execute(String jobName) throws Exception;
/**
* Creates the plan with which the system will execute the program, and returns it as
* a String using a JSON representation of the execution data flow graph.
*
* @return The execution plan of the program, as a JSON String.
* @throws Exception Thrown, if the compiler could not be instantiated, or the master could not
* be contacted to retrieve information relevant to the execution planning.
*/
public abstract String getExecutionPlan() throws Exception;
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files, or files in a distributed file system. The runtime will copy the files
* temporarily to a local cache, if needed.
* <p>
* The {@link eu.stratosphere.api.common.functions.RuntimeContext} can be obtained inside UDFs via
* {@link eu.stratosphere.api.common.functions.Function#getRuntimeContext()} and provides access
* {@link eu.stratosphere.api.common.cache.DistributedCache} via
* {@link eu.stratosphere.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
this.cacheFile.add(new Tuple2<String, String>(filePath, name));
}
/**
* Registers all files that were registered at this execution environment's cache registry of the
* given plan's cache registry.
*
* @param p The plan to register files at.
* @throws IOException Thrown if checks for existence and sanity fail.
*/
protected void registerCachedFilesWithPlan(Plan p) throws IOException {
for (Tuple2<String, String> entry : cacheFile) {
p.registerCachedFile(entry.f0, entry.f1);
}
}
/**
* Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
* and operations and how they interact, as an isolated unit that can be executed with a
* {@link eu.stratosphere.api.common.PlanExecutor}. Obtaining a plan and starting it with an
* executor is an alternative way to run a program and is only possible if the program consists
* only of distributed operations.
*
* @return The program's plan.
*/
public JavaPlan createProgramPlan() {
return createProgramPlan(null);
}
/**
* Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
* and operations and how they interact, as an isolated unit that can be executed with a
* {@link eu.stratosphere.api.common.PlanExecutor}. Obtaining a plan and starting it with an
* executor is an alternative way to run a program and is only possible if the program consists
* only of distributed operations.
*
* @param jobName The name attached to the plan (displayed in logs and monitoring).
* @return The program's plan.
*/
public JavaPlan createProgramPlan(String jobName) {
if (this.sinks.isEmpty()) {
throw new RuntimeException("No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.");
......
......@@ -14,11 +14,19 @@
**********************************************************************************************************************/
package eu.stratosphere.api.java.typeutils;
/**
*
* {@link eu.stratosphere.api.common.io.OutputFormat}s can implement this interface to be configured
* with the data type they will operate on. The method {@link #setInputType(TypeInformation)} will be
* called when the output format is used with an output method such as
* {@link eu.stratosphere.api.java.DataSet#output(eu.stratosphere.api.common.io.OutputFormat)}.
*/
public interface InputTypeConfigurable {
/**
* Method that is called on an {@link eu.stratosphere.api.common.io.OutputFormat} when it is passed to
* the DataSet's output method. May be used to configures the output format based on the data type.
*
* @param type The data type of the input.
*/
void setInputType(TypeInformation<?> type);
}
......@@ -15,10 +15,17 @@
package eu.stratosphere.api.java.typeutils;
/**
* Interface to be implemented by functions and input formats who be queries for the
* {@link TypeInformation} of the result.
* This interface can be implemented by functions and input formats to tell the framework
* about their produced data type. This method acts as an alternative to the reflection analysis
* that is otherwise performed and is useful in situations where the produced data type may vary
* depending on parameterization.
*/
public interface ResultTypeQueryable<T> {
/**
* Gets the data type (as a {@link TypeInformation}) produced by this function or input format.
*
* @return The data type produced by this function or input format.
*/
TypeInformation<T> getProducedType();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册