提交 b8e8bd7a 编写于 作者: S szape 提交者: mbalassi

[streaming] Added CoBatchedDataStream and CoWindowDataStream with reduce...

[streaming] Added CoBatchedDataStream and CoWindowDataStream with reduce functionality + CoBatchReduce bugfix

[streaming] RAT & Checkstyle fix
上级 a8137850
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
/**
* A {@link CoBatchedDataStream} represents a two data stream whose elements are
* batched together in sliding batches. Operation
* {@link #reduce(ReduceFunction)} can be applied for each batch and the batch
* is slid afterwards.
*
* @param <IN1>
* The type of the first input data stream
* @param <IN2>
* The type of the second input data stream
*/
public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
protected long batchSize1;
protected long batchSize2;
protected long slideSize1;
protected long slideSize2;
protected CoBatchedDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2,
long batchSize1, long batchSize2, long slideSize1, long slideSize2) {
super(dataStream1, dataStream2);
this.batchSize1 = batchSize1;
this.batchSize2 = batchSize2;
this.slideSize1 = slideSize1;
this.slideSize2 = slideSize2;
}
protected CoBatchedDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long batchSize1,
long batchSize2, long slideSize1, long slideSize2) {
super(coDataStream);
this.batchSize1 = batchSize1;
this.batchSize2 = batchSize2;
this.slideSize1 = slideSize1;
this.slideSize2 = slideSize2;
}
protected CoBatchedDataStream(CoBatchedDataStream<IN1, IN2> coBatchedDataStream) {
super(coBatchedDataStream);
this.batchSize1 = coBatchedDataStream.batchSize1;
this.batchSize2 = coBatchedDataStream.batchSize2;
this.slideSize1 = coBatchedDataStream.slideSize1;
this.slideSize2 = coBatchedDataStream.slideSize2;
}
/**
* Groups the elements of the {@link CoBatchedDataStream} by the given key
* positions to be used with grouped operators.
*
* @param keyPosition1
* The position of the field on which the first input data stream
* will be grouped.
* @param keyPosition2
* The position of the field on which the second input data
* stream will be grouped.
* @return The transformed {@link CoBatchedDataStream}
*/
public CoBatchedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2);
}
@Override
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoBatchReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2);
} else {
invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
batchSize2, slideSize1, slideSize2);
}
return invokable;
}
protected CoBatchedDataStream<IN1, IN2> copy() {
return new CoBatchedDataStream<IN1, IN2>(this);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
/**
* A {@link CoWindowDataStream} represents two data streams whose elements are
* batched together into sliding windows. Operation
* {@link #reduce(CoReduceFunction)} can be applied for each window.
*
* @param <IN1>
* The type of the first input data stream
* @param <IN2>
* The type of the second input data stream
*/
public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> {
TimeStamp<IN1> timeStamp1;
TimeStamp<IN2> timeStamp2;
protected CoWindowDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
}
protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long windowSize1,
long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1,
TimeStamp<IN2> timeStamp2) {
super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
}
protected CoWindowDataStream(CoWindowDataStream<IN1, IN2> coWindowDataStream) {
super(coWindowDataStream);
this.timeStamp1 = coWindowDataStream.timeStamp1;
this.timeStamp2 = coWindowDataStream.timeStamp2;
}
public CoWindowDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2,
timeStamp1, timeStamp2);
}
@Override
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2, timeStamp1,
timeStamp2);
} else {
invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(coReducer, batchSize1,
batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2);
}
return invokable;
}
protected CoWindowDataStream<IN1, IN2> copy() {
return new CoWindowDataStream<IN1, IN2>(this);
}
}
......@@ -49,16 +49,16 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
* {@link DataStream}s
*
* @param <IN1>
* Type of the first DataSteam.
* Type of the first input data steam.
* @param <IN2>
* Type of the second DataStream.
* Type of the second input data stream.
*/
public class ConnectedDataStream<IN1, IN2> {
protected StreamExecutionEnvironment environment;
protected JobGraphBuilder jobGraphBuilder;
protected DataStream<IN1> input1;
protected DataStream<IN2> input2;
protected DataStream<IN1> dataStream1;
protected DataStream<IN2> dataStream2;
protected boolean isGrouped;
protected int keyPosition1;
......@@ -67,8 +67,8 @@ public class ConnectedDataStream<IN1, IN2> {
protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> input2) {
this.jobGraphBuilder = input1.jobGraphBuilder;
this.environment = input1.environment;
this.input1 = input1.copy();
this.input2 = input2.copy();
this.dataStream1 = input1.copy();
this.dataStream2 = input2.copy();
if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
this.isGrouped = true;
......@@ -76,16 +76,28 @@ public class ConnectedDataStream<IN1, IN2> {
this.keyPosition2 = ((GroupedDataStream<IN2>) input2).keyPosition;
} else {
this.isGrouped = false;
this.keyPosition1 = 0;
this.keyPosition2 = 0;
}
}
protected ConnectedDataStream(ConnectedDataStream<IN1, IN2> coDataStream) {
this.jobGraphBuilder = coDataStream.jobGraphBuilder;
this.environment = coDataStream.environment;
this.dataStream1 = coDataStream.getFirst();
this.dataStream2 = coDataStream.getSecond();
this.isGrouped = coDataStream.isGrouped;
this.keyPosition1 = coDataStream.keyPosition1;
this.keyPosition2 = coDataStream.keyPosition2;
}
/**
* Returns the first {@link DataStream}.
*
* @return The first DataStream.
*/
public DataStream<IN1> getFirst() {
return input1.copy();
return dataStream1.copy();
}
/**
......@@ -94,7 +106,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The second DataStream.
*/
public DataStream<IN2> getSecond() {
return input2.copy();
return dataStream2.copy();
}
/**
......@@ -103,7 +115,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The type of the first input
*/
public TypeInformation<IN1> getInputType1() {
return input1.getOutputType();
return dataStream1.getOutputType();
}
/**
......@@ -112,7 +124,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The type of the second input
*/
public TypeInformation<IN2> getInputType2() {
return input2.getOutputType();
return dataStream2.getOutputType();
}
/**
......@@ -127,15 +139,178 @@ public class ConnectedDataStream<IN1, IN2> {
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return Returns the {@link GroupedConnectedDataStream} created.
* @return @return The transformed {@link ConnectedDataStream}
*/
public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
if (keyPosition1 < 0 || keyPosition2 < 0) {
throw new IllegalArgumentException("The position of the field must be non-negative");
}
return new ConnectedDataStream<IN1, IN2>(input1.groupBy(keyPosition1),
input2.groupBy(keyPosition2));
return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1),
dataStream2.groupBy(keyPosition2));
}
/**
* Batch operation for connected data stream. Collects each input data
* stream's elements simultaneously into sliding batches creating a new
* {@link CoBatchedDataStream}. Then the user can apply
* {@link CoBatchedDataStream#reduce} transformation on the
* {@link CoBatchedDataStream}.
*
* @param batchSize1
* The number of elements in each batch of the first input data
* stream
* @param batchSize2
* The number of elements in each batch of the second input data
* stream
* @param slideSize1
* The number of elements with which the batches of the first
* input data stream are slid by after each transformation.
* @param slideSize2
* The number of elements with which the batches of the second
* input data stream are slid by after each transformation.
* @return The transformed {@link ConnectedDataStream}
*/
public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long batchSize2, long slideSize1,
long slideSize2) {
if (batchSize1 < 1 || batchSize2 < 1) {
throw new IllegalArgumentException("Batch size must be positive");
}
if (slideSize1 < 1 || slideSize2 < 1) {
throw new IllegalArgumentException("Slide size must be positive");
}
return new CoBatchedDataStream<IN1, IN2>(this, batchSize1, batchSize2, slideSize1,
slideSize2);
}
/**
* Batch operation for connected data stream. Collects each input data
* stream's elements simultaneously into batches creating a new
* {@link CoBatchedDataStream}. Then the user can apply
* {@link CoBatchedDataStream#reduce} transformation on the
* {@link CoBatchedDataStream}.
*
* @param batchSize1
* The number of elements in each batch of the first input data
* stream
* @param batchSize2
* The number of elements in each batch of the second input data
* stream
* @return The transformed {@link ConnectedDataStream}
*/
public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long batchSize2) {
return batch(batchSize1, batchSize2, batchSize1, batchSize2);
}
/**
* Window operation for connected data stream. Collects each input data
* stream's elements simultaneously into sliding windows creating a new
* {@link CoWindowDataStream}. Then the user can apply
* {@link WindowDataStream#reduce} transformation on the
* {@link CoWindowDataStream}. The user can implement their own time stamps
* or use the system time by default.
*
* @param windowSize1
* The length of the window of the first input data stream
* @param windowSize2
* The length of the window of the second input data stream
* @param slideInterval1
* The number of milliseconds with which the windows of the first
* input data stream are slid by after each transformation
* @param slideInterval2
* The number of milliseconds with which the windows of the
* second input data stream are slid by after each transformation
* @param timeStamp1
* User defined function for extracting time-stamps from each
* element of the first input data stream
* @param timeStamp2
* User defined function for extracting time-stamps from each
* element of the second input data stream
* @return The transformed {@link ConnectedDataStream}
*/
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1,
TimeStamp<IN2> timeStamp2) {
if (windowSize1 < 1 || windowSize2 < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
if (slideInterval1 < 1 || slideInterval2 < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
return new CoWindowDataStream<IN1, IN2>(this, windowSize1, windowSize2, slideInterval1,
slideInterval2, timeStamp1, timeStamp2);
}
/**
* Window operation for connected data stream. Collects each input data
* stream's elements simultaneously into sliding windows creating a new
* {@link CoWindowDataStream}. Then the user can apply
* {@link WindowDataStream#reduce} transformation on the
* {@link CoWindowDataStream}.
*
* @param windowSize1
* The length of the window of the first input data stream in
* milliseconds.
* @param windowSize2
* The length of the window of the second input data stream in
* milliseconds.
* @param slideInterval1
* The number of milliseconds with which the windows of the first
* input data stream are slid by after each transformation
* @param slideInterval2
* The number of milliseconds with which the windows of the
* second input data stream are slid by after each transformation
* @return The transformed {@link ConnectedDataStream}
*/
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2) {
return window(windowSize1, windowSize2, slideInterval1, slideInterval2,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
}
/**
* Window operation for connected data stream. Collects each input data
* stream's elements simultaneously into windows creating a new
* {@link CoWindowDataStream}. Then the user can apply
* {@link WindowDataStream#reduce} transformation on the
* {@link CoWindowDataStream}. The user can implement their own time stamps
* or use the system time by default.
*
* @param windowSize1
* The length of the window of the first input data stream
* @param windowSize2
* The length of the window of the second input data stream
* @param timeStamp1
* User defined function for extracting time-stamps from each
* element of the first input data stream
* @param timeStamp2
* User defined function for extracting time-stamps from each
* element of the second input data stream
* @return The transformed {@link ConnectedDataStream}
*/
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2);
}
/**
* Window operation for connected data stream. Collects each input data
* stream's elements simultaneously into windows creating a new
* {@link CoWindowDataStream}. Then the user can apply
* {@link WindowDataStream#reduce} transformation on the
* {@link CoWindowDataStream}.
*
* @param windowSize1
* The length of the window of the first input data stream in
* milliseconds
* @param windowSize2
* The length of the window of the second input data stream in
* milliseconds
* @return The transformed {@link ConnectedDataStream}
*/
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
return window(windowSize1, windowSize2, windowSize1, windowSize2,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
}
/**
......@@ -150,7 +325,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @param coMapper
* The CoMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
* @return The transformed {@link DataStream}
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
......@@ -177,7 +352,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @param coFlatMapper
* The CoFlatMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
* @return The transformed {@link DataStream}
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
......@@ -194,19 +369,19 @@ public class ConnectedDataStream<IN1, IN2> {
/**
* Applies a reduce transformation on a {@link ConnectedDataStream} and maps
* the outputs to a common type. The transformation calls
* {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
* each element of the first input and {@link CoReduceFunction#reduce2} and
* {@link CoReduceFunction#map2} for each element of the second input. This
* type of reduce is much faster than reduceGroup since the reduce function
* can be applied incrementally. The user can also extend the
* {@link RichCoReduceFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
* the outputs to a common type. If the {@link ConnectedDataStream} is
* batched or windowed then the reduce transformation is applied on every
* sliding batch/window of the data stream. If the connected data stream is
* grouped then the reducer is applied on every group of elements sharing
* the same key. This type of reduce is much faster than reduceGroup since
* the reduce function can be applied incrementally. The user can also
* extend the {@link RichCoReduceFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every
* element of the inputs.
* @return The transformed DataStream.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
......@@ -216,14 +391,8 @@ public class ConnectedDataStream<IN1, IN2> {
CoReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoReduceFunction.class, 2);
if (this.isGrouped) {
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer,
keyPosition1, keyPosition2));
} else {
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
}
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
getReduceInvokable(coReducer));
}
/**
......@@ -296,6 +465,18 @@ public class ConnectedDataStream<IN1, IN2> {
slideInterval, timestamp1, timestamp2));
}
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1,
keyPosition2);
} else {
invokable = new CoReduceInvokable<IN1, IN2, OUT>(coReducer);
}
return invokable;
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeWrapper<IN1> in1TypeWrapper,
TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper,
......@@ -306,7 +487,7 @@ public class ConnectedDataStream<IN1, IN2> {
environment, functionName, outTypeWrapper);
try {
input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
SerializationUtils.serialize((Serializable) function),
environment.getDegreeOfParallelism());
......@@ -314,12 +495,16 @@ public class ConnectedDataStream<IN1, IN2> {
throw new RuntimeException("Cannot serialize user defined function");
}
input1.connectGraph(input1, returnStream.getId(), 1);
input1.connectGraph(input2, returnStream.getId(), 2);
dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
// TODO consider iteration
return returnStream;
}
protected ConnectedDataStream<IN1, IN2> copy() {
return new ConnectedDataStream<IN1, IN2>(this);
}
}
......@@ -236,9 +236,11 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
}
if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) {
if (!streamBatch.circularBuffer.isEmpty()) {
streamBatch.circularBuffer.remove();
if (streamBatch.circularBuffer.isFull()) {
for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) {
if (!streamBatch.circularBuffer.isEmpty()) {
streamBatch.circularBuffer.remove();
}
}
}
if (!streamBatch.circularBuffer.isEmpty()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册