diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index c663315f76385bccc5f1e284951f123a113778b7..65a6c37b71c829ed4261f929584632e79c9ac7de 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -17,15 +17,10 @@ package org.apache.flink.streaming.api.datastream; -import java.util.List; - -import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -43,7 +38,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp; -import org.apache.flink.util.Collector; /** * The ConnectedDataStream represents a stream for two different data types. It @@ -545,54 +539,7 @@ public class ConnectedDataStream { return invokable; } - protected SingleOutputStreamOperator addGeneralWindowCross( - CrossFunction crossFunction, long windowSize, long slideInterval, - TimeStamp timestamp1, TimeStamp timestamp2) { - - TypeInformation outTypeInfo = TypeExtractor.createTypeInfo(CrossFunction.class, - crossFunction.getClass(), 2, null, null); - - CrossWindowFunction crossWindowFunction = new CrossWindowFunction( - clean(crossFunction)); - - return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval, - timestamp1, timestamp2); - } - - private static class CrossWindowFunction implements - CoWindowFunction { - - private static final long serialVersionUID = 1L; - - private CrossFunction crossFunction; - - public CrossWindowFunction(CrossFunction crossFunction) { - this.crossFunction = crossFunction; - } - - @Override - public void coWindow(List first, List second, Collector out) - throws Exception { - for (IN1 firstValue : first) { - for (IN2 secondValue : second) { - out.collect(crossFunction.cross(firstValue, secondValue)); - } - } - } - } - - protected SingleOutputStreamOperator, ?> addGeneralWindowJoin( - CoWindowFunction> coWindowFunction, long windowSize, - long slideInterval, TimeStamp timestamp1, TimeStamp timestamp2) { - - TypeInformation> outType = new TupleTypeInfo>( - getInputType1(), getInputType2()); - - return addGeneralWindowCombine(coWindowFunction, outType, windowSize, slideInterval, - timestamp1, timestamp2); - } - - private SingleOutputStreamOperator addGeneralWindowCombine( + protected SingleOutputStreamOperator addGeneralWindowCombine( CoWindowFunction coWindowFunction, TypeInformation outTypeInfo, long windowSize, long slideInterval, TimeStamp timestamp1, TimeStamp timestamp2) { diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java index f54f2732bac2ea64afd824ba39ea3cdea77a5069..c6cba6344bbd7fb28e5166ac1de2ec1207a8fa3c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java @@ -19,9 +19,16 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.CrossOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.function.co.CrossWindowFunction; -public class StreamCrossOperator extends WindowDBOperator> { +public class StreamCrossOperator extends + TemporalOperator> { public StreamCrossOperator(DataStream input1, DataStream input2) { super(input1, input2); @@ -40,12 +47,23 @@ public class StreamCrossOperator extends WindowDBOperator F clean(F f) { + if (op.input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) { + ClosureCleaner.clean(f, true); + } + ClosureCleaner.ensureSerializable(f); + return f; + } + /** - * Finalizes a temporal Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.
- * Each CrossFunction call returns exactly one element. + * Finalizes a temporal Cross transformation by applying a + * {@link CrossFunction} to each pair of crossed elements.
+ * Each CrossFunction call returns exactly one element. * - * @param function The CrossFunction that is called for each pair of crossed elements. - * @return An CrossOperator that represents the crossed result DataSet + * @param function + * The CrossFunction that is called for each pair of crossed + * elements. + * @return A CrossOperator that represents the crossed result DataStream * * @see CrossFunction * @see DataSet @@ -54,15 +72,29 @@ public class StreamCrossOperator extends WindowDBOperator, ?> withDefault() { + return createCrossOperator(new CrossOperator.DefaultCrossFunction()); + } + protected SingleOutputStreamOperator createCrossOperator( CrossFunction function) { - return op.input1.connect(op.input2).addGeneralWindowCross(function, op.windowSize, - op.slideInterval, op.timeStamp1, op.timeStamp2); + TypeInformation outTypeInfo = TypeExtractor.getCrossReturnTypes(function, + op.input1.getType(), op.input2.getType()); - } + CrossWindowFunction crossWindowFunction = new CrossWindowFunction( + clean(function)); - // ---------------------------------------------------------------------------------------- + return op.input1.connect(op.input2).addGeneralWindowCombine(crossWindowFunction, + outTypeInfo, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); + + } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java index 89c80ab3876b6b324fee1ffd9c954dcb2d7eda64..30515875999d2d98524e5742f435fa635f7594fc 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java @@ -18,15 +18,18 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.function.co.JoinWindowFunction; import org.apache.flink.streaming.util.keys.FieldsKeySelector; import org.apache.flink.streaming.util.keys.PojoKeySelector; public class StreamJoinOperator extends - WindowDBOperator> { + TemporalOperator> { public StreamJoinOperator(DataStream input1, DataStream input2) { super(input1, input2); @@ -51,7 +54,7 @@ public class StreamJoinOperator extends * that should be used as join keys.
* Note: Fields can only be selected as join keys on Tuple * DataStreams.
- * + * * @param fields * The indexes of the other Tuple fields of the first join * DataStreams that should be used as keys. @@ -59,8 +62,8 @@ public class StreamJoinOperator extends * {@link JoinPredicate#equalTo} to continue the Join. */ public JoinPredicate where(int... fields) { - return new JoinPredicate(op, FieldsKeySelector.getSelector( - op.input1.getType(), fields)); + return new JoinPredicate(op, FieldsKeySelector.getSelector(op.input1.getType(), + fields)); } /** @@ -68,7 +71,7 @@ public class StreamJoinOperator extends * Defines the fields of the first join {@link DataStream} that should * be used as grouping keys. Fields are the names of member fields of * the underlying type of the data stream. - * + * * @param fields * The fields of the first join DataStream that should be * used as keys. @@ -105,12 +108,13 @@ public class StreamJoinOperator extends * Intermediate step of a temporal Join transformation.
* To continue the Join transformation, select the join key of the second * input {@link DataStream} by calling {@link JoinPredicate#equalTo} - * + * */ public static class JoinPredicate { - private StreamJoinOperator op; - private final KeySelector keys1; + public StreamJoinOperator op; + public KeySelector keys1; + public KeySelector keys2; private JoinPredicate(StreamJoinOperator operator, KeySelector keys1) { this.op = operator; @@ -124,37 +128,30 @@ public class StreamJoinOperator extends * Note: Fields can only be selected as join keys on Tuple * DataStreams.
* - * The resulting operator wraps each pair of joining elements into a - * {@link Tuple2}, with the element of the first input being the first - * field of the tuple and the element of the second input being the - * second field of the tuple. - * * @param fields * The indexes of the Tuple fields of the second join * DataStream that should be used as keys. - * @return The joined data stream. + * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or + * {@link FinalizeStreamJoin#withDefault} to complete */ - public SingleOutputStreamOperator, ?> equalTo(int... fields) { - return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(), - fields)); + public FinalizeStreamJoin equalTo(int... fields) { + keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields); + return new FinalizeStreamJoin(this); } /** * Continues a temporal Join transformation and defines the fields of * the second join {@link DataStream} that should be used as join keys.
- * - * The resulting operator wraps each pair of joining elements into a - * {@link Tuple2}, with the element of the first input being the first - * field of the tuple and the element of the second input being the - * second field of the tuple. - * + * * @param fields * The fields of the second join DataStream that should be * used as keys. - * @return The joined data stream. + * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or + * {@link FinalizeStreamJoin#withDefault} to complete */ - public SingleOutputStreamOperator, ?> equalTo(String... fields) { - return createJoinOperator(new PojoKeySelector(op.input2.getType(), fields)); + public FinalizeStreamJoin equalTo(String... fields) { + this.keys2 = new PojoKeySelector(op.input2.getType(), fields); + return new FinalizeStreamJoin(this); } /** @@ -164,29 +161,75 @@ public class StreamJoinOperator extends * second DataStream and extracts a single key value on which the * DataStream is joined.
* - * The resulting operator wraps each pair of joining elements into a - * {@link Tuple2}, with the element of the first input being the first - * field of the tuple and the element of the second input being the - * second field of the tuple. - * * @param keySelector * The KeySelector function which extracts the key values * from the second DataStream on which it is joined. + * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or + * {@link FinalizeStreamJoin#withDefault} to complete + */ + public FinalizeStreamJoin equalTo(KeySelector keySelector) { + this.keys2 = keySelector; + return new FinalizeStreamJoin(this); + } + + } + + public static class FinalizeStreamJoin { + private final JoinPredicate predicate; + + private FinalizeStreamJoin(JoinPredicate predicate) { + this.predicate = predicate; + } + + /** + * Completes a stream join.

The resulting operator wraps each pair + * of joining elements into a {@link Tuple2}, with the element of the + * first input being the first field of the tuple and the element of the + * second input being the second field of the tuple. + * + * @return The joined data stream. + */ + public SingleOutputStreamOperator, ?> withDefault() { + return createJoinOperator(new DefaultJoinFunction()); + } + + /** + * Completes a stream join.

The resulting operator wraps each pair + * of joining elements using the user defined {@link JoinFunction} + * * @return The joined data stream. */ - public SingleOutputStreamOperator, ?> equalTo( - KeySelector keySelector) { - return createJoinOperator(keySelector); + public SingleOutputStreamOperator with(JoinFunction joinFunction) { + return createJoinOperator(joinFunction); } - protected SingleOutputStreamOperator, ?> createJoinOperator( - KeySelector keys2) { + private SingleOutputStreamOperator createJoinOperator( + JoinFunction joinFunction) { + + JoinWindowFunction joinWindowFunction = new JoinWindowFunction( + predicate.keys1, predicate.keys2, joinFunction); + + StreamJoinOperator op = predicate.op; + + TypeInformation outType = TypeExtractor.getJoinReturnTypes(joinFunction, + op.input1.getType(), op.input2.getType()); - JoinWindowFunction joinWindowFunction = new JoinWindowFunction(keys1, - keys2); - return op.input1.connect(op.input2).addGeneralWindowJoin(joinWindowFunction, - op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); + return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction, + outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); } } + public static final class DefaultJoinFunction implements + JoinFunction> { + + private static final long serialVersionUID = 1L; + private final Tuple2 outTuple = new Tuple2(); + + @Override + public Tuple2 join(T1 first, T2 second) throws Exception { + outTuple.f0 = first; + outTuple.f1 = second; + return outTuple; + } + } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java similarity index 90% rename from flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java rename to flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java index 4f6f0c12627d4f66e8a65a979afdc98f4adcc1d6..cd8aabd6dc6abca140707520706547e6bb522802 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java @@ -21,18 +21,18 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp; -public abstract class WindowDBOperator { +public abstract class TemporalOperator { - protected final DataStream input1; - protected final DataStream input2; + public final DataStream input1; + public final DataStream input2; - long windowSize; - long slideInterval; + public long windowSize; + public long slideInterval; - TimeStamp timeStamp1; - TimeStamp timeStamp2; + public TimeStamp timeStamp1; + public TimeStamp timeStamp2; - public WindowDBOperator(DataStream input1, DataStream input2) { + public TemporalOperator(DataStream input1, DataStream input2) { if (input1 == null || input2 == null) { throw new NullPointerException(); } @@ -97,7 +97,7 @@ public abstract class WindowDBOperator { return createNextWindowOperator(); } - + protected abstract OP createNextWindowOperator(); } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java index f61738abef56d1276a49082338457edb964560a4..9cafcd1eb0892fb67b0ddf8d960d2cb22aec6098 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java @@ -20,20 +20,25 @@ package org.apache.flink.streaming.api.function.co; import java.util.List; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.util.Collector; -public class CrossWindowFunction implements CoWindowFunction> { +public class CrossWindowFunction implements CoWindowFunction { private static final long serialVersionUID = 1L; - @Override - public void coWindow(List first, List second, Collector> out) - throws Exception { + private CrossFunction crossFunction; + + public CrossWindowFunction(CrossFunction crossFunction) { + this.crossFunction = crossFunction; + } - for (IN1 item1 : first) { - for (IN2 item2 : second) { - out.collect(new Tuple2(item1, item2)); + @Override + public void coWindow(List first, List second, Collector out) throws Exception { + for (IN1 firstValue : first) { + for (IN2 secondValue : second) { + out.collect(crossFunction.cross(firstValue, secondValue)); } } } + } \ No newline at end of file diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java index 53e26570e6913935b150f30fe6857d07279ffe5a..9f5cd5d0b82fdf4e3b7814c70bf396826ab2a51d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java @@ -20,31 +20,30 @@ package org.apache.flink.streaming.api.function.co; import java.util.List; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; -public class JoinWindowFunction implements CoWindowFunction> { +public class JoinWindowFunction implements CoWindowFunction { private static final long serialVersionUID = 1L; private KeySelector keySelector1; private KeySelector keySelector2; + private JoinFunction joinFunction; - public JoinWindowFunction() { - } - - public JoinWindowFunction(KeySelector keySelector1, KeySelector keySelector2) { + public JoinWindowFunction(KeySelector keySelector1, KeySelector keySelector2, + JoinFunction joinFunction) { this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; + this.joinFunction = joinFunction; } @Override - public void coWindow(List first, List second, Collector> out) - throws Exception { + public void coWindow(List first, List second, Collector out) throws Exception { for (IN1 item1 : first) { for (IN2 item2 : second) { if (keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) { - out.collect(new Tuple2(item1, item2)); + out.collect(joinFunction.join(item1, item2)); } } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index 37f8c0a720ecadf98bc91e581feeaef30000019d..07d40ff8fe3127d812f46e484b170c7988c765e6 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable { DataStream inStream2 = env.fromCollection(in2); inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) - .where(0).equalTo(0).addSink(new JoinResultSink()); + .where(0).equalTo(0).withDefault().addSink(new JoinResultSink()); inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) .with(new CrossFunction, Integer, Tuple2, Integer>>() { diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 93df8239fdd857d9542fef523927322e8d01d1b5..2586e3c4fa884818d4d41e6f68f4c270cdec7b4c 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -67,7 +67,8 @@ public class WindowJoin { .join(salaries) .onWindow(1000) .where(0) - .equalTo(0); + .equalTo(0) + .withDefault(); // emit result if (fileOutput) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index 5f46c849237870fe069e0ab6bb68de58ae3cb7bb..a117412c1fff5926988dc100783dcd5b175bf65c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * received records. * */ - def count: DataStream[java.lang.Long] = new DataStream[java.lang.Long](javaStream.count()) + def count: DataStream[Long] = new DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]] /** * Creates a new DataStream by applying the given function to every element of this DataStream. @@ -445,14 +445,14 @@ class DataStream[T](javaStream: JavaStream[T]) { * Creates a new SplitDataStream that contains only the elements satisfying the * given output selector predicate. */ - def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = { + def split(fun: T => String): SplitDataStream[T] = { if (fun == null) { throw new NullPointerException("OutputSelector must not be null.") } val selector = new OutputSelector[T] { val cleanFun = clean(fun) def select(in: T): java.lang.Iterable[String] = { - asJavaIterable(cleanFun(in).toIterable) + List(cleanFun(in)) } } split(selector)