diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml index 50fe3bd90adb66e7e50dca9a873d9c2c3a0e6654..25d69b057f6bb6e1edbedfb91c483e1ce77b1bf2 100644 --- a/flink-addons/flink-streaming/pom.xml +++ b/flink-addons/flink-streaming/pom.xml @@ -12,7 +12,7 @@ jar - 0.6-SNAPSHOT + 0.5 UTF-8 UTF-8 diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 181d8263ff922141f4a1da59b3f4ff1c6810411b..3d00c24b35dc5b37cb66ba9f653d601a27aeacda 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -37,6 +37,7 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex; import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.pact.runtime.task.util.TaskConfig; +import eu.stratosphere.streaming.api.invokable.StreamComponent; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; @@ -51,7 +52,7 @@ import eu.stratosphere.streaming.partitioner.GlobalPartitioner; import eu.stratosphere.streaming.partitioner.ShufflePartitioner; /** - * Object for building Flink stream processing job graphs + * Object for building Stratosphere stream processing job graphs */ public class JobGraphBuilder { @@ -88,16 +89,15 @@ public class JobGraphBuilder { } /** - * Creates a new JobGraph with the given parameters + * Creates a new JobGraph with the given name with fault tolerance turned + * off * * @param jobGraphName * Name of the JobGraph - * @param faultToleranceType - * Type of fault tolerance - * @param defaultBatchSize - * Default number of records to send at one emit - * @param defaultBatchTimeoutMillis */ + public JobGraphBuilder(String jobGraphName) { + this(jobGraphName, FaultToleranceType.NONE); + } public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int defaultBatchSize, long defaultBatchTimeoutMillis) { @@ -107,118 +107,151 @@ public class JobGraphBuilder { } /** - * Adds source to the JobGraph with the given parameters + * Adds source to the JobGraph by user defined object and serialized + * operator * * @param sourceName - * Name of the component * @param InvokableObject - * User defined operator * @param operatorName - * Operator type * @param serializedFunction - * Serialized udf - * @param parallelism - * Number of parallel instances created - * @param subtasksPerInstance - * Number of parallel instances on one task manager */ public void setSource(String sourceName, UserSourceInvokable InvokableObject, String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { + Configuration config = setSource(sourceName, InvokableObject, parallelism, + subtasksPerInstance); + config.setBytes("operator", serializedFunction); + config.setString("operatorName", operatorName); + } - final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); + public void setSource(String sourceName, UserSourceInvokable InvokableObject, + String operatorName, byte[] serializedFunction) { + setSource(sourceName, InvokableObject, operatorName, serializedFunction, 1, 1); + } + /** + * Adds source to the JobGraph by user defined object with the set + * parallelism + * + * @param sourceName + * Name of the source component + * @param InvokableObject + * User defined UserSourceInvokable object or other predefined + * source object + * @param parallelism + * Number of task instances of this type to run in parallel + * @param subtasksPerInstance + * Number of subtasks allocated to a machine + */ + public Configuration setSource(String sourceName, + UserSourceInvokable InvokableObject, int parallelism, + int subtasksPerInstance) { + final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); source.setInputClass(StreamSource.class); - - setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction, - parallelism, subtasksPerInstance); - + Configuration config = setComponent(sourceName, InvokableObject, parallelism, + subtasksPerInstance, source); if (log.isDebugEnabled()) { log.debug("SOURCE: " + sourceName); } + return config; + } + + public void setTask(String taskName, + UserTaskInvokable TaskInvokableObject, + String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { + Configuration config = setTask(taskName, TaskInvokableObject, parallelism, + subtasksPerInstance); + config.setBytes("operator", serializedFunction); + config.setString("operatorName", operatorName); + } + + public void setTask(String taskName, + UserTaskInvokable TaskInvokableObject, + String operatorName, byte[] serializedFunction) { + setTask(taskName, TaskInvokableObject, operatorName, serializedFunction, 1, 1); } /** - * Adds task to the JobGraph with the given parameters + * Adds a task component to the JobGraph * * @param taskName - * Name of the component + * Name of the task component * @param TaskInvokableObject - * User defined operator - * @param operatorName - * Operator type - * @param serializedFunction - * Serialized udf + * User defined UserTaskInvokable object * @param parallelism - * Number of parallel instances created + * Number of task instances of this type to run in parallel * @param subtasksPerInstance - * Number of parallel instances on one task manager + * Number of subtasks allocated to a machine + * @return */ - public void setTask(String taskName, + public Configuration setTask(String taskName, UserTaskInvokable TaskInvokableObject, - String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { - + int parallelism, int subtasksPerInstance) { final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); task.setTaskClass(StreamTask.class); - setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction, - parallelism, subtasksPerInstance); - + Configuration config = setComponent(taskName, TaskInvokableObject, parallelism, + subtasksPerInstance, task); if (log.isDebugEnabled()) { log.debug("TASK: " + taskName); } + return config; + } + + public void setSink(String sinkName, UserSinkInvokable InvokableObject, + String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { + Configuration config = setSink(sinkName, InvokableObject, parallelism, subtasksPerInstance); + config.setBytes("operator", serializedFunction); + config.setString("operatorName", operatorName); + + } + + public void setSink(String sinkName, UserSinkInvokable InvokableObject, + String operatorName, byte[] serializedFunction) { + setSink(sinkName, InvokableObject, operatorName, serializedFunction, 1, 1); + } /** - * Adds sink to the JobGraph with the given parameters + * Adds a sink component to the JobGraph with no parallelism * * @param sinkName - * Name of the component + * Name of the sink component * @param InvokableObject - * User defined operator - * @param operatorName - * Operator type - * @param serializedFunction - * Serialized udf + * User defined UserSinkInvokable object * @param parallelism - * Number of parallel instances created + * Number of task instances of this type to run in parallel * @param subtasksPerInstance - * Number of parallel instances on one task manager + * Number of subtasks allocated to a machine */ - public void setSink(String sinkName, UserSinkInvokable InvokableObject, - String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) { - + public Configuration setSink(String sinkName, + UserSinkInvokable InvokableObject, int parallelism, + int subtasksPerInstance) { final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); sink.setOutputClass(StreamSink.class); - setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, - parallelism, subtasksPerInstance); - + Configuration config = setComponent(sinkName, InvokableObject, parallelism, + subtasksPerInstance, sink); if (log.isDebugEnabled()) { log.debug("SINK: " + sinkName); } - + return config; } /** - * Sets component parameters in the JobGraph + * Sets JobVertex configuration based on the given parameters * * @param componentName * Name of the component - * @param component - * The component vertex - * @param InvokableObject - * The user defined invokable object - * @param operatorName - * Type of the user defined operator - * @param serializedFunction - * Serialized operator + * @param InvokableClass + * Class of the user defined Invokable * @param parallelism - * Number of parallel instances created + * Number of subtasks * @param subtasksPerInstance - * Number of parallel instances on one task manager + * Number of subtasks per instance + * @param component + * AbstractJobVertex associated with the component */ - private void setComponent(String componentName, AbstractJobVertex component, - Serializable InvokableObject, String operatorName, byte[] serializedFunction, - int parallelism, int subtasksPerInstance) { - + private Configuration setComponent(String componentName, + final Class InvokableClass, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { component.setNumberOfSubtasks(parallelism); component.setNumberOfSubtasksPerInstance(subtasksPerInstance); @@ -228,27 +261,49 @@ public class JobGraphBuilder { } Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration(); - config.setClass("userfunction", InvokableObject.getClass()); + config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); config.setInteger("batchSize", batchSize); config.setLong("batchTimeout", batchTimeout); + // config.setBytes("operator", getSerializedFunction()); + config.setInteger("faultToleranceType", faultToleranceType.id); - config.setBytes("operator", serializedFunction); - config.setString("operatorName", operatorName); - addSerializedObject(InvokableObject, config); components.put(componentName, component); numberOfInstances.put(componentName, parallelism); + return config; + } + + private Configuration setComponent(String componentName, + UserSourceInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { + Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, + subtasksPerInstance, component); + + addSerializedObject(InvokableObject, component); + return config; + } + + private Configuration setComponent(String componentName, + UserTaskInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { + Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, + subtasksPerInstance, component); + + addSerializedObject(InvokableObject, component); + return config; + } + + private Configuration setComponent(String componentName, + UserSinkInvokable InvokableObject, int parallelism, + int subtasksPerInstance, AbstractJobVertex component) { + Configuration config = setComponent(componentName, InvokableObject.getClass(), parallelism, + subtasksPerInstance, component); + + addSerializedObject(InvokableObject, component); + return config; } - /** - * Sets the number of tuples batched together for higher throughput - * - * @param componentName - * Name of the component - * @param batchSize - * Number of tuples batched together - */ public void setBatchSize(String componentName, int batchSize) { Configuration config = components.get(componentName).getConfiguration(); config.setInteger("batchSize_" @@ -260,11 +315,12 @@ public class JobGraphBuilder { * * @param InvokableObject * Invokable object to serialize - * @param config - * JobVertex configuration to which the serialized invokable will - * be added + * @param component + * JobVertex to which the serialized invokable will be added */ - private void addSerializedObject(Serializable InvokableObject, Configuration config) { + private void addSerializedObject(Serializable InvokableObject, AbstractJobVertex component) { + + Configuration config = component.getConfiguration(); ByteArrayOutputStream baos = null; ObjectOutputStream oos = null; @@ -283,31 +339,19 @@ public class JobGraphBuilder { } - /** - * Sets udf operator from one component to another, used with some sinks. - * - * @param from - * @param to - */ - public void setBytesFrom(String from, String to) { - Configuration fromConfig = components.get(from).getConfiguration(); - Configuration toConfig = components.get(to).getConfiguration(); - - toConfig.setString("operatorName", fromConfig.getString("operatorName", null)); - toConfig.setBytes("operator", fromConfig.getBytes("operator", null)); - - } - /** * Connects to JobGraph components with the given names, partitioning and * channel type * * @param upStreamComponentName - * Name of the upstream component, that will emit the tuples + * Name of the upstream component, that will emit the records * @param downStreamComponentName - * Name of the downstream component, that will receive the tuples + * Name of the downstream component, that will receive the + * records * @param PartitionerClass * Class of the partitioner + * @param channelType + * Channel Type */ private void connect(String upStreamComponentName, String downStreamComponentName, Class> PartitionerClass) { @@ -351,7 +395,7 @@ public class JobGraphBuilder { /** * Sets all components to share with the one with highest parallelism */ - private void setAutomaticInstanceSharing() { + public void setAutomaticInstanceSharing() { AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName); @@ -439,13 +483,14 @@ public class JobGraphBuilder { /** * Connects two components with the given names by global partitioning. *

- * Global partitioning: sends all emitted tuples to one output instance + * Global partitioning: sends all emitted records to one output instance * (i.e. the first one) * * @param upStreamComponentName - * Name of the upstream component, that will emit the tuples + * Name of the upstream component, that will emit the records * @param downStreamComponentName - * Name of the downstream component, that will receive the tuples + * Name of the downstream component, that will receive the + * records */ public void globalConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class); @@ -457,13 +502,14 @@ public class JobGraphBuilder { /** * Connects two components with the given names by shuffle partitioning. *

- * Shuffle partitioning: sends the output tuples to a randomly selected + * Shuffle partitioning: sends the output records to a randomly selected * channel * * @param upStreamComponentName - * Name of the upstream component, that will emit the tuples + * Name of the upstream component, that will emit the records * @param downStreamComponentName - * Name of the downstream component, that will receive the tuples + * Name of the downstream component, that will receive the + * records */ public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) { connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class); @@ -471,13 +517,6 @@ public class JobGraphBuilder { log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName); } - /** - * Sets the number of instances for a given component, used for fault - * tolerance purposes - * - * @param upStreamComponentName - * @param numOfInstances - */ private void addOutputChannels(String upStreamComponentName, int numOfInstances) { if (numberOfOutputChannels.containsKey(upStreamComponentName)) { numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances); @@ -528,4 +567,12 @@ public class JobGraphBuilder { return jobGraph; } + public void setBytesFrom(String from, String to) { + Configuration fromConfig = components.get(from).getConfiguration(); + Configuration toConfig = components.get(to).getConfiguration(); + + toConfig.setString("operatorName", fromConfig.getString("operatorName", null)); + toConfig.setBytes("operator", fromConfig.getBytes("operator", null)); + + } } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 89ce7cfad28fe7a1a3ad56ce42a7d910f0ba7245..303c96249e58651242e024c5a8a28e48735f5184 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -15,19 +15,21 @@ package eu.stratosphere.streaming.api; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.junit.Test; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.configuration.Configuration; @@ -37,6 +39,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.streaming.api.MapTest.MyMap; import eu.stratosphere.streaming.api.MapTest.MySink; +import eu.stratosphere.streaming.api.PrintTest.MyFlatMap; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; @@ -47,57 +50,193 @@ public class FlatMapTest { public static final class MyFlatMap extends FlatMapFunction, Tuple1> { @Override - public void flatMap(Tuple1 value, - Collector> out) throws Exception { - out.collect(new Tuple1(value.f0*value.f0)); - + public void flatMap(Tuple1 value, Collector> out) throws Exception { + out.collect(new Tuple1(value.f0 * value.f0)); + } - + + } + + public static final class ParallelFlatMap extends + FlatMapFunction, Tuple1> { + + @Override + public void flatMap(Tuple1 value, Collector> out) throws Exception { + numberOfElements++; + + } + + } + + public static final class GenerateSequenceFlatMap extends + FlatMapFunction, Tuple1> { + + @Override + public void flatMap(Tuple1 value, Collector> out) throws Exception { + out.collect(new Tuple1(value.f0 * value.f0)); + + } + } public static final class MySink extends SinkFunction> { - + @Override public void invoke(Tuple1 tuple) { result.add(tuple.f0); - System.out.println("result " + tuple.f0); } } - public static final class MySource extends SourceFunction> { + public static final class FromElementsSink extends SinkFunction> { @Override - public void invoke(Collector> collector) - throws Exception { - for(int i=0; i<10; i++){ - collector.collect(new Tuple1(i)); - } + public void invoke(Tuple1 tuple) { + fromElementsResult.add(tuple.f0); + } + + } + + public static final class FromCollectionSink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + fromCollectionResult.add(tuple.f0); + } + + } + + public static final class GenerateSequenceSink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + generateSequenceResult.add(tuple.f0); + } + + } + + private static void fillExpectedList() { + for (int i = 0; i < 10; i++) { + expected.add(i * i); + } + } + + private static void fillFromElementsExpected() { + fromElementsExpected.add(4); + fromElementsExpected.add(25); + fromElementsExpected.add(81); + } + + private static void fillSequenceSet() { + for (int i = 0; i < 10; i++) { + sequenceExpected.add(i * i); } } - private static void fillExpectedList(){ - for(int i=0;i<10;i++){ - expected.add(i*i); + private static void fillLongSequenceSet() { + for (int i = 0; i < 10; i++) { + sequenceLongExpected.add((long)(i * i)); + } + } + + private static void fillFromCollectionSet() { + if(fromCollectionSet.isEmpty()){ + for (int i = 0; i < 10; i++) { + fromCollectionSet.add(i); + } } } private static final int PARALELISM = 1; - private static List expected = new ArrayList(); - private static List result = new ArrayList(); + private static int numberOfElements = 0; + private static Set expected = new HashSet(); + private static Set result = new HashSet(); + private static Set fromElementsExpected = new HashSet(); + private static Set fromElementsResult = new HashSet(); + private static Set fromCollectionSet = new HashSet(); + private static Set sequenceExpected = new HashSet(); + private static Set sequenceLongExpected = new HashSet(); + private static Set fromCollectionResult = new HashSet(); + private static Set generateSequenceResult = new HashSet(); @Test public void test() throws Exception { - StreamExecutionEnvironment env = new StreamExecutionEnvironment(2, 1000); - DataStream> dataStream = env.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink()); - + + fillFromCollectionSet(); + + DataStream> dataStream = env.fromCollection(fromCollectionSet) + .flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink()); env.execute(); - + fillExpectedList(); - + assertTrue(expected.equals(result)); + + } + + @Test + public void parallelShuffleconnectTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + DataStream> source = env.fromCollection(fromCollectionSet); + DataStream> map = source.flatMap(new ParallelFlatMap(), 1).addSink( + new MySink()); + DataStream> map2 = source.flatMap(new ParallelFlatMap(), 1).addSink( + new MySink()); + + env.execute(); + + assertEquals(20, numberOfElements); + numberOfElements=0; + + + } + + @Test + public void fromElementsTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + DataStream> map = env.fromElements(2, 5, 9).flatMap(new MyFlatMap(), 1); + DataStream> sink = map.addSink(new FromElementsSink()); + + fillFromElementsExpected(); + + env.execute(); + assertEquals(fromElementsExpected, fromElementsResult); + + } + + @Test + public void fromCollectionTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + + DataStream> map = env.fromCollection(fromCollectionSet).flatMap( + new MyFlatMap(), 1); + DataStream> sink = map.addSink(new FromCollectionSink()); + + fillSequenceSet(); + + env.execute(); + assertEquals(sequenceExpected, fromCollectionResult); + + } + + @Test + public void generateSequenceTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + DataStream> map = env.generateSequence(0, 9).flatMap( + new GenerateSequenceFlatMap(), 1); + DataStream> sink = map.addSink(new GenerateSequenceSink()); + + fillLongSequenceSet(); + + env.execute(); + assertEquals(sequenceLongExpected, generateSequenceResult); } } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index ee91152cefd293b61f0fc94288928bc8445386f5..c92e97c563e4e84c839e7c1bfba68735ff747ef3 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -18,7 +18,9 @@ package eu.stratosphere.streaming.api; import static org.junit.Assert.*; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.junit.Test; @@ -33,27 +35,6 @@ public class MapTest { @Override public void invoke(Collector> collector) throws Exception { for (int i = 0; i < 10; i++) { - System.out.println("source "+i); - collector.collect(new Tuple1(i)); - } - } - } - - public static final class MyFieldsSource extends SourceFunction> { - - @Override - public void invoke(Collector> collector) throws Exception { - for (int i = 0; i < MAXSOURCE; i++) { - collector.collect(new Tuple1(5)); - } - } - } - - public static final class MyDiffFieldsSource extends SourceFunction> { - - @Override - public void invoke(Collector> collector) throws Exception { - for (int i = 0; i < 9; i++) { collector.collect(new Tuple1(i)); } } @@ -63,7 +44,6 @@ public class MapTest { @Override public Tuple1 map(Tuple1 value) throws Exception { - System.out.println("mymap "+map); map++; return new Tuple1(value.f0 * value.f0); } @@ -76,7 +56,6 @@ public class MapTest { @Override public Tuple1 map(Tuple1 value) throws Exception { counter++; - if (counter == MAXSOURCE) allInOne = true; return new Tuple1(value.f0 * value.f0); @@ -140,13 +119,12 @@ public class MapTest { @Override public void invoke(Tuple1 tuple) { - System.out.println("sink "+graphResult); graphResult++; } } - private static List expected = new ArrayList(); - private static List result = new ArrayList(); + private static Set expected = new HashSet(); + private static Set result = new HashSet(); private static int broadcastResult = 0; private static int shuffleResult = 0; private static int fieldsResult = 0; @@ -157,19 +135,49 @@ public class MapTest { private static final int MAXSOURCE = 10; private static boolean allInOne = false; private static boolean threeInAll = true; + private static Set fromCollectionSet = new HashSet(); + private static List fromCollectionFields = new ArrayList(); + private static Set fromCollectionDiffFieldsSet = new HashSet(); private static void fillExpectedList() { for (int i = 0; i < 10; i++) { expected.add(i * i); } } + + private static void fillFromCollectionSet() { + if(fromCollectionSet.isEmpty()){ + for (int i = 0; i < 10; i++) { + fromCollectionSet.add(i); + } + } + } + + private static void fillFromCollectionFieldsSet() { + if(fromCollectionFields.isEmpty()){ + for (int i = 0; i < MAXSOURCE; i++) { + + fromCollectionFields.add(5); + } + } + } + + private static void fillFromCollectionDiffFieldsSet() { + if(fromCollectionDiffFieldsSet.isEmpty()){ + for (int i = 0; i < 9; i++) { + fromCollectionDiffFieldsSet.add(i); + } + } + } @Test public void mapTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); - DataStream> dataStream = env.addSource(new MySource(), 1) + fillFromCollectionSet(); + + DataStream> dataStream = env.fromCollection(fromCollectionSet) .map(new MyMap(), PARALELISM).addSink(new MySink()); env.execute(); @@ -182,8 +190,11 @@ public class MapTest { @Test public void broadcastSinkTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + DataStream> dataStream = env - .addSource(new MySource(), 1) + .fromCollection(fromCollectionSet) .broadcast() .map(new MyMap(), 3) .addSink(new MyBroadcastSink()); @@ -196,8 +207,11 @@ public class MapTest { @Test public void shuffleSinkTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + DataStream> dataStream = env - .addSource(new MySource(), 1) + .fromCollection(fromCollectionSet) .map(new MyMap(), 3) .addSink(new MyShufflesSink()); env.execute(); @@ -222,8 +236,11 @@ public class MapTest { @Test public void fieldsMapTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionFieldsSet(); + DataStream> dataStream = env - .addSource(new MyFieldsSource(), 1) + .fromCollection(fromCollectionFields) .partitionBy(0) .map(new MyFieldsMap(), 3) .addSink(new MyFieldsSink()); @@ -236,8 +253,11 @@ public class MapTest { @Test public void diffFieldsMapTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionDiffFieldsSet(); + DataStream> dataStream = env - .addSource(new MyDiffFieldsSource(), 1) + .fromCollection(fromCollectionDiffFieldsSet) .partitionBy(0) .map(new MyDiffFieldsMap(), 3) .addSink(new MyDiffFieldsSink()); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java index e898bd03febb8bd6aa26ea0718bf3ab607b4f09d..6c8b78036c690a17b3f3dc86fa0cfd3fbe1c9d34 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java @@ -29,8 +29,6 @@ public class PrintTest { public static final class MyFlatMap extends FlatMapFunction, Tuple2> { - private static final long serialVersionUID = 1L; - @Override public void flatMap(Tuple2 value, Collector> out) throws Exception { @@ -53,4 +51,6 @@ public class PrintTest { env.execute(); } + + }