diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java index 3c9f0de75b0062c8421494f02d53c7f302e3a57c..cd557054472d0f67abe5af5b1cf8e73d6aab622a 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java @@ -23,7 +23,9 @@ import org.junit.Test; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.util.MockRecordWriterFactory; public class StreamCollector2Test { @@ -44,8 +46,11 @@ public class StreamCollector2Test { List> fOut = new ArrayList>(); - fOut.add(null); - fOut.add(null); + MockRecordWriter rw1 = MockRecordWriterFactory.create(); + MockRecordWriter rw2 = MockRecordWriterFactory.create(); + + fOut.add(rw1); + fOut.add(rw2); collector = new StreamCollector2(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut); @@ -55,15 +60,17 @@ public class StreamCollector2Test { t.f0 = 0; collector.collect(t); t.f0 = 1; - collector.collect(t); + collector.collect(t); t.f0 = 0; collector.collect(t); + + System.out.println(rw1.emittedRecords); + System.out.println(rw2.emittedRecords); + t.f0 = 1; collector.collect(t); - } - - @Test - public void testClose() { - } + + + } } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java index 580632ca355ffab340e7a1f58f27da13ffdd1c94..4008d6ff052e238858953b157d8156254de37d3d 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java @@ -28,6 +28,7 @@ import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.util.MockRecordWriterFactory; public class StreamCollectorTest { @@ -63,20 +64,19 @@ public class StreamCollectorTest { @Test public void recordWriter() { - MockRecordWriter recWriter = mock(MockRecordWriter.class); - - Mockito.when(recWriter.initList()).thenCallRealMethod(); - doCallRealMethod().when(recWriter).emit(Mockito.any(StreamRecord.class)); - - recWriter.initList(); + MockRecordWriter recWriter = MockRecordWriterFactory.create(); ArrayList> rwList = new ArrayList>(); rwList.add(recWriter); - StreamCollector collector = new StreamCollector(1, 1000, 0, null, rwList); + StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList); collector.collect(new Tuple1(3)); - + collector.collect(new Tuple1(4)); + collector.collect(new Tuple1(5)); + collector.collect(new Tuple1(6)); + assertEquals((Integer) 3, recWriter.emittedRecords.get(0).getTuple(0).getField(0)); + assertEquals((Integer) 6, recWriter.emittedRecords.get(1).getTuple(1).getField(0)); } @Test