提交 0ede9394 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] test for connect with

上级 46dab75a
......@@ -12,7 +12,7 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
......
......@@ -37,11 +37,20 @@ public class DataStream<T extends Tuple> {
List<ConnectionType> ctypes;
List<Integer> cparams;
List<Integer> batchSizes;
/**
* Constructor
*/
protected DataStream() {
// TODO implement
environment = new StreamExecutionEnvironment();
id = "source";
initConnections();
}
/**
* Create a new DataStream in the given environment
*
* @param environment
* Constructor
* @param context
*/
protected DataStream(StreamExecutionEnvironment environment) {
if (environment == null) {
......@@ -56,15 +65,16 @@ public class DataStream<T extends Tuple> {
}
/**
* Create a new DataStream in the given environment with the given id
*
* @param environment
* Constructor
* @param context
* @param id
*/
private DataStream(StreamExecutionEnvironment environment, String id) {
this(environment);
this.id = id;
}
//TODO: create copy method (or constructor) and copy datastream at every operator
/**
* Initialize the connections.
......@@ -82,42 +92,41 @@ public class DataStream<T extends Tuple> {
}
/**
* Creates an identical DataStream.
*
* @return The DataStream copy.
* Creates an identical datastream.
* @return
* The identical datastream.
*/
public DataStream<T> copy() {
DataStream<T> copiedStream = new DataStream<T>(environment, getId());
copiedStream.type = this.type;
copiedStream.connectIDs = new ArrayList<String>(this.connectIDs);
copiedStream.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(this.ctypes);
copiedStream.cparams = new ArrayList<Integer>(this.cparams);
copiedStream.batchSizes = new ArrayList<Integer>(this.batchSizes);
return copiedStream;
}
/**
* Returns the id of the DataStream.
*
* @return ID
* Gets the id of the datastream.
* @return
* The id of the datastream.
*/
public String getId() {
return id;
}
/**
* Groups a number of consecutive elements from the DataStream to increase
* network throughput.
*
* Collects a number of consecutive elements from the datastream.
* @param batchSize
* The number of elements to group.
* @return The DataStream.
* The number of elements to collect.
* @return
* The collected elements.
*/
public DataStream<T> batch(int batchSize) {
DataStream<T> returnStream = copy();
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
}
......@@ -127,33 +136,38 @@ public class DataStream<T extends Tuple> {
}
return returnStream;
}
/**
* Connecting DataStream outputs with each other. The streams connected
* using this operator will be transformed simultaneously. It creates a
* joint output of the connected streams.
*
* Connecting streams to each other.
* @param stream
* The DataStream to connect output with.
* @return The connected DataStream.
* The stream it connects to.
* @return
* The new already connected datastream.
*/
public DataStream<T> connectWith(DataStream<T> stream) {
public DataStream<T> connectWith(DataStream<T>... streams) {
DataStream<T> returnStream = copy();
for(DataStream<T> stream:streams){
addConnection(returnStream, stream);
}
return returnStream;
}
public DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream){
returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.ctypes.addAll(stream.ctypes);
returnStream.cparams.addAll(stream.cparams);
returnStream.batchSizes.addAll(stream.batchSizes);
return returnStream;
}
/**
* Send the output tuples of the DataStream to the next vertices partitioned
* by their hashcode.
*
* Send the elements of the stream to the following vertices according to their hashcode.
* @param keyposition
* The field used to compute the hashcode.
* @return The DataStream with field partitioning set.
* The field used to compute the hashcode.
* @return
* The original datastream.
*/
public DataStream<T> partitionBy(int keyposition) {
DataStream<T> returnStream = copy();
......@@ -166,10 +180,9 @@ public class DataStream<T extends Tuple> {
}
/**
* Broadcast the output tuples to every parallel instance of the next
* component.
*
* @return The DataStream with broadcast partitioning set.
* Send the elements of the stream to every following vertices of the graph.
* @return
* The datastream.
*/
public DataStream<T> broadcast() {
DataStream<T> returnStream = copy();
......@@ -181,106 +194,89 @@ public class DataStream<T extends Tuple> {
}
/**
* Applies a FlatMap transformation on a DataStream. The transformation
* calls a FlatMapFunction for each element of the DataSet. Each
* FlatMapFunction call can return any number of elements including none.
*
* Sets the given flatmap function.
* @param flatMapper
* The FlatMapFunction that is called for each element of the
* DataStream
* @param parallelism
* The number of threads the function runs on.
* @return The transformed DataStream.
* The object containing the flatmap function.
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, int parallelism) {
return environment.addFunction("flatMap", this.copy(), flatMapper,
new FlatMapInvokable<T, R>(flatMapper), parallelism);
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, int paralelism) {
return environment.addFunction("flatMap", this.copy(), flatMapper, new FlatMapInvokable<T, R>(
flatMapper), paralelism);
}
/**
* Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call
* returns exactly one element.
*
* Sets the given map function.
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param parallelism
* The number of threads the function runs on.
* @return The transformed DataStream.
* The object containing the map function.
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int parallelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper),
parallelism);
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int paralelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper), paralelism);
}
/**
* Applies a reduce transformation on preset chunks of the DataStream. The
* transformation calls a GroupReduceFunction for each tuple batch of the
* predefined size. Each GroupReduceFunction call can return any number of
* elements including none.
*
*
* Sets the given batchreduce function.
* @param reducer
* The GroupReduceFunction that is called for each tuple batch.
* The object containing the batchreduce function.
* @param batchSize
* The number of tuples grouped together in the batch.
* @param parallelism
* The number of threads the function runs on.
* @return The modified datastream.
* The number of elements proceeded at the same time
* @param paralelism
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize, int paralelism) {
return environment.addFunction("batchReduce", batch(batchSize).copy(), reducer,
new BatchReduceInvokable<T, R>(reducer), paralelism);
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer, int batchSize, int paralelism) {
return environment.addFunction("batchReduce", batch(batchSize).copy(), reducer, new BatchReduceInvokable<T, R>(
reducer), paralelism);
}
/**
* Applies a Filter transformation on a DataStream. The transformation calls
* a FilterFunction for each element of the DataStream and retains only
* those element for which the function returns true. Elements for which the
* function returns false are filtered.
*
* Sets the given filter function.
* @param filter
* The FilterFunction that is called for each element of the
* DataSet.
* The object containing the filter function.
* @param paralelism
* The number of threads the function runs on.
* @return The filtered DataStream.
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public DataStream<T> filter(FilterFunction<T> filter, int paralelism) {
return environment.addFunction("filter", this.copy(), filter,
new FilterInvokable<T>(filter), paralelism);
return environment.addFunction("filter", this.copy(), filter, new FilterInvokable<T>(filter), paralelism);
}
/**
* Sets the given sink function.
*
* @param sinkFunction
* The object containing the sink's invoke function.
* The object containing the sink's invoke function.
* @param paralelism
* The number of threads the function runs on.
* @return The modified datastream.
* The number of threads the function runs on.
* @return
* The modified datastream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction, int paralelism) {
return environment.addSink(this.copy(), sinkFunction, paralelism);
}
/**
* Sets the given sink function.
*
* @param sinkFunction
* The object containing the sink's invoke function.
* The object containing the sink's invoke function.
* @return
* The modified datastream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return environment.addSink(this.copy(), sinkFunction);
}
/**
* Prints the tuples from the DataStream.
*
* Prints the datastream.
* @return
* The original stream.
*/
public DataStream<T> print() {
return environment.print(this.copy());
......@@ -288,9 +284,8 @@ public class DataStream<T extends Tuple> {
/**
* Set the type parameter.
*
* @param type
* The type parameter.
* The type parameter.
*/
protected void setType(TypeInformation<T> type) {
this.type = type;
......@@ -298,8 +293,8 @@ public class DataStream<T extends Tuple> {
/**
* Get the type information.
*
* @return The type of the generic parameter.
* @return
* The type of the generic parameter.
*/
public TypeInformation<T> getType() {
return this.type;
......
......@@ -37,6 +37,7 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -51,7 +52,7 @@ import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
import eu.stratosphere.streaming.partitioner.ShufflePartitioner;
/**
* Object for building Flink stream processing job graphs
* Object for building Stratosphere stream processing job graphs
*/
public class JobGraphBuilder {
......@@ -88,16 +89,15 @@ public class JobGraphBuilder {
}
/**
* Creates a new JobGraph with the given parameters
* Creates a new JobGraph with the given name with fault tolerance turned
* off
*
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Type of fault tolerance
* @param defaultBatchSize
* Default number of records to send at one emit
* @param defaultBatchTimeoutMillis
*/
public JobGraphBuilder(String jobGraphName) {
this(jobGraphName, FaultToleranceType.NONE);
}
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
int defaultBatchSize, long defaultBatchTimeoutMillis) {
......@@ -107,118 +107,151 @@ public class JobGraphBuilder {
}
/**
* Adds source to the JobGraph with the given parameters
* Adds source to the JobGraph by user defined object and serialized
* operator
*
* @param sourceName
* Name of the component
* @param InvokableObject
* User defined operator
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* @param parallelism
* Number of parallel instances created
* @param subtasksPerInstance
* Number of parallel instances on one task manager
*/
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
Configuration config = setSource(sourceName, InvokableObject, parallelism,
subtasksPerInstance);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction) {
setSource(sourceName, InvokableObject, operatorName, serializedFunction, 1, 1);
}
/**
* Adds source to the JobGraph by user defined object with the set
* parallelism
*
* @param sourceName
* Name of the source component
* @param InvokableObject
* User defined UserSourceInvokable object or other predefined
* source object
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public Configuration setSource(String sourceName,
UserSourceInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
Configuration config = setComponent(sourceName, InvokableObject, parallelism,
subtasksPerInstance, source);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
}
return config;
}
public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
Configuration config = setTask(taskName, TaskInvokableObject, parallelism,
subtasksPerInstance);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction) {
setTask(taskName, TaskInvokableObject, operatorName, serializedFunction, 1, 1);
}
/**
* Adds task to the JobGraph with the given parameters
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the component
* Name of the task component
* @param TaskInvokableObject
* User defined operator
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* User defined UserTaskInvokable object
* @param parallelism
* Number of parallel instances created
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of parallel instances on one task manager
* Number of subtasks allocated to a machine
* @return
*/
public void setTask(String taskName,
public Configuration setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
Configuration config = setComponent(taskName, TaskInvokableObject, parallelism,
subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
return config;
}
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
Configuration config = setSink(sinkName, InvokableObject, parallelism, subtasksPerInstance);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
}
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction) {
setSink(sinkName, InvokableObject, operatorName, serializedFunction, 1, 1);
}
/**
* Adds sink to the JobGraph with the given parameters
* Adds a sink component to the JobGraph with no parallelism
*
* @param sinkName
* Name of the component
* Name of the sink component
* @param InvokableObject
* User defined operator
* @param operatorName
* Operator type
* @param serializedFunction
* Serialized udf
* User defined UserSinkInvokable object
* @param parallelism
* Number of parallel instances created
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of parallel instances on one task manager
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
public Configuration setSink(String sinkName,
UserSinkInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
Configuration config = setComponent(sinkName, InvokableObject, parallelism,
subtasksPerInstance, sink);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
}
return config;
}
/**
* Sets component parameters in the JobGraph
* Sets JobVertex configuration based on the given parameters
*
* @param componentName
* Name of the component
* @param component
* The component vertex
* @param InvokableObject
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
* @param serializedFunction
* Serialized operator
* @param InvokableClass
* Class of the user defined Invokable
* @param parallelism
* Number of parallel instances created
* Number of subtasks
* @param subtasksPerInstance
* Number of parallel instances on one task manager
* Number of subtasks per instance
* @param component
* AbstractJobVertex associated with the component
*/
private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism, int subtasksPerInstance) {
private Configuration setComponent(String componentName,
final Class<? extends StreamComponent> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
component.setNumberOfSubtasks(parallelism);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
......@@ -228,27 +261,49 @@ public class JobGraphBuilder {
}
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableObject.getClass());
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("batchSize", batchSize);
config.setLong("batchTimeout", batchTimeout);
// config.setBytes("operator", getSerializedFunction());
config.setInteger("faultToleranceType", faultToleranceType.id);
config.setBytes("operator", serializedFunction);
config.setString("operatorName", operatorName);
addSerializedObject(InvokableObject, config);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
return config;
}
private Configuration setComponent(String componentName,
UserSourceInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component);
addSerializedObject(InvokableObject, component);
return config;
}
private Configuration setComponent(String componentName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component);
addSerializedObject(InvokableObject, component);
return config;
}
private Configuration setComponent(String componentName,
UserSinkInvokable<? extends Tuple> InvokableObject, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism,
subtasksPerInstance, component);
addSerializedObject(InvokableObject, component);
return config;
}
/**
* Sets the number of tuples batched together for higher throughput
*
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
......@@ -260,11 +315,12 @@ public class JobGraphBuilder {
*
* @param InvokableObject
* Invokable object to serialize
* @param config
* JobVertex configuration to which the serialized invokable will
* be added
* @param component
* JobVertex to which the serialized invokable will be added
*/
private void addSerializedObject(Serializable InvokableObject, Configuration config) {
private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) {
Configuration config = component.getConfiguration();
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
......@@ -283,31 +339,19 @@ public class JobGraphBuilder {
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* @param to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* Name of the downstream component, that will receive the
* records
* @param PartitionerClass
* Class of the partitioner
* @param channelType
* Channel Type
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
......@@ -351,7 +395,7 @@ public class JobGraphBuilder {
/**
* Sets all components to share with the one with highest parallelism
*/
private void setAutomaticInstanceSharing() {
public void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
......@@ -439,13 +483,14 @@ public class JobGraphBuilder {
/**
* Connects two components with the given names by global partitioning.
* <p>
* Global partitioning: sends all emitted tuples to one output instance
* Global partitioning: sends all emitted records to one output instance
* (i.e. the first one)
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* Name of the downstream component, that will receive the
* records
*/
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
......@@ -457,13 +502,14 @@ public class JobGraphBuilder {
/**
* Connects two components with the given names by shuffle partitioning.
* <p>
* Shuffle partitioning: sends the output tuples to a randomly selected
* Shuffle partitioning: sends the output records to a randomly selected
* channel
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* Name of the downstream component, that will receive the
* records
*/
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
......@@ -471,13 +517,6 @@ public class JobGraphBuilder {
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
* Sets the number of instances for a given component, used for fault
* tolerance purposes
*
* @param upStreamComponentName
* @param numOfInstances
*/
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
......@@ -528,4 +567,12 @@ public class JobGraphBuilder {
return jobGraph;
}
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
}
......@@ -15,6 +15,8 @@
package eu.stratosphere.streaming.api;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
......@@ -30,16 +32,17 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
protected int counter = 0;
protected int channelID;
private long timeOfLastRecordEmitted = System.currentTimeMillis();;
private RecordWriter<StreamRecord> output;
private List<RecordWriter<StreamRecord>> outputs;
public StreamCollector(int batchSize, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate, RecordWriter<StreamRecord> output) {
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> outputs) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.streamRecord = new ArrayStreamRecord(batchSize);
this.streamRecord.setSeralizationDelegate(serializationDelegate);
this.channelID = channelID;
this.output = output;
this.outputs = outputs;
}
public StreamCollector(int batchSize, long batchTimeout, int channelID,
......@@ -50,6 +53,7 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
// TODO reconsider emitting mechanism at timeout (find a place to timeout)
@Override
public void collect(T tuple) {
//TODO: move copy to StreamCollector2
streamRecord.setTuple(counter, tuple);
counter++;
......@@ -73,14 +77,19 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
counter = 0;
streamRecord.setId(channelID);
try {
output.emit(streamRecord);
output.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
if (outputs == null) {
System.out.println(streamRecord);
} else {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(streamRecord);
output.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
}
}
}
}
@Override
......
......@@ -24,7 +24,7 @@ import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
public class StreamCollector2<T extends Tuple> implements Collector<T> {
ArrayList<StreamCollector<Tuple>> notPartitionedCollectors;
ArrayList<StreamCollector<Tuple>[]> partitionedCollectors;
......@@ -33,9 +33,8 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
int keyPostition;
// TODO consider channelID
public StreamCollectorManager(List<Integer> batchSizesOfNotPartitioned,
List<Integer> batchSizesOfPartitioned, List<Integer> parallelismOfOutput,
int keyPosition, long batchTimeout, int channelID,
public StreamCollector2(List<Integer> batchSizesOfNotPartitioned, List<Integer> batchSizesOfPartitioned,
List<Integer> parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> partitionedOutputs,
List<RecordWriter<StreamRecord>> notPartitionedOutputs) {
......@@ -48,16 +47,19 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
this.keyPostition = keyPosition;
for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) {
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned
.get(i), batchTimeout, channelID, serializationDelegate, notPartitionedOutputs
.get(i)));
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(notPartitionedOutputs.get(i));
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, output));
}
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
for (int j = 0; j < collectors.length; j++) {
List<RecordWriter<StreamRecord>> output = new ArrayList<RecordWriter<StreamRecord>>();
output.add(partitionedOutputs.get(i));
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
batchTimeout, channelID, serializationDelegate, partitionedOutputs.get(i));
batchTimeout, channelID, serializationDelegate, output);
}
partitionedCollectors.add(collectors);
}
......@@ -67,7 +69,7 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
@Override
public void collect(T tuple) {
T copiedTuple = StreamRecord.copyTuple(tuple);
for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
collector.collect(copiedTuple);
}
......
......@@ -193,17 +193,17 @@ public class StreamExecutionEnvironment {
* Creates a new DataStream that contains a sequence of numbers.
*
* @param from
* The number to start at (inclusive).
* First number in the sequence
* @param to
* The number to stop at (inclusive)
* @return A DataStrean, containing all number in the [from, to] interval.
* Last element in the sequence
* @return the data stream constructed
*/
public DataStream<Tuple1<Long>> generateSequence(long from, long to) {
return addSource(new SequenceSource(from, to), 1);
}
/**
* Source Function used to generate the number sequence
* Source Function used to generate sequence
*
*/
private static final class SequenceSource extends SourceFunction<Tuple1<Long>> {
......@@ -230,15 +230,12 @@ public class StreamExecutionEnvironment {
}
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
* The sequence of elements must not be empty. Furthermore, the elements
* must be serializable (as defined in java.io.Serializable), because the
* execution environment may ship the elements into the cluster.
* Creates a new DataStream by iterating through the given data. The
* elements are inserted into a Tuple1.
*
* @param data
* The collection of elements to create the DataStream from.
* @return The DataStream representing the elements.
*
* @return
*/
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this);
......@@ -250,14 +247,12 @@ public class StreamExecutionEnvironment {
}
/**
* Creates a DataStream from the given non-empty collection. The type of the
* DataStream is that of the elements in the collection. The elements need
* to be serializable (as defined by java.io.Serializable), because the
* framework may move the elements into the cluster if needed.
* Creates a new DataStream by iterating through the given data collection.
* The elements are inserted into a Tuple1.
*
* @param data
* The collection of elements to create the DataStream from.
* @return The DataStream representing the elements.
*
* @return
*/
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this);
......@@ -315,9 +310,10 @@ public class StreamExecutionEnvironment {
return addSink(inputStream, sinkFunction, 1);
}
// TODO: link to SinkFunction
/**
* Dummy implementation of the SinkFunction writing every tuple to the
* standard output. Used for print.
* standard output.
*
* @param <IN>
* Input tuple type
......@@ -377,31 +373,27 @@ public class StreamExecutionEnvironment {
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
* Read a text file from the given path and emits the lines as
* Tuple1<Strings>-s
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
* @param path
* Input file
* @return the data stream constructed
*/
public DataStream<Tuple1<String>> readTextFile(String filePath) {
return addSource(new FileSourceFunction(filePath), 1);
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path), 1);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set.
* Streams a text file from the given path by reading through it multiple
* times.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
* @param path
* Input file
* @return the data stream constructed
*/
public DataStream<Tuple1<String>> readTextStream(String filePath) {
return addSource(new FileStreamFunction(filePath), 1);
public DataStream<Tuple1<String>> readTextStream(String path) {
return addSource(new FileStreamFunction(path), 1);
}
/**
......
......@@ -45,7 +45,8 @@ import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.StreamCollector2;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
......@@ -123,7 +124,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
// collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
// outSerializationDelegate, outputs);
collector = new StreamCollectorManager<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
collector = new StreamCollector2<Tuple>(batchsizes_s, batchsizes_f, numOfOutputs_f,
keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s);
return collector;
}
......
......@@ -21,10 +21,13 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
......
......@@ -26,6 +26,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.util.Collector;
public class MapTest {
......@@ -39,6 +40,39 @@ public class MapTest {
}
}
}
public static final class MySource1 extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("s1: "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MySource2 extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 5; i < 10; i++) {
System.out.println("s2: "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MySource3 extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 10; i < 15; i++) {
System.out.println("s3: "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
}
public static final class MyMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
......@@ -48,6 +82,16 @@ public class MapTest {
return new Tuple1<Integer>(value.f0 * value.f0);
}
}
public static final class MyJoinMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
joinSetResult.add(value.f0);
System.out.println(value.f0);
return new Tuple1<Integer>(value.f0);
}
}
public static final class MyFieldsMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
......@@ -122,6 +166,14 @@ public class MapTest {
graphResult++;
}
}
public static final class JoinSink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<Integer> tuple) {
System.out.println("doing nothing");
}
}
private static Set<Integer> expected = new HashSet<Integer>();
private static Set<Integer> result = new HashSet<Integer>();
......@@ -138,6 +190,9 @@ public class MapTest {
private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
private static List<Integer> fromCollectionFields = new ArrayList<Integer>();
private static Set<Integer> fromCollectionDiffFieldsSet = new HashSet<Integer>();
private static Set<Integer> singleJoinSetExpected = new HashSet<Integer>();
private static Set<Integer> multipleJoinSetExpected = new HashSet<Integer>();
private static Set<Integer> joinSetResult = new HashSet<Integer>();
private static void fillExpectedList() {
for (int i = 0; i < 10; i++) {
......@@ -169,6 +224,19 @@ public class MapTest {
}
}
}
private static void fillSingleJoinSet() {
for (int i = 0; i < 10; i++) {
singleJoinSetExpected.add(i);
}
}
private static void fillMultipleJoinSet() {
for (int i = 0; i < 15; i++) {
multipleJoinSetExpected.add(i);
}
}
@Test
public void mapTest() throws Exception {
......@@ -287,5 +355,51 @@ public class MapTest {
// }
//
// }
@Test
public void singleConnectWithTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> source1 = env.addSource(new MySource1(),
1);
DataStream<Tuple1<Integer>> source2 = env
.addSource(new MySource2(), 1)
.connectWith(source1)
.partitionBy(0)
.map(new MyJoinMap(), 1)
.addSink(new JoinSink());
env.execute();
fillSingleJoinSet();
assertEquals(singleJoinSetExpected, joinSetResult);
}
@Test
public void multipleConnectWithTest() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> source1 = env.addSource(new MySource1(),
1);
DataStream<Tuple1<Integer>> source2 = env.addSource(new MySource2(),
1);
DataStream<Tuple1<Integer>> source3 = env
.addSource(new MySource3(), 1)
.connectWith(source1, source2)
.partitionBy(0)
.map(new MyJoinMap(), 1)
.addSink(new JoinSink());
env.execute();
fillMultipleJoinSet();
assertEquals(multipleJoinSetExpected, joinSetResult);
}
}
......@@ -29,8 +29,6 @@ public class PrintTest {
public static final class MyFlatMap extends
FlatMapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple2<Integer, String> value, Collector<Tuple2<Integer, String>> out)
throws Exception {
......
......@@ -31,7 +31,7 @@ import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollector2Test {
StreamCollectorManager<Tuple> collector;
StreamCollector2<Tuple> collector;
@Test
public void testCollect() {
......@@ -54,7 +54,7 @@ public class StreamCollector2Test {
fOut.add(rw1);
fOut.add(rw2);
collector = new StreamCollectorManager<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
Tuple1<Integer> t = new Tuple1<Integer>();
StreamCollector<Tuple> sc1 = new StreamCollector<Tuple>(1, batchTimeout, channelID, null);
......
......@@ -65,8 +65,11 @@ public class StreamCollectorTest {
@Test
public void recordWriter() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
ArrayList<RecordWriter<StreamRecord>> rwList = new ArrayList<RecordWriter<StreamRecord>>();
rwList.add(recWriter);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, recWriter);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册