提交 127470b8 编写于 作者: G Gyula Fora 提交者: mbalassi

[streaming] ConnectedDataStream API and operator cleanup + modified...

[streaming] ConnectedDataStream API  and operator cleanup + modified windowReduceGroup functionality
上级 6492af04
......@@ -254,7 +254,7 @@ With `minBy` and `maxBy` the output of the operator is the element with the curr
### Window/Batch operators
Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default.
Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default. The user can also use user defined timestamps for calculating time windows.
When applied to grouped data streams the data stream is batched/windowed for different key values separately.
......@@ -326,6 +326,27 @@ dataStream1.connect(dataStream2)
})
~~~
#### winddowReduceGroup on ConnectedDataStream
The windowReduceGroup operator applies a user defined `CoGroupFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
~~~java
DataStream<Integer> dataStream1 = ...
DataStream<String> dataStream2 = ...
dataStream1.connect(dataStream2)
.windowReduceGroup(new CoGroupFunction<Integer, String, String>() {
@Override
public void coGroup(Iterable<Integer> first, Iterable<String> second,
Collector<String> out) throws Exception {
//Do something here
}
}, 10000, 5000);
~~~
#### Reduce on ConnectedDataStream
The Reduce operator for the `ConnectedDataStream` applies a simple reduce transformation on the joined data streams and then maps the reduced elements to a common output type.
......
......@@ -21,23 +21,23 @@ import java.io.Serializable;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
......@@ -55,17 +55,28 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
*/
public class ConnectedDataStream<IN1, IN2> {
StreamExecutionEnvironment environment;
JobGraphBuilder jobGraphBuilder;
DataStream<IN1> input1;
DataStream<IN2> input2;
protected StreamExecutionEnvironment environment;
protected JobGraphBuilder jobGraphBuilder;
protected DataStream<IN1> input1;
protected DataStream<IN2> input2;
protected ConnectedDataStream(StreamExecutionEnvironment environment,
JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2) {
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
protected boolean isGrouped;
protected int keyPosition1;
protected int keyPosition2;
protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> input2) {
this.jobGraphBuilder = input1.jobGraphBuilder;
this.environment = input1.environment;
this.input1 = input1.copy();
this.input2 = input2.copy();
if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
this.isGrouped = true;
this.keyPosition1 = ((GroupedDataStream<IN1>) input1).keyPosition;
this.keyPosition2 = ((GroupedDataStream<IN2>) input2).keyPosition;
} else {
this.isGrouped = false;
}
}
/**
......@@ -108,7 +119,7 @@ public class ConnectedDataStream<IN1, IN2> {
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Used for
* applying function on grouped data streams for example
* {@link GroupedConnectedDataStream#reduce}
* {@link ConnectedDataStream#reduce}
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
......@@ -118,14 +129,13 @@ public class ConnectedDataStream<IN1, IN2> {
* second input stream.
* @return Returns the {@link GroupedConnectedDataStream} created.
*/
public GroupedConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
if (keyPosition1 < 0 || keyPosition2 < 0) {
throw new IllegalArgumentException("The position of the field must be non-negative");
}
return new GroupedConnectedDataStream<IN1, IN2>(this.environment, this.jobGraphBuilder,
getFirst().partitionBy(keyPosition1), getSecond().partitionBy(keyPosition2),
keyPosition1, keyPosition2);
return new ConnectedDataStream<IN1, IN2>(input1.groupBy(keyPosition1),
input2.groupBy(keyPosition2));
}
/**
......@@ -205,229 +215,84 @@ public class ConnectedDataStream<IN1, IN2> {
CoReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoReduceFunction.class, 2);
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoStreamReduceInvokable<IN1, IN2, OUT>(coReducer));
}
/**
* Applies a reduceGroup transformation on the preset batches of the inputs
* of a {@link ConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each batch of the first input
* and {@link CoGroupReduceFunction#reduce2} for each batch of the second
* input. Each {@link CoGroupReduceFunction} call can return any number of
* elements including none. When the reducer has ran for all the values of a
* batch, the batch is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param batchSize1
* The number of elements in a batch of the first input.
* @param batchSize2
* The number of elements in a batch of the second input.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2) {
return batchReduceGroup(coReducer, batchSize1, batchSize2, batchSize1, batchSize2);
}
/**
* Applies a reduceGroup transformation on the preset batches of the inputs
* of a {@link ConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each batch of the first input
* and {@link CoGroupReduceFunction#reduce2} for each batch of the second
* input. Each {@link CoGroupReduceFunction} call can return any number of
* elements including none. When the reducer has ran for all the values of a
* batch, the batch is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param batchSize1
* The number of elements in a batch of the first input.
* @param batchSize2
* The number of elements in a batch of the second input.
* @param slideSize1
* The number of elements a batch of the first input is slid by.
* @param slideSize2
* The number of elements a batch of the second input is slid by.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2,
long slideSize1, long slideSize2) {
if (batchSize1 < 1 || batchSize2 < 1) {
throw new IllegalArgumentException("Batch size must be positive");
if (this.isGrouped) {
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer,
keyPosition1, keyPosition2));
} else {
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
}
if (slideSize1 < 1 || slideSize2 < 1) {
throw new IllegalArgumentException("Slide size must be positive");
}
if (batchSize1 < slideSize1 || batchSize2 < slideSize2) {
throw new IllegalArgumentException("Batch size must be at least slide size");
}
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoGroupReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoGroupReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoGroupReduceFunction.class, 2);
return addCoFunction("coBatchReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoBatchGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
batchSize1, batchSize2, slideSize1, slideSize2));
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link ConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. Each {@link CoGroupReduceFunction} call can return any number of
* elements including none. When the reducer has ran for all the values of a
* window, the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
* Applies a CoGroup transformation on the connected DataStreams. The
* transformation calls the {@link CoGroupFunction#coGroupache} method for
* for time aligned windows of the two data streams. System time is used as
* default to compute windows.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2) {
return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2);
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link ConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. Each {@link CoGroupReduceFunction} call can return any number of
* elements including none. When the reducer has ran for all the values of a
* window, the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
* @param coGroupFunction
* The {@link CoGroupFunction} that will be applied for the time
* windows.
* @param windowSize
* Size of the windows that will be aligned for both streams in
* milliseconds.
* @param slideInterval
* After every function call the windows will be slid by this
* interval.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @param slideInterval1
* The time interval a window of the first input is slid by.
* @param slideInterval2
* The time interval a window of the second input is slid by.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2) {
return windowReduceGroup(coReducer, windowSize1, windowSize2, slideInterval1,
slideInterval2, new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize, long slideInterval) {
return windowReduceGroup(coGroupFunction, windowSize, slideInterval,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link ConnectedDataStream}, where the time is provided by
* timestamps. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. Each {@link CoGroupReduceFunction} call can return any number of
* elements including none. When the reducer has ran for all the values of a
* window, the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
* Applies a CoGroup transformation on the connected DataStreams. The
* transformation calls the {@link CoGroupFunction#coGroupache} method for
* for time aligned windows of the two data streams. The user can implement
* their own time stamps or use the system time by default.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @param timestamp1
* The predefined timestamp function of the first input.
* @param timestamp2
* The predefined timestamp function of the second input.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2,
timestamp1, timestamp2);
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link ConnectedDataStream}, where the time is provided by
* timestamps. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. Each {@link CoGroupReduceFunction} call can return any number of
* elements including none. When the reducer has ran for all the values of a
* window, the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
* @param coGroupFunction
* The {@link CoGroupFunction} that will be applied for the time
* windows.
* @param windowSize
* Size of the windows that will be aligned for both streams. If
* system time is used it is milliseconds. User defined time
* stamps are assumed to be monotonically increasing.
* @param slideInterval
* After every function call the windows will be slid by this
* interval.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @param slideInterval1
* The time interval a window of the first input is slid by.
* @param slideInterval2
* The time interval a window of the second input is slid by.
* @param timestamp1
* The predefined timestamp function of the first input.
* User defined time stamps for the first input.
* @param timestamp2
* The predefined timestamp function of the second input.
* User defined time stamps for the second input.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
if (windowSize1 < 1 || windowSize2 < 1) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
if (slideInterval1 < 1 || slideInterval2 < 1) {
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
if (windowSize1 < slideInterval1 || windowSize2 < slideInterval2) {
throw new IllegalArgumentException("Window size must be at least slide interval");
}
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoGroupReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoGroupReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoGroupReduceFunction.class, 2);
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coGroupFunction,
CoGroupFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coGroupFunction,
CoGroupFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coGroupFunction,
CoGroupFunction.class, 2);
return addCoFunction("coWindowReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoWindowGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
timestamp2));
return addCoFunction("coWindowReduce", coGroupFunction, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoGroupInvokable<IN1, IN2, OUT>(coGroupFunction, windowSize,
slideInterval, timestamp1, timestamp2));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
......
......@@ -260,7 +260,7 @@ public class DataStream<OUT> {
* @return The {@link ConnectedDataStream}.
*/
public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
return new ConnectedDataStream<OUT, R>(environment, jobGraphBuilder, this, dataStream);
return new ConnectedDataStream<OUT, R>(this, dataStream);
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
int keyPosition1;
int keyPosition2;
protected GroupedConnectedDataStream(StreamExecutionEnvironment environment,
JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2,
int keyPosition1, int keyPosition2) {
super(environment, jobGraphBuilder, input1, input2);
this.keyPosition1 = keyPosition1;
this.keyPosition2 = keyPosition2;
}
/**
* Applies a reduce transformation on a {@link GroupedConnectedDataStream},
* and maps the outputs to a common type. The transformation calls
* {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
* each element of the first input and {@link CoReduceFunction#reduce2} and
* {@link CoReduceFunction#map2} for each element of the second input. For
* both inputs, the reducer is applied on every group of elements sharing
* the same key at the respective position. This type of reduce is much
* faster than reduceGroup since the reduce function can be applied
* incrementally. The user can also extend the {@link RichCoReduceFunction}
* to gain access to other features provided by the {@link RichFuntion}
* interface.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every
* element of the inputs.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoReduceFunction.class, 2);
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
}
/**
* Applies a reduceGroup transformation on the preset batches of the inputs
* of a {@link GroupedConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each batch of the first input
* and {@link CoGroupReduceFunction#reduce2} for each batch of the second
* input. For both inputs, the reducer is applied on every group of elements
* of every batch sharing the same key at the respective position. Each
* {@link CoGroupReduceFunction} call can return any number of elements
* including none. When the reducer has ran for all the values of a batch,
* the batch is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param batchSize1
* The number of elements in a batch of the first input.
* @param batchSize2
* The number of elements in a batch of the second input.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2) {
return batchReduceGroup(coReducer, batchSize1, batchSize2, batchSize1, batchSize2);
}
/**
* Applies a reduceGroup transformation on the preset batches of the inputs
* of a {@link GroupedConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each batch of the first input
* and {@link CoGroupReduceFunction#reduce2} for each batch of the second
* input. For both inputs, the reducer is applied on every group of elements
* of every batch sharing the same key at the respective position. Each
* {@link CoGroupReduceFunction} call can return any number of elements
* including none. When the reducer has ran for all the values of a batch,
* the batch is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param batchSize1
* The number of elements in a batch of the first input.
* @param batchSize2
* The number of elements in a batch of the second input.
* @param slideSize1
* The number of elements a batch of the first input is slid by.
* @param slideSize2
* The number of elements a batch of the second input is slid by.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2,
long slideSize1, long slideSize2) {
if (batchSize1 < 1 || batchSize2 < 1) {
throw new IllegalArgumentException("Batch size must be positive");
}
if (slideSize1 < 1 || slideSize2 < 1) {
throw new IllegalArgumentException("Slide size must be positive");
}
if (batchSize1 < slideSize1 || batchSize2 < slideSize2) {
throw new IllegalArgumentException("Batch size must be at least slide size");
}
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoGroupReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoGroupReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoGroupReduceFunction.class, 2);
return addCoFunction("coBatchReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoGroupedBatchGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
batchSize1, batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2));
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link GroupedConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. For both inputs, the reducer is applied on every group of elements
* of every window sharing the same key at the respective position. Each
* {@link CoGroupReduceFunction} call can return any number of elements
* including none. When the reducer has ran for all the values of a window,
* the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2) {
return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2);
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link GroupedConnectedDataStream}. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. For both inputs, the reducer is applied on every group of elements
* of every window sharing the same key at the respective position. Each
* {@link CoGroupReduceFunction} call can return any number of elements
* including none. When the reducer has ran for all the values of a window,
* the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @param slideInterval1
* The time interval a window of the first input is slid by.
* @param slideInterval2
* The time interval a window of the second input is slid by.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2) {
return windowReduceGroup(coReducer, windowSize1, windowSize2, slideInterval1,
slideInterval2, new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link GroupedConnectedDataStream}, where the time is
* provided by timestamps. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. For both inputs, the reducer is applied on every group of elements
* of every window sharing the same key at the respective position. Each
* {@link CoGroupReduceFunction} call can return any number of elements
* including none. When the reducer has ran for all the values of a window,
* the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @param timestamp1
* The predefined timestamp function of the first input.
* @param timestamp2
* The predefined timestamp function of the second input.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2,
timestamp1, timestamp2);
}
/**
* Applies a reduceGroup transformation on the preset time windows of the
* inputs of a {@link GroupedConnectedDataStream}, where the time is
* provided by timestamps. The transformation calls
* {@link CoGroupReduceFunction#reduce1} for each window of the first input
* and {@link CoGroupReduceFunction#reduce2} for each window of the second
* input. For both inputs, the reducer is applied on every group of elements
* of every window sharing the same key at the respective position. Each
* {@link CoGroupReduceFunction} call can return any number of elements
* including none. When the reducer has ran for all the values of a window,
* the window is slid forward. The user can also extend
* {@link RichCoGroupReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoGroupReduceFunction} that will be called for
* every batch of each input.
* @param windowSize1
* The size of the time window of the first input.
* @param windowSize2
* The size of the time window of the second input.
* @param slideInterval1
* The time interval a window of the first input is slid by.
* @param slideInterval2
* The time interval a window of the second input is slid by.
* @param timestamp1
* The predefined timestamp function of the first input.
* @param timestamp2
* The predefined timestamp function of the second input.
* @return The transformed {@link DataStream}.
*/
@Override
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
if (windowSize1 < 1 || windowSize2 < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
if (slideInterval1 < 1 || slideInterval2 < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
if (windowSize1 < slideInterval1 || windowSize2 < slideInterval2) {
throw new IllegalArgumentException("Window size must be at least slide interval");
}
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoGroupReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoGroupReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoGroupReduceFunction.class, 2);
return addCoFunction("coWindowReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoGroupedWindowGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
windowSize1, windowSize2, slideInterval1, slideInterval2, keyPosition1,
keyPosition2, timestamp1, timestamp2));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.co;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.util.Collector;
public interface CoGroupReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
void reduce1(Iterable<IN1> values, Collector<OUT> out) throws Exception;
void reduce2(Iterable<IN2> values, Collector<OUT> out) throws Exception;
}
......@@ -28,14 +28,14 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
int keyPosition;
Map<Object, StreamWindow> streamWindows;
List<Object> cleanList;
long currentMiniBatchCount = 0;
public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
long slideInterval, int keyPosition, TimeStamp<IN> timestamp) {
public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction,
long windowSize, long slideInterval, int keyPosition, TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval, timestamp);
this.keyPosition = keyPosition;
this.reducer = reduceFunction;
......@@ -48,7 +48,6 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
StreamWindow window = streamWindows.get(key);
if (window == null) {
window = new GroupedStreamWindow();
window.minibatchCounter = currentMiniBatchCount;
streamWindows.put(key, window);
}
this.window = window;
......@@ -62,26 +61,25 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
}
}
private void shiftGranularityAllWindows(){
private void shiftGranularityAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.circularList.newSlide();
window.minibatchCounter+=1;
}
}
private void slideAllWindows(){
private void slideAllWindows() {
currentMiniBatchCount -= batchPerSlide;
for (StreamBatch window : streamWindows.values()) {
window.circularList.shiftWindow(batchPerSlide);
}
}
}
private void reduceAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.minibatchCounter -= batchPerSlide;
window.reduceBatch();
}
}
protected class GroupedStreamWindow extends StreamWindow {
private static final long serialVersionUID = 1L;
......@@ -90,28 +88,35 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
super();
}
@Override
public void addToBuffer(IN nextValue) throws Exception {
checkWindowEnd(timestamp.getTimestamp(nextValue));
if (currentMiniBatchCount >= 0) {
circularList.add(nextValue);
}
}
@Override
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
shiftGranularityAllWindows();
currentMiniBatchCount += 1;
if (batchEnd()) {
reduceAllWindows();
slideAllWindows();
}
}
currentMiniBatchCount = this.minibatchCounter;
}
@Override
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
if (currentMiniBatchCount == numberOfBatches) {
return true;
}
return false;
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
public class CoBatchGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected long startCounter1;
protected long startCounter2;
protected long endCounter1;
protected long endCounter2;
public CoBatchGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
}
@Override
protected void handleStream1() throws Exception {
circularList1.add(reuse1);
if (windowStart1()) {
circularList1.newSlide();
}
if (windowEnd1()) {
reduce1();
circularList1.shiftWindow();
}
}
@Override
protected void handleStream2() throws Exception {
circularList2.add(reuse2);
if (windowStart2()) {
circularList2.newSlide();
}
if (windowEnd2()) {
reduce2();
circularList2.shiftWindow();
}
}
@Override
protected boolean windowStart1() throws Exception {
if (startCounter1 == slideInterval1) {
startCounter1 = 0;
return true;
}
return false;
}
@Override
protected boolean windowStart2() throws Exception {
if (startCounter2 == slideInterval2) {
startCounter2 = 0;
return true;
}
return false;
}
@Override
protected boolean windowEnd1() throws Exception {
if (endCounter1 == windowSize1) {
endCounter1 -= slideInterval1;
return true;
}
return false;
}
@Override
protected boolean windowEnd2() throws Exception {
if (endCounter2 == windowSize2) {
endCounter2 -= slideInterval2;
return true;
}
return false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
startCounter1 = 0;
startCounter2 = 0;
endCounter1 = 0;
endCounter2 = 0;
}
@Override
protected void initialize1() {
startCounter1++;
endCounter1++;
}
@Override
protected void initialize2() {
startCounter2++;
endCounter2++;
}
}
......@@ -17,45 +17,43 @@
package org.apache.flink.streaming.api.invokable.operator.co;
import java.util.Iterator;
import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.CircularFifoList;
import org.apache.flink.streaming.state.StreamIterator;
public abstract class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
public class CoGroupInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected CoGroupReduceFunction<IN1, IN2, OUT> coReducer;
protected StreamIterator<IN1> userIterator1;
protected StreamIterator<IN2> userIterator2;
protected Iterable<IN1> userIterable1;
protected Iterable<IN2> userIterable2;
protected long windowSize1;
protected long windowSize2;
protected long slideInterval1;
protected long slideInterval2;
protected CoGroupFunction<IN1, IN2, OUT> coGroupFunction;
protected long windowSize;
protected long slideSize;
protected CircularFifoList<StreamRecord<IN1>> circularList1;
protected CircularFifoList<StreamRecord<IN2>> circularList2;
protected long WindowStartTime1;
protected long WindowStartTime2;
protected long WindowEndTime1;
protected long WindowEndTime2;
public CoGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
super(reduceFunction);
this.coReducer = reduceFunction;
this.userIterator1 = new StreamIterator<IN1>();
this.userIterator2 = new StreamIterator<IN2>();
this.windowSize1 = windowSize1;
this.windowSize2 = windowSize2;
this.slideInterval1 = slideInterval1;
this.slideInterval2 = slideInterval2;
protected TimeStamp<IN1> timeStamp1;
protected TimeStamp<IN2> timeStamp2;
protected StreamWindow window;
protected long startTime;
protected long nextRecordTime;
public CoGroupInvokable(CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
super(coGroupFunction);
this.coGroupFunction = coGroupFunction;
this.windowSize = windowSize;
this.slideSize = slideInterval;
this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
this.startTime = timeStamp1.getStartTime();
this.window = new StreamWindow();
}
@Override
......@@ -65,75 +63,122 @@ public abstract class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<
@Override
protected void handleStream1() throws Exception {
while (windowStart1()) {
circularList1.newSlide();
}
while (windowEnd1()) {
reduce1();
circularList1.shiftWindow();
}
circularList1.add(reuse1);
window.addToBuffer1(reuse1.getObject());
}
@Override
protected void handleStream2() throws Exception {
while (windowStart2()) {
circularList2.newSlide();
}
while (windowEnd2()) {
reduce2();
circularList2.shiftWindow();
}
circularList2.add(reuse2);
window.addToBuffer2(reuse2.getObject());
}
protected void reduce1() throws Exception {
userIterator1.load(circularList1.getIterator());
callUserFunctionAndLogException1();
@Override
protected void callUserFunction() throws Exception {
if(!window.circularList1.isEmpty() || !window.circularList2.isEmpty()){
coGroupFunction.coGroup(window.getIterable1(), window.getIterable2(), collector);
}
}
protected void reduce2() throws Exception {
userIterator2.load(circularList2.getIterator());
callUserFunctionAndLogException2();
}
protected class StreamWindow implements Serializable {
private static final long serialVersionUID = 1L;
@Override
protected void callUserFunction1() throws Exception {
coReducer.reduce1(userIterable1, collector);
}
protected int granularity;
protected int batchPerSlide;
protected long numberOfBatches;
@Override
protected void callUserFunction2() throws Exception {
coReducer.reduce2(userIterable2, collector);
}
protected long minibatchCounter;
protected abstract boolean windowStart1() throws Exception;
protected CircularFifoList<IN1> circularList1;
protected CircularFifoList<IN2> circularList2;
protected abstract boolean windowStart2() throws Exception;
public StreamWindow() {
this.granularity = (int) MathUtils.gcd(windowSize, slideSize);
this.batchPerSlide = (int) (slideSize / granularity);
this.numberOfBatches = windowSize / granularity;
this.circularList1 = new CircularFifoList<IN1>();
this.circularList2 = new CircularFifoList<IN2>();
this.minibatchCounter = 0;
}
protected abstract boolean windowEnd1() throws Exception;
public void addToBuffer1(IN1 nextValue) throws Exception {
checkWindowEnd(timeStamp1.getTimestamp(nextValue));
if (minibatchCounter >= 0) {
circularList1.add(nextValue);
}
}
protected abstract boolean windowEnd2() throws Exception;
public void addToBuffer2(IN2 nextValue) throws Exception {
checkWindowEnd(timeStamp2.getTimestamp(nextValue));
if (minibatchCounter >= 0) {
circularList2.add(nextValue);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
userIterable1 = new BatchIterable1();
userIterable2 = new BatchIterable2();
}
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
circularList1.newSlide();
circularList2.newSlide();
minibatchCounter++;
if (windowEnd()) {
callUserFunctionAndLogException();
circularList1.shiftWindow(batchPerSlide);
circularList2.shiftWindow(batchPerSlide);
}
}
}
protected boolean miniBatchEnd() {
if (nextRecordTime < startTime + granularity) {
return false;
} else {
startTime += granularity;
return true;
}
}
public boolean windowEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
public void reduceLastBatch() {
if (!miniBatchEnd()) {
callUserFunctionAndLogException();
}
}
public Iterable<IN1> getIterable1() {
return circularList1.getIterable();
}
public Iterable<IN2> getIterable2() {
return circularList2.getIterable();
}
protected class BatchIterable1 implements Iterable<IN1> {
@Override
public Iterator<IN1> iterator() {
return userIterator1;
public String toString() {
return circularList1.toString();
}
}
protected class BatchIterable2 implements Iterable<IN2> {
@Override
public Iterator<IN2> iterator() {
return userIterator2;
@Override
public void close() {
if (!window.miniBatchEnd()) {
callUserFunctionAndLogException();
}
}
@Override
protected void callUserFunction1() throws Exception {
}
@Override
protected void callUserFunction2() throws Exception {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.MutableTableState;
public class CoGroupedBatchGroupReduceInvokable<IN1, IN2, OUT> extends
CoBatchGroupReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private int keyPosition1;
private int keyPosition2;
private Iterator<StreamRecord<IN1>> iterator1;
private Iterator<StreamRecord<IN2>> iterator2;
private MutableTableState<Object, List<IN1>> values1;
private MutableTableState<Object, List<IN2>> values2;
private IN1 nextValue1;
private IN2 nextValue2;
public CoGroupedBatchGroupReduceInvokable(
CoGroupReduceFunction<IN1, IN2, OUT> coReduceFunction, long batchSize1,
long batchSize2, long slideSize1, long slideSize2, int keyPosition1, int keyPosition2) {
super(coReduceFunction, batchSize1, batchSize2, slideSize1, slideSize2);
this.keyPosition1 = keyPosition1;
this.keyPosition2 = keyPosition2;
values1 = new MutableTableState<Object, List<IN1>>();
values2 = new MutableTableState<Object, List<IN2>>();
}
@Override
protected void reduce1() {
iterator1 = circularList1.getIterator();
while (iterator1.hasNext()) {
StreamRecord<IN1> nextRecord = iterator1.next();
Object key = nextRecord.getField(keyPosition1);
nextValue1 = nextRecord.getObject();
List<IN1> group = values1.get(key);
if (group != null) {
group.add(nextValue1);
} else {
group = new ArrayList<IN1>();
group.add(nextValue1);
values1.put(key, group);
}
}
for (List<IN1> group : values1.values()) {
userIterable1 = group;
callUserFunctionAndLogException1();
}
values1.clear();
}
@Override
protected void reduce2() {
iterator2 = circularList2.getIterator();
while (iterator2.hasNext()) {
StreamRecord<IN2> nextRecord = iterator2.next();
Object key = nextRecord.getField(keyPosition2);
nextValue2 = nextRecord.getObject();
List<IN2> group = values2.get(key);
if (group != null) {
group.add(nextValue2);
} else {
group = new ArrayList<IN2>();
group.add(nextValue2);
values2.put(key, group);
}
}
for (List<IN2> group : values2.values()) {
userIterable2 = group;
callUserFunctionAndLogException2();
}
values2.clear();
}
}
......@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.state.MutableTableState;
public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoStreamReduceInvokable<IN1, IN2, OUT> {
public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private int keyPosition1;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.MutableTableState;
public class CoGroupedWindowGroupReduceInvokable<IN1, IN2, OUT> extends
CoWindowGroupReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private int keyPosition1;
private int keyPosition2;
private Iterator<StreamRecord<IN1>> iterator1;
private Iterator<StreamRecord<IN2>> iterator2;
private MutableTableState<Object, List<IN1>> values1;
private MutableTableState<Object, List<IN2>> values2;
private IN1 nextValue1;
private IN2 nextValue2;
public CoGroupedWindowGroupReduceInvokable(
CoGroupReduceFunction<IN1, IN2, OUT> coReduceFunction, long windowSize1,
long windowSize2, long slideInterval1, long slideInterval2, int keyPosition1,
int keyPosition2, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
super(coReduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2,
timestamp1, timestamp2);
this.keyPosition1 = keyPosition1;
this.keyPosition2 = keyPosition2;
values1 = new MutableTableState<Object, List<IN1>>();
values2 = new MutableTableState<Object, List<IN2>>();
}
@Override
protected void reduce1() {
iterator1 = circularList1.getIterator();
while (iterator1.hasNext()) {
StreamRecord<IN1> nextRecord = iterator1.next();
Object key = nextRecord.getField(keyPosition1);
nextValue1 = nextRecord.getObject();
List<IN1> group = values1.get(key);
if (group != null) {
group.add(nextValue1);
} else {
group = new ArrayList<IN1>();
group.add(nextValue1);
values1.put(key, group);
}
}
for (List<IN1> group : values1.values()) {
userIterable1 = group;
callUserFunctionAndLogException1();
}
values1.clear();
}
@Override
protected void reduce2() {
iterator2 = circularList2.getIterator();
while (iterator2.hasNext()) {
StreamRecord<IN2> nextRecord = iterator2.next();
Object key = nextRecord.getField(keyPosition2);
nextValue2 = nextRecord.getObject();
List<IN2> group = values2.get(key);
if (group != null) {
group.add(nextValue2);
} else {
group = new ArrayList<IN2>();
group.add(nextValue2);
values2.put(key, group);
}
}
for (List<IN2> group : values2.values()) {
userIterable2 = group;
callUserFunctionAndLogException2();
}
values2.clear();
}
}
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
public class CoStreamReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected CoReduceFunction<IN1, IN2, OUT> coReducer;
......@@ -28,7 +28,7 @@ public class CoStreamReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2
protected IN1 nextValue1 = null;
protected IN2 nextValue2 = null;
public CoStreamReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
super(coReducer);
this.coReducer = coReducer;
currentValue1 = null;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
public class CoWindowGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected long startTime1;
protected long startTime2;
protected long endTime1;
protected long endTime2;
protected long currentTime;
protected TimeStamp<IN1> timestamp1;
protected TimeStamp<IN2> timestamp2;
public CoWindowGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timestamp1 = timestamp1;
this.timestamp2 = timestamp2;
startTime1 = timestamp1.getStartTime();
startTime2 = timestamp2.getStartTime();
endTime1 = startTime1 + windowSize1;
endTime2 = startTime2 + windowSize2;
}
@Override
protected boolean windowStart1() throws Exception {
if (currentTime - startTime1 >= slideInterval1) {
startTime1 += slideInterval1;
return true;
}
return false;
}
@Override
protected boolean windowStart2() throws Exception {
if (currentTime - startTime2 >= slideInterval2) {
startTime2 += slideInterval2;
return true;
}
return false;
}
@Override
protected boolean windowEnd1() throws Exception {
if (currentTime >= endTime1) {
endTime1 += slideInterval1;
return true;
}
return false;
}
@Override
protected boolean windowEnd2() throws Exception {
if (currentTime >= endTime2) {
endTime2 += slideInterval2;
return true;
}
return false;
}
@Override
protected void initialize1() {
currentTime = timestamp1.getTimestamp(reuse1.getObject());
}
@Override
protected void initialize2() {
currentTime = timestamp2.getTimestamp(reuse2.getObject());
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
}
......@@ -82,7 +82,9 @@ public class CircularFifoList<T> implements Serializable {
return iterable;
}
private class ListIterable implements Iterable<T> {
private class ListIterable implements Iterable<T>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public Iterator<T> iterator() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchGroupReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class CoBatchGroupReduceTest {
public static final class MyCoGroupReduceFunction implements
CoGroupReduceFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@Override
public void reduce1(Iterable<Integer> values, Collector<String> out) throws Exception {
String gather = "";
for (Integer value : values) {
gather += value.toString();
}
out.collect(gather);
}
@Override
public void reduce2(Iterable<String> values, Collector<String> out) throws Exception {
String gather = "";
for (String value : values) {
gather += value;
}
out.collect(gather);
}
}
@Test
public void coBatchGroupReduceTest1() {
List<Integer> inputs1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<String> inputs2 = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
CoBatchGroupReduceInvokable<Integer, String, String> invokable = new CoBatchGroupReduceInvokable<Integer, String, String>(
new MyCoGroupReduceFunction(), 4L, 2L, 4L, 2L);
List<String> expected = Arrays.asList("1234", "5678", "ab", "cd", "ef", "gh");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
@Test
public void coBatchGroupReduceTest2() {
List<Integer> inputs1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<String> inputs2 = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
CoBatchGroupReduceInvokable<Integer, String, String> invokable = new CoBatchGroupReduceInvokable<Integer, String, String>(
new MyCoGroupReduceFunction(), 4L, 2L, 3L, 1L);
List<String> expected = Arrays.asList("1234", "4567", "78910", "ab", "bc", "cd", "de",
"ef", "fg", "gh");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchGroupReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class CoGroupedBatchGroupReduceTest {
public static final class MyCoGroupReduceFunction implements
CoGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> {
private static final long serialVersionUID = 1L;
@Override
public void reduce1(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
throws Exception {
String gather = "";
for (Tuple2<Integer, Integer> value : values) {
gather += value.f1.toString();
}
out.collect(gather);
}
@Override
public void reduce2(Iterable<Tuple2<Integer, String>> values, Collector<String> out)
throws Exception {
String gather = "";
for (Tuple2<Integer, String> value : values) {
gather += value.f1;
}
out.collect(gather);
}
}
final static int KEY_POSITION1 = 0;
final static int KEY_POSITION2 = 0;
@Test
public void coGroupedBatchGroupReduceTest1() {
List<Tuple2<Integer, Integer>> inputs1 = new ArrayList<Tuple2<Integer, Integer>>();
for (Integer i = 1; i <= 10; i++) {
inputs1.add(new Tuple2<Integer, Integer>(i % 3, i));
}
List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
for (char ch = 'a'; ch <= 'h'; ch++) {
inputs2.add(new Tuple2<Integer, String>(((int) ch) % 3, ch + ""));
}
CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> invokable = new CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String>(
new MyCoGroupReduceFunction(), 5L, 4L, 5L, 4L, KEY_POSITION1, KEY_POSITION2);
List<String> expected = Arrays.asList("14", "25", "3", "69", "710", "8", "ad", "b", "c",
"eh", "f", "g");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
@Test
public void coGroupedBatchGroupReduceTest2() {
List<Tuple2<Integer, Integer>> inputs1 = new ArrayList<Tuple2<Integer, Integer>>();
for (Integer i = 1; i <= 10; i++) {
inputs1.add(new Tuple2<Integer, Integer>(i % 3, i));
}
List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
for (char ch = 'a'; ch <= 'h'; ch++) {
inputs2.add(new Tuple2<Integer, String>(((int) ch) % 3, ch + ""));
}
CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> invokable = new CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String>(
new MyCoGroupReduceFunction(), 6L, 6L, 3L, 2L, KEY_POSITION1, KEY_POSITION2);
List<String> expected = Arrays.asList("14", "25", "36", "47", "58", "69", "ad", "be", "cf",
"cf", "dg", "eh");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class CoGroupedWindowGroupReduceTest {
public static final class MyCoGroupReduceFunction1 implements
CoGroupReduceFunction<Character, Character, String> {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
@Override
public void reduce1(Iterable<Character> values, Collector<String> out) throws Exception {
Integer gather = 0;
Character ch = values.iterator().next();
for (Character value : values) {
gather++;
}
out.collect(ch + ":" + gather);
}
@SuppressWarnings("unused")
@Override
public void reduce2(Iterable<Character> values, Collector<String> out) throws Exception {
Integer gather = 0;
Character ch = values.iterator().next();
for (Character value : values) {
gather++;
}
out.collect(ch + ":" + gather);
}
}
public static final class MyCoGroupReduceFunction2 implements
CoGroupReduceFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> {
private static final long serialVersionUID = 1L;
@Override
public void reduce1(Iterable<Tuple2<String, Integer>> values, Collector<String> out)
throws Exception {
String gather = "";
for (Tuple2<String, Integer> value : values) {
gather += value.f0;
}
out.collect(gather);
}
@Override
public void reduce2(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
throws Exception {
Integer gather = 0;
for (Tuple2<Integer, Integer> value : values) {
gather += value.f0;
}
out.collect(gather.toString());
}
}
public static final class MyTimeStamp<T> implements TimeStamp<T> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
private long start;
public MyTimeStamp(List<Long> timestamps) {
this.timestamps = timestamps.iterator();
this.start = timestamps.get(0);
}
@Override
public long getTimestamp(T value) {
long ts = timestamps.next();
return ts;
}
@Override
public long getStartTime() {
return start;
}
}
@Test
public void coGroupedWindowGroupReduceTest1() {
List<Character> inputs1 = new ArrayList<Character>();
inputs1.add('a');
inputs1.add('b');
inputs1.add('c');
inputs1.add('a');
inputs1.add('a');
inputs1.add('c');
inputs1.add('b');
inputs1.add('c');
inputs1.add('a');
inputs1.add('a');
inputs1.add('x');
List<Character> inputs2 = new ArrayList<Character>();
inputs2.add('a');
inputs2.add('d');
inputs2.add('d');
inputs2.add('e');
inputs2.add('d');
inputs2.add('e');
inputs2.add('e');
inputs2.add('a');
inputs2.add('a');
inputs2.add('x');
List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 8L, 10L, 11L);
List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L, 7L, 7L, 8L, 8L, 10L);
CoGroupedWindowGroupReduceInvokable<Character, Character, String> invokable = new CoGroupedWindowGroupReduceInvokable<Character, Character, String>(
new MyCoGroupReduceFunction1(), 5L, 5L, 3L, 5L, 0, 0, new MyTimeStamp<Character>(
timestamps1), new MyTimeStamp<Character>(timestamps2));
List<String> expected = new ArrayList<String>();
expected.add("a:3");
expected.add("b:2");
expected.add("c:3");
expected.add("c:1");
expected.add("a:2");
expected.add("a:1");
expected.add("a:2");
expected.add("d:3");
expected.add("e:3");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
@Test
public void coGroupedWindowGroupReduceTest2() {
List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
inputs1.add(new Tuple2<String, Integer>("a", 1));
inputs1.add(new Tuple2<String, Integer>("b", 1));
inputs1.add(new Tuple2<String, Integer>("c", 0));
inputs1.add(new Tuple2<String, Integer>("d", 0));
inputs1.add(new Tuple2<String, Integer>("e", 1));
inputs1.add(new Tuple2<String, Integer>("f", 1));
inputs1.add(new Tuple2<String, Integer>("g", 0));
inputs1.add(new Tuple2<String, Integer>("h", 0));
inputs1.add(new Tuple2<String, Integer>("i", 1));
inputs1.add(new Tuple2<String, Integer>("j", 1));
List<Tuple2<Integer, Integer>> inputs2 = new ArrayList<Tuple2<Integer, Integer>>();
inputs2.add(new Tuple2<Integer, Integer>(1, 1));
inputs2.add(new Tuple2<Integer, Integer>(2, 2));
inputs2.add(new Tuple2<Integer, Integer>(3, 1));
inputs2.add(new Tuple2<Integer, Integer>(4, 2));
inputs2.add(new Tuple2<Integer, Integer>(5, 1));
inputs2.add(new Tuple2<Integer, Integer>(6, 2));
inputs2.add(new Tuple2<Integer, Integer>(7, 1));
inputs2.add(new Tuple2<Integer, Integer>(8, 2));
inputs2.add(new Tuple2<Integer, Integer>(9, 1));
inputs2.add(new Tuple2<Integer, Integer>(10, 2));
List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 4L, 7L);
List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L, 7L, 7L, 8L, 8L, 10L);
CoGroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String>(
new MyCoGroupReduceFunction2(), 2L, 4L, 2L, 2L, 1, 1,
new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
new MyTimeStamp<Tuple2<Integer, Integer>>(timestamps2));
List<String> expected = new ArrayList<String>();
expected.add("ab");
expected.add("cd");
expected.add("ef");
expected.add("gh");
expected.add("i");
expected.add("1");
expected.add("3");
expected.add("2");
expected.add("15");
expected.add("12");
expected.add("21");
expected.add("18");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
}
......@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
......@@ -58,7 +58,7 @@ public class CoStreamReduceTest {
@Test
public void coStreamReduceTest() {
CoStreamReduceInvokable<Integer, String, Integer> coReduce = new CoStreamReduceInvokable<Integer, String, Integer>(
CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>(
new MyCoReduceFunction());
List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
......
......@@ -20,237 +20,170 @@ package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class CoWindowGroupReduceTest {
public class CoWindowGroupReduceTest{
public static final class MyCoGroupReduceFunction1 implements
CoGroupReduceFunction<Long, Integer, String> {
private static final long serialVersionUID = 1L;
public static final class MyCoGroup1 implements CoGroupFunction<Integer, Integer, Integer> {
@Override
public void reduce1(Iterable<Long> values, Collector<String> out) throws Exception {
Long gather = 0L;
for (Long value : values) {
gather += value;
}
out.collect(gather.toString());
}
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
@Override
public void reduce2(Iterable<Integer> values, Collector<String> out) throws Exception {
Integer gather = 0;
for (Integer value : values) {
gather++;
public void coGroup(Iterable<Integer> first, Iterable<Integer> second,
Collector<Integer> out) throws Exception {
Integer count1 = 0;
for (Integer i : first) {
count1++;
}
out.collect(gather.toString());
Integer count2 = 0;
for (Integer i : second) {
count2++;
}
out.collect(count1);
out.collect(count2);
}
}
public static final class MyCoGroupReduceFunction2 implements
CoGroupReduceFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> {
public static final class MyCoGroup2 implements
CoGroupFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
private static final long serialVersionUID = 1L;
@Override
public void reduce1(Iterable<Tuple2<String, Integer>> values, Collector<String> out)
throws Exception {
String gather = "";
for (Tuple2<String, Integer> value : values) {
gather += value.f0;
}
out.collect(gather);
}
public void coGroup(Iterable<Tuple2<Integer, Integer>> first,
Iterable<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
@Override
public void reduce2(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
throws Exception {
Integer gather = 0;
for (Tuple2<Integer, Integer> value : values) {
gather += value.f0;
Set<Integer> firstElements = new HashSet<Integer>();
for (Tuple2<Integer, Integer> value : first) {
firstElements.add(value.f1);
}
for (Tuple2<Integer, Integer> value : second) {
if (firstElements.contains(value.f1)) {
out.collect(value.f1);
}
}
out.collect(gather.toString());
}
}
public static final class MyTimeStamp1 implements TimeStamp<Long> {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Long value) {
return value;
}
@Override
public long getStartTime() {
return 0L;
}
}
public static final class MyTimeStamp2 implements TimeStamp<Integer> {
private static final class MyTS1 implements TimeStamp<Integer> {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value.longValue();
return value;
}
@Override
public long getStartTime() {
return 0L;
}
}
public static final class MyTimeStamp3 implements TimeStamp<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<String, Integer> value) {
return value.f1.longValue();
return 1;
}
@Override
public long getStartTime() {
return 0L;
}
}
public static final class MyTimeStamp4 implements TimeStamp<Tuple2<Integer, Integer>> {
private static final class MyTS2 implements TimeStamp<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<Integer, Integer> value) {
return value.f1.longValue();
return value.f0;
}
@Override
public long getStartTime() {
return 0L;
return 1;
}
}
@Test
public void coWindowGroupReduceTest1() {
List<Long> inputs1 = new ArrayList<Long>();
inputs1.add(0L);
inputs1.add(2L);
inputs1.add(2L);
inputs1.add(3L);
inputs1.add(4L);
inputs1.add(5L);
inputs1.add(6L);
inputs1.add(6L);
inputs1.add(6L);
inputs1.add(8L);
inputs1.add(14L);
inputs1.add(15L);
inputs1.add(15L);
List<Integer> inputs2 = new ArrayList<Integer>();
inputs2.add(0);
inputs2.add(0);
inputs2.add(5);
inputs2.add(7);
inputs2.add(7);
inputs2.add(7);
inputs2.add(8);
inputs2.add(8);
inputs2.add(8);
inputs2.add(14);
inputs2.add(14);
inputs2.add(15);
inputs2.add(16);
CoWindowGroupReduceInvokable<Long, Integer, String> invokable = new CoWindowGroupReduceInvokable<Long, Integer, String>(
new MyCoGroupReduceFunction1(), 3L, 4L, 2L, 2L, new MyTimeStamp1(),
new MyTimeStamp2());
List<String> expected = new ArrayList<String>();
expected.add("4");
expected.add("11");
expected.add("27");
expected.add("26");
expected.add("8");
expected.add("0");
expected.add("14");
expected.add("2");
expected.add("1");
expected.add("4");
expected.add("6");
expected.add("3");
expected.add("0");
expected.add("3");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
}
@Test
public void coWindowGroupReduceTest2() {
List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
inputs1.add(new Tuple2<String, Integer>("I", 1));
inputs1.add(new Tuple2<String, Integer>("t", 2));
inputs1.add(new Tuple2<String, Integer>("i", 4));
inputs1.add(new Tuple2<String, Integer>("s", 5));
inputs1.add(new Tuple2<String, Integer>("a", 7));
inputs1.add(new Tuple2<String, Integer>("l", 7));
inputs1.add(new Tuple2<String, Integer>("l", 8));
inputs1.add(new Tuple2<String, Integer>("o", 10));
inputs1.add(new Tuple2<String, Integer>("k", 11));
inputs1.add(new Tuple2<String, Integer>("a", 11));
inputs1.add(new Tuple2<String, Integer>("y", 11));
inputs1.add(new Tuple2<String, Integer>("!", 11));
inputs1.add(new Tuple2<String, Integer>(" ", 12));
List<Tuple2<Integer, Integer>> inputs2 = new ArrayList<Tuple2<Integer, Integer>>();
inputs2.add(new Tuple2<Integer, Integer>(10, 1));
inputs2.add(new Tuple2<Integer, Integer>(10, 2));
inputs2.add(new Tuple2<Integer, Integer>(20, 2));
inputs2.add(new Tuple2<Integer, Integer>(30, 2));
inputs2.add(new Tuple2<Integer, Integer>(10, 3));
inputs2.add(new Tuple2<Integer, Integer>(30, 4));
inputs2.add(new Tuple2<Integer, Integer>(40, 5));
inputs2.add(new Tuple2<Integer, Integer>(30, 6));
inputs2.add(new Tuple2<Integer, Integer>(20, 7));
inputs2.add(new Tuple2<Integer, Integer>(20, 7));
inputs2.add(new Tuple2<Integer, Integer>(10, 7));
inputs2.add(new Tuple2<Integer, Integer>(10, 8));
inputs2.add(new Tuple2<Integer, Integer>(30, 9));
inputs2.add(new Tuple2<Integer, Integer>(30, 10));
CoWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> invokable = new CoWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String>(
new MyCoGroupReduceFunction2(), 3L, 3L, 3L, 2L, new MyTimeStamp3(),
new MyTimeStamp4());
List<String> expected = new ArrayList<String>();
expected.add("It");
expected.add("is");
expected.add("all");
expected.add("okay!");
expected.add("70");
expected.add("100");
expected.add("100");
expected.add("90");
List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(expected);
Collections.sort(actualList);
assertEquals(expected, actualList);
public void coWindowGroupReduceTest2() throws Exception {
CoGroupInvokable<Integer, Integer, Integer> invokable1 = new CoGroupInvokable<Integer, Integer, Integer>(
new MyCoGroup1(), 2, 1, new MyTS1(), new MyTS1());
// Windowsize 2, slide 1
// 1,2|2,3|3,4|4,5
List<Integer> input11 = new ArrayList<Integer>();
input11.add(1);
input11.add(1);
input11.add(2);
input11.add(3);
input11.add(3);
List<Integer> input12 = new ArrayList<Integer>();
input12.add(1);
input12.add(2);
input12.add(3);
input12.add(3);
input12.add(5);
// Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
// expected output: 3,2|3,3|2,2|0,1
List<Integer> expected1 = new ArrayList<Integer>();
expected1.add(3);
expected1.add(2);
expected1.add(3);
expected1.add(3);
expected1.add(2);
expected1.add(2);
expected1.add(0);
expected1.add(1);
List<Integer> actual1 = MockCoInvokable.createAndExecute(invokable1, input11, input12);
assertEquals(expected1, actual1);
CoGroupInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoGroupInvokable<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Integer>(new MyCoGroup2(), 2, 3, new MyTS2(), new MyTS2());
//WindowSize 2, slide 3
//1,2|4,5|7,8|
List<Tuple2<Integer,Integer>> input21 = new ArrayList<Tuple2<Integer,Integer>>();
input21.add(new Tuple2<Integer, Integer>(1,1));
input21.add(new Tuple2<Integer, Integer>(1,2));
input21.add(new Tuple2<Integer, Integer>(2,3));
input21.add(new Tuple2<Integer, Integer>(3,4));
input21.add(new Tuple2<Integer, Integer>(3,5));
input21.add(new Tuple2<Integer, Integer>(4,6));
input21.add(new Tuple2<Integer, Integer>(4,7));
input21.add(new Tuple2<Integer, Integer>(5,8));
List<Tuple2<Integer,Integer>> input22 = new ArrayList<Tuple2<Integer,Integer>>();
input22.add(new Tuple2<Integer, Integer>(1,1));
input22.add(new Tuple2<Integer, Integer>(2,0));
input22.add(new Tuple2<Integer, Integer>(2,2));
input22.add(new Tuple2<Integer, Integer>(3,9));
input22.add(new Tuple2<Integer, Integer>(3,4));
input22.add(new Tuple2<Integer, Integer>(4,10));
input22.add(new Tuple2<Integer, Integer>(5,8));
input22.add(new Tuple2<Integer, Integer>(5,7));
List<Integer> expected2 = new ArrayList<Integer>();
expected2.add(1);
expected2.add(2);
expected2.add(8);
expected2.add(7);
List<Integer> actual2 = MockCoInvokable.createAndExecute(invokable2, input21, input22);
assertEquals(expected2, actual2);
}
}
......@@ -158,6 +158,7 @@ public class MockCoInvokable<IN1, IN2, OUT> {
try {
invokable.open(null);
invokable.invoke();
invokable.close();
} catch (Exception e) {
throw new RuntimeException("Cannot invoke invokable.", e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册