提交 1c87d8bc 编写于 作者: G Gyula Fora

[streaming] Temporal join and cross rework for consistence and extended features

上级 f165c353
......@@ -17,15 +17,10 @@
package org.apache.flink.streaming.api.datastream;
import java.util.List;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -43,7 +38,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.util.Collector;
/**
* The ConnectedDataStream represents a stream for two different data types. It
......@@ -545,54 +539,7 @@ public class ConnectedDataStream<IN1, IN2> {
return invokable;
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCross(
CrossFunction<IN1, IN2, OUT> crossFunction, long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CrossFunction.class,
crossFunction.getClass(), 2, null, null);
CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(
clean(crossFunction));
return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval,
timestamp1, timestamp2);
}
private static class CrossWindowFunction<IN1, IN2, OUT> implements
CoWindowFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private CrossFunction<IN1, IN2, OUT> crossFunction;
public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
this.crossFunction = crossFunction;
}
@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
throws Exception {
for (IN1 firstValue : first) {
for (IN2 secondValue : second) {
out.collect(crossFunction.cross(firstValue, secondValue));
}
}
}
}
protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
TypeInformation<Tuple2<IN1, IN2>> outType = new TupleTypeInfo<Tuple2<IN1, IN2>>(
getInputType1(), getInputType2());
return addGeneralWindowCombine(coWindowFunction, outType, windowSize, slideInterval,
timestamp1, timestamp2);
}
private <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
......
......@@ -19,9 +19,16 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
public class StreamCrossOperator<I1, I2> extends
TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
super(input1, input2);
......@@ -40,12 +47,23 @@ public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, Stream
this.op = operator;
}
public <F> F clean(F f) {
if (op.input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
}
ClosureCleaner.ensureSerializable(f);
return f;
}
/**
* Finalizes a temporal Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.<br/>
* Each CrossFunction call returns exactly one element.
* Finalizes a temporal Cross transformation by applying a
* {@link CrossFunction} to each pair of crossed elements.<br/>
* Each CrossFunction call returns exactly one element.
*
* @param function The CrossFunction that is called for each pair of crossed elements.
* @return An CrossOperator that represents the crossed result DataSet
* @param function
* The CrossFunction that is called for each pair of crossed
* elements.
* @return A CrossOperator that represents the crossed result DataStream
*
* @see CrossFunction
* @see DataSet
......@@ -54,15 +72,29 @@ public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, Stream
return createCrossOperator(function);
}
/**
* Finalizes a temporal Cross transformation by emitting all pairs in a
* new Tuple2.
*
* @return A CrossOperator that represents the crossed result DataStream
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() {
return createCrossOperator(new CrossOperator.DefaultCrossFunction<I1, I2>());
}
protected <R> SingleOutputStreamOperator<R, ?> createCrossOperator(
CrossFunction<I1, I2, R> function) {
return op.input1.connect(op.input2).addGeneralWindowCross(function, op.windowSize,
op.slideInterval, op.timeStamp1, op.timeStamp2);
TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
op.input1.getType(), op.input2.getType());
}
CrossWindowFunction<I1, I2, R> crossWindowFunction = new CrossWindowFunction<I1, I2, R>(
clean(function));
// ----------------------------------------------------------------------------------------
return op.input1.connect(op.input2).addGeneralWindowCombine(crossWindowFunction,
outTypeInfo, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
}
}
......
......@@ -18,15 +18,18 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
import org.apache.flink.streaming.util.keys.PojoKeySelector;
public class StreamJoinOperator<I1, I2> extends
WindowDBOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
super(input1, input2);
......@@ -51,7 +54,7 @@ public class StreamJoinOperator<I1, I2> extends
* that should be used as join keys.<br/>
* <b>Note: Fields can only be selected as join keys on Tuple
* DataStreams.</b><br/>
*
*
* @param fields
* The indexes of the other Tuple fields of the first join
* DataStreams that should be used as keys.
......@@ -59,8 +62,8 @@ public class StreamJoinOperator<I1, I2> extends
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public JoinPredicate<I1, I2> where(int... fields) {
return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(
op.input1.getType(), fields));
return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(op.input1.getType(),
fields));
}
/**
......@@ -68,7 +71,7 @@ public class StreamJoinOperator<I1, I2> extends
* Defines the fields of the first join {@link DataStream} that should
* be used as grouping keys. Fields are the names of member fields of
* the underlying type of the data stream.
*
*
* @param fields
* The fields of the first join DataStream that should be
* used as keys.
......@@ -105,12 +108,13 @@ public class StreamJoinOperator<I1, I2> extends
* Intermediate step of a temporal Join transformation. <br/>
* To continue the Join transformation, select the join key of the second
* input {@link DataStream} by calling {@link JoinPredicate#equalTo}
*
*
*/
public static class JoinPredicate<I1, I2> {
private StreamJoinOperator<I1, I2> op;
private final KeySelector<I1, ?> keys1;
public StreamJoinOperator<I1, I2> op;
public KeySelector<I1, ?> keys1;
public KeySelector<I2, ?> keys2;
private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
this.op = operator;
......@@ -124,37 +128,30 @@ public class StreamJoinOperator<I1, I2> extends
* <b>Note: Fields can only be selected as join keys on Tuple
* DataStreams.</b><br/>
*
* The resulting operator wraps each pair of joining elements into a
* {@link Tuple2}, with the element of the first input being the first
* field of the tuple and the element of the second input being the
* second field of the tuple.
*
* @param fields
* The indexes of the Tuple fields of the second join
* DataStream that should be used as keys.
* @return The joined data stream.
* @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
* {@link FinalizeStreamJoin#withDefault} to complete
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(int... fields) {
return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(),
fields));
public FinalizeStreamJoin<I1, I2> equalTo(int... fields) {
keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields);
return new FinalizeStreamJoin<I1, I2>(this);
}
/**
* Continues a temporal Join transformation and defines the fields of
* the second join {@link DataStream} that should be used as join keys.<br/>
*
* The resulting operator wraps each pair of joining elements into a
* {@link Tuple2}, with the element of the first input being the first
* field of the tuple and the element of the second input being the
* second field of the tuple.
*
*
* @param fields
* The fields of the second join DataStream that should be
* used as keys.
* @return The joined data stream.
* @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
* {@link FinalizeStreamJoin#withDefault} to complete
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(String... fields) {
return createJoinOperator(new PojoKeySelector<I2>(op.input2.getType(), fields));
public FinalizeStreamJoin<I1, I2> equalTo(String... fields) {
this.keys2 = new PojoKeySelector<I2>(op.input2.getType(), fields);
return new FinalizeStreamJoin<I1, I2>(this);
}
/**
......@@ -164,29 +161,75 @@ public class StreamJoinOperator<I1, I2> extends
* second DataStream and extracts a single key value on which the
* DataStream is joined. </br>
*
* The resulting operator wraps each pair of joining elements into a
* {@link Tuple2}, with the element of the first input being the first
* field of the tuple and the element of the second input being the
* second field of the tuple.
*
* @param keySelector
* The KeySelector function which extracts the key values
* from the second DataStream on which it is joined.
* @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
* {@link FinalizeStreamJoin#withDefault} to complete
*/
public <K> FinalizeStreamJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
this.keys2 = keySelector;
return new FinalizeStreamJoin<I1, I2>(this);
}
}
public static class FinalizeStreamJoin<I1, I2> {
private final JoinPredicate<I1, I2> predicate;
private FinalizeStreamJoin(JoinPredicate<I1, I2> predicate) {
this.predicate = predicate;
}
/**
* Completes a stream join. </p> The resulting operator wraps each pair
* of joining elements into a {@link Tuple2}, with the element of the
* first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* @return The joined data stream.
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() {
return createJoinOperator(new DefaultJoinFunction<I1, I2>());
}
/**
* Completes a stream join. </p> The resulting operator wraps each pair
* of joining elements using the user defined {@link JoinFunction}
*
* @return The joined data stream.
*/
public <K> SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(
KeySelector<I2, K> keySelector) {
return createJoinOperator(keySelector);
public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
return createJoinOperator(joinFunction);
}
protected SingleOutputStreamOperator<Tuple2<I1, I2>, ?> createJoinOperator(
KeySelector<I2, ?> keys2) {
private <OUT> SingleOutputStreamOperator<OUT, ?> createJoinOperator(
JoinFunction<I1, I2, OUT> joinFunction) {
JoinWindowFunction<I1, I2, OUT> joinWindowFunction = new JoinWindowFunction<I1, I2, OUT>(
predicate.keys1, predicate.keys2, joinFunction);
StreamJoinOperator<I1, I2> op = predicate.op;
TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
op.input1.getType(), op.input2.getType());
JoinWindowFunction<I1, I2> joinWindowFunction = new JoinWindowFunction<I1, I2>(keys1,
keys2);
return op.input1.connect(op.input2).addGeneralWindowJoin(joinWindowFunction,
op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction,
outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
}
}
public static final class DefaultJoinFunction<T1, T2> implements
JoinFunction<T1, T2, Tuple2<T1, T2>> {
private static final long serialVersionUID = 1L;
private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
@Override
public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception {
outTuple.f0 = first;
outTuple.f1 = second;
return outTuple;
}
}
}
......@@ -21,18 +21,18 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
public abstract class WindowDBOperator<I1, I2, OP> {
public abstract class TemporalOperator<I1, I2, OP> {
protected final DataStream<I1> input1;
protected final DataStream<I2> input2;
public final DataStream<I1> input1;
public final DataStream<I2> input2;
long windowSize;
long slideInterval;
public long windowSize;
public long slideInterval;
TimeStamp<I1> timeStamp1;
TimeStamp<I2> timeStamp2;
public TimeStamp<I1> timeStamp1;
public TimeStamp<I2> timeStamp2;
public WindowDBOperator(DataStream<I1> input1, DataStream<I2> input2) {
public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
if (input1 == null || input2 == null) {
throw new NullPointerException();
}
......@@ -97,7 +97,7 @@ public abstract class WindowDBOperator<I1, I2, OP> {
return createNextWindowOperator();
}
protected abstract OP createNextWindowOperator();
}
......@@ -20,20 +20,25 @@ package org.apache.flink.streaming.api.function.co;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.util.Collector;
public class CrossWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out)
throws Exception {
private CrossFunction<IN1, IN2, OUT> crossFunction;
public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
this.crossFunction = crossFunction;
}
for (IN1 item1 : first) {
for (IN2 item2 : second) {
out.collect(new Tuple2<IN1, IN2>(item1, item2));
@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
for (IN1 firstValue : first) {
for (IN2 secondValue : second) {
out.collect(crossFunction.cross(firstValue, secondValue));
}
}
}
}
\ No newline at end of file
......@@ -20,31 +20,30 @@ package org.apache.flink.streaming.api.function.co;
import java.util.List;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> {
public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private KeySelector<IN1, ?> keySelector1;
private KeySelector<IN2, ?> keySelector2;
private JoinFunction<IN1, IN2, OUT> joinFunction;
public JoinWindowFunction() {
}
public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
JoinFunction<IN1, IN2, OUT> joinFunction) {
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
this.joinFunction = joinFunction;
}
@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<Tuple2<IN1, IN2>> out)
throws Exception {
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
for (IN1 item1 : first) {
for (IN2 item2 : second) {
if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
out.collect(new Tuple2<IN1, IN2>(item1, item2));
out.collect(joinFunction.join(item1, item2));
}
}
}
......
......@@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable {
DataStream<Integer> inStream2 = env.fromCollection(in2);
inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.where(0).equalTo(0).addSink(new JoinResultSink());
.where(0).equalTo(0).withDefault().addSink(new JoinResultSink());
inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.with(new CrossFunction<Tuple2<Integer,String>, Integer, Tuple2<Tuple2<Integer,String>, Integer>>() {
......
......@@ -67,7 +67,8 @@ public class WindowJoin {
.join(salaries)
.onWindow(1000)
.where(0)
.equalTo(0);
.equalTo(0)
.withDefault();
// emit result
if (fileOutput) {
......
......@@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* received records.
*
*/
def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count())
def count: DataStream[Long] = new DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]]
/**
* Creates a new DataStream by applying the given function to every element of this DataStream.
......@@ -445,14 +445,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Creates a new SplitDataStream that contains only the elements satisfying the
* given output selector predicate.
*/
def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
def split(fun: T => String): SplitDataStream[T] = {
if (fun == null) {
throw new NullPointerException("OutputSelector must not be null.")
}
val selector = new OutputSelector[T] {
val cleanFun = clean(fun)
def select(in: T): java.lang.Iterable[String] = {
asJavaIterable(cleanFun(in).toIterable)
List(cleanFun(in))
}
}
split(selector)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册