From 0ede93947d36ec176df8ee80bfe7eda5198a0578 Mon Sep 17 00:00:00 2001 From: jfeher Date: Mon, 14 Jul 2014 16:29:25 +0200 Subject: [PATCH] [streaming] test for connect with --- flink-addons/flink-streaming/pom.xml | 2 +- .../streaming/api/DataStream.java | 199 ++++++------- .../streaming/api/JobGraphBuilder.java | 275 ++++++++++-------- .../streaming/api/StreamCollector.java | 29 +- ...ctorManager.java => StreamCollector2.java} | 20 +- .../api/StreamExecutionEnvironment.java | 64 ++-- .../StreamComponentHelper.java | 5 +- .../api/streamcomponent/StreamSource.java | 3 + .../stratosphere/streaming/api/MapTest.java | 114 ++++++++ .../stratosphere/streaming/api/PrintTest.java | 2 - .../streaming/api/StreamCollector2Test.java | 4 +- .../streaming/api/StreamCollectorTest.java | 5 +- 12 files changed, 443 insertions(+), 279 deletions(-) rename flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/{StreamCollectorManager.java => StreamCollector2.java} (80%) diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml index 50fe3bd90ad..25d69b057f6 100644 --- a/flink-addons/flink-streaming/pom.xml +++ b/flink-addons/flink-streaming/pom.xml @@ -12,7 +12,7 @@ jar - 0.6-SNAPSHOT + 0.5 UTF-8 UTF-8 diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java index 6244d5788ad..c9bd56d61df 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java @@ -37,11 +37,20 @@ public class DataStream { List ctypes; List cparams; List batchSizes; + + /** + * Constructor + */ + protected DataStream() { + // TODO implement + environment = new StreamExecutionEnvironment(); + id = "source"; + initConnections(); + } /** - * Create a new DataStream in the given environment - * - * @param environment + * Constructor + * @param context */ protected DataStream(StreamExecutionEnvironment environment) { if (environment == null) { @@ -56,15 +65,16 @@ public class DataStream { } /** - * Create a new DataStream in the given environment with the given id - * - * @param environment + * Constructor + * @param context * @param id */ private DataStream(StreamExecutionEnvironment environment, String id) { this(environment); this.id = id; } + + //TODO: create copy method (or constructor) and copy datastream at every operator /** * Initialize the connections. @@ -82,42 +92,41 @@ public class DataStream { } /** - * Creates an identical DataStream. - * - * @return The DataStream copy. + * Creates an identical datastream. + * @return + * The identical datastream. */ public DataStream copy() { DataStream copiedStream = new DataStream(environment, getId()); copiedStream.type = this.type; - + copiedStream.connectIDs = new ArrayList(this.connectIDs); - + copiedStream.ctypes = new ArrayList(this.ctypes); copiedStream.cparams = new ArrayList(this.cparams); copiedStream.batchSizes = new ArrayList(this.batchSizes); return copiedStream; } - + /** - * Returns the id of the DataStream. - * - * @return ID + * Gets the id of the datastream. + * @return + * The id of the datastream. */ public String getId() { return id; } /** - * Groups a number of consecutive elements from the DataStream to increase - * network throughput. - * + * Collects a number of consecutive elements from the datastream. * @param batchSize - * The number of elements to group. - * @return The DataStream. + * The number of elements to collect. + * @return + * The collected elements. */ public DataStream batch(int batchSize) { DataStream returnStream = copy(); - + if (batchSize < 1) { throw new IllegalArgumentException("Batch size must be positive."); } @@ -127,33 +136,38 @@ public class DataStream { } return returnStream; } - + /** - * Connecting DataStream outputs with each other. The streams connected - * using this operator will be transformed simultaneously. It creates a - * joint output of the connected streams. - * + * Connecting streams to each other. * @param stream - * The DataStream to connect output with. - * @return The connected DataStream. + * The stream it connects to. + * @return + * The new already connected datastream. */ - public DataStream connectWith(DataStream stream) { + public DataStream connectWith(DataStream... streams) { DataStream returnStream = copy(); - + + for(DataStream stream:streams){ + addConnection(returnStream, stream); + } + return returnStream; + } + + public DataStream addConnection(DataStream returnStream, DataStream stream){ returnStream.connectIDs.addAll(stream.connectIDs); returnStream.ctypes.addAll(stream.ctypes); returnStream.cparams.addAll(stream.cparams); returnStream.batchSizes.addAll(stream.batchSizes); + return returnStream; } /** - * Send the output tuples of the DataStream to the next vertices partitioned - * by their hashcode. - * + * Send the elements of the stream to the following vertices according to their hashcode. * @param keyposition - * The field used to compute the hashcode. - * @return The DataStream with field partitioning set. + * The field used to compute the hashcode. + * @return + * The original datastream. */ public DataStream partitionBy(int keyposition) { DataStream returnStream = copy(); @@ -166,10 +180,9 @@ public class DataStream { } /** - * Broadcast the output tuples to every parallel instance of the next - * component. - * - * @return The DataStream with broadcast partitioning set. + * Send the elements of the stream to every following vertices of the graph. + * @return + * The datastream. */ public DataStream broadcast() { DataStream returnStream = copy(); @@ -181,106 +194,89 @@ public class DataStream { } /** - * Applies a FlatMap transformation on a DataStream. The transformation - * calls a FlatMapFunction for each element of the DataSet. Each - * FlatMapFunction call can return any number of elements including none. - * + * Sets the given flatmap function. * @param flatMapper - * The FlatMapFunction that is called for each element of the - * DataStream - * @param parallelism - * The number of threads the function runs on. - * @return The transformed DataStream. + * The object containing the flatmap function. + * @param paralelism + * The number of threads the function runs on. + * @return + * The modified datastream. */ - public DataStream flatMap(FlatMapFunction flatMapper, int parallelism) { - return environment.addFunction("flatMap", this.copy(), flatMapper, - new FlatMapInvokable(flatMapper), parallelism); + public DataStream flatMap(FlatMapFunction flatMapper, int paralelism) { + return environment.addFunction("flatMap", this.copy(), flatMapper, new FlatMapInvokable( + flatMapper), paralelism); } /** - * Applies a Map transformation on a DataStream. The transformation calls a - * MapFunction for each element of the DataStream. Each MapFunction call - * returns exactly one element. - * + * Sets the given map function. * @param mapper - * The MapFunction that is called for each element of the - * DataStream. - * @param parallelism - * The number of threads the function runs on. - * @return The transformed DataStream. + * The object containing the map function. + * @param paralelism + * The number of threads the function runs on. + * @return + * The modified datastream. */ - public DataStream map(MapFunction mapper, int parallelism) { - return environment.addFunction("map", this.copy(), mapper, new MapInvokable(mapper), - parallelism); + public DataStream map(MapFunction mapper, int paralelism) { + return environment.addFunction("map", this.copy(), mapper, new MapInvokable(mapper), paralelism); } /** - * Applies a reduce transformation on preset chunks of the DataStream. The - * transformation calls a GroupReduceFunction for each tuple batch of the - * predefined size. Each GroupReduceFunction call can return any number of - * elements including none. - * - * + * Sets the given batchreduce function. * @param reducer - * The GroupReduceFunction that is called for each tuple batch. + * The object containing the batchreduce function. * @param batchSize - * The number of tuples grouped together in the batch. - * @param parallelism - * The number of threads the function runs on. - * @return The modified datastream. + * The number of elements proceeded at the same time + * @param paralelism + * The number of threads the function runs on. + * @return + * The modified datastream. */ - public DataStream batchReduce(GroupReduceFunction reducer, - int batchSize, int paralelism) { - return environment.addFunction("batchReduce", batch(batchSize).copy(), reducer, - new BatchReduceInvokable(reducer), paralelism); + public DataStream batchReduce(GroupReduceFunction reducer, int batchSize, int paralelism) { + return environment.addFunction("batchReduce", batch(batchSize).copy(), reducer, new BatchReduceInvokable( + reducer), paralelism); } /** - * Applies a Filter transformation on a DataStream. The transformation calls - * a FilterFunction for each element of the DataStream and retains only - * those element for which the function returns true. Elements for which the - * function returns false are filtered. - * + * Sets the given filter function. * @param filter - * The FilterFunction that is called for each element of the - * DataSet. + * The object containing the filter function. * @param paralelism - * The number of threads the function runs on. - * @return The filtered DataStream. + * The number of threads the function runs on. + * @return + * The modified datastream. */ public DataStream filter(FilterFunction filter, int paralelism) { - return environment.addFunction("filter", this.copy(), filter, - new FilterInvokable(filter), paralelism); + return environment.addFunction("filter", this.copy(), filter, new FilterInvokable(filter), paralelism); } /** * Sets the given sink function. - * * @param sinkFunction - * The object containing the sink's invoke function. + * The object containing the sink's invoke function. * @param paralelism - * The number of threads the function runs on. - * @return The modified datastream. + * The number of threads the function runs on. + * @return + * The modified datastream. */ public DataStream addSink(SinkFunction sinkFunction, int paralelism) { return environment.addSink(this.copy(), sinkFunction, paralelism); } - + /** * Sets the given sink function. - * * @param sinkFunction - * The object containing the sink's invoke function. + * The object containing the sink's invoke function. * @return + * The modified datastream. */ public DataStream addSink(SinkFunction sinkFunction) { return environment.addSink(this.copy(), sinkFunction); } /** - * Prints the tuples from the DataStream. - * + * Prints the datastream. * @return + * The original stream. */ public DataStream print() { return environment.print(this.copy()); @@ -288,9 +284,8 @@ public class DataStream { /** * Set the type parameter. - * * @param type - * The type parameter. + * The type parameter. */ protected void setType(TypeInformation type) { this.type = type; @@ -298,8 +293,8 @@ public class DataStream { /** * Get the type information. - * - * @return The type of the generic parameter. + * @return + * The type of the generic parameter. */ public TypeInformation getType() { return this.type; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 181d8263ff9..3d00c24b35d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -37,6 +37,7 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex; import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.pact.runtime.task.util.TaskConfig; +import eu.stratosphere.streaming.api.invokable.StreamComponent; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; @@ -51,7 +52,7 @@ import eu.stratosphere.streaming.partitioner.GlobalPartitioner; import eu.stratosphere.streaming.partitioner.ShufflePartitioner; /** - * Object for building Flink stream processing job graphs + * Object for building Stratosphere stream processing job graphs */ public class JobGraphBuilder { @@ -88,16 +89,15 @@ public class JobGraphBuilder { } /** - * Creates a new JobGraph with the given parameters + * Creates a new JobGraph with the given name with fault tolerance turned + * off * * @param jobGraphName * Name of the JobGraph - * @param faultToleranceType - * Type of fault tolerance - * @param defaultBatchSize - * Default number of records to send at one emit - * @param defaultBatchTimeoutMillis */ + public JobGraphBuilder(String jobGraphName) { + this(jobGraphName, FaultToleranceType.NONE); + } public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int defaultBatchSize, long defaultBatchTimeoutMillis) { @@ -107,118 +107,151 @@ public class JobGraphBuilder { } /** - * Adds source to the JobGraph with the given parameters + * Adds source to the JobGraph by user defined object and serialized + * operator * * @param sourceName - * Name of the component * @param InvokableObject - * User defined operator * @param operatorName - * Operator type * @param serializedFunction - * Serialized udf - * @param parallelism - * Number of parallel instances created - * @param subtasksPerInstance - * Number of parallel instances on one task manager */ public void setSource(String sourceName, UserSourceInvokable InvokableObject, String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { + Configuration config = setSource(sourceName, InvokableObject, parallelism, + subtasksPerInstance); + config.setBytes("operator", serializedFunction); + config.setString("operatorName", operatorName); + } - final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); + public void setSource(String sourceName, UserSourceInvokable InvokableObject, + String operatorName, byte[] serializedFunction) { + setSource(sourceName, InvokableObject, operatorName, serializedFunction, 1, 1); + } + /** + * Adds source to the JobGraph by user defined object with the set + * parallelism + * + * @param sourceName + * Name of the source component + * @param InvokableObject + * User defined UserSourceInvokable object or other predefined + * source object + * @param parallelism + * Number of task instances of this type to run in parallel + * @param subtasksPerInstance + * Number of subtasks allocated to a machine + */ + public Configuration setSource(String sourceName, + UserSourceInvokable InvokableObject, int parallelism, + int subtasksPerInstance) { + final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); source.setInputClass(StreamSource.class); - - setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction, - parallelism, subtasksPerInstance); - + Configuration config = setComponent(sourceName, InvokableObject, parallelism, + subtasksPerInstance, source); if (log.isDebugEnabled()) { log.debug("SOURCE: " + sourceName); } + return config; + } + + public void setTask(String taskName, + UserTaskInvokable TaskInvokableObject, + String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { + Configuration config = setTask(taskName, TaskInvokableObject, parallelism, + subtasksPerInstance); + config.setBytes("operator", serializedFunction); + config.setString("operatorName", operatorName); + } + + public void setTask(String taskName, + UserTaskInvokable TaskInvokableObject, + String operatorName, byte[] serializedFunction) { + setTask(taskName, TaskInvokableObject, operatorName, serializedFunction, 1, 1); } /** - * Adds task to the JobGraph with the given parameters + * Adds a task component to the JobGraph * * @param taskName - * Name of the component + * Name of the task component * @param TaskInvokableObject - * User defined operator - * @param operatorName - * Operator type - * @param serializedFunction - * Serialized udf + * User defined UserTaskInvokable object * @param parallelism - * Number of parallel instances created + * Number of task instances of this type to run in parallel * @param subtasksPerInstance - * Number of parallel instances on one task manager + * Number of subtasks allocated to a machine + * @return */ - public void setTask(String taskName, + public Configuration setTask(String taskName, UserTaskInvokable TaskInvokableObject, - String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { - + int parallelism, int subtasksPerInstance) { final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); task.setTaskClass(StreamTask.class); - setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction, - parallelism, subtasksPerInstance); - + Configuration config = setComponent(taskName, TaskInvokableObject, parallelism, + subtasksPerInstance, task); if (log.isDebugEnabled()) { log.debug("TASK: " + taskName); } + return config; + } + + public void setSink(String sinkName, UserSinkInvokable InvokableObject, + String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { + Configuration config = setSink(sinkName, InvokableObject, parallelism, subtasksPerInstance); + config.setBytes("operator", serializedFunction); + config.setString("operatorName", operatorName); + + } + + public void setSink(String sinkName, UserSinkInvokable InvokableObject, + String operatorName, byte[] serializedFunction) { + setSink(sinkName, InvokableObject, operatorName, serializedFunction, 1, 1); + } /** - * Adds sink to the JobGraph with the given parameters + * Adds a sink component to the JobGraph with no parallelism * * @param sinkName - * Name of the component + * Name of the sink component * @param InvokableObject - * User defined operator - * @param operatorName - * Operator type - * @param serializedFunction - * Serialized udf + * User defined UserSinkInvokable object * @param parallelism - * Number of parallel instances created + * Number of task instances of this type to run in parallel * @param subtasksPerInstance - * Number of parallel instances on one task manager + * Number of subtasks allocated to a machine */ - public void setSink(String sinkName, UserSinkInvokable InvokableObject, - String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { - + public Configuration setSink(String sinkName, + UserSinkInvokable InvokableObject, int parallelism, + int subtasksPerInstance) { final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); sink.setOutputClass(StreamSink.class); - setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, - parallelism, subtasksPerInstance); - + Configuration config = setComponent(sinkName, InvokableObject, parallelism, + subtasksPerInstance, sink); if (log.isDebugEnabled()) { log.debug("SINK: " + sinkName); } - + return config; } /** - * Sets component parameters in the JobGraph + * Sets JobVertex configuration based on the given parameters * * @param componentName * Name of the component - * @param component - * The component vertex - * @param InvokableObject - * The user defined invokable object - * @param operatorName - * Type of the user defined operator - * @param serializedFunction - * Serialized operator + * @param InvokableClass + * Class of the user defined Invokable * @param parallelism - * Number of parallel instances created + * Number of subtasks * @param subtasksPerInstance - * Number of parallel instances on one task manager + * Number of subtasks per instance + * @param component + * AbstractJobVertex associated with the component */ - private void setComponent(String componentName, AbstractJobVertex component, - Serializable InvokableObject, String operatorName, byte[] serializedFunction, - int parallelism, int subtasksPerInstance) { - + private Configuration setComponent(String componentName, + final Class InvokableClass, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { component.setNumberOfSubtasks(parallelism); component.setNumberOfSubtasksPerInstance(subtasksPerInstance); @@ -228,27 +261,49 @@ public class JobGraphBuilder { } Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration(); - config.setClass("userfunction", InvokableObject.getClass()); + config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); config.setInteger("batchSize", batchSize); config.setLong("batchTimeout", batchTimeout); + // config.setBytes("operator", getSerializedFunction()); + config.setInteger("faultToleranceType", faultToleranceType.id); - config.setBytes("operator", serializedFunction); - config.setString("operatorName", operatorName); - addSerializedObject(InvokableObject, config); components.put(componentName, component); numberOfInstances.put(componentName, parallelism); + return config; + } + + private Configuration setComponent(String componentName, + UserSourceInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { + Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, + subtasksPerInstance, component); + + addSerializedObject(InvokableObject, component); + return config; + } + + private Configuration setComponent(String componentName, + UserTaskInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { + Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, + subtasksPerInstance, component); + + addSerializedObject(InvokableObject, component); + return config; + } + + private Configuration setComponent(String componentName, + UserSinkInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { + Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, + subtasksPerInstance, component); + + addSerializedObject(InvokableObject, component); + return config; } - /** - * Sets the number of tuples batched together for higher throughput - * - * @param componentName - * Name of the component - * @param batchSize - * Number of tuples batched together - */ public void setBatchSize(String componentName, int batchSize) { Configuration config = components.get(componentName).getConfiguration(); config.setInteger("batchSize_" @@ -260,11 +315,12 @@ public class JobGraphBuilder { * * @param InvokableObject * Invokable object to serialize - * @param config - * JobVertex configuration to which the serialized invokable will - * be added + * @param component + * JobVertex to which the serialized invokable will be added */ - private void addSerializedObject(Serializable InvokableObject, Configuration config) { + private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) { + + Configuration config = component.getConfiguration(); ByteArrayOutputStream baos = null; ObjectOutputStream oos = null; @@ -283,31 +339,19 @@ public class JobGraphBuilder { } - /** - * Sets udf operator from one component to another, used with some sinks. - * - * @param from - * @param to - */ - public void setBytesFrom(String from, String to) { - Configuration fromConfig = components.get(from).getConfiguration(); - Configuration toConfig = components.get(to).getConfiguration(); - - toConfig.setString("operatorName", fromConfig.getString("operatorName", null)); - toConfig.setBytes("operator", fromConfig.getBytes("operator", null)); - - } - /** * Connects to JobGraph components with the given names, partitioning and * channel type * * @param upStreamComponentName - * Name of the upstream component, that will emit the tuples + * Name of the upstream component, that will emit the records * @param downStreamComponentName - * Name of the downstream component, that will receive the tuples + * Name of the downstream component, that will receive the + * records * @param PartitionerClass * Class of the partitioner + * @param channelType + * Channel Type */ private void connect(String upStreamComponentName, String downStreamComponentName, Class> PartitionerClass) { @@ -351,7 +395,7 @@ public class JobGraphBuilder { /** * Sets all components to share with the one with highest parallelism */ - private void setAutomaticInstanceSharing() { + public void setAutomaticInstanceSharing() { AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName); @@ -439,13 +483,14 @@ public class JobGraphBuilder { /** * Connects two components with the given names by global partitioning. *

- * Global partitioning: sends all emitted tuples to one output instance + * Global partitioning: sends all emitted records to one output instance * (i.e. the first one) * * @param upStreamComponentName - * Name of the upstream component, that will emit the tuples + * Name of the upstream component, that will emit the records * @param downStreamComponentName - * Name of the downstream component, that will receive the tuples + * Name of the downstream component, that will receive the + * records */ public void globalConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class); @@ -457,13 +502,14 @@ public class JobGraphBuilder { /** * Connects two components with the given names by shuffle partitioning. *

- * Shuffle partitioning: sends the output tuples to a randomly selected + * Shuffle partitioning: sends the output records to a randomly selected * channel * * @param upStreamComponentName - * Name of the upstream component, that will emit the tuples + * Name of the upstream component, that will emit the records * @param downStreamComponentName - * Name of the downstream component, that will receive the tuples + * Name of the downstream component, that will receive the + * records */ public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class); @@ -471,13 +517,6 @@ public class JobGraphBuilder { log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName); } - /** - * Sets the number of instances for a given component, used for fault - * tolerance purposes - * - * @param upStreamComponentName - * @param numOfInstances - */ private void addOutputChannels(String upStreamComponentName, int numOfInstances) { if (numberOfOutputChannels.containsKey(upStreamComponentName)) { numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances); @@ -528,4 +567,12 @@ public class JobGraphBuilder { return jobGraph; } + public void setBytesFrom(String from, String to) { + Configuration fromConfig = components.get(from).getConfiguration(); + Configuration toConfig = components.get(to).getConfiguration(); + + toConfig.setString("operatorName", fromConfig.getString("operatorName", null)); + toConfig.setBytes("operator", fromConfig.getBytes("operator", null)); + + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 192c54ff1a3..5c8b5886975 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -15,6 +15,8 @@ package eu.stratosphere.streaming.api; +import java.util.List; + import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; @@ -30,16 +32,17 @@ public class StreamCollector implements Collector { protected int counter = 0; protected int channelID; private long timeOfLastRecordEmitted = System.currentTimeMillis();; - private RecordWriter output; + private List> outputs; public StreamCollector(int batchSize, long batchTimeout, int channelID, - SerializationDelegate serializationDelegate, RecordWriter output) { + SerializationDelegate serializationDelegate, + List> outputs) { this.batchSize = batchSize; this.batchTimeout = batchTimeout; this.streamRecord = new ArrayStreamRecord(batchSize); this.streamRecord.setSeralizationDelegate(serializationDelegate); this.channelID = channelID; - this.output = output; + this.outputs = outputs; } public StreamCollector(int batchSize, long batchTimeout, int channelID, @@ -50,6 +53,7 @@ public class StreamCollector implements Collector { // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { + //TODO: move copy to StreamCollector2 streamRecord.setTuple(counter, tuple); counter++; @@ -73,14 +77,19 @@ public class StreamCollector implements Collector { counter = 0; streamRecord.setId(channelID); - try { - output.emit(streamRecord); - output.flush(); - } catch (Exception e) { - e.printStackTrace(); - System.out.println("emit fail"); + if (outputs == null) { + System.out.println(streamRecord); + } else { + for (RecordWriter output : outputs) { + try { + output.emit(streamRecord); + output.flush(); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("emit fail"); + } + } } - } @Override diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java similarity index 80% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java index 63e127edfb0..d733f462dd5 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java @@ -24,7 +24,7 @@ import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.util.Collector; -public class StreamCollectorManager implements Collector { +public class StreamCollector2 implements Collector { ArrayList> notPartitionedCollectors; ArrayList[]> partitionedCollectors; @@ -33,9 +33,8 @@ public class StreamCollectorManager implements Collector { int keyPostition; // TODO consider channelID - public StreamCollectorManager(List batchSizesOfNotPartitioned, - List batchSizesOfPartitioned, List parallelismOfOutput, - int keyPosition, long batchTimeout, int channelID, + public StreamCollector2(List batchSizesOfNotPartitioned, List batchSizesOfPartitioned, + List parallelismOfOutput, int keyPosition, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, List> partitionedOutputs, List> notPartitionedOutputs) { @@ -48,16 +47,19 @@ public class StreamCollectorManager implements Collector { this.keyPostition = keyPosition; for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) { - notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned - .get(i), batchTimeout, channelID, serializationDelegate, notPartitionedOutputs - .get(i))); + List> output = new ArrayList>(); + output.add(notPartitionedOutputs.get(i)); + notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned.get(i), + batchTimeout, channelID, serializationDelegate, output)); } for (int i = 0; i < batchSizesOfPartitioned.size(); i++) { StreamCollector[] collectors = new StreamCollector[parallelismOfOutput.get(i)]; for (int j = 0; j < collectors.length; j++) { + List> output = new ArrayList>(); + output.add(partitionedOutputs.get(i)); collectors[j] = new StreamCollector(batchSizesOfPartitioned.get(i), - batchTimeout, channelID, serializationDelegate, partitionedOutputs.get(i)); + batchTimeout, channelID, serializationDelegate, output); } partitionedCollectors.add(collectors); } @@ -67,7 +69,7 @@ public class StreamCollectorManager implements Collector { @Override public void collect(T tuple) { T copiedTuple = StreamRecord.copyTuple(tuple); - + for (StreamCollector collector : notPartitionedCollectors) { collector.collect(copiedTuple); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java index 1673e45843d..37e73c62ffa 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java @@ -193,17 +193,17 @@ public class StreamExecutionEnvironment { * Creates a new DataStream that contains a sequence of numbers. * * @param from - * The number to start at (inclusive). + * First number in the sequence * @param to - * The number to stop at (inclusive) - * @return A DataStrean, containing all number in the [from, to] interval. + * Last element in the sequence + * @return the data stream constructed */ public DataStream> generateSequence(long from, long to) { return addSource(new SequenceSource(from, to), 1); } /** - * Source Function used to generate the number sequence + * Source Function used to generate sequence * */ private static final class SequenceSource extends SourceFunction> { @@ -230,15 +230,12 @@ public class StreamExecutionEnvironment { } /** - * Creates a new DataStream that contains the given elements. The elements - * must all be of the same type, for example, all of the String or Integer. - * The sequence of elements must not be empty. Furthermore, the elements - * must be serializable (as defined in java.io.Serializable), because the - * execution environment may ship the elements into the cluster. + * Creates a new DataStream by iterating through the given data. The + * elements are inserted into a Tuple1. * * @param data - * The collection of elements to create the DataStream from. - * @return The DataStream representing the elements. + * + * @return */ public DataStream> fromElements(X... data) { DataStream> returnStream = new DataStream>(this); @@ -250,14 +247,12 @@ public class StreamExecutionEnvironment { } /** - * Creates a DataStream from the given non-empty collection. The type of the - * DataStream is that of the elements in the collection. The elements need - * to be serializable (as defined by java.io.Serializable), because the - * framework may move the elements into the cluster if needed. + * Creates a new DataStream by iterating through the given data collection. + * The elements are inserted into a Tuple1. * * @param data - * The collection of elements to create the DataStream from. - * @return The DataStream representing the elements. + * + * @return */ public DataStream> fromCollection(Collection data) { DataStream> returnStream = new DataStream>(this); @@ -315,9 +310,10 @@ public class StreamExecutionEnvironment { return addSink(inputStream, sinkFunction, 1); } + // TODO: link to SinkFunction /** * Dummy implementation of the SinkFunction writing every tuple to the - * standard output. Used for print. + * standard output. * * @param * Input tuple type @@ -377,31 +373,27 @@ public class StreamExecutionEnvironment { } /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise. The file will be read with the system's default - * character set. + * Read a text file from the given path and emits the lines as + * Tuple1-s * - * @param filePath - * The path of the file, as a URI (e.g., - * "file:///some/local/file" or "hdfs://host:port/file/path"). - * @return The DataStream representing the text file. + * @param path + * Input file + * @return the data stream constructed */ - public DataStream> readTextFile(String filePath) { - return addSource(new FileSourceFunction(filePath), 1); + public DataStream> readTextFile(String path) { + return addSource(new FileSourceFunction(path), 1); } /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise multiple times(infinite). The file will be read with - * the system's default character set. + * Streams a text file from the given path by reading through it multiple + * times. * - * @param filePath - * The path of the file, as a URI (e.g., - * "file:///some/local/file" or "hdfs://host:port/file/path"). - * @return The DataStream representing the text file. + * @param path + * Input file + * @return the data stream constructed */ - public DataStream> readTextStream(String filePath) { - return addSource(new FileStreamFunction(filePath), 1); + public DataStream> readTextStream(String path) { + return addSource(new FileStreamFunction(path), 1); } /** diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index ed1a104cdac..2205939e358 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -45,7 +45,8 @@ import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.streaming.api.SinkFunction; -import eu.stratosphere.streaming.api.StreamCollectorManager; +import eu.stratosphere.streaming.api.StreamCollector; +import eu.stratosphere.streaming.api.StreamCollector2; import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable; import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; @@ -123,7 +124,7 @@ public final class StreamComponentHelper { // collector = new StreamCollector(batchSize, batchTimeout, id, // outSerializationDelegate, outputs); - collector = new StreamCollectorManager(batchsizes_s, batchsizes_f, numOfOutputs_f, + collector = new StreamCollector2(batchsizes_s, batchsizes_f, numOfOutputs_f, keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s); return collector; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java index 5227bb8b663..2e7a44b2777 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java @@ -21,10 +21,13 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.execution.Environment; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInputTask; +import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.examples.DummyIS; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index c92e97c563e..0b56b6d1ba8 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -26,6 +26,7 @@ import org.junit.Test; import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.util.Collector; public class MapTest { @@ -39,6 +40,39 @@ public class MapTest { } } } + + public static final class MySource1 extends SourceFunction> { + + @Override + public void invoke(Collector> collector) throws Exception { + for (int i = 0; i < 5; i++) { + System.out.println("s1: "+i); + collector.collect(new Tuple1(i)); + } + } + } + + public static final class MySource2 extends SourceFunction> { + + @Override + public void invoke(Collector> collector) throws Exception { + for (int i = 5; i < 10; i++) { + System.out.println("s2: "+i); + collector.collect(new Tuple1(i)); + } + } + } + + public static final class MySource3 extends SourceFunction> { + + @Override + public void invoke(Collector> collector) throws Exception { + for (int i = 10; i < 15; i++) { + System.out.println("s3: "+i); + collector.collect(new Tuple1(i)); + } + } + } public static final class MyMap extends MapFunction, Tuple1> { @@ -48,6 +82,16 @@ public class MapTest { return new Tuple1(value.f0 * value.f0); } } + + public static final class MyJoinMap extends MapFunction, Tuple1> { + + @Override + public Tuple1 map(Tuple1 value) throws Exception { + joinSetResult.add(value.f0); + System.out.println(value.f0); + return new Tuple1(value.f0); + } + } public static final class MyFieldsMap extends MapFunction, Tuple1> { @@ -122,6 +166,14 @@ public class MapTest { graphResult++; } } + + public static final class JoinSink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + System.out.println("doing nothing"); + } + } private static Set expected = new HashSet(); private static Set result = new HashSet(); @@ -138,6 +190,9 @@ public class MapTest { private static Set fromCollectionSet = new HashSet(); private static List fromCollectionFields = new ArrayList(); private static Set fromCollectionDiffFieldsSet = new HashSet(); + private static Set singleJoinSetExpected = new HashSet(); + private static Set multipleJoinSetExpected = new HashSet(); + private static Set joinSetResult = new HashSet(); private static void fillExpectedList() { for (int i = 0; i < 10; i++) { @@ -169,6 +224,19 @@ public class MapTest { } } } + + private static void fillSingleJoinSet() { + for (int i = 0; i < 10; i++) { + singleJoinSetExpected.add(i); + } + } + + private static void fillMultipleJoinSet() { + for (int i = 0; i < 15; i++) { + multipleJoinSetExpected.add(i); + } + } + @Test public void mapTest() throws Exception { @@ -287,5 +355,51 @@ public class MapTest { // } // // } + + @Test + public void singleConnectWithTest() throws Exception { + + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + DataStream> source1 = env.addSource(new MySource1(), + 1); + + DataStream> source2 = env + .addSource(new MySource2(), 1) + .connectWith(source1) + .partitionBy(0) + .map(new MyJoinMap(), 1) + .addSink(new JoinSink()); + + env.execute(); + + fillSingleJoinSet(); + + assertEquals(singleJoinSetExpected, joinSetResult); + } + + @Test + public void multipleConnectWithTest() throws Exception { + + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + DataStream> source1 = env.addSource(new MySource1(), + 1); + + DataStream> source2 = env.addSource(new MySource2(), + 1); + DataStream> source3 = env + .addSource(new MySource3(), 1) + .connectWith(source1, source2) + .partitionBy(0) + .map(new MyJoinMap(), 1) + .addSink(new JoinSink()); + + env.execute(); + + fillMultipleJoinSet(); + + assertEquals(multipleJoinSetExpected, joinSetResult); + } } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java index d636573ed0b..6c8b78036c6 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java @@ -29,8 +29,6 @@ public class PrintTest { public static final class MyFlatMap extends FlatMapFunction, Tuple2> { - private static final long serialVersionUID = 1L; - @Override public void flatMap(Tuple2 value, Collector> out) throws Exception { diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java index 51d294a455c..bf847fc68c4 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java @@ -31,7 +31,7 @@ import eu.stratosphere.streaming.util.MockRecordWriterFactory; public class StreamCollector2Test { - StreamCollectorManager collector; + StreamCollector2 collector; @Test public void testCollect() { @@ -54,7 +54,7 @@ public class StreamCollector2Test { fOut.add(rw1); fOut.add(rw2); - collector = new StreamCollectorManager(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut); + collector = new StreamCollector2(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut); Tuple1 t = new Tuple1(); StreamCollector sc1 = new StreamCollector(1, batchTimeout, channelID, null); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java index 910980d6184..4008d6ff052 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java @@ -65,8 +65,11 @@ public class StreamCollectorTest { @Test public void recordWriter() { MockRecordWriter recWriter = MockRecordWriterFactory.create(); + + ArrayList> rwList = new ArrayList>(); + rwList.add(recWriter); - StreamCollector collector = new StreamCollector(2, 1000, 0, null, recWriter); + StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList); collector.collect(new Tuple1(3)); collector.collect(new Tuple1(4)); collector.collect(new Tuple1(5)); -- GitLab