提交 c804ab5f 编写于 作者: E Eszes Dávid 提交者: Stephan Ewen

[streaming] Map and FlatMapTest implemented

上级 6452ca06
......@@ -53,7 +53,7 @@ public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
@Override
public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
long progress = (Long) value.getField(windowFieldId);
long progress = value.getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
......
......@@ -20,6 +20,8 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
......@@ -33,6 +35,8 @@ import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.MapTest.MyMap;
import eu.stratosphere.streaming.api.MapTest.MySink;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -40,129 +44,61 @@ import eu.stratosphere.util.Collector;
public class FlatMapTest {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
out.collect(value);
System.out.println("flatMap");
public void flatMap(Tuple1<Integer> value,
Collector<Tuple1<Integer>> out) throws Exception {
out.collect(new Tuple1<Integer>(value.f0*value.f0));
}
}
public static final class MySink extends SinkFunction<Tuple1<String>> {
int c=0;
public static final class MySink extends SinkFunction<Tuple1<Integer>> {
@Override
public void invoke(Tuple1<String> tuple) {
System.out.println(tuple);
c++;
System.out.println(c);
public void invoke(Tuple1<Integer> tuple) {
result.add(tuple.f0);
System.out.println("result " + tuple.f0);
}
}
public static final class MySource extends SourceFunction<Tuple1<String>> {
public static final class MySource extends SourceFunction<Tuple1<Integer>> {
@Override
public void invoke(Collector<Tuple1<String>> collector) {
for (int i = 0; i < 5; i++) {
collector.collect(new Tuple1<String>("hi"));
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<10; i++){
collector.collect(new Tuple1<Integer>(i));
}
}
}
private static final int PARALELISM = 2;
private static void fillExpectedList(){
for(int i=0;i<10;i++){
expected.add(i*i);
System.out.println("expected " + i*i);
}
}
private static final int PARALELISM = 1;
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
@Test
public void test() throws Exception {
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000);
fail();
} catch (IllegalArgumentException e) {
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0);
fail();
} catch (IllegalArgumentException e2) {
}
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000);
DataStream<Tuple1<String>> dataStream0 = context.addSource(new MySource(),1);
DataStream<Tuple1<Integer>> dataStream0 = context.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
DataStream<Tuple1<String>> dataStream1 = context.addDummySource().connectWith(dataStream0)
.partitionBy(0).flatMap(new MyFlatMap(), PARALELISM).broadcast().addSink(new MySink());
context.execute();
JobGraphBuilder jgb = context.jobGB();
for (AbstractJobVertex c : jgb.components.values()) {
if (c instanceof JobTaskVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
Tuple t = new Tuple1<String>("asd");
f.flatMap(t, s);
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
FlatMapFunction.class, f.getClass(), 0, null, null);
fillExpectedList();
System.out.println(ts);
assertTrue(expected.equals(result));
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
}
if (c instanceof JobOutputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
SinkFunction.class, f.getClass(), 0, null, null);
System.out.println(ts);
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserSinkInvokable userFunction = (UserSinkInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
}
if (c instanceof JobInputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, f.getClass(), 0, null, null);
System.out.println(ts);
System.out.println("----------------");
}
}
}
}
......@@ -17,121 +17,72 @@ package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
import eu.stratosphere.util.Collector;
public class MapTest {
public static final class MyMap extends MapFunction<Tuple1<String>, Tuple1<String>> {
public static final class MySource extends SourceFunction<Tuple1<Integer>> {
@Override
public Tuple1<String> map(Tuple1<String> value) throws Exception {
System.out.println("map");
return value;
public void invoke(Collector<Tuple1<Integer>> collector)
throws Exception {
for(int i=0; i<10; i++){
collector.collect(new Tuple1<Integer>(i));
}
}
private static final int PARALELISM = 1;
@Test
public void test() throws Exception {
Tuple1<String> tup = new Tuple1<String>("asd");
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream = context.addDummySource().map(new MyMap(), PARALELISM)
.addDummySink();
context.execute();
JobGraphBuilder jgb = context.jobGB();
for (AbstractJobVertex c : jgb.components.values()) {
if (c instanceof JobTaskVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
MapFunction<Tuple, Tuple> f = (MapFunction<Tuple, Tuple>) in.readObject();
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
Tuple t = new Tuple1<String>("asd");
s.collect(f.map(t));
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
MapFunction.class, f.getClass(), 0, null, null);
System.out.println(ts);
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
}
if (c instanceof JobOutputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
public static final class MyMap extends MapFunction<Tuple1<Integer>,Tuple1<Integer>> {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
// TODO Auto-generated method stub
return new Tuple1<Integer>(value.f0*value.f0);
}
}
SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
public static final class MySink extends SinkFunction<Tuple1<Integer>> {
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
SinkFunction.class, f.getClass(), 0, null, null);
@Override
public void invoke(Tuple1<Integer> tuple) {
result.add(tuple.f0);
System.out.println("result " + tuple.f0);
}
}
System.out.println(ts);
private static List<Integer> expected = new ArrayList<Integer>();
private static List<Integer> result = new ArrayList<Integer>();
private static final int PARALELISM = 1;
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
UserSinkInvokable userFunction = (UserSinkInvokable) in.readObject();
System.out.println(userFunction.getClass());
assertTrue(true);
System.out.println("----------------");
private static void fillExpectedList(){
for(int i=0;i<10;i++){
expected.add(i*i);
System.out.println("expected " + i*i);
}
}
if (c instanceof JobInputVertex) {
Configuration config = c.getConfiguration();
System.out.println(config.getString("componentName", "default"));
byte[] bytes = config.getBytes("operator", null);
@Test
public void test() throws Exception {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();
DataStream<Tuple1<Integer>> dataStream = context
.addSource(new MySource(), 1)
.map(new MyMap(), PARALELISM)
.addSink(new MySink());
System.out.println(f.getClass().getGenericSuperclass());
TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
UserSourceInvokable.class, f.getClass(), 0, null, null);
context.execute();
System.out.println(ts);
System.out.println("----------------");
}
}
fillExpectedList();
assertTrue(expected.equals(result));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册