FlatMapTest.java 3.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

G
ghermann 已提交
16 17
package eu.stratosphere.streaming.api;

18 19
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
G
gyfora 已提交
20

G
ghermann 已提交
21 22
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
J
jfeher 已提交
23 24
import java.util.ArrayList;
import java.util.List;
G
ghermann 已提交
25

G
ghermann 已提交
26 27 28
import org.junit.Test;

import eu.stratosphere.api.java.functions.FlatMapFunction;
G
ghermann 已提交
29
import eu.stratosphere.api.java.tuple.Tuple;
G
ghermann 已提交
30
import eu.stratosphere.api.java.tuple.Tuple1;
G
gyfora 已提交
31
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
G
ghermann 已提交
32 33 34
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
G
gyfora 已提交
35 36
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
G
ghermann 已提交
37
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
J
jfeher 已提交
38 39
import eu.stratosphere.streaming.api.MapTest.MyMap;
import eu.stratosphere.streaming.api.MapTest.MySink;
G
gyfora 已提交
40 41
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
G
gyfora 已提交
42
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
G
ghermann 已提交
43 44
import eu.stratosphere.util.Collector;

45
public class FlatMapTest {
G
ghermann 已提交
46

J
jfeher 已提交
47 48
	public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {

G
ghermann 已提交
49
		@Override
50 51 52 53
		public void flatMap(Tuple1<Integer> value,
				Collector<Tuple1<Integer>> out) throws Exception {
			out.collect(new Tuple1<Integer>(value.f0*value.f0));
			
J
jfeher 已提交
54
		}
55
		
G
ghermann 已提交
56
	}
G
ghermann 已提交
57

J
jfeher 已提交
58
	public static final class MySink extends SinkFunction<Tuple1<Integer>> {
59
		
G
gyfora 已提交
60
		@Override
J
jfeher 已提交
61 62
		public void invoke(Tuple1<Integer> tuple) {
			result.add(tuple.f0);
63
			System.out.println("result " + tuple.f0);
G
gyfora 已提交
64 65 66 67
		}

	}

68
	public static final class MySource extends SourceFunction<Tuple1<Integer>> {
J
jfeher 已提交
69 70

		@Override
71 72 73 74 75
		public void invoke(Collector<Tuple1<Integer>> collector)
				throws Exception {
			for(int i=0; i<10; i++){
				collector.collect(new Tuple1<Integer>(i));
			}
76 77
		}
	}
J
jfeher 已提交
78
	
79 80 81
	private static void fillExpectedList(){
		for(int i=0;i<10;i++){
			expected.add(i*i);
J
jfeher 已提交
82 83
		}
	}
84

J
jfeher 已提交
85
	private static final int PARALELISM = 1;
86 87
	private static List<Integer> expected = new ArrayList<Integer>();
	private static List<Integer> result = new ArrayList<Integer>();
88

G
ghermann 已提交
89
	@Test
G
gyfora 已提交
90
	public void test() throws Exception {
J
jfeher 已提交
91
		
92 93
		StreamExecutionEnvironment env = new StreamExecutionEnvironment(2, 1000);
		DataStream<Tuple1<Integer>> dataStream = env.addSource(new MySource(),1).flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
J
jfeher 已提交
94 95


96
		env.execute();
J
jfeher 已提交
97
		
98
		fillExpectedList();
99
		
100
		assertTrue(expected.equals(result));
J
jfeher 已提交
101
		
G
ghermann 已提交
102 103
	}
}