提交 4f94e2fc 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Updated StreamCollector2Test

上级 a1d3d8f5
......@@ -23,7 +23,9 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollector2Test {
......@@ -44,8 +46,11 @@ public class StreamCollector2Test {
List<RecordWriter<StreamRecord>> fOut = new ArrayList<RecordWriter<StreamRecord>>();
fOut.add(null);
fOut.add(null);
MockRecordWriter rw1 = MockRecordWriterFactory.create();
MockRecordWriter rw2 = MockRecordWriterFactory.create();
fOut.add(rw1);
fOut.add(rw2);
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
......@@ -55,15 +60,17 @@ public class StreamCollector2Test {
t.f0 = 0;
collector.collect(t);
t.f0 = 1;
collector.collect(t);
collector.collect(t);
t.f0 = 0;
collector.collect(t);
System.out.println(rw1.emittedRecords);
System.out.println(rw2.emittedRecords);
t.f0 = 1;
collector.collect(t);
}
@Test
public void testClose() {
}
}
}
......@@ -28,6 +28,7 @@ import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollectorTest {
......@@ -63,20 +64,19 @@ public class StreamCollectorTest {
@Test
public void recordWriter() {
MockRecordWriter recWriter = mock(MockRecordWriter.class);
Mockito.when(recWriter.initList()).thenCallRealMethod();
doCallRealMethod().when(recWriter).emit(Mockito.any(StreamRecord.class));
recWriter.initList();
MockRecordWriter recWriter = MockRecordWriterFactory.create();
ArrayList<RecordWriter<StreamRecord>> rwList = new ArrayList<RecordWriter<StreamRecord>>();
rwList.add(recWriter);
StreamCollector collector = new StreamCollector(1, 1000, 0, null, rwList);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, rwList);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
collector.collect(new Tuple1<Integer>(6));
assertEquals((Integer) 3, recWriter.emittedRecords.get(0).getTuple(0).getField(0));
assertEquals((Integer) 6, recWriter.emittedRecords.get(1).getTuple(1).getField(0));
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册