提交 5a81d9de 编写于 作者: R Robert Metzger

Expose WriteMode in JAPI. #781

Pass Configuration parameters to runtime operators in JAPI (+test)
上级 1fff5137
......@@ -320,6 +320,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
......
......@@ -120,7 +120,6 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
this.outputFilePath = outputPath;
}
public void setOutputFilePath(Path path) {
if (path == null) {
throw new IllegalArgumentException("Output file path may not be null.");
......
......@@ -630,6 +630,22 @@ public abstract class DataSet<T> {
return output(new TextOutputFormat<T>(new Path(filePath)));
}
/**
* Writes a DataSet as a text file to the specified location.<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
*
* @param filePath The path pointing to the location the text file is written to.
* @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE.
* @return The DataSink that writes the DataSet.
*
* @see TextOutputFormat
*/
public DataSink<T> writeAsText(String filePath, WriteMode writeMode) {
TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(filePath));
tof.setWriteMode(writeMode);
return output(tof);
}
/**
* Writes a {@link Tuple} DataSet as a CSV file to the specified location.<br/>
* <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br/>
......@@ -660,13 +676,34 @@ public abstract class DataSet<T> {
* @see CsvOutputFormat
*/
public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter) {
Validate.isTrue(this.type.isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter);
return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, null);
}
/**
* Writes a {@link Tuple} DataSet as a CSV file to the specified location with the specified field and line delimiters.<br/>
* <b>Note: Only a Tuple DataSet can written as a CSV file.</b><br/>
* For each Tuple field the result of {@link Object#toString()} is written.
*
* @param filePath The path pointing to the location the CSV file is written to.
* @param rowDelimiter The row delimiter to separate Tuples.
* @param fieldDelimiter The field delimiter to separate Tuple fields.
* @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE.
*
* @see Tuple
* @see CsvOutputFormat
*/
public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) {
return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode);
}
@SuppressWarnings("unchecked")
private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter) {
return output((OutputFormat<T>) new CsvOutputFormat<X>(filePath, rowDelimiter, fieldDelimiter));
private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
Validate.isTrue(this.type.isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(filePath, rowDelimiter, fieldDelimiter);
if(wm != null) {
of.setWriteMode(wm);
}
return output((OutputFormat<T>) of);
}
/**
......
......@@ -26,6 +26,7 @@ import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.operators.translation.JavaPlan;
import eu.stratosphere.api.java.operators.translation.PlanBulkIterationOperator;
import eu.stratosphere.api.java.operators.translation.PlanDeltaIterationOperator;
import eu.stratosphere.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -84,6 +85,13 @@ public class OperatorTranslation {
Operator input = translate(op.getInput());
// translate the operation itself and connect it to the input
dataFlowOp = op.translateToDataFlow(input);
if(dataSet instanceof UdfOperator<?> ) {
Configuration opParams = ((UdfOperator<?>) op).getParameters();
if(opParams != null) {
dataFlowOp.getParameters().addAll(opParams);
}
}
}
else if (dataSet instanceof TwoInputOperator) {
TwoInputOperator<?, ?, ?, ?> op = (TwoInputOperator<?, ?, ?, ?>) dataSet;
......@@ -94,6 +102,14 @@ public class OperatorTranslation {
// translate the operation itself and connect it to the inputs
dataFlowOp = op.translateToDataFlow(input1, input2);
// set configuration params
if(dataSet instanceof UdfOperator<?> ) {
Configuration opParams = ((UdfOperator<?>) op).getParameters();
if(opParams != null) {
dataFlowOp.getParameters().addAll(opParams);
}
}
}
else if (dataSet instanceof BulkIterationResultSet<?>) {
dataFlowOp = translateBulkIteration((BulkIterationResultSet<?>) dataSet);
......
......@@ -19,6 +19,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import junit.framework.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
......@@ -35,7 +37,7 @@ import eu.stratosphere.test.util.JavaProgramTestBase;
@RunWith(Parameterized.class)
public class MapITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 8;
private static int NUM_PROGRAMS = 9;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
......@@ -439,6 +441,43 @@ public class MapITCase extends JavaProgramTestBase {
"55,6,Comment#14\n" +
"55,6,Comment#15\n";
}
case 9: {
/*
* Test passing configuration object.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
Configuration conf = new Configuration();
final String testKey = "testVariable";
final int testValue = 666;
conf.setInteger(testKey, testValue);
DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
@Override
public void open(Configuration config) {
int val = config.getInteger(testKey, -1);
Assert.assertEquals(testValue, val);
}
@Override
public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
throws Exception {
return value;
}
}).withParameters(conf);
bcMapDs.writeAsCsv(resultPath);
env.execute();
// return expected result
return "1,1,Hi\n"
+ "2,2,Hello\n"
+ "3,2,Hello world";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册