提交 a7702c09 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Updated StreamComponentTest

上级 926403e9
......@@ -20,155 +20,88 @@ import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Level;
import org.junit.BeforeClass;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
public class StreamComponentTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
public static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
public static class MySource extends UserSourceInvokable {
public static class MySource extends SourceFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
StreamRecord record = new ArrayStreamRecord(new Tuple[] {new Tuple1<Integer>()});
String out;
public MySource() {
}
public MySource(String string) {
out = string;
}
private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
@Override
public void invoke() throws Exception {
for (int i = 0; i < 100; i++) {
record.getTuple(0).setField(0, i);
emit(record);
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
tuple.f0 = i;
collector.collect(tuple);
System.out.println("collecting " + tuple);
}
}
@Override
public String getResult() {
return out;
}
}
public static class MyTask extends UserTaskInvokable {
public static class MyTask extends MapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
String out;
public MyTask() {
}
public MyTask(String string) {
out = string;
}
@Override
public void invoke(StreamRecord record) throws Exception {
Integer i = record.getTuple(0).getField(0);
emit(new ArrayStreamRecord(new Tuple[] {new Tuple2<Integer, Integer>(i, i + 1)}));
}
@Override
public String getResult() {
return out;
public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
Integer i = value.f0;
System.out.println("mapping " + i);
return new Tuple2<Integer, Integer>(i, i + 1);
}
}
public static class MyOtherTask extends UserTaskInvokable {
// TODO test multiple tasks
// public static class MyOtherTask extends MapFunction<Tuple1<Integer>,
// Tuple2<Integer, Integer>> {
// private static final long serialVersionUID = 1L;
//
// @Override
// public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws
// Exception {
// Integer i = value.f0;
// return new Tuple2<Integer, Integer>(-i - 1, -i - 2);
// }
// }
public static class MySink extends SinkFunction<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
String out;
public MyOtherTask() {
}
public MyOtherTask(String string) {
out = string;
}
@Override
public void invoke(StreamRecord record) throws Exception {
Integer i = record.getTuple(0).getField(0);
emit(new ArrayStreamRecord(new Tuple[] {new Tuple2<Integer, Integer>(-i - 1, -i - 2)}));
}
@Override
public String getResult() {
return out;
}
}
public static class MySink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
String out;
public MySink(String out) {
this.out = out;
}
@Override
public void invoke(StreamRecord record) throws Exception {
Integer k = record.getTuple(0).getField(0);
Integer v = record.getTuple(0).getField(1);
public void invoke(Tuple2<Integer, Integer> tuple) {
Integer k = tuple.getField(0);
Integer v = tuple.getField(1);
data.put(k, v);
}
@Override
public String getResult() {
return out;
}
}
@BeforeClass
public static void runStream() {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", new MySource("source"));
graphBuilder.setTask("MyTask", new MyTask("task"), 1, 1);
graphBuilder.setTask("MyOtherTask", new MyOtherTask("otherTask"), 1, 1);
graphBuilder.setSink("MySink", new MySink("sink"));
DataStream<Tuple2<Integer, Integer>> oneTask = context.addSource(new MySource())
.map(new MyTask()).addSink(new MySink());
graphBuilder.shuffleConnect("MySource", "MyTask");
graphBuilder.shuffleConnect("MySource", "MyOtherTask");
graphBuilder.shuffleConnect("MyOtherTask", "MySink");
graphBuilder.shuffleConnect("MyTask", "MySink");
ClusterUtil.runOnMiniCluster(graphBuilder.getJobGraph());
context.execute();
}
@Test
public void test() {
assertEquals(200, data.keySet().size());
assertEquals(10, data.keySet().size());
for (Integer k : data.keySet()) {
if (k < 0) {
assertEquals((Integer) (k - 1), data.get(k));
} else {
assertEquals((Integer) (k + 1), data.get(k));
}
assertEquals((Integer) (k + 1), data.get(k));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册