FlatMapTest.java 6.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

G
ghermann 已提交
16 17
package eu.stratosphere.streaming.api;

G
gyfora 已提交
18 19
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
G
gyfora 已提交
20

G
gyfora 已提交
21 22
import java.util.HashSet;
import java.util.Set;
G
ghermann 已提交
23

G
ghermann 已提交
24 25 26 27 28 29
import org.junit.Test;

import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;

30
public class FlatMapTest {
G
ghermann 已提交
31

J
jfeher 已提交
32 33
	public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {

G
gyfora 已提交
34 35
		private static final long serialVersionUID = 1L;

G
ghermann 已提交
36
		@Override
G
gyfora 已提交
37 38 39
		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
			out.collect(new Tuple1<Integer>(value.f0 * value.f0));

J
jfeher 已提交
40
		}
G
gyfora 已提交
41 42 43 44 45

	}

	public static final class ParallelFlatMap extends
			FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
G
gyfora 已提交
46
		private static final long serialVersionUID = 1L;
G
gyfora 已提交
47 48 49 50 51 52 53 54 55 56 57

		@Override
		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
			numberOfElements++;

		}

	}

	public static final class GenerateSequenceFlatMap extends
			FlatMapFunction<Tuple1<Long>, Tuple1<Long>> {
G
gyfora 已提交
58
		private static final long serialVersionUID = 1L;
G
gyfora 已提交
59 60 61 62 63 64 65

		@Override
		public void flatMap(Tuple1<Long> value, Collector<Tuple1<Long>> out) throws Exception {
			out.collect(new Tuple1<Long>(value.f0 * value.f0));

		}

G
ghermann 已提交
66
	}
G
ghermann 已提交
67

J
jfeher 已提交
68
	public static final class MySink extends SinkFunction<Tuple1<Integer>> {
G
gyfora 已提交
69
		private static final long serialVersionUID = 1L;
G
gyfora 已提交
70

G
gyfora 已提交
71
		@Override
J
jfeher 已提交
72 73
		public void invoke(Tuple1<Integer> tuple) {
			result.add(tuple.f0);
G
gyfora 已提交
74 75 76 77
		}

	}

G
gyfora 已提交
78
	public static final class FromElementsSink extends SinkFunction<Tuple1<Integer>> {
G
gyfora 已提交
79
		private static final long serialVersionUID = 1L;
J
jfeher 已提交
80 81

		@Override
G
gyfora 已提交
82 83 84 85 86 87 88
		public void invoke(Tuple1<Integer> tuple) {
			fromElementsResult.add(tuple.f0);
		}

	}

	public static final class FromCollectionSink extends SinkFunction<Tuple1<Integer>> {
G
gyfora 已提交
89
		private static final long serialVersionUID = 1L;
G
gyfora 已提交
90 91 92 93 94 95 96 97 98

		@Override
		public void invoke(Tuple1<Integer> tuple) {
			fromCollectionResult.add(tuple.f0);
		}

	}

	public static final class GenerateSequenceSink extends SinkFunction<Tuple1<Long>> {
G
gyfora 已提交
99
		private static final long serialVersionUID = 1L;
G
gyfora 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

		@Override
		public void invoke(Tuple1<Long> tuple) {
			generateSequenceResult.add(tuple.f0);
		}

	}

	private static void fillExpectedList() {
		for (int i = 0; i < 10; i++) {
			expected.add(i * i);
		}
	}

	private static void fillFromElementsExpected() {
		fromElementsExpected.add(4);
		fromElementsExpected.add(25);
		fromElementsExpected.add(81);
	}

	private static void fillSequenceSet() {
		for (int i = 0; i < 10; i++) {
			sequenceExpected.add(i * i);
123 124
		}
	}
G
gyfora 已提交
125

G
gyfora 已提交
126 127
	private static void fillLongSequenceSet() {
		for (int i = 0; i < 10; i++) {
G
gyfora 已提交
128
			sequenceLongExpected.add((long) (i * i));
G
gyfora 已提交
129 130 131 132
		}
	}

	private static void fillFromCollectionSet() {
G
gyfora 已提交
133
		if (fromCollectionSet.isEmpty()) {
G
gyfora 已提交
134 135 136
			for (int i = 0; i < 10; i++) {
				fromCollectionSet.add(i);
			}
J
jfeher 已提交
137 138
		}
	}
