BasicTopology.java 2.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/
15 16
package eu.stratosphere.streaming.examples.basictopology;

17
import eu.stratosphere.api.java.functions.MapFunction;
18
import eu.stratosphere.api.java.tuple.Tuple1;
19
import eu.stratosphere.streaming.api.DataStream;
20
import eu.stratosphere.streaming.api.SourceFunction;
21 22
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
23 24 25

public class BasicTopology {

26
	public static class BasicSource extends SourceFunction<Tuple1<String>> {
27

28
		private static final long serialVersionUID = 1L;
J
jfeher 已提交
29
		Tuple1<String> tuple = new Tuple1<String>("streaming");
30 31

		@Override
32 33
		public void invoke(Collector<Tuple1<String>> out) throws Exception {
			//  continuously emit a tuple
34
			while (true) {
35
				out.collect(tuple);
36 37 38 39
			}
		}
	}

40
	public static class BasicMap extends MapFunction<Tuple1<String>, Tuple1<String>> {
41
		private static final long serialVersionUID = 1L;
42

43
		// map to the same tuple
44
		@Override
45 46
		public Tuple1<String> map(Tuple1<String> value) throws Exception {
			return value;
47 48 49 50
		}

	}

51 52 53
	private static final int PARALELISM = 1;
	private static final int SOURCE_PARALELISM = 1;

54
	public static void main(String[] args) {
55
		StreamExecutionEnvironment env = new StreamExecutionEnvironment();
J
jfeher 已提交
56 57 58

		DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM)
				.map(new BasicMap(), PARALELISM);
59
		
J
jfeher 已提交
60 61
		stream.print();

M
Márton Balassi 已提交
62
		env.execute();
63 64
	}
}