diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index d2b8332491eba73838d84d76ec4e422097594d77..16217527d55df7b3d041ca40ec29b0580d40ad9b 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -18,9 +18,11 @@ package org.apache.flink.streaming.api.datastream; import java.io.Serializable; +import java.util.List; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -32,7 +34,6 @@ import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; import org.apache.flink.streaming.api.function.co.CoMapFunction; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.function.co.CoWindowFunction; -import org.apache.flink.streaming.api.function.co.CrossWindowFunction; import org.apache.flink.streaming.api.function.co.RichCoMapFunction; import org.apache.flink.streaming.api.function.co.RichCoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable; @@ -47,6 +48,7 @@ import org.apache.flink.streaming.util.serialization.CombineTypeWrapper; import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper; import org.apache.flink.streaming.util.serialization.TypeWrapper; +import org.apache.flink.util.Collector; /** * The ConnectedDataStream represents a stream for two different data types. It @@ -550,22 +552,66 @@ public class ConnectedDataStream { return invokable; } - SingleOutputStreamOperator, ?> windowCross(long windowSize, long slideInterval) { - return windowCross(windowSize, slideInterval, new DefaultTimeStamp(), - new DefaultTimeStamp()); - } + protected SingleOutputStreamOperator addGeneralWindowCross( + CrossFunction crossFunction, long windowSize, long slideInterval, + TimeStamp timestamp1, TimeStamp timestamp2) { - SingleOutputStreamOperator, ?> windowCross(long windowSize, - long slideInterval, TimeStamp timestamp1, TimeStamp timestamp2) { + TypeWrapper in1TypeWrapper = new ObjectTypeWrapper(dataStream1.getOutputType() + .createSerializer().createInstance()); + TypeWrapper in2TypeWrapper = new ObjectTypeWrapper(dataStream2.getOutputType() + .createSerializer().createInstance()); + + FunctionTypeWrapper outTypeWrapper = new FunctionTypeWrapper(crossFunction, + CrossFunction.class, 2); - return addGeneralWindowJoin(new CrossWindowFunction(), windowSize, slideInterval, - timestamp1, timestamp2); + CrossWindowFunction crossWindowFunction = new CrossWindowFunction(crossFunction); + + return addGeneralWindowCombine(crossWindowFunction, in1TypeWrapper, in2TypeWrapper, + outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2); } + private static class CrossWindowFunction implements CoWindowFunction { + + private static final long serialVersionUID = 1L; + + private CrossFunction crossFunction; + + public CrossWindowFunction(CrossFunction crossFunction) { + this.crossFunction = crossFunction; + } + + @Override + public void coWindow(List first, List second, Collector out) + throws Exception { + for (IN1 firstValue : first) { + for (IN2 secondValue : second) { + out.collect(crossFunction.cross(firstValue, secondValue)); + } + } + } + } + protected SingleOutputStreamOperator, ?> addGeneralWindowJoin( CoWindowFunction> coWindowFunction, long windowSize, long slideInterval, TimeStamp timestamp1, TimeStamp timestamp2) { + TypeWrapper in1TypeWrapper = new ObjectTypeWrapper(dataStream1.getOutputType() + .createSerializer().createInstance()); + TypeWrapper in2TypeWrapper = new ObjectTypeWrapper(dataStream2.getOutputType() + .createSerializer().createInstance()); + + CombineTypeWrapper outTypeWrapper = new CombineTypeWrapper( + in1TypeWrapper, in2TypeWrapper); + + return addGeneralWindowCombine(coWindowFunction, in1TypeWrapper, in2TypeWrapper, + outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2); + } + + private SingleOutputStreamOperator addGeneralWindowCombine( + CoWindowFunction coWindowFunction, TypeWrapper in1TypeWrapper, + TypeWrapper in2TypeWrapper, TypeWrapper outTypeWrapper, long windowSize, + long slideInterval, TimeStamp timestamp1, TimeStamp timestamp2) { + if (windowSize < 1) { throw new IllegalArgumentException("Window size must be positive"); } @@ -573,20 +619,9 @@ public class ConnectedDataStream { throw new IllegalArgumentException("Slide interval must be positive"); } - TypeWrapper in1TypeWrapper = null; - TypeWrapper in2TypeWrapper = null; - - in1TypeWrapper = new ObjectTypeWrapper(dataStream1.getOutputType().createSerializer() - .createInstance()); - in2TypeWrapper = new ObjectTypeWrapper(dataStream2.getOutputType().createSerializer() - .createInstance()); - - CombineTypeWrapper outTypeWrapper = new CombineTypeWrapper( - in1TypeWrapper, in2TypeWrapper); - return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper, - outTypeWrapper, new CoWindowInvokable>(coWindowFunction, - windowSize, slideInterval, timestamp1, timestamp2)); + outTypeWrapper, new CoWindowInvokable(coWindowFunction, windowSize, + slideInterval, timestamp1, timestamp2)); } protected SingleOutputStreamOperator addCoFunction(String functionName, diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 13a7ba1306344c909c328efadc5eda1213d140c9..fd19f73323cb143b22d998fac10286edd1a3028d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -37,7 +37,6 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -58,8 +57,6 @@ import org.apache.flink.streaming.api.invokable.operator.FilterInvokable; import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable; import org.apache.flink.streaming.api.invokable.operator.MapInvokable; import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Time; @@ -495,6 +492,34 @@ public class DataStream { return new StreamProjection(this.copy(), fieldIndexes); } + /** + * Initiates a temporal Cross transformation.
+ * A Cross transformation combines the elements of two {@link DataStream}s + * into one DataStream over a specified time window. It builds all pair + * combinations of elements of both DataStreams, i.e., it builds a Cartesian + * product. + * + *

+ * This method returns a {@link StreamCrossOperator} on which the + * {@link StreamCrossOperator#onWindow} should be called to define the + * window, and then call + * {@link StreamCrossOperator.CrossWindow#with(org.apache.flink.api.common.functions.CrossFunction)} + * to define a {@link org.apache.flink.api.common.functions.CrossFunction} + * which is called for each pair of crossed elements. The CrossFunction + * returns a exactly one element for each pair of input elements. + * + * @param dataStreamToCross + * The other DataStream with which this DataStream is crossed. + * @return A {@link StreamCrossOperator} to continue the definition of the + * Join transformation. + * + * @see org.apache.flink.api.common.functions.CrossFunction + * @see DataStream + */ + public StreamCrossOperator cross(DataStream dataStreamToCross) { + return new StreamCrossOperator(this, dataStreamToCross); + } + /** * Initiates a temporal Join transformation.
* A temporal Join transformation joins the elements of two @@ -765,51 +790,6 @@ public class DataStream { return new WindowedDataStream(this, triggers, evicters); } - /** - * Creates a cross (Cartesian product) of a data stream window. The user can - * implement their own time stamps or use the system time by default. - * - * @param windowSize - * Size of the windows that will be aligned for both streams in - * milliseconds. - * @param slideInterval - * After every function call the windows will be slid by this - * interval. - * @param dataStreamToCross - * @param windowSize - * @param slideInterval - * @return The transformed {@link DataStream}. - */ - public SingleOutputStreamOperator, ?> windowCross( - DataStream dataStreamToCross, long windowSize, long slideInterval) { - return this.windowCross(dataStreamToCross, windowSize, slideInterval, - new DefaultTimeStamp(), new DefaultTimeStamp()); - } - - /** - * Creates a cross (Cartesian product) of a data stream window. - * - * @param dataStreamToCross - * {@link DataStream} to cross with. - * @param windowSize - * Size of the windows that will be aligned for both streams in - * milliseconds. - * @param slideInterval - * After every function call the windows will be slid by this - * interval. - * @param timestamp1 - * User defined time stamps for the first input. - * @param timestamp2 - * User defined time stamps for the second input. - * @return The transformed {@link DataStream}. - */ - public SingleOutputStreamOperator, ?> windowCross( - DataStream dataStreamToCross, long windowSize, long slideInterval, - TimeStamp timestamp1, TimeStamp timestamp2) { - return this.connect(dataStreamToCross).windowCross(windowSize, slideInterval, timestamp1, - timestamp2); - } - /** * Writes a DataStream to the standard output stream (stdout). For each * element of the DataStream the result of {@link Object#toString()} is diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java new file mode 100644 index 0000000000000000000000000000000000000000..f54f2732bac2ea64afd824ba39ea3cdea77a5069 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.java.DataSet; + +public class StreamCrossOperator extends WindowDBOperator> { + + public StreamCrossOperator(DataStream input1, DataStream input2) { + super(input1, input2); + } + + @Override + protected CrossWindow createNextWindowOperator() { + return new CrossWindow(this); + } + + public static class CrossWindow { + + private StreamCrossOperator op; + + public CrossWindow(StreamCrossOperator operator) { + this.op = operator; + } + + /** + * Finalizes a temporal Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.
+ * Each CrossFunction call returns exactly one element. + * + * @param function The CrossFunction that is called for each pair of crossed elements. + * @return An CrossOperator that represents the crossed result DataSet + * + * @see CrossFunction + * @see DataSet + */ + public SingleOutputStreamOperator with(CrossFunction function) { + return createCrossOperator(function); + } + + protected SingleOutputStreamOperator createCrossOperator( + CrossFunction function) { + + return op.input1.connect(op.input2).addGeneralWindowCross(function, op.windowSize, + op.slideInterval, op.timeStamp1, op.timeStamp2); + + } + + // ---------------------------------------------------------------------------------------- + + } + +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java index fb76d283e3baf1f853d09912de7753b6f31f154d..ba6e75e71457dc8a6c92acb6d294b2f3aed12cb7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java @@ -22,92 +22,27 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.JoinWindowFunction; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.util.keys.FieldsKeySelector; import org.apache.flink.streaming.util.keys.PojoKeySelector; -public class StreamJoinOperator { - - private final DataStream input1; - private final DataStream input2; - - long windowSize; - long slideInterval; - - TimeStamp timeStamp1; - TimeStamp timeStamp2; +public class StreamJoinOperator extends + WindowDBOperator> { public StreamJoinOperator(DataStream input1, DataStream input2) { - if (input1 == null || input2 == null) { - throw new NullPointerException(); - } - this.input1 = input1.copy(); - this.input2 = input2.copy(); + super(input1, input2); } - /** - * Continues a temporal Join transformation.
- * Defines the window size on which the two DataStreams will be joined. - * - * @param windowSize - * The size of the window in milliseconds. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. - */ - public JoinWindow onWindow(long windowSize) { - return onWindow(windowSize, windowSize); - } - - /** - * Continues a temporal Join transformation.
- * Defines the window size on which the two DataStreams will be joined. - * - * @param windowSize - * The size of the window in milliseconds. - * @param slideInterval - * The slide size of the window. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. - */ - public JoinWindow onWindow(long windowSize, long slideInterval) { - return onWindow(windowSize, slideInterval, new DefaultTimeStamp(), - new DefaultTimeStamp()); - } - - /** - * Continues a temporal Join transformation.
- * Defines the window size on which the two DataStreams will be joined. - * - * @param windowSize - * The size of the window in milliseconds. - * @param slideInterval - * The slide size of the window. - * @param timeStamp1 - * The timestamp used to extract time from the elements of the - * first data stream. - * @param timeStamp2 - * The timestamp used to extract time from the elements of the - * second data stream. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. - */ - public JoinWindow onWindow(long windowSize, long slideInterval, TimeStamp timeStamp1, - TimeStamp timeStamp2) { - - this.windowSize = windowSize; - this.slideInterval = slideInterval; - - this.timeStamp1 = timeStamp1; - this.timeStamp2 = timeStamp2; - - return new JoinWindow(); + @Override + protected JoinWindow createNextWindowOperator() { + return new JoinWindow(this); } - public class JoinWindow { + public static class JoinWindow { - private JoinWindow() { + private StreamJoinOperator op; + private JoinWindow(StreamJoinOperator operator) { + this.op = operator; } /** @@ -123,8 +58,9 @@ public class StreamJoinOperator { * @return An incomplete Join transformation. Call * {@link JoinPredicate#equalTo} to continue the Join. */ - public JoinPredicate where(int... fields) { - return new JoinPredicate(FieldsKeySelector.getSelector(input1.getOutputType(), fields)); + public JoinPredicate where(int... fields) { + return new JoinPredicate(op, FieldsKeySelector.getSelector( + op.input1.getOutputType(), fields)); } /** @@ -139,8 +75,9 @@ public class StreamJoinOperator { * @return An incomplete Join transformation. Call * {@link JoinPredicate#equalTo} to continue the Join. */ - public JoinPredicate where(String... fields) { - return new JoinPredicate(new PojoKeySelector(input1.getOutputType(), fields)); + public JoinPredicate where(String... fields) { + return new JoinPredicate(op, new PojoKeySelector(op.input1.getOutputType(), + fields)); } /** @@ -156,8 +93,8 @@ public class StreamJoinOperator { * @return An incomplete Join transformation. Call * {@link JoinPredicate#equalTo} to continue the Join. */ - public JoinPredicate where(KeySelector keySelector) { - return new JoinPredicate(keySelector); + public JoinPredicate where(KeySelector keySelector) { + return new JoinPredicate(op, keySelector); } // ---------------------------------------------------------------------------------------- @@ -170,11 +107,13 @@ public class StreamJoinOperator { * input {@link DataStream} by calling {@link JoinPredicate#equalTo} * */ - public class JoinPredicate { + public static class JoinPredicate { + private StreamJoinOperator op; private final KeySelector keys1; - private JoinPredicate(KeySelector keys1) { + private JoinPredicate(StreamJoinOperator operator, KeySelector keys1) { + this.op = operator; this.keys1 = keys1; } @@ -196,7 +135,8 @@ public class StreamJoinOperator { * @return The joined data stream. */ public SingleOutputStreamOperator, ?> equalTo(int... fields) { - return createJoinOperator(FieldsKeySelector.getSelector(input2.getOutputType(), fields)); + return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getOutputType(), + fields)); } /** @@ -214,7 +154,7 @@ public class StreamJoinOperator { * @return The joined data stream. */ public SingleOutputStreamOperator, ?> equalTo(String... fields) { - return createJoinOperator(new PojoKeySelector(input2.getOutputType(), fields)); + return createJoinOperator(new PojoKeySelector(op.input2.getOutputType(), fields)); } /** @@ -244,8 +184,8 @@ public class StreamJoinOperator { JoinWindowFunction joinWindowFunction = new JoinWindowFunction(keys1, keys2); - return input1.connect(input2).addGeneralWindowJoin(joinWindowFunction, windowSize, - slideInterval, timeStamp1, timeStamp2); + return op.input1.connect(op.input2).addGeneralWindowJoin(joinWindowFunction, + op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java new file mode 100644 index 0000000000000000000000000000000000000000..4f6f0c12627d4f66e8a65a979afdc98f4adcc1d6 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; + +public abstract class WindowDBOperator { + + protected final DataStream input1; + protected final DataStream input2; + + long windowSize; + long slideInterval; + + TimeStamp timeStamp1; + TimeStamp timeStamp2; + + public WindowDBOperator(DataStream input1, DataStream input2) { + if (input1 == null || input2 == null) { + throw new NullPointerException(); + } + this.input1 = input1.copy(); + this.input2 = input2.copy(); + } + + /** + * Continues a temporal Join transformation.
+ * Defines the window size on which the two DataStreams will be joined. + * + * @param windowSize + * The size of the window in milliseconds. + * @return An incomplete Join transformation. Call {@link JoinWindow#where} + * to continue the Join. + */ + public OP onWindow(long windowSize) { + return onWindow(windowSize, windowSize); + } + + /** + * Continues a temporal Join transformation.
+ * Defines the window size on which the two DataStreams will be joined. + * + * @param windowSize + * The size of the window in milliseconds. + * @param slideInterval + * The slide size of the window. + * @return An incomplete Join transformation. Call {@link JoinWindow#where} + * to continue the Join. + */ + public OP onWindow(long windowSize, long slideInterval) { + return onWindow(windowSize, slideInterval, new DefaultTimeStamp(), + new DefaultTimeStamp()); + } + + /** + * Continues a temporal Join transformation.
+ * Defines the window size on which the two DataStreams will be joined. + * + * @param windowSize + * The size of the window in milliseconds. + * @param slideInterval + * The slide size of the window. + * @param timeStamp1 + * The timestamp used to extract time from the elements of the + * first data stream. + * @param timeStamp2 + * The timestamp used to extract time from the elements of the + * second data stream. + * @return An incomplete Join transformation. Call {@link JoinWindow#where} + * to continue the Join. + */ + public OP onWindow(long windowSize, long slideInterval, TimeStamp timeStamp1, + TimeStamp timeStamp2) { + + this.windowSize = windowSize; + this.slideInterval = slideInterval; + + this.timeStamp1 = timeStamp1; + this.timeStamp2 = timeStamp2; + + return createNextWindowOperator(); + } + + protected abstract OP createNextWindowOperator(); + +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index 3b719beb7ad74003c07df20d06c3f9c5654525cc..d8cdfa511cc996f83f3d32d66c50ef883b43df8a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -19,8 +19,10 @@ package org.apache.flink.streaming.api; import static org.junit.Assert.assertEquals; +import java.io.Serializable; import java.util.ArrayList; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; @@ -29,7 +31,10 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction; import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.junit.Test; -public class WindowCrossJoinTest { +public class WindowCrossJoinTest implements Serializable { + + private static final long serialVersionUID = 1L; + private static final long MEMORYSIZE = 32; private static ArrayList, Integer>> joinResults = new ArrayList, Integer>>(); @@ -93,7 +98,17 @@ public class WindowCrossJoinTest { inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) .where(0).equalTo(0).addSink(new JoinResultSink()); - inStream1.windowCross(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2()) + inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) + .with(new CrossFunction, Integer, Tuple2, Integer>>() { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple2, Integer> cross( + Tuple2 val1, Integer val2) throws Exception { + return new Tuple2, Integer>(val1, val2); + } + }) .addSink(new CrossResultSink()); env.executeTest(MEMORYSIZE);