提交 0a0ec777 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] package restructure

上级 5f8fa5ed
......@@ -25,7 +25,7 @@ import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
/**
......
......@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.addons.performance;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.examples.wordcount.WordCountCounter;
public class WordCountPerformanceLocal {
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.addons.rabbitmq;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
......
......@@ -25,7 +25,7 @@ import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
/**
......
......@@ -25,7 +25,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.util.Collector;
......
......@@ -24,6 +24,11 @@ import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment.ConnectionType;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.invokable.BatchReduceInvokable;
import eu.stratosphere.streaming.api.invokable.FilterInvokable;
import eu.stratosphere.streaming.api.invokable.FlatMapInvokable;
import eu.stratosphere.streaming.api.invokable.MapInvokable;
import eu.stratosphere.types.TypeInformation;
public class DataStream<T extends Tuple> {
......@@ -193,6 +198,29 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
/**
* Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call
* returns exactly one element.
*
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param parallelism
* The number of threads the function runs on.
* @param <R>
* output type
* @return The transformed DataStream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int parallelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper),
parallelism);
}
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return map(mapper, 1);
}
/**
* Applies a FlatMap transformation on a DataStream. The transformation
* calls a FlatMapFunction for each element of the DataSet. Each
......@@ -218,26 +246,25 @@ public class DataStream<T extends Tuple> {
}
/**
* Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call
* returns exactly one element.
* Applies a Filter transformation on a DataStream. The transformation calls
* a FilterFunction for each element of the DataStream and retains only
* those element for which the function returns true. Elements for which the
* function returns false are filtered.
*
* @param mapper
* The MapFunction that is called for each element of the
* DataStream.
* @param filter
* The FilterFunction that is called for each element of the
* DataSet.
* @param parallelism
* The number of threads the function runs on.
* @param <R>
* output type
* @return The transformed DataStream.
* @return The filtered DataStream.
*/
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper, int parallelism) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper),
parallelism);
public DataStream<T> filter(FilterFunction<T> filter, int parallelism) {
return environment.addFunction("filter", this.copy(), filter,
new FilterInvokable<T>(filter), parallelism);
}
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return map(mapper, 1);
public DataStream<T> filter(FilterFunction<T> filter) {
return filter(filter, 1);
}
/**
......@@ -268,28 +295,6 @@ public class DataStream<T extends Tuple> {
return batchReduce(reducer, batchSize, 1);
}
/**
* Applies a Filter transformation on a DataStream. The transformation calls
* a FilterFunction for each element of the DataStream and retains only
* those element for which the function returns true. Elements for which the
* function returns false are filtered.
*
* @param filter
* The FilterFunction that is called for each element of the
* DataSet.
* @param parallelism
* The number of threads the function runs on.
* @return The filtered DataStream.
*/
public DataStream<T> filter(FilterFunction<T> filter, int parallelism) {
return environment.addFunction("filter", this.copy(), filter,
new FilterInvokable<T>(filter), parallelism);
}
public DataStream<T> filter(FilterFunction<T> filter) {
return filter(filter, 1);
}
/**
* Sets the given sink function.
*
......
......@@ -224,20 +224,6 @@ public class JobGraphBuilder {
numberOfInstances.put(componentName, parallelism);
}
/**
* Sets the number of tuples batched together for higher throughput
*
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
}
/**
* Adds serialized invokable object to the JobVertex configuration
*
......@@ -248,105 +234,36 @@ public class JobGraphBuilder {
* be added
*/
private void addSerializedObject(Serializable InvokableObject, Configuration config) {
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(InvokableObject);
config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) {
e.printStackTrace();
System.out.println("Serialization error " + InvokableObject.getClass());
}
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* from
* @param to
* to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* @param PartitionerClass
* Class of the partitioner
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
}
/**
* Sets instance sharing between the given components
* Sets the number of tuples batched together for higher throughput
*
* @param component1
* Share will be called on this component
* @param component2
* Share will be called to this component
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
* @param componentName
* Name of the component
* @param batchSize
* Number of tuples batched together
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
}
/**
......@@ -384,28 +301,28 @@ public class JobGraphBuilder {
*/
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName,
int keyPosition) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
FieldsPartitioner.class);
config.setInteger(
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
config.setInteger("numOfOutputs_"
+ (upStreamComponent.getNumberOfForwardConnections() - 1),
numberOfInstances.get(downStreamComponentName));
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
......@@ -419,7 +336,7 @@ public class JobGraphBuilder {
}
log.info("Fieldsconnected " + upStreamComponentName + " to " + downStreamComponentName
+ " on " + keyPosition);
}
/**
......@@ -437,7 +354,7 @@ public class JobGraphBuilder {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
addOutputChannels(upStreamComponentName, 1);
log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -457,6 +374,89 @@ public class JobGraphBuilder {
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
* @param PartitionerClass
* Class of the partitioner
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
}
/**
* Sets udf operator from one component to another, used with some sinks.
*
* @param from
* from
* @param to
* to
*/
public void setBytesFrom(String from, String to) {
Configuration fromConfig = components.get(from).getConfiguration();
Configuration toConfig = components.get(to).getConfiguration();
toConfig.setString("operatorName", fromConfig.getString("operatorName", null));
toConfig.setBytes("operator", fromConfig.getBytes("operator", null));
}
/**
* Sets instance sharing between the given components
*
* @param component1
* Share will be called on this component
* @param component2
* Share will be called to this component
*/
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
/**
* Sets all components to share with the one with highest parallelism
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
}
/**
* Sets the number of instances for a given component, used for fault
* tolerance purposes
......
......@@ -18,16 +18,23 @@ package eu.stratosphere.streaming.api;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.RemoteEnvironment;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.FileSourceFunction;
import eu.stratosphere.streaming.api.function.FileStreamFunction;
import eu.stratosphere.streaming.api.function.FromElementsFunction;
import eu.stratosphere.streaming.api.function.PrintSinkFunction;
import eu.stratosphere.streaming.api.function.GenSequenceFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.streaming.api.invokable.SinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.util.Collector;
//TODO:add link to ExecutionEnvironment
/**
......@@ -36,12 +43,23 @@ import eu.stratosphere.util.Collector;
*
*/
public abstract class StreamExecutionEnvironment {
protected JobGraphBuilder jobGraphBuilder;
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private int degreeOfParallelism = -1;
protected JobGraphBuilder jobGraphBuilder;
/**
* Partitioning strategy on the stream.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
/**
* Constructor for creating StreamExecutionEnvironment
*/
......@@ -49,6 +67,17 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE);
}
public int getDegreeOfParallelism() {
return degreeOfParallelism;
}
public void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1)
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
this.degreeOfParallelism = degreeOfParallelism;
}
public void setDefaultBatchSize(int batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
......@@ -65,86 +94,128 @@ public abstract class StreamExecutionEnvironment {
}
}
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
/**
* Partitioning strategy on the stream.
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
public DataStream<Tuple1<String>> readTextFile(String filePath) {
return addSource(new FileSourceFunction(filePath), 1);
}
public int getDegreeOfParallelism() {
return degreeOfParallelism;
public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
public void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1)
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
this.degreeOfParallelism = degreeOfParallelism;
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
public DataStream<Tuple1<String>> readTextStream(String filePath) {
return addSource(new FileStreamFunction(filePath), 1);
}
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalDop);
public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
}
public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
LocalStreamEnvironment lee = new LocalStreamEnvironment();
lee.setDegreeOfParallelism(degreeOfParallelism);
return lee;
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
* The sequence of elements must not be empty. Furthermore, the elements
* must be serializable (as defined in java.io.Serializable), because the
* execution environment may ship the elements into the cluster.
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsFunction<X>(data),
"elements", serializeToByteArray(data[0]), 1);
return returnStream.copy();
}
/**
* Sets the batch size of the data stream in which the tuple are
* transmitted.
* Creates a DataStream from the given non-empty collection. The type of the
* DataStream is that of the elements in the collection. The elements need
* to be serializable (as defined by java.io.Serializable), because the
* framework may move the elements into the cluster if needed.
*
* @param inputStream
* input data stream
* @param <T>
* type of the input stream
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i),
inputStream.batchSizes.get(i));
}
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsFunction<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1);
return returnStream.copy();
}
// TODO: Link to JobGraph & JobGraphBuilder
/**
* Internal function for assembling the underlying JobGraph of the job.
* Creates a new DataStream that contains a sequence of numbers.
*
* @param inputStream
* input data stream
* @param outputID
* ID of the output
* @param <T>
* type of the input stream
* @param from
* The number to start at (inclusive).
* @param to
* The number to stop at (inclusive)
* @return A DataStrean, containing all number in the [from, to] interval.
*/
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
ConnectionType type = inputStream.ctypes.get(i);
String input = inputStream.connectIDs.get(i);
int param = inputStream.cparams.get(i);
public DataStream<Tuple1<Long>> generateSequence(long from, long to) {
return addSource(new GenSequenceFunction(from, to), 1);
}
switch (type) {
case SHUFFLE:
jobGraphBuilder.shuffleConnect(input, outputID);
break;
case BROADCAST:
jobGraphBuilder.broadcastConnect(input, outputID);
break;
case FIELD:
jobGraphBuilder.fieldsConnect(input, outputID, param);
break;
}
// TODO: Link to DataStream
/**
* Ads a data source thus opening a data stream.
*
* @param sourceFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param <T>
* type of the returned stream
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
}
this.setBatchSize(inputStream);
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism);
return returnStream.copy();
}
// --------------------------------------------------------------------------------------------
// Data stream operators and sinks
// --------------------------------------------------------------------------------------------
// TODO: link to JobGraph, JobVertex
/**
* Internal function for passing the user defined functions to the JobGraph
......@@ -204,62 +275,6 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
/**
* Creates a new DataStream that contains a sequence of numbers.
*
* @param from
* The number to start at (inclusive).
* @param to
* The number to stop at (inclusive)
* @return A DataStrean, containing all number in the [from, to] interval.
*/
public DataStream<Tuple1<Long>> generateSequence(long from, long to) {
return addSource(new SequenceSource(from, to), 1);
}
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
* The sequence of elements must not be empty. Furthermore, the elements
* must be serializable (as defined in java.io.Serializable), because the
* execution environment may ship the elements into the cluster.
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data[0]), 1);
return returnStream.copy();
}
/**
* Creates a DataStream from the given non-empty collection. The type of the
* DataStream is that of the elements in the collection. The elements need
* to be serializable (as defined by java.io.Serializable), because the
* framework may move the elements into the cluster if needed.
*
* @param data
* The collection of elements to create the DataStream from.
* @param <X>
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1);
return returnStream.copy();
}
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
......@@ -277,23 +292,6 @@ public abstract class StreamExecutionEnvironment {
return addSink(inputStream, sinkFunction, 1);
}
/**
* Dummy implementation of the SinkFunction writing every tuple to the
* standard output. Used for print.
*
* @param <IN>
* Input tuple type
*/
private static final class DummySink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(IN tuple) {
System.out.println(tuple);
}
}
/**
* Prints the tuples of the data stream to the standard output.
*
......@@ -305,78 +303,63 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
public <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
DataStream<T> returnStream = addSink(inputStream, new DummySink<T>());
DataStream<T> returnStream = addSink(inputStream, new PrintSinkFunction<T>());
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
return returnStream;
}
// TODO: Link to JobGraph & JobGraphBuilder
/**
* Executes the JobGraph.
**/
public abstract void execute();
public void executeCluster() {
ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123);
}
// TODO: Link to DataStream
/**
* Ads a data source thus opening a data stream.
* Internal function for assembling the underlying JobGraph of the job.
*
* @param sourceFunction
* the user defined function
* @param parallelism
* number of parallel instances of the function
* @param inputStream
* input data stream
* @param outputID
* ID of the output
* @param <T>
* type of the returned stream
* @return the data stream constructed
* type of the input stream
*/
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism);
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
ConnectionType type = inputStream.ctypes.get(i);
String input = inputStream.connectIDs.get(i);
int param = inputStream.cparams.get(i);
return returnStream.copy();
}
switch (type) {
case SHUFFLE:
jobGraphBuilder.shuffleConnect(input, outputID);
break;
case BROADCAST:
jobGraphBuilder.broadcastConnect(input, outputID);
break;
case FIELD:
jobGraphBuilder.fieldsConnect(input, outputID, param);
break;
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
*/
public DataStream<Tuple1<String>> readTextFile(String filePath) {
return addSource(new FileSourceFunction(filePath), 1);
}
}
this.setBatchSize(inputStream);
public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
return addSource(new FileSourceFunction(filePath), parallelism);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set.
* Sets the batch size of the data stream in which the tuple are
* transmitted.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
* @param inputStream
* input data stream
* @param <T>
* type of the input stream
*/
public DataStream<Tuple1<String>> readTextStream(String filePath) {
return addSource(new FileStreamFunction(filePath), 1);
}
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
return addSource(new FileStreamFunction(filePath), parallelism);
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i),
inputStream.batchSizes.get(i));
}
}
/**
......@@ -399,6 +382,39 @@ public abstract class StreamExecutionEnvironment {
return baos.toByteArray();
}
// --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
// --------------------------------------------------------------------------------------------
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalDop);
}
public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
LocalStreamEnvironment lee = new LocalStreamEnvironment();
lee.setDegreeOfParallelism(degreeOfParallelism);
return lee;
}
public static ExecutionEnvironment createRemoteEnvironment(String host, int port,
String... jarFiles) {
return new RemoteEnvironment(host, port, jarFiles);
}
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
int degreeOfParallelism, String... jarFiles) {
RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles);
rec.setDegreeOfParallelism(degreeOfParallelism);
return rec;
}
/**
* Executes the JobGraph.
**/
public abstract void execute();
// TODO: Add link to JobGraphBuilder
/**
* Getter of the JobGraphBuilder of the streaming job.
......
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.io.BufferedReader;
import java.io.FileReader;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.io.BufferedReader;
import java.io.FileReader;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.util.Arrays;
import java.util.Collection;
......@@ -21,17 +21,17 @@ import java.util.Collection;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FromElementsSource<T> extends SourceFunction<Tuple1<T>> {
public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsSource(T... elements) {
public FromElementsFunction(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsSource(Collection<T> elements) {
public FromElementsFunction(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
......@@ -22,7 +22,7 @@ import eu.stratosphere.util.Collector;
* Source Function used to generate the number sequence
*
*/
public class SequenceSource extends SourceFunction<Tuple1<Long>> {
public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
......@@ -30,7 +30,7 @@ public class SequenceSource extends SourceFunction<Tuple1<Long>> {
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public SequenceSource(long from, long to) {
public GenSequenceFunction(long from, long to) {
this.from = from;
this.to = to;
}
......
package eu.stratosphere.streaming.api.function;
import eu.stratosphere.api.java.tuple.Tuple;
/**
* Dummy implementation of the SinkFunction writing every tuple to the standard
* output. Used for print.
*
* @param <IN>
* Input tuple type
*/
public class PrintSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(IN tuple) {
System.out.println(tuple);
}
}
\ No newline at end of file
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import java.io.Serializable;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.function;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......
......@@ -12,13 +12,12 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import java.util.Iterator;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......@@ -32,6 +31,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends U
@Override
public void invoke(StreamRecord record, Collector<OUT> collector) throws Exception {
@SuppressWarnings("unchecked")
Iterator<IN> iterator = (Iterator<IN>) record.getBatchIterable().iterator();
reducer.reduce(iterator, collector);
}
......
......@@ -12,24 +12,27 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
FilterFunction<IN> filterFunction;
public FilterInvokable(FilterFunction<IN> filterFunction) {
this.filterFunction = filterFunction;
}
@Override
public void invoke(StreamRecord record, Collector<IN> collector) throws Exception {
for (int i = 0; i < record.getBatchSize(); i++) {
@SuppressWarnings("unchecked")
IN tuple = (IN) record.getTuple(i);
if (filterFunction.filter(tuple)) {
collector.collect(tuple);
......
......@@ -13,11 +13,10 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -13,11 +13,10 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -13,10 +13,10 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -21,6 +21,9 @@ import eu.stratosphere.util.Collector;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
StreamComponentInvokable {
private static final long serialVersionUID = 1L;
public abstract void invoke(StreamRecord record, Collector<OUT> collector)
throws Exception;
}
......@@ -40,12 +40,12 @@ import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.invokable.StreamComponentInvokable;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamCollectorManager;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.InputSplit;
public class DummyIS implements InputSplit {
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void read(DataInput in) throws IOException {
}
@Override
public int getSplitNumber() {
return 0;
}
}
\ No newline at end of file
......@@ -13,13 +13,11 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamrecord;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class StreamCollector<T extends Tuple> implements Collector<T> {
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamrecord;
import java.util.ArrayList;
import java.util.List;
......@@ -21,7 +21,6 @@ import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
......@@ -54,6 +53,7 @@ public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
}
for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
@SuppressWarnings("unchecked")
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
for (int j = 0; j < collectors.length; j++) {
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
......
......@@ -150,6 +150,7 @@ public abstract class StreamRecord implements IOReadableWritable, Serializable {
* type of the tuple
* @return Copy of the tuple
*/
@SuppressWarnings("unchecked")
public static <T extends Tuple> T copyTuple(T tuple) {
// TODO: implement deep copy for arrays
int numofFields = tuple.getArity();
......
......@@ -24,6 +24,8 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class BatchReduceTest {
......
......@@ -22,6 +22,8 @@ import org.mockito.cglib.core.Local;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class BatchTest {
......
......@@ -25,6 +25,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.util.Collector;
public class FlatMapTest {
......
......@@ -27,6 +27,8 @@ import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class MapTest {
......
......@@ -26,6 +26,7 @@ import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamCollectorManager;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
......
......@@ -28,9 +28,9 @@ import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.LocalStreamEnvironment;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SinkFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class StreamComponentTest {
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamrecord;
import static org.junit.Assert.assertEquals;
......@@ -21,6 +21,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamCollector;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollectorTest {
......
......@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.examples.basictopology;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class BasicTopology {
......
......@@ -21,8 +21,8 @@ import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class CellInfoLocal {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class CollaborativeFilteringSink extends SinkFunction<Tuple4<Integer, Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class CollaborativeFilteringSource extends SourceFunction<Tuple4<Integer, Integer, Integer, Long>> {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class KMeansSink extends SinkFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.iterative.kmeans;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class KMeansSource extends SourceFunction<Tuple2<String, Long>> {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class PageRankSink extends SinkFunction<Tuple3<Integer, Float, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class PageRankSource extends SourceFunction<Tuple3<Integer, Integer, Long>> {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class SSSPSink extends SinkFunction<Tuple3<Integer, Integer, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class SSSPSource extends SourceFunction<Tuple3<Integer, Integer, Long>> {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.join;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class JoinSink extends SinkFunction<Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class JoinSourceOne extends SourceFunction<Tuple3<String, String, Integer>> {
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class JoinSourceTwo extends SourceFunction<Tuple3<String, String, Integer>> {
......
......@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.examples.ml;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class IncrementalLearningSkeleton {
......
......@@ -24,8 +24,8 @@ import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class IncrementalOLS {
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, Integer, Long>> {
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, Integer, Long>> {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> {
......
......@@ -16,7 +16,7 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.function.SinkFunction;
public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L;
......
......@@ -19,7 +19,7 @@ import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.function.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册