FlatMapTest.java 6.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

G
ghermann 已提交
16 17
package eu.stratosphere.streaming.api;

G
gyfora 已提交
18
import static org.junit.Assert.assertTrue;
M
Márton Balassi 已提交
19
import static org.junit.Assert.fail;
G
gyfora 已提交
20

G
ghermann 已提交
21 22 23
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

G
ghermann 已提交
24 25 26
import org.junit.Test;

import eu.stratosphere.api.java.functions.FlatMapFunction;
G
ghermann 已提交
27
import eu.stratosphere.api.java.tuple.Tuple;
G
ghermann 已提交
28
import eu.stratosphere.api.java.tuple.Tuple1;
G
gyfora 已提交
29
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
G
ghermann 已提交
30 31 32
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
G
gyfora 已提交
33 34
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
G
ghermann 已提交
35
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
G
gyfora 已提交
36 37
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
G
gyfora 已提交
38
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
G
ghermann 已提交
39 40
import eu.stratosphere.util.Collector;

41
public class FlatMapTest {
G
ghermann 已提交
42 43 44 45

	public static final class MyFlatMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
		@Override
		public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
G
gyfora 已提交
46
			out.collect(value);
47
			System.out.println("flatMap");
G
ghermann 已提交
48 49
		}
	}
G
ghermann 已提交
50

G
gyfora 已提交
51 52 53 54 55 56 57 58 59
	public static final class MySink extends SinkFunction<Tuple1<String>> {

		@Override
		public void invoke(Tuple1<String> tuple) {
			System.out.println(tuple);
		}

	}

60 61 62 63 64 65 66 67 68 69
	public static final class MySource extends SourceFunction<Tuple1<String>> {

		@Override
		public void invoke(Collector<Tuple1<String>> collector) {
			for (int i = 0; i < 5; i++) {
				collector.collect(new Tuple1<String>("hi"));
			}
		}
	}

G
ghermann 已提交
70
	@Test
G
gyfora 已提交
71
	public void test() throws Exception {
G
ghermann 已提交
72

G
gyfora 已提交
73
		try {
Y
Yingjun Wu 已提交
74
			StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000);
G
gyfora 已提交
75 76
			fail();
		} catch (IllegalArgumentException e) {
Y
Yingjun Wu 已提交
77 78 79 80 81
			try {
				StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0);
				fail();
			} catch (IllegalArgumentException e2) {	
			}
G
gyfora 已提交
82 83
		}
		
Y
Yingjun Wu 已提交
84
		StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000);
85
		DataStream<Tuple1<String>> dataStream0 = context.addSource(new MySource());
G
ghermann 已提交
86

87
		DataStream<Tuple1<String>> dataStream1 = context.addDummySource().connectWith(dataStream0)
G
gyfora 已提交
88
				.partitionBy(0).flatMap(new MyFlatMap()).broadcast().addSink(new MySink());
G
ghermann 已提交
89

90
		context.execute();
G
gyfora 已提交
91

92
		JobGraphBuilder jgb = context.jobGB();
G
ghermann 已提交
93 94 95 96 97 98 99 100 101

		for (AbstractJobVertex c : jgb.components.values()) {
			if (c instanceof JobTaskVertex) {
				Configuration config = c.getConfiguration();
				System.out.println(config.getString("componentName", "default"));
				byte[] bytes = config.getBytes("operator", null);

				ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));

G
gyfora 已提交
102 103
				FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();

Y
Yingjun Wu 已提交
104
				StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
G
gyfora 已提交
105 106 107 108 109 110 111 112 113 114 115 116
				Tuple t = new Tuple1<String>("asd");

				f.flatMap(t, s);

				System.out.println(f.getClass().getGenericSuperclass());
				TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
						FlatMapFunction.class, f.getClass(), 0, null, null);

				System.out.println(ts);

				byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
				in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
G
gyfora 已提交
117
				UserTaskInvokable userFunction = (UserTaskInvokable) in.readObject();
G
gyfora 已提交
118 119 120 121 122 123 124 125 126 127 128 129
				System.out.println(userFunction.getClass());
				assertTrue(true);
				System.out.println("----------------");
			}

			if (c instanceof JobOutputVertex) {
				Configuration config = c.getConfiguration();
				System.out.println(config.getString("componentName", "default"));
				byte[] bytes = config.getBytes("operator", null);

				ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));

G
gyfora 已提交
130
				SinkFunction<Tuple> f = (SinkFunction<Tuple>) in.readObject();
G
ghermann 已提交
131 132

				System.out.println(f.getClass().getGenericSuperclass());
G
gyfora 已提交
133
				TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
G
gyfora 已提交
134
						SinkFunction.class, f.getClass(), 0, null, null);
G
gyfora 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158

				System.out.println(ts);

				byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
				in = new ObjectInputStream(new ByteArrayInputStream(userFunctionSerialized));
				UserSinkInvokable userFunction = (UserSinkInvokable) in.readObject();
				System.out.println(userFunction.getClass());
				assertTrue(true);
				System.out.println("----------------");
			}

			if (c instanceof JobInputVertex) {
				Configuration config = c.getConfiguration();
				System.out.println(config.getString("componentName", "default"));
				byte[] bytes = config.getBytes("operator", null);

				ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));

				UserSourceInvokable<Tuple> f = (UserSourceInvokable<Tuple>) in.readObject();

				System.out.println(f.getClass().getGenericSuperclass());
				TupleTypeInfo<Tuple> ts = (TupleTypeInfo) TypeExtractor.createTypeInfo(
						UserSourceInvokable.class, f.getClass(), 0, null, null);

G
ghermann 已提交
159 160 161 162
				System.out.println(ts);
				System.out.println("----------------");
			}
		}
G
ghermann 已提交
163 164
	}
}