BasicTopology.java 2.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/
15 16
package eu.stratosphere.streaming.examples.basictopology;

17
import eu.stratosphere.api.java.functions.MapFunction;
18
import eu.stratosphere.api.java.tuple.Tuple1;
19 20 21 22
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
23 24 25

public class BasicTopology {

26
	public static class BasicSource extends SourceFunction<Tuple1<String>> {
27

28
		private static final long serialVersionUID = 1L;
29
		Tuple1<String> tuple =  new Tuple1<String>("streaming");
30 31

		@Override
32 33
		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
			// emit continuously a tuple
34
			while (true) {
35
				collector.collect(tuple);
36 37 38 39
			}
		}
	}

40
	public static class BasicMap extends MapFunction<Tuple1<String>, Tuple1<String>> {
41
		private static final long serialVersionUID = 1L;
42

43
		// map to the same tuple
44
		@Override
45 46
		public Tuple1<String> map(Tuple1<String> value) throws Exception {
			return value;
47 48 49 50 51
		}

	}

	public static void main(String[] args) {
52 53 54 55 56
		StreamExecutionEnvironment context = new StreamExecutionEnvironment();
		
		DataStream<Tuple1<String>> stream = context.addSource(new BasicSource()).map(new BasicMap()).addDummySink();
		
		context.execute();
57 58
	}
}