提交 b96db729 编写于 作者: M mbalassi 提交者: Stephan Ewen

[streaming] IterativeDataStream Prototype

上级 5b05ca63
......@@ -33,7 +33,7 @@
<modules>
<module>stratosphere-streaming-core</module>
<module>stratosphere-streaming-examples</module>
<module>stratosphere-streaming-addons</module>
<module>stratosphere-streaming-connectors</module>
</modules>
<properties>
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.util.Collector;
//TODO:add link to ExecutionEnvironment
/**
* ExecutionEnvironment for streaming jobs. An instance of it is necessary to
* construct streaming topologies.
*
*/
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
private float clusterSize = 1;
/**
* General constructor specifying the batch size in which the tuples are
* transmitted and their timeout boundary.
*
* @param defaultBatchSize
* number of tuples in a batch
* @param defaultBatchTimeoutMillis
* timeout boundary in milliseconds
*/
public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) {
if (defaultBatchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
}
if (defaultBatchTimeoutMillis < 1) {
throw new IllegalArgumentException("Batch timeout must be positive.");
}
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE,
defaultBatchSize, defaultBatchTimeoutMillis);
}
/**
* Constructor for transmitting tuples individually with a 1 second timeout.
*/
public StreamExecutionEnvironment() {
this(1, 1000);
}
/**
* Set the number of machines in the executing cluster. Used for setting
* task parallelism.
*
* @param clusterSize
* cluster size
* @return environment
*/
public StreamExecutionEnvironment setClusterSize(int clusterSize) {
this.clusterSize = clusterSize;
return this;
}
/**
* Partitioning strategy on the stream.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
/**
* Sets the batch size of the data stream in which the tuple are
* transmitted.
*
* @param inputStream
* input data stream
* @param <T>
* type of the input stream
*/
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i),
inputStream.batchSizes.get(i));
}
}
// TODO: Link to JobGraph & JobGraphBuilder
/**
* Internal function for assembling the underlying JobGraph of the job.
*
* @param inputStream
* input data stream
* @param outputID
* ID of the output
* @param <T>
* type of the input stream
*/
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
ConnectionType type = inputStream.ctypes.get(i);
String input = inputStream.connectIDs.get(i);
int param = inputStream.cparams.get(i);
switch (type) {
case SHUFFLE:
jobGraphBuilder.shuffleConnect(input, outputID);
break;
case BROADCAST:
jobGraphBuilder.broadcastConnect(input, outputID);
break;
case FIELD:
jobGraphBuilder.fieldsConnect(input, outputID, param);
break;
}
}
this.setBatchSize(inputStream);
}
// TODO: link to JobGraph, JobVertex
/**
* Internal function for passing the user defined functions to the JobGraph
* of the job.
*
* @param functionName
* name of the function
* @param inputStream
* input data stream
* @param function
* the user defined function
* @param functionInvokable
* the wrapping JobVertex instance
* @param parallelism
* number of parallel instances of the function
* @param <T>
* type of the input stream
* @param <R>
* type of the return stream
* @return the data stream constructed
*/
<T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable, int parallelism) {
DataStream<R> returnStream = new DataStream<R>(this, functionName);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
serializeToByteArray(function), parallelism,
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
/**
* Ads a sink to the data stream closing it.
*
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism,
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
/**
* Ads a sink to the data stream closing it.
*
* @param streamId
* the stream id that identifies the stream.
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(String streamId, DataStream<T> inputStream,
SinkFunction<T> sinkFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism,
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
/**
* Creates a new DataStream that contains a sequence of numbers.
*
* @param from
* The number to start at (inclusive).
* @param to
* The number to stop at (inclusive)
* @return A DataStrean, containing all number in the [from, to] interval.
*/
public DataStream<Tuple1<Long>> generateSequence(long from, long to) {
return addSource(new SequenceSource(from, to), 1);
}
/**
* Source Function used to generate the number sequence
*
*/
private static final class SequenceSource extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public SequenceSource(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
for (long i = from; i <= to; i++) {
outTuple.f0 = i;
collector.collect(outTuple);
}
}
}
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
* The sequence of elements must not be empty. Furthermore, the elements
* must be serializable (as defined in java.io.Serializable), because the
* execution environment may ship the elements into the cluster.
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data[0]), 1, 1);
return returnStream.copy();
}
/**
* Creates a DataStream from the given non-empty collection. The type of the
* DataStream is that of the elements in the collection. The elements need
* to be serializable (as defined by java.io.Serializable), because the
* framework may move the elements into the cluster if needed.
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1, 1);
return returnStream.copy();
}
/**
* SourceFunction created to use with fromElements and fromCollection
*
* @param <T>
* type of the returned stream
*/
private static class FromElementsSource<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsSource(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsSource(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
@Override
public void invoke(Collector<Tuple1<T>> collector) throws Exception {
for (T element : iterable) {
outTuple.f0 = element;
collector.collect(outTuple);
}
}
}
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
*
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
return addSink(inputStream, sinkFunction, 1);
}
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
* @param streamId
* the stream id that identifies the stream
* @param inputStream
* input data stream
* @param sinkFunction
* the user defined function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSink(String streamId, DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
return addSink(inputStream, sinkFunction, 1);
}
/**
* Dummy implementation of the SinkFunction writing every tuple to the
* standard output. Used for print.
*
* @param <IN>
* Input tuple type
*/
private static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(IN tuple) {
System.out.println(tuple);
}
}
/**
* Disk implementation of the SinkFunction writing every tuple to the
* local disk.
*
* @param <IN>
* Input tuple type
*/
private static final class DiskSink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
private String filename;
private BufferedWriter writer = null;
public DiskSink(String filename){
this.filename = filename;
}
@Override
public void invoke(IN tuple) {
try {
if (writer == null) {
writer = new BufferedWriter(new FileWriter(filename));
}
writer.write(tuple + "\n");
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* Prints the tuples of the data stream to the standard output.
*
* @param inputStream
* the input data stream
*
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new DummySink<T>());
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return returnStream;
}
/**
* Dump the tuples of the data stream to the local disk.
*
* @param inputStream
* the input data stream
* @param filename
* the name of the output file
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> dumpDisk(DataStream<T> inputStream, String filename){
DataStream<T> returnStream = addSink(inputStream, new DiskSink<T>(filename));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return returnStream;
}
// TODO: Link to JobGraph and ClusterUtil
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*/
public void execute() {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
public void executeCluster() {
ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123);
}
// TODO: Link to DataStream
/**
* Ads a data source thus opening a data stream.
*
* @param sourceFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism,
(int) Math.ceil(parallelism / clusterSize));
return returnStream.copy();
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
public DataStream<Tuple1<String>> readTextFile(String filePath) {
return addSource(new FileSourceFunction(filePath), 1);
}
public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
public DataStream<Tuple1<String>> readTextStream(String filePath) {
return addSource(new FileStreamFunction(filePath), 1);
}
public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
}
/**
* Converts object to byte array using default java serialization
*
* @param object
* Object to be serialized
* @return Serialized object
*/
private byte[] serializeToByteArray(Object object) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return baos.toByteArray();
}
// TODO: Add link to JobGraphBuilder
/**
* Getter of the JobGraphBuilder of the streaming job.
*
* @return jobgraph
*/
public JobGraphBuilder jobGB() {
return jobGraphBuilder;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.LinkedList;
public class StateManager implements Runnable, Serializable {
private static final long serialVersionUID = 1L;
private LinkedList<Object> stateList = new LinkedList<Object>();
private long checkpointInterval;
private String filename;
public StateManager(String filename, long checkpointIntervalMS) {
this.filename = filename;
this.checkpointInterval = checkpointIntervalMS;
}
public void registerState(Object state) {
stateList.add(state);
}
public void restoreState(){
ObjectInputStream ois = null;
try {
ois=new ObjectInputStream(new FileInputStream(filename));
} catch (Exception e) {
e.printStackTrace();
}
for (Object state : stateList){
try {
state= ois.readObject();
} catch (Exception e) {
e.printStackTrace();
}
}
}
//run checkpoint.
@Override
public void run() {
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(new FileOutputStream(filename));
} catch (Exception e) {
e.printStackTrace();
}
// take snapshot of every registered state.
while (true) {
try {
Thread.sleep(checkpointInterval);
for (Object state : stateList) {
oos.writeObject(state);
oos.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
......@@ -11,8 +11,8 @@
<relativePath>..</relativePath>
</parent>
<artifactId>stratosphere-streaming-addons</artifactId>
<name>stratosphere-streaming-addons</name>
<artifactId>stratosphere-streaming-connectors</artifactId>
<name>stratosphere-streaming-connectors</name>
<packaging>jar</packaging>
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.kafka;
package eu.stratosphere.streaming.connectors.kafka;
import java.io.BufferedReader;
import java.io.FileReader;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.kafka;
package eu.stratosphere.streaming.connectors.kafka;
import java.util.HashMap;
import java.util.List;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.kafka;
package eu.stratosphere.streaming.connectors.kafka;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.rabbitmq;
package eu.stratosphere.streaming.connectors.rabbitmq;
import java.io.IOException;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.rabbitmq;
package eu.stratosphere.streaming.connectors.rabbitmq;
import java.io.IOException;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.rabbitmq;
package eu.stratosphere.streaming.connectors.rabbitmq;
import static org.junit.Assert.*;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.addons.rabbitmq;
package eu.stratosphere.streaming.connectors.rabbitmq;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
......
......@@ -344,6 +344,11 @@ public class DataStream<T extends Tuple> {
return environment.print(this.copy());
}
public IterativeDataStream<T> iterate(){
environment.iterate();
return new IterativeDataStream<T>(environment);
}
/**
* Set the type parameter.
*
......
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
public class IterativeDataStream<T extends Tuple> {
private final StreamExecutionEnvironment environment;
public IterativeDataStream(StreamExecutionEnvironment environment){
this.environment = environment;
}
public DataStream<T> closeWith(DataStream<T> iterationResult) {
return environment.closeIteration(iterationResult.copy());
}
}
\ No newline at end of file
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -61,6 +62,8 @@ public class JobGraphBuilder {
protected Map<String, Integer> numberOfInstances;
protected Map<String, List<String>> edgeList;
protected Map<String, List<Class<? extends ChannelSelector<StreamRecord>>>> connectionTypes;
protected boolean iterationStart;
protected Stack<String> iterationStartPoints;
protected String maxParallelismVertexName;
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
......@@ -75,12 +78,14 @@ public class JobGraphBuilder {
* @param faultToleranceType
* Fault tolerance type
*/
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType) {
public JobGraphBuilder(String jobGraphName,
FaultToleranceType faultToleranceType) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
edgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<Class<? extends ChannelSelector<StreamRecord>>>>();
iterationStartPoints = new Stack<String>();
maxParallelismVertexName = "";
maxParallelism = 0;
if (log.isDebugEnabled()) {
......@@ -111,15 +116,16 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
public void setSource(String sourceName,
UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInvokableClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism);
setComponent(sourceName, source, InvokableObject, operatorName,
serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
......@@ -140,14 +146,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
public void setTask(String taskName,
public void setTask(
String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setInvokableClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism);
setComponent(taskName, task, TaskInvokableObject, operatorName,
serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
......@@ -168,12 +175,14 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
public void setSink(String sinkName,
UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setInvokableClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, parallelism);
setComponent(sinkName, sink, InvokableObject, operatorName,
serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
......@@ -199,18 +208,24 @@ public class JobGraphBuilder {
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/
private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism) {
private void setComponent(String componentName,
AbstractJobVertex component, Serializable InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
component.setNumberOfSubtasks(parallelism);
if (iterationStart) {
iterationStartPoints.push(componentName);
iterationStart = false;
}
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
maxParallelismVertexName = componentName;
}
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(component.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableObject.getClass());
config.setString("componentName", componentName);
config.setInteger("batchSize", batchSize);
......@@ -233,7 +248,8 @@ public class JobGraphBuilder {
* JobVertex configuration to which the serialized invokable will
* be added
*/
private void addSerializedObject(Serializable InvokableObject, Configuration config) {
private void addSerializedObject(Serializable InvokableObject,
Configuration config) {
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
......@@ -247,7 +263,8 @@ public class JobGraphBuilder {
config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) {
e.printStackTrace();
System.out.println("Serialization error " + InvokableObject.getClass());
System.out.println("Serialization error "
+ InvokableObject.getClass());
}
}
......@@ -263,7 +280,8 @@ public class JobGraphBuilder {
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
+ (components.get(componentName)
.getNumberOfForwardConnections() - 1), batchSize);
}
/**
......@@ -296,9 +314,12 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class);
log.info("Broadcastconnected: " + upStreamComponentName + " to " + downStreamComponentName);
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
BroadcastPartitioner.class);
log.info("Broadcastconnected: " + upStreamComponentName + " to "
+ downStreamComponentName);
}
/**
......@@ -316,52 +337,65 @@ public class JobGraphBuilder {
* @param keyPosition
* Position of key in the tuple
*/
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName,
int keyPosition) {
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
addToEdges(upStreamComponentName, downStreamComponentName, FieldsPartitioner.class);
addToEdges(upStreamComponentName, downStreamComponentName,
FieldsPartitioner.class);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
upStreamComponent.connectTo(downStreamComponent,
ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
"partitionerClass_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
FieldsPartitioner.class);
config.setInteger(
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
keyPosition);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
log.debug("CONNECTED: FIELD PARTITIONING - "
+ upStreamComponentName + " -> "
+ downStreamComponentName + ", KEY: " + keyPosition);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components by field: " + upStreamComponentName + " to "
log.error("Cannot connect components by field: "
+ upStreamComponentName + " to "
+ downStreamComponentName, e);
}
}
log.info("Fieldsconnected " + upStreamComponentName + " to " + downStreamComponentName
+ " on " + keyPosition);
log.info("Fieldsconnected " + upStreamComponentName + " to "
+ downStreamComponentName + " on " + keyPosition);
}
private void addToEdges(String upStreamComponentName, String downStreamComponentName,
Class<?> ctype) {
private void addToEdges(String upStreamComponentName,
String downStreamComponentName, Class<?> ctype) {
if (edgeList.containsKey(upStreamComponentName)) {
connectionTypes.get(upStreamComponentName).add(FieldsPartitioner.class);
connectionTypes.get(upStreamComponentName).add(
FieldsPartitioner.class);
edgeList.get(upStreamComponentName).add(downStreamComponentName);
} else {
connectionTypes.put(upStreamComponentName,
new ArrayList<Class<? extends ChannelSelector<StreamRecord>>>());
connectionTypes.get(upStreamComponentName).add(FieldsPartitioner.class);
connectionTypes
.put(upStreamComponentName,
new ArrayList<Class<? extends ChannelSelector<StreamRecord>>>());
connectionTypes.get(upStreamComponentName).add(
FieldsPartitioner.class);
edgeList.put(upStreamComponentName, new ArrayList<String>());
edgeList.get(upStreamComponentName).add(downStreamComponentName);
......@@ -379,9 +413,12 @@ public class JobGraphBuilder {
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
*/
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
GlobalPartitioner.class);
log.info("Globalconnected: " + upStreamComponentName + " to "
+ downStreamComponentName);
}
......@@ -396,9 +433,12 @@ public class JobGraphBuilder {
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
*/
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
public void shuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
ShufflePartitioner.class);
log.info("Shuffleconnected: " + upStreamComponentName + " to "
+ downStreamComponentName);
}
/**
......@@ -412,29 +452,40 @@ public class JobGraphBuilder {
* @param PartitionerClass
* Class of the partitioner
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
addToEdges(upStreamComponentName, downStreamComponentName, PartitionerClass);
addToEdges(upStreamComponentName, downStreamComponentName,
PartitionerClass);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
upStreamComponent.connectTo(downStreamComponent,
ChannelType.NETWORK);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
"partitionerClass_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
log.debug("CONNECTED: " + PartitionerClass.getSimpleName()
+ " - " + upStreamComponentName + " -> "
+ downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
log.error(
"Cannot connect components with "
+ PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> "
+ downStreamComponentName, e);
}
}
}
......@@ -451,7 +502,8 @@ public class JobGraphBuilder {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setString("operatorName",
fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
......@@ -476,11 +528,13 @@ public class JobGraphBuilder {
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
AbstractJobVertex maxParallelismVertex = components
.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
components.get(componentName).setVertexToShareInstancesWith(
maxParallelismVertex);
}
}
......@@ -516,9 +570,10 @@ public class JobGraphBuilder {
int i = 0;
for (Class<?> ctype : connectionTypes.get(componentName)) {
if (ctype.equals(FieldsPartitioner.class)) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("numOfOutputs_" + i,
numberOfInstances.get(edgeList.get(componentName).get(i)));
Configuration config = components.get(componentName)
.getConfiguration();
config.setInteger("numOfOutputs_" + i, numberOfInstances
.get(edgeList.get(componentName).get(i)));
}
i++;
}
......
......@@ -21,7 +21,6 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.client.program.ProgramInvocationException;
......@@ -29,19 +28,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.jobgraph.JobGraph;
/**
* An {@link StreamExecutionEnvironment} that sends programs
* to a cluster for execution. Note that all file paths used in the program must be accessible from the
* cluster. The execution will use the cluster's default degree of parallelism, unless the parallelism is
* set explicitly via {@link ExecutionEnvironment#setDegreeOfParallelism(int)}.
*
* @param host The host name or address of the master (JobManager), where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Log log = LogFactory.getLog(RemoteStreamEnvironment.class);
......@@ -74,7 +60,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
JobWithJars.checkJarFile(file);
jobGraph.addJar(new Path(file.getAbsolutePath()));
}
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(host, port), configuration);
......@@ -83,20 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
} catch (IOException e) {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
e.printStackTrace();
}
} catch (ProgramInvocationException e) {
if (log.isErrorEnabled()) {
log.error(e.getMessage());
e.printStackTrace();
}
}
}
@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - DOP = " +
(getDegreeOfParallelism() == -1 ? "default" : getDegreeOfParallelism()) + ")";
}
}
......@@ -54,7 +54,8 @@ public abstract class StreamExecutionEnvironment {
/** flag to disable local executor when using the ContextEnvironment */
private static boolean allowLocalExecution = true;
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private static int defaultLocalDop = Runtime.getRuntime()
.availableProcessors();
private int degreeOfParallelism = 1;
......@@ -77,11 +78,13 @@ public abstract class StreamExecutionEnvironment {
* Constructor for creating StreamExecutionEnvironment
*/
protected StreamExecutionEnvironment() {
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE);
jobGraphBuilder = new JobGraphBuilder("jobGraph",
FaultToleranceType.NONE);
}
public int getExecutionParallelism() {
return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
return executionParallelism == -1 ? degreeOfParallelism
: executionParallelism;
}
/**
......@@ -111,7 +114,8 @@ public abstract class StreamExecutionEnvironment {
*/
protected void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1)
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
throw new IllegalArgumentException(
"Degree of parallelism must be at least one.");
this.degreeOfParallelism = degreeOfParallelism;
}
......@@ -125,7 +129,8 @@ public abstract class StreamExecutionEnvironment {
*/
public void setExecutionParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1)
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
throw new IllegalArgumentException(
"Degree of parallelism must be at least one.");
this.executionParallelism = degreeOfParallelism;
}
......@@ -140,7 +145,8 @@ public abstract class StreamExecutionEnvironment {
public void setBatchTimeout(int timeout) {
if (timeout < 1) {
throw new IllegalArgumentException("Batch timeout must be positive.");
throw new IllegalArgumentException(
"Batch timeout must be positive.");
} else {
jobGraphBuilder.setBatchTimeout(timeout);
}
......@@ -164,7 +170,8 @@ public abstract class StreamExecutionEnvironment {
return addSource(new FileSourceFunction(filePath), 1);
}
public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
public DataStream<Tuple1<String>> readTextFile(String filePath,
int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
......@@ -182,7 +189,8 @@ public abstract class StreamExecutionEnvironment {
return addSource(new FileStreamFunction(filePath), 1);
}
public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
public DataStream<Tuple1<String>> readTextStream(String filePath,
int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
}
......@@ -200,10 +208,12 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this,
"elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsFunction<X>(data),
"elements", serializeToByteArray(data[0]), 1);
jobGraphBuilder.setSource(returnStream.getId(),
new FromElementsFunction<X>(data), "elements",
serializeToByteArray(data[0]), 1);
return returnStream.copy();
}
......@@ -221,10 +231,12 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this,
"elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsFunction<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1);
jobGraphBuilder.setSource(returnStream.getId(),
new FromElementsFunction<X>(data), "elements",
serializeToByteArray(data.toArray()[0]), 1);
return returnStream.copy();
}
......@@ -254,17 +266,18 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
public <T extends Tuple> DataStream<T> addSource(
SourceFunction<T> sourceFunction, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism);
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction,
"source", serializeToByteArray(sourceFunction), parallelism);
return returnStream.copy();
}
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
public <T extends Tuple> DataStream<T> addSource(
SourceFunction<T> sourceFunction) {
return addSource(sourceFunction, 1);
}
......@@ -290,13 +303,15 @@ public abstract class StreamExecutionEnvironment {
* type of the return stream
* @return the data stream constructed
*/
protected <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function,
protected <T extends Tuple, R extends Tuple> DataStream<R> addFunction(
String functionName, DataStream<T> inputStream,
final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable) {
DataStream<R> returnStream = new DataStream<R>(this, functionName);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
serializeToByteArray(function), degreeOfParallelism);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable,
functionName, serializeToByteArray(function),
degreeOfParallelism);
connectGraph(inputStream, returnStream.getId());
......@@ -315,12 +330,13 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return the data stream constructed
*/
protected <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
protected <T extends Tuple> DataStream<T> addSink(
DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), degreeOfParallelism);
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(
sinkFunction), "sink", serializeToByteArray(sinkFunction),
degreeOfParallelism);
connectGraph(inputStream, returnStream.getId());
......@@ -340,13 +356,25 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
protected <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new PrintSinkFunction<T>());
DataStream<T> returnStream = addSink(inputStream,
new PrintSinkFunction<T>());
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return returnStream;
}
// TODO iterative datastream
protected void iterate() {
jobGraphBuilder.iterationStart = true;
}
protected <T extends Tuple> DataStream<T> closeIteration(DataStream<T> inputStream){
connectGraph(inputStream, jobGraphBuilder.iterationStartPoints.pop());
return inputStream;
}
/**
* Internal function for assembling the underlying
* {@link eu.stratosphere.nephele.jobgraph.JobGraph} of the job. Connects
......@@ -360,7 +388,8 @@ public abstract class StreamExecutionEnvironment {
* @param <T>
* type of the input stream
*/
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
private <T extends Tuple> void connectGraph(DataStream<T> inputStream,
String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
ConnectionType type = inputStream.ctypes.get(i);
......@@ -409,7 +438,8 @@ public abstract class StreamExecutionEnvironment {
* @param <T>
* type of the operator
*/
protected <T extends Tuple> void setOperatorParallelism(DataStream<T> inputStream) {
protected <T extends Tuple> void setOperatorParallelism(
DataStream<T> inputStream) {
jobGraphBuilder.setParallelism(inputStream.getId(), inputStream.dop);
}
......@@ -447,7 +477,8 @@ public abstract class StreamExecutionEnvironment {
* executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
return contextEnvironment == null ? createLocalEnvironment()
: contextEnvironment;
}
/**
......@@ -475,7 +506,8 @@ public abstract class StreamExecutionEnvironment {
* @return A local execution environment with the specified degree of
* parallelism.
*/
public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
public static LocalStreamEnvironment createLocalEnvironment(
int degreeOfParallelism) {
LocalStreamEnvironment lee = new LocalStreamEnvironment();
lee.setDegreeOfParallelism(degreeOfParallelism);
return lee;
......@@ -502,9 +534,9 @@ public abstract class StreamExecutionEnvironment {
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
String... jarFiles) {
return new RemoteStreamEnvironment(host, port, jarFiles);
public static ExecutionEnvironment createRemoteEnvironment(String host,
int port, String... jarFiles) {
return new RemoteEnvironment(host, port, jarFiles);
}
/**
......@@ -528,9 +560,10 @@ public abstract class StreamExecutionEnvironment {
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
int degreeOfParallelism, String... jarFiles) {
RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles);
public static StreamExecutionEnvironment createRemoteEnvironment(
String host, int port, int degreeOfParallelism, String... jarFiles) {
RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port,
jarFiles);
rec.setDegreeOfParallelism(degreeOfParallelism);
return rec;
}
......@@ -540,7 +573,8 @@ public abstract class StreamExecutionEnvironment {
// packaged programs
// --------------------------------------------------------------------------------------------
protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
protected static void initializeContextEnvironment(
StreamExecutionEnvironment ctx) {
contextEnvironment = ctx;
}
......
......@@ -73,6 +73,7 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
try {
output.emit(streamRecord);
// TODO:Consider own flushing mechanism
output.flush();
} catch (Exception e) {
e.printStackTrace();
......
......@@ -15,12 +15,10 @@
package eu.stratosphere.streaming.api;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.util.Collector;
......@@ -32,9 +30,10 @@ public class PrintTest {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple2<Integer, String> value, Collector<Tuple2<Integer, String>> out)
throws Exception {
out.collect(new Tuple2<Integer, String>(value.f0 * value.f0, value.f1));
public void flatMap(Tuple2<Integer, String> value,
Collector<Tuple2<Integer, String>> out) throws Exception {
out.collect(new Tuple2<Integer, String>(value.f0 * value.f0,
value.f1));
}
......@@ -42,16 +41,57 @@ public class PrintTest {
private static final long MEMORYSIZE = 32;
public static final class Increment extends
FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple1<Integer> value,
Collector<Tuple1<Integer>> out) throws Exception {
if (value.f0 < 20) {
out.collect(new Tuple1<Integer>(value.f0 + 1));
}
}
}
public static final class Forward extends
FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple1<Integer> value,
Collector<Tuple1<Integer>> out) throws Exception {
out.collect(value);
}
}
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.fromElements(2, 3, 4).print();
env.generateSequence(1, 10).print();
Set<Integer> a = new HashSet<Integer>();
a.add(-2);
a.add(-100);
env.fromCollection(a).print();
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(1);
DataStream<Tuple1<Integer>> source = env.fromElements(1);
IterativeDataStream<Tuple1<Integer>> i = source.iterate();
DataStream<Tuple1<Integer>> j = source.flatMap(new Increment())
.flatMap(new Forward());
i.closeWith(j).print();
// env.fromElements(2, 3, 4).print();
// env.generateSequence(1, 10).print();
// Set<Integer> a = new HashSet<Integer>();
// a.add(-2);
// a.add(-100);
// env.fromCollection(a).print();
env.executeTest(MEMORYSIZE);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册