DataStream.java 3.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

G
ghermann 已提交
16 17
package eu.stratosphere.api.datastream;

18 19
import java.util.ArrayList;
import java.util.List;
G
ghermann 已提交
20 21
import java.util.Random;

22
import eu.stratosphere.api.datastream.StreamExecutionEnvironment.ConnectionType;
G
ghermann 已提交
23 24 25 26 27 28 29 30 31 32 33
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.types.TypeInformation;

public class DataStream<T extends Tuple> {

	private final StreamExecutionEnvironment context;
	private TypeInformation<T> type;
	private final Random random = new Random();
	private final String id;
34 35 36
	List<String> connectIDs;
	ConnectionType ctype = ConnectionType.SHUFFLE;
	int cparam = 0;
G
gyfora 已提交
37

G
ghermann 已提交
38 39 40 41
	protected DataStream() {
		// TODO implement
		context = new StreamExecutionEnvironment();
		id = "source";
42 43
		connectIDs = new ArrayList<String>();
		connectIDs.add(getId());
G
ghermann 已提交
44
	}
G
gyfora 已提交
45 46

	protected DataStream(StreamExecutionEnvironment context) {
G
ghermann 已提交
47 48 49 50
		if (context == null) {
			throw new NullPointerException("context is null");
		}

51
		// TODO add name based on component number an preferable sequential id
G
ghermann 已提交
52 53
		this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong());
		this.context = context;
54 55
		connectIDs = new ArrayList<String>();
		connectIDs.add(getId());
G
ghermann 已提交
56
	}
G
gyfora 已提交
57

G
ghermann 已提交
58 59 60 61
	public String getId() {
		return id;
	}

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
	public DataStream<T> connectWith(DataStream<T> stream) {
		connectIDs.add(stream.getId());
		return this;
	}

	public DataStream<T> partitionBy(int keyposition) {
		ctype = ConnectionType.FIELD;
		cparam = keyposition;
		return this;
	}

	public DataStream<T> broadcast() {
		ctype = ConnectionType.BROADCAST;
		return this;
	}

G
ghermann 已提交
78
	public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
G
gyfora 已提交
79
		return context.addFlatMapFunction(this, flatMapper);
G
ghermann 已提交
80
	}
G
gyfora 已提交
81

82 83 84
	public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
		return context.addMapFunction(this, mapper);
	}
85

G
gyfora 已提交
86 87
	public <R extends Tuple> DataStream<R> addDummySink() {
		return context.addDummySink(this);
G
gyfora 已提交
88 89
	}

G
ghermann 已提交
90 91 92
	protected void setType(TypeInformation<T> type) {
		this.type = type;
	}
G
gyfora 已提交
93

G
ghermann 已提交
94 95 96 97
	public TypeInformation<T> getType() {
		return this.type;
	}
}