提交 64baa00b 编写于 作者: G ghermann 提交者: mbalassi

[streaming] Window cross API rework

上级 d6dc4349
......@@ -18,9 +18,11 @@
package org.apache.flink.streaming.api.datastream;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -32,7 +34,6 @@ import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
......@@ -47,6 +48,7 @@ import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeWrapper;
import org.apache.flink.util.Collector;
/**
* The ConnectedDataStream represents a stream for two different data types. It
......@@ -550,22 +552,66 @@ public class ConnectedDataStream<IN1, IN2> {
return invokable;
}
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowCross(long windowSize, long slideInterval) {
return windowCross(windowSize, slideInterval, new DefaultTimeStamp<IN1>(),
new DefaultTimeStamp<IN2>());
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCross(
CrossFunction<IN1, IN2, OUT> crossFunction, long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> windowCross(long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
.createSerializer().createInstance());
TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
.createSerializer().createInstance());
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(crossFunction,
CrossFunction.class, 2);
return addGeneralWindowJoin(new CrossWindowFunction<IN1, IN2>(), windowSize, slideInterval,
timestamp1, timestamp2);
CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(crossFunction);
return addGeneralWindowCombine(crossWindowFunction, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
}
private static class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private CrossFunction<IN1, IN2, OUT> crossFunction;
public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
this.crossFunction = crossFunction;
}
@Override
public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
throws Exception {
for (IN1 firstValue : first) {
for (IN2 secondValue : second) {
out.collect(crossFunction.cross(firstValue, secondValue));
}
}
}
}
protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
.createSerializer().createInstance());
TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
.createSerializer().createInstance());
CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
in1TypeWrapper, in2TypeWrapper);
return addGeneralWindowCombine(coWindowFunction, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
}
private <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeWrapper<IN1> in1TypeWrapper,
TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper, long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
......@@ -573,20 +619,9 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must be positive");
}
TypeWrapper<IN1> in1TypeWrapper = null;
TypeWrapper<IN2> in2TypeWrapper = null;
in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType().createSerializer()
.createInstance());
in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType().createSerializer()
.createInstance());
CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
in1TypeWrapper, in2TypeWrapper);
return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoWindowInvokable<IN1, IN2, Tuple2<IN1, IN2>>(coWindowFunction,
windowSize, slideInterval, timestamp1, timestamp2));
outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
slideInterval, timestamp1, timestamp2));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
......
......@@ -37,7 +37,6 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -58,8 +57,6 @@ import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
......@@ -495,6 +492,34 @@ public class DataStream<OUT> {
return new StreamProjection<OUT>(this.copy(), fieldIndexes);
}
/**
* Initiates a temporal Cross transformation.<br/>
* A Cross transformation combines the elements of two {@link DataStream}s
* into one DataStream over a specified time window. It builds all pair
* combinations of elements of both DataStreams, i.e., it builds a Cartesian
* product.
*
* <p>
* This method returns a {@link StreamCrossOperator} on which the
* {@link StreamCrossOperator#onWindow} should be called to define the
* window, and then call
* {@link StreamCrossOperator.CrossWindow#with(org.apache.flink.api.common.functions.CrossFunction)}
* to define a {@link org.apache.flink.api.common.functions.CrossFunction}
* which is called for each pair of crossed elements. The CrossFunction
* returns a exactly one element for each pair of input elements.
*
* @param dataStreamToCross
* The other DataStream with which this DataStream is crossed.
* @return A {@link StreamCrossOperator} to continue the definition of the
* Join transformation.
*
* @see org.apache.flink.api.common.functions.CrossFunction
* @see DataStream
*/
public <IN2> StreamCrossOperator<OUT, IN2> cross(DataStream<IN2> dataStreamToCross) {
return new StreamCrossOperator<OUT, IN2>(this, dataStreamToCross);
}
/**
* Initiates a temporal Join transformation. <br/>
* A temporal Join transformation joins the elements of two
......@@ -765,51 +790,6 @@ public class DataStream<OUT> {
return new WindowedDataStream<OUT>(this, triggers, evicters);
}
/**
* Creates a cross (Cartesian product) of a data stream window. The user can
* implement their own time stamps or use the system time by default.
*
* @param windowSize
* Size of the windows that will be aligned for both streams in
* milliseconds.
* @param slideInterval
* After every function call the windows will be slid by this
* interval.
* @param dataStreamToCross
* @param windowSize
* @param slideInterval
* @return The transformed {@link DataStream}.
*/
public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowCross(
DataStream<IN2> dataStreamToCross, long windowSize, long slideInterval) {
return this.windowCross(dataStreamToCross, windowSize, slideInterval,
new DefaultTimeStamp<OUT>(), new DefaultTimeStamp<IN2>());
}
/**
* Creates a cross (Cartesian product) of a data stream window.
*
* @param dataStreamToCross
* {@link DataStream} to cross with.
* @param windowSize
* Size of the windows that will be aligned for both streams in
* milliseconds.
* @param slideInterval
* After every function call the windows will be slid by this
* interval.
* @param timestamp1
* User defined time stamps for the first input.
* @param timestamp2
* User defined time stamps for the second input.
* @return The transformed {@link DataStream}.
*/
public <IN2> SingleOutputStreamOperator<Tuple2<OUT, IN2>, ?> windowCross(
DataStream<IN2> dataStreamToCross, long windowSize, long slideInterval,
TimeStamp<OUT> timestamp1, TimeStamp<IN2> timestamp2) {
return this.connect(dataStreamToCross).windowCross(windowSize, slideInterval, timestamp1,
timestamp2);
}
/**
* Writes a DataStream to the standard output stream (stdout). For each
* element of the DataStream the result of {@link Object#toString()} is
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.DataSet;
public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
super(input1, input2);
}
@Override
protected CrossWindow<I1, I2> createNextWindowOperator() {
return new CrossWindow<I1, I2>(this);
}
public static class CrossWindow<I1, I2> {
private StreamCrossOperator<I1, I2> op;
public CrossWindow(StreamCrossOperator<I1, I2> operator) {
this.op = operator;
}
/**
* Finalizes a temporal Cross transformation by applying a {@link CrossFunction} to each pair of crossed elements.<br/>
* Each CrossFunction call returns exactly one element.
*
* @param function The CrossFunction that is called for each pair of crossed elements.
* @return An CrossOperator that represents the crossed result DataSet
*
* @see CrossFunction
* @see DataSet
*/
public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
return createCrossOperator(function);
}
protected <R> SingleOutputStreamOperator<R, ?> createCrossOperator(
CrossFunction<I1, I2, R> function) {
return op.input1.connect(op.input2).addGeneralWindowCross(function, op.windowSize,
op.slideInterval, op.timeStamp1, op.timeStamp2);
}
// ----------------------------------------------------------------------------------------
}
}
......@@ -22,92 +22,27 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
import org.apache.flink.streaming.util.keys.PojoKeySelector;
public class StreamJoinOperator<I1, I2> {
private final DataStream<I1> input1;
private final DataStream<I2> input2;
long windowSize;
long slideInterval;
TimeStamp<I1> timeStamp1;
TimeStamp<I2> timeStamp2;
public class StreamJoinOperator<I1, I2> extends
WindowDBOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
if (input1 == null || input2 == null) {
throw new NullPointerException();
}
this.input1 = input1.copy();
this.input2 = input2.copy();
super(input1, input2);
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
*
* @param windowSize
* The size of the window in milliseconds.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
*/
public JoinWindow onWindow(long windowSize) {
return onWindow(windowSize, windowSize);
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
*
* @param windowSize
* The size of the window in milliseconds.
* @param slideInterval
* The slide size of the window.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
*/
public JoinWindow onWindow(long windowSize, long slideInterval) {
return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(),
new DefaultTimeStamp<I2>());
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
*
* @param windowSize
* The size of the window in milliseconds.
* @param slideInterval
* The slide size of the window.
* @param timeStamp1
* The timestamp used to extract time from the elements of the
* first data stream.
* @param timeStamp2
* The timestamp used to extract time from the elements of the
* second data stream.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
*/
public JoinWindow onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1,
TimeStamp<I2> timeStamp2) {
this.windowSize = windowSize;
this.slideInterval = slideInterval;
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
return new JoinWindow();
@Override
protected JoinWindow<I1, I2> createNextWindowOperator() {
return new JoinWindow<I1, I2>(this);
}
public class JoinWindow {
public static class JoinWindow<I1, I2> {
private JoinWindow() {
private StreamJoinOperator<I1, I2> op;
private JoinWindow(StreamJoinOperator<I1, I2> operator) {
this.op = operator;
}
/**
......@@ -123,8 +58,9 @@ public class StreamJoinOperator<I1, I2> {
* @return An incomplete Join transformation. Call
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public JoinPredicate where(int... fields) {
return new JoinPredicate(FieldsKeySelector.getSelector(input1.getOutputType(), fields));
public JoinPredicate<I1, I2> where(int... fields) {
return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(
op.input1.getOutputType(), fields));
}
/**
......@@ -139,8 +75,9 @@ public class StreamJoinOperator<I1, I2> {
* @return An incomplete Join transformation. Call
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public JoinPredicate where(String... fields) {
return new JoinPredicate(new PojoKeySelector<I1>(input1.getOutputType(), fields));
public JoinPredicate<I1, I2> where(String... fields) {
return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getOutputType(),
fields));
}
/**
......@@ -156,8 +93,8 @@ public class StreamJoinOperator<I1, I2> {
* @return An incomplete Join transformation. Call
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public <K> JoinPredicate where(KeySelector<I1, K> keySelector) {
return new JoinPredicate(keySelector);
public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
return new JoinPredicate<I1, I2>(op, keySelector);
}
// ----------------------------------------------------------------------------------------
......@@ -170,11 +107,13 @@ public class StreamJoinOperator<I1, I2> {
* input {@link DataStream} by calling {@link JoinPredicate#equalTo}
*
*/
public class JoinPredicate {
public static class JoinPredicate<I1, I2> {
private StreamJoinOperator<I1, I2> op;
private final KeySelector<I1, ?> keys1;
private JoinPredicate(KeySelector<I1, ?> keys1) {
private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
this.op = operator;
this.keys1 = keys1;
}
......@@ -196,7 +135,8 @@ public class StreamJoinOperator<I1, I2> {
* @return The joined data stream.
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(int... fields) {
return createJoinOperator(FieldsKeySelector.getSelector(input2.getOutputType(), fields));
return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getOutputType(),
fields));
}
/**
......@@ -214,7 +154,7 @@ public class StreamJoinOperator<I1, I2> {
* @return The joined data stream.
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(String... fields) {
return createJoinOperator(new PojoKeySelector<I2>(input2.getOutputType(), fields));
return createJoinOperator(new PojoKeySelector<I2>(op.input2.getOutputType(), fields));
}
/**
......@@ -244,8 +184,8 @@ public class StreamJoinOperator<I1, I2> {
JoinWindowFunction<I1, I2> joinWindowFunction = new JoinWindowFunction<I1, I2>(keys1,
keys2);
return input1.connect(input2).addGeneralWindowJoin(joinWindowFunction, windowSize,
slideInterval, timeStamp1, timeStamp2);
return op.input1.connect(op.input2).addGeneralWindowJoin(joinWindowFunction,
op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
public abstract class WindowDBOperator<I1, I2, OP> {
protected final DataStream<I1> input1;
protected final DataStream<I2> input2;
long windowSize;
long slideInterval;
TimeStamp<I1> timeStamp1;
TimeStamp<I2> timeStamp2;
public WindowDBOperator(DataStream<I1> input1, DataStream<I2> input2) {
if (input1 == null || input2 == null) {
throw new NullPointerException();
}
this.input1 = input1.copy();
this.input2 = input2.copy();
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
*
* @param windowSize
* The size of the window in milliseconds.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
*/
public OP onWindow(long windowSize) {
return onWindow(windowSize, windowSize);
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
*
* @param windowSize
* The size of the window in milliseconds.
* @param slideInterval
* The slide size of the window.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
*/
public OP onWindow(long windowSize, long slideInterval) {
return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(),
new DefaultTimeStamp<I2>());
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
*
* @param windowSize
* The size of the window in milliseconds.
* @param slideInterval
* The slide size of the window.
* @param timeStamp1
* The timestamp used to extract time from the elements of the
* first data stream.
* @param timeStamp2
* The timestamp used to extract time from the elements of the
* second data stream.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
*/
public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1,
TimeStamp<I2> timeStamp2) {
this.windowSize = windowSize;
this.slideInterval = slideInterval;
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
return createNextWindowOperator();
}
protected abstract OP createNextWindowOperator();
}
......@@ -19,8 +19,10 @@ package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
......@@ -29,7 +31,10 @@ import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.junit.Test;
public class WindowCrossJoinTest {
public class WindowCrossJoinTest implements Serializable {
private static final long serialVersionUID = 1L;
private static final long MEMORYSIZE = 32;
private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
......@@ -93,7 +98,17 @@ public class WindowCrossJoinTest {
inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.where(0).equalTo(0).addSink(new JoinResultSink());
inStream1.windowCross(inStream2, 1000, 1000, new MyTimestamp1(), new MyTimestamp2())
inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.with(new CrossFunction<Tuple2<Integer,String>, Integer, Tuple2<Tuple2<Integer,String>, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Tuple2<Integer, String>, Integer> cross(
Tuple2<Integer, String> val1, Integer val2) throws Exception {
return new Tuple2<Tuple2<Integer,String>, Integer>(val1, val2);
}
})
.addSink(new CrossResultSink());
env.executeTest(MEMORYSIZE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册