DataStream.java 1.5 KB
Newer Older
G
ghermann 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package eu.stratosphere.api.datastream;

import java.util.Random;

import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.types.TypeInformation;

public class DataStream<T extends Tuple> {

	private final StreamExecutionEnvironment context;
	private TypeInformation<T> type;
	private final Random random = new Random();
	private final String id;
G
gyfora 已提交
17

G
ghermann 已提交
18 19 20 21 22
	protected DataStream() {
		// TODO implement
		context = new StreamExecutionEnvironment();
		id = "source";
	}
G
gyfora 已提交
23 24

	protected DataStream(StreamExecutionEnvironment context) {
G
ghermann 已提交
25 26 27 28 29 30 31
		if (context == null) {
			throw new NullPointerException("context is null");
		}

		this.id = Long.toHexString(random.nextLong()) + Long.toHexString(random.nextLong());
		this.context = context;
	}
G
gyfora 已提交
32

G
ghermann 已提交
33 34 35 36 37
	public String getId() {
		return id;
	}

	public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
G
gyfora 已提交
38
		return context.addFlatMapFunction(this, flatMapper);
G
ghermann 已提交
39
	}
G
gyfora 已提交
40 41 42 43 44 45 46 47 48 49 50

	public <R extends Tuple> void addSink() {
		context.addSink(this);
	}

	// public <R> DataStream<R> map(MapFunction<T, R> mapper) {
	// TypeInformation<R> returnType = TypeExtractor.getMapReturnTypes(mapper,
	// type);
	// return context.addMapFunction(this, mapper, returnType);
	// }

G
ghermann 已提交
51 52 53
	protected void setType(TypeInformation<T> type) {
		this.type = type;
	}
G
gyfora 已提交
54

G
ghermann 已提交
55 56 57 58
	public TypeInformation<T> getType() {
		return this.type;
	}
}