diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 5c8b58869753ec221cb574a48aae8908c9d882d5..192c54ff1a3161839f4e9b27d5acc43aaed7c524 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -15,8 +15,6 @@ package eu.stratosphere.streaming.api; -import java.util.List; - import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; @@ -32,17 +30,16 @@ public class StreamCollector implements Collector { protected int counter = 0; protected int channelID; private long timeOfLastRecordEmitted = System.currentTimeMillis();; - private List> outputs; + private RecordWriter output; public StreamCollector(int batchSize, long batchTimeout, int channelID, - SerializationDelegate serializationDelegate, - List> outputs) { + SerializationDelegate serializationDelegate, RecordWriter output) { this.batchSize = batchSize; this.batchTimeout = batchTimeout; this.streamRecord = new ArrayStreamRecord(batchSize); this.streamRecord.setSeralizationDelegate(serializationDelegate); this.channelID = channelID; - this.outputs = outputs; + this.output = output; } public StreamCollector(int batchSize, long batchTimeout, int channelID, @@ -53,7 +50,6 @@ public class StreamCollector implements Collector { // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { - //TODO: move copy to StreamCollector2 streamRecord.setTuple(counter, tuple); counter++; @@ -77,19 +73,14 @@ public class StreamCollector implements Collector { counter = 0; streamRecord.setId(channelID); - if (outputs == null) { - System.out.println(streamRecord); - } else { - for (RecordWriter output : outputs) { - try { - output.emit(streamRecord); - output.flush(); - } catch (Exception e) { - e.printStackTrace(); - System.out.println("emit fail"); - } - } + try { + output.emit(streamRecord); + output.flush(); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("emit fail"); } + } @Override diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java index 1fe77d7e00331f47ed1ab70aea985dbb3c2db182..63e127edfb0e28a43a061734a727bc4f0ce2bf71 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollectorManager.java @@ -33,8 +33,9 @@ public class StreamCollectorManager implements Collector { int keyPostition; // TODO consider channelID - public StreamCollectorManager(List batchSizesOfNotPartitioned, List batchSizesOfPartitioned, - List parallelismOfOutput, int keyPosition, long batchTimeout, int channelID, + public StreamCollectorManager(List batchSizesOfNotPartitioned, + List batchSizesOfPartitioned, List parallelismOfOutput, + int keyPosition, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, List> partitionedOutputs, List> notPartitionedOutputs) { @@ -47,19 +48,16 @@ public class StreamCollectorManager implements Collector { this.keyPostition = keyPosition; for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) { - List> output = new ArrayList>(); - output.add(notPartitionedOutputs.get(i)); - notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned.get(i), - batchTimeout, channelID, serializationDelegate, output)); + notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned + .get(i), batchTimeout, channelID, serializationDelegate, notPartitionedOutputs + .get(i))); } for (int i = 0; i < batchSizesOfPartitioned.size(); i++) { StreamCollector[] collectors = new StreamCollector[parallelismOfOutput.get(i)]; for (int j = 0; j < collectors.length; j++) { - List> output = new ArrayList>(); - output.add(partitionedOutputs.get(i)); collectors[j] = new StreamCollector(batchSizesOfPartitioned.get(i), - batchTimeout, channelID, serializationDelegate, output); + batchTimeout, channelID, serializationDelegate, partitionedOutputs.get(i)); } partitionedCollectors.add(collectors); } @@ -69,7 +67,7 @@ public class StreamCollectorManager implements Collector { @Override public void collect(T tuple) { T copiedTuple = StreamRecord.copyTuple(tuple); - + for (StreamCollector collector : notPartitionedCollectors) { collector.collect(copiedTuple); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java index 2e7a44b2777816736552d8e28340f88b9314ac29..5227bb8b663010e813dceda25f17f51f65d779c9 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java @@ -21,13 +21,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.nephele.execution.Environment; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInputTask; -import eu.stratosphere.streaming.api.StreamCollector; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.examples.DummyIS; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 89ce7cfad28fe7a1a3ad56ce42a7d910f0ba7245..303c96249e58651242e024c5a8a28e48735f5184 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -15,19 +15,21 @@ package eu.stratosphere.streaming.api; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.junit.Test; import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TypeExtractor; import eu.stratosphere.configuration.Configuration; @@ -37,6 +39,7 @@ import eu.stratosphere.nephele.jobgraph.JobOutputVertex; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; import eu.stratosphere.streaming.api.MapTest.MyMap; import eu.stratosphere.streaming.api.MapTest.MySink; +import eu.stratosphere.streaming.api.PrintTest.MyFlatMap; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; @@ -47,57 +50,193 @@ public class FlatMapTest { public static final class MyFlatMap extends FlatMapFunction, Tuple1> { @Override - public void flatMap(Tuple1 value, - Collector> out) throws Exception { - out.collect(new Tuple1(value.f0*value.f0)); - + public void flatMap(Tuple1 value, Collector> out) throws Exception { + out.collect(new Tuple1(value.f0 * value.f0)); + } - + + } + + public static final class ParallelFlatMap extends + FlatMapFunction, Tuple1> { + + @Override + public void flatMap(Tuple1 value, Collector> out) throws Exception { + numberOfElements++; + + } + + } + + public static final class GenerateSequenceFlatMap extends + FlatMapFunction, Tuple1> { + + @Override + public void flatMap(Tuple1 value, Collector> out) throws Exception { + out.collect(new Tuple1(value.f0 * value.f0)); + + } + } public static final class MySink extends SinkFunction> { - + @Override public void invoke(Tuple1 tuple) { result.add(tuple.f0); - System.out.println("result " + tuple.f0); } } - public static final class MySource extends SourceFunction> { + public static final class FromElementsSink extends SinkFunction> { @Override - public void invoke(Collector> collector) - throws Exception { - for(int i=0; i<10; i++){ - collector.collect(new Tuple1(i)); - } + public void invoke(Tuple1 tuple) { + fromElementsResult.add(tuple.f0); + } + + } + + public static final class FromCollectionSink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + fromCollectionResult.add(tuple.f0); + } + + } + + public static final class GenerateSequenceSink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + generateSequenceResult.add(tuple.f0); + } + + } + + private static void fillExpectedList() { + for (int i = 0; i < 10; i++) { + expected.add(i * i); + } + } + + private static void fillFromElementsExpected() { + fromElementsExpected.add(4); + fromElementsExpected.add(25); + fromElementsExpected.add(81); + } + + private static void fillSequenceSet() { + for (int i = 0; i < 10; i++) { + sequenceExpected.add(i * i); } } - private static void fillExpectedList(){ - for(int i=0;i<10;i++){ - expected.add(i*i); + private static void fillLongSequenceSet() { + for (int i = 0; i < 10; i++) { + sequenceLongExpected.add((long)(i * i)); + } + } + + private static void fillFromCollectionSet() { + if(fromCollectionSet.isEmpty()){ + for (int i = 0; i < 10; i++) { + fromCollectionSet.add(i); + } } } private static final int PARALELISM = 1; - private static List expected = new ArrayList(); - private static List result = new ArrayList(); + private static int numberOfElements = 0; + private static Set expected = new HashSet(); + private static Set result = new HashSet(); + private static Set fromElementsExpected = new HashSet(); + private static Set fromElementsResult = new HashSet(); + private static Set fromCollectionSet = new HashSet(); + private static Set sequenceExpected = new HashSet(); + private static Set sequenceLongExpected = new HashSet(); + private static Set fromCollectionResult = new HashSet(); + private static Set generateSequenceResult = new HashSet(); @Test public void test() throws Exception { - StreamExecutionEnvironment env = new StreamExecutionEnvironment(2, 1000); - DataStream> dataStream = env.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink()); - + + fillFromCollectionSet(); + + DataStream> dataStream = env.fromCollection(fromCollectionSet) + .flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink()); env.execute(); - + fillExpectedList(); - + assertTrue(expected.equals(result)); + + } + + @Test + public void parallelShuffleconnectTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + DataStream> source = env.fromCollection(fromCollectionSet); + DataStream> map = source.flatMap(new ParallelFlatMap(), 1).addSink( + new MySink()); + DataStream> map2 = source.flatMap(new ParallelFlatMap(), 1).addSink( + new MySink()); + + env.execute(); + + assertEquals(20, numberOfElements); + numberOfElements=0; + + + } + + @Test + public void fromElementsTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + DataStream> map = env.fromElements(2, 5, 9).flatMap(new MyFlatMap(), 1); + DataStream> sink = map.addSink(new FromElementsSink()); + + fillFromElementsExpected(); + + env.execute(); + assertEquals(fromElementsExpected, fromElementsResult); + + } + + @Test + public void fromCollectionTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + + DataStream> map = env.fromCollection(fromCollectionSet).flatMap( + new MyFlatMap(), 1); + DataStream> sink = map.addSink(new FromCollectionSink()); + + fillSequenceSet(); + + env.execute(); + assertEquals(sequenceExpected, fromCollectionResult); + + } + + @Test + public void generateSequenceTest() throws Exception { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + DataStream> map = env.generateSequence(0, 9).flatMap( + new GenerateSequenceFlatMap(), 1); + DataStream> sink = map.addSink(new GenerateSequenceSink()); + + fillLongSequenceSet(); + + env.execute(); + assertEquals(sequenceLongExpected, generateSequenceResult); } } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index ee91152cefd293b61f0fc94288928bc8445386f5..c92e97c563e4e84c839e7c1bfba68735ff747ef3 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -18,7 +18,9 @@ package eu.stratosphere.streaming.api; import static org.junit.Assert.*; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.junit.Test; @@ -33,27 +35,6 @@ public class MapTest { @Override public void invoke(Collector> collector) throws Exception { for (int i = 0; i < 10; i++) { - System.out.println("source "+i); - collector.collect(new Tuple1(i)); - } - } - } - - public static final class MyFieldsSource extends SourceFunction> { - - @Override - public void invoke(Collector> collector) throws Exception { - for (int i = 0; i < MAXSOURCE; i++) { - collector.collect(new Tuple1(5)); - } - } - } - - public static final class MyDiffFieldsSource extends SourceFunction> { - - @Override - public void invoke(Collector> collector) throws Exception { - for (int i = 0; i < 9; i++) { collector.collect(new Tuple1(i)); } } @@ -63,7 +44,6 @@ public class MapTest { @Override public Tuple1 map(Tuple1 value) throws Exception { - System.out.println("mymap "+map); map++; return new Tuple1(value.f0 * value.f0); } @@ -76,7 +56,6 @@ public class MapTest { @Override public Tuple1 map(Tuple1 value) throws Exception { counter++; - if (counter == MAXSOURCE) allInOne = true; return new Tuple1(value.f0 * value.f0); @@ -140,13 +119,12 @@ public class MapTest { @Override public void invoke(Tuple1 tuple) { - System.out.println("sink "+graphResult); graphResult++; } } - private static List expected = new ArrayList(); - private static List result = new ArrayList(); + private static Set expected = new HashSet(); + private static Set result = new HashSet(); private static int broadcastResult = 0; private static int shuffleResult = 0; private static int fieldsResult = 0; @@ -157,19 +135,49 @@ public class MapTest { private static final int MAXSOURCE = 10; private static boolean allInOne = false; private static boolean threeInAll = true; + private static Set fromCollectionSet = new HashSet(); + private static List fromCollectionFields = new ArrayList(); + private static Set fromCollectionDiffFieldsSet = new HashSet(); private static void fillExpectedList() { for (int i = 0; i < 10; i++) { expected.add(i * i); } } + + private static void fillFromCollectionSet() { + if(fromCollectionSet.isEmpty()){ + for (int i = 0; i < 10; i++) { + fromCollectionSet.add(i); + } + } + } + + private static void fillFromCollectionFieldsSet() { + if(fromCollectionFields.isEmpty()){ + for (int i = 0; i < MAXSOURCE; i++) { + + fromCollectionFields.add(5); + } + } + } + + private static void fillFromCollectionDiffFieldsSet() { + if(fromCollectionDiffFieldsSet.isEmpty()){ + for (int i = 0; i < 9; i++) { + fromCollectionDiffFieldsSet.add(i); + } + } + } @Test public void mapTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); - DataStream> dataStream = env.addSource(new MySource(), 1) + fillFromCollectionSet(); + + DataStream> dataStream = env.fromCollection(fromCollectionSet) .map(new MyMap(), PARALELISM).addSink(new MySink()); env.execute(); @@ -182,8 +190,11 @@ public class MapTest { @Test public void broadcastSinkTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + DataStream> dataStream = env - .addSource(new MySource(), 1) + .fromCollection(fromCollectionSet) .broadcast() .map(new MyMap(), 3) .addSink(new MyBroadcastSink()); @@ -196,8 +207,11 @@ public class MapTest { @Test public void shuffleSinkTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionSet(); + DataStream> dataStream = env - .addSource(new MySource(), 1) + .fromCollection(fromCollectionSet) .map(new MyMap(), 3) .addSink(new MyShufflesSink()); env.execute(); @@ -222,8 +236,11 @@ public class MapTest { @Test public void fieldsMapTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionFieldsSet(); + DataStream> dataStream = env - .addSource(new MyFieldsSource(), 1) + .fromCollection(fromCollectionFields) .partitionBy(0) .map(new MyFieldsMap(), 3) .addSink(new MyFieldsSink()); @@ -236,8 +253,11 @@ public class MapTest { @Test public void diffFieldsMapTest() throws Exception { StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + + fillFromCollectionDiffFieldsSet(); + DataStream> dataStream = env - .addSource(new MyDiffFieldsSource(), 1) + .fromCollection(fromCollectionDiffFieldsSet) .partitionBy(0) .map(new MyDiffFieldsMap(), 3) .addSink(new MyDiffFieldsSink()); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java index e898bd03febb8bd6aa26ea0718bf3ab607b4f09d..d636573ed0b213feefea3bbefa45d876bc967dfa 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/PrintTest.java @@ -53,4 +53,6 @@ public class PrintTest { env.execute(); } + + } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java index 4008d6ff052e238858953b157d8156254de37d3d..910980d6184f5a854ee0c8529442ce6db41dc722 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java @@ -65,11 +65,8 @@ public class StreamCollectorTest { @Test public void recordWriter() { MockRecordWriter recWriter = MockRecordWriterFactory.create(); - - ArrayList> rwList = new ArrayList>(); - rwList.add(recWriter); - StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList); + StreamCollector collector = new StreamCollector(2, 1000, 0, null, recWriter); collector.collect(new Tuple1(3)); collector.collect(new Tuple1(4)); collector.collect(new Tuple1(5));