139

J
jfeher 已提交
140
	private static final int PARALELISM = 1;
G
gyfora 已提交
141 142 143 144 145 146 147 148 149 150
	private static int numberOfElements = 0;
	private static Set<Integer> expected = new HashSet<Integer>();
	private static Set<Integer> result = new HashSet<Integer>();
	private static Set<Integer> fromElementsExpected = new HashSet<Integer>();
	private static Set<Integer> fromElementsResult = new HashSet<Integer>();
	private static Set<Integer> fromCollectionSet = new HashSet<Integer>();
	private static Set<Integer> sequenceExpected = new HashSet<Integer>();
	private static Set<Long> sequenceLongExpected = new HashSet<Long>();
	private static Set<Integer> fromCollectionResult = new HashSet<Integer>();
	private static Set<Long> generateSequenceResult = new HashSet<Long>();
151

G
ghermann 已提交
152
	@Test
G
gyfora 已提交
153
	public void test() throws Exception {
154
		StreamExecutionEnvironment env = new StreamExecutionEnvironment();
G
gyfora 已提交
155 156 157

		// flatmapTest

G
gyfora 已提交
158
		fillFromCollectionSet();
G
gyfora 已提交
159

G
gyfora 已提交
160 161
		DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
				.flatMap(new MyFlatMap(), PARALELISM).addSink(new MySink());
J
jfeher 已提交
162

163
		fillExpectedList();
G
gyfora 已提交
164

G
gyfora 已提交
165
		// parallelShuffleconnectTest
G
gyfora 已提交
166
		fillFromCollectionSet();
G
gyfora 已提交
167

G
gyfora 已提交
168 169 170 171 172 173
		DataStream<Tuple1<Integer>> source = env.fromCollection(fromCollectionSet);
		DataStream<Tuple1<Integer>> map = source.flatMap(new ParallelFlatMap(), 1).addSink(
				new MySink());
		DataStream<Tuple1<Integer>> map2 = source.flatMap(new ParallelFlatMap(), 1).addSink(
				new MySink());

G
gyfora 已提交
174 175 176
		// fromElementsTest
		DataStream<Tuple1<Integer>> fromElementsMap = env.fromElements(2, 5, 9).flatMap(
				new MyFlatMap(), 1);
J
jfeher 已提交
177
		DataStream<Tuple1<Integer>> sink = fromElementsMap.addSink(new FromElementsSink());
G
gyfora 已提交
178 179 180

		fillFromElementsExpected();

G
gyfora 已提交
181
		// fromCollectionTest
G
gyfora 已提交
182 183
		fillFromCollectionSet();

G
gyfora 已提交
184 185 186 187
		DataStream<Tuple1<Integer>> fromCollectionMap = env.fromCollection(fromCollectionSet)
				.flatMap(new MyFlatMap(), 1);
		DataStream<Tuple1<Integer>> fromCollectionSink = fromCollectionMap
				.addSink(new FromCollectionSink());
G
gyfora 已提交
188

J
jfeher 已提交
189
		// generateSequenceTest
G
gyfora 已提交
190 191
		fillSequenceSet();

J
jfeher 已提交
192
		DataStream<Tuple1<Long>> generateSequenceMap = env.generateSequence(0, 9).flatMap(
G
gyfora 已提交
193
				new GenerateSequenceFlatMap(), 1);
G
gyfora 已提交
194 195
		DataStream<Tuple1<Long>> generateSequenceSink = generateSequenceMap
				.addSink(new GenerateSequenceSink());
G
gyfora 已提交
196 197 198

		fillLongSequenceSet();

199
		env.execute();
G
gyfora 已提交
200

J
jfeher 已提交
201 202 203 204
		assertTrue(expected.equals(result));
		assertEquals(20, numberOfElements);
		assertEquals(fromElementsExpected, fromElementsResult);
		assertEquals(sequenceExpected, fromCollectionResult);
G
gyfora 已提交
205
		assertEquals(sequenceLongExpected, generateSequenceResult);
G
ghermann 已提交
206 207
	}
}