From b8e8bd7a021d4388edaa1e9fc40d18fe1019c065 Mon Sep 17 00:00:00 2001 From: szape Date: Thu, 9 Oct 2014 11:50:01 +0200 Subject: [PATCH] [streaming] Added CoBatchedDataStream and CoWindowDataStream with reduce functionality + CoBatchReduce bugfix [streaming] RAT & Checkstyle fix --- .../api/datastream/CoBatchedDataStream.java | 104 +++++++ .../api/datastream/CoWindowDataStream.java | 86 ++++++ .../api/datastream/ConnectedDataStream.java | 255 +++++++++++++++--- .../operator/co/CoBatchReduceInvokable.java | 8 +- 4 files changed, 415 insertions(+), 38 deletions(-) create mode 100644 flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java create mode 100644 flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java new file mode 100644 index 00000000000..e145a005898 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.function.co.CoReduceFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; + +/** + * A {@link CoBatchedDataStream} represents a two data stream whose elements are + * batched together in sliding batches. Operation + * {@link #reduce(ReduceFunction)} can be applied for each batch and the batch + * is slid afterwards. + * + * @param + * The type of the first input data stream + * @param + * The type of the second input data stream + */ +public class CoBatchedDataStream extends ConnectedDataStream { + + protected long batchSize1; + protected long batchSize2; + protected long slideSize1; + protected long slideSize2; + + protected CoBatchedDataStream(DataStream dataStream1, DataStream dataStream2, + long batchSize1, long batchSize2, long slideSize1, long slideSize2) { + super(dataStream1, dataStream2); + this.batchSize1 = batchSize1; + this.batchSize2 = batchSize2; + this.slideSize1 = slideSize1; + this.slideSize2 = slideSize2; + } + + protected CoBatchedDataStream(ConnectedDataStream coDataStream, long batchSize1, + long batchSize2, long slideSize1, long slideSize2) { + super(coDataStream); + this.batchSize1 = batchSize1; + this.batchSize2 = batchSize2; + this.slideSize1 = slideSize1; + this.slideSize2 = slideSize2; + } + + protected CoBatchedDataStream(CoBatchedDataStream coBatchedDataStream) { + super(coBatchedDataStream); + this.batchSize1 = coBatchedDataStream.batchSize1; + this.batchSize2 = coBatchedDataStream.batchSize2; + this.slideSize1 = coBatchedDataStream.slideSize1; + this.slideSize2 = coBatchedDataStream.slideSize2; + } + + /** + * Groups the elements of the {@link CoBatchedDataStream} by the given key + * positions to be used with grouped operators. + * + * @param keyPosition1 + * The position of the field on which the first input data stream + * will be grouped. + * @param keyPosition2 + * The position of the field on which the second input data + * stream will be grouped. + * @return The transformed {@link CoBatchedDataStream} + */ + public CoBatchedDataStream groupBy(int keyPosition1, int keyPosition2) { + return new CoBatchedDataStream(dataStream1.groupBy(keyPosition1), + dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2); + } + + @Override + protected CoInvokable getReduceInvokable( + CoReduceFunction coReducer) { + CoBatchReduceInvokable invokable; + if (isGrouped) { + invokable = new CoGroupedBatchReduceInvokable(coReducer, batchSize1, + batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2); + } else { + invokable = new CoBatchReduceInvokable(coReducer, batchSize1, + batchSize2, slideSize1, slideSize2); + } + return invokable; + } + + protected CoBatchedDataStream copy() { + return new CoBatchedDataStream(this); + } + +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java new file mode 100644 index 00000000000..c092e225f29 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.function.co.CoReduceFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; + +/** + * A {@link CoWindowDataStream} represents two data streams whose elements are + * batched together into sliding windows. Operation + * {@link #reduce(CoReduceFunction)} can be applied for each window. + * + * @param + * The type of the first input data stream + * @param + * The type of the second input data stream + */ +public class CoWindowDataStream extends CoBatchedDataStream { + TimeStamp timeStamp1; + TimeStamp timeStamp2; + + protected CoWindowDataStream(DataStream dataStream1, DataStream dataStream2, + long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, + TimeStamp timeStamp1, TimeStamp timeStamp2) { + super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2); + this.timeStamp1 = timeStamp1; + this.timeStamp2 = timeStamp2; + } + + protected CoWindowDataStream(ConnectedDataStream coDataStream, long windowSize1, + long windowSize2, long slideInterval1, long slideInterval2, TimeStamp timeStamp1, + TimeStamp timeStamp2) { + super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2); + this.timeStamp1 = timeStamp1; + this.timeStamp2 = timeStamp2; + } + + protected CoWindowDataStream(CoWindowDataStream coWindowDataStream) { + super(coWindowDataStream); + this.timeStamp1 = coWindowDataStream.timeStamp1; + this.timeStamp2 = coWindowDataStream.timeStamp2; + } + + public CoWindowDataStream groupBy(int keyPosition1, int keyPosition2) { + return new CoWindowDataStream(dataStream1.groupBy(keyPosition1), + dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2, + timeStamp1, timeStamp2); + } + + @Override + protected CoInvokable getReduceInvokable( + CoReduceFunction coReducer) { + CoWindowReduceInvokable invokable; + if (isGrouped) { + invokable = new CoGroupedWindowReduceInvokable(coReducer, batchSize1, + batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2, timeStamp1, + timeStamp2); + } else { + invokable = new CoWindowReduceInvokable(coReducer, batchSize1, + batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2); + } + return invokable; + } + + protected CoWindowDataStream copy() { + return new CoWindowDataStream(this); + } +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index bb121fd9ee6..a2ab3637d9f 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -49,16 +49,16 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper; * {@link DataStream}s * * @param - * Type of the first DataSteam. + * Type of the first input data steam. * @param - * Type of the second DataStream. + * Type of the second input data stream. */ public class ConnectedDataStream { protected StreamExecutionEnvironment environment; protected JobGraphBuilder jobGraphBuilder; - protected DataStream input1; - protected DataStream input2; + protected DataStream dataStream1; + protected DataStream dataStream2; protected boolean isGrouped; protected int keyPosition1; @@ -67,8 +67,8 @@ public class ConnectedDataStream { protected ConnectedDataStream(DataStream input1, DataStream input2) { this.jobGraphBuilder = input1.jobGraphBuilder; this.environment = input1.environment; - this.input1 = input1.copy(); - this.input2 = input2.copy(); + this.dataStream1 = input1.copy(); + this.dataStream2 = input2.copy(); if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) { this.isGrouped = true; @@ -76,16 +76,28 @@ public class ConnectedDataStream { this.keyPosition2 = ((GroupedDataStream) input2).keyPosition; } else { this.isGrouped = false; + this.keyPosition1 = 0; + this.keyPosition2 = 0; } } + protected ConnectedDataStream(ConnectedDataStream coDataStream) { + this.jobGraphBuilder = coDataStream.jobGraphBuilder; + this.environment = coDataStream.environment; + this.dataStream1 = coDataStream.getFirst(); + this.dataStream2 = coDataStream.getSecond(); + this.isGrouped = coDataStream.isGrouped; + this.keyPosition1 = coDataStream.keyPosition1; + this.keyPosition2 = coDataStream.keyPosition2; + } + /** * Returns the first {@link DataStream}. * * @return The first DataStream. */ public DataStream getFirst() { - return input1.copy(); + return dataStream1.copy(); } /** @@ -94,7 +106,7 @@ public class ConnectedDataStream { * @return The second DataStream. */ public DataStream getSecond() { - return input2.copy(); + return dataStream2.copy(); } /** @@ -103,7 +115,7 @@ public class ConnectedDataStream { * @return The type of the first input */ public TypeInformation getInputType1() { - return input1.getOutputType(); + return dataStream1.getOutputType(); } /** @@ -112,7 +124,7 @@ public class ConnectedDataStream { * @return The type of the second input */ public TypeInformation getInputType2() { - return input2.getOutputType(); + return dataStream2.getOutputType(); } /** @@ -127,15 +139,178 @@ public class ConnectedDataStream { * @param keyPosition2 * The field used to compute the hashcode of the elements in the * second input stream. - * @return Returns the {@link GroupedConnectedDataStream} created. + * @return @return The transformed {@link ConnectedDataStream} */ public ConnectedDataStream groupBy(int keyPosition1, int keyPosition2) { if (keyPosition1 < 0 || keyPosition2 < 0) { throw new IllegalArgumentException("The position of the field must be non-negative"); } - return new ConnectedDataStream(input1.groupBy(keyPosition1), - input2.groupBy(keyPosition2)); + return new ConnectedDataStream(dataStream1.groupBy(keyPosition1), + dataStream2.groupBy(keyPosition2)); + } + + /** + * Batch operation for connected data stream. Collects each input data + * stream's elements simultaneously into sliding batches creating a new + * {@link CoBatchedDataStream}. Then the user can apply + * {@link CoBatchedDataStream#reduce} transformation on the + * {@link CoBatchedDataStream}. + * + * @param batchSize1 + * The number of elements in each batch of the first input data + * stream + * @param batchSize2 + * The number of elements in each batch of the second input data + * stream + * @param slideSize1 + * The number of elements with which the batches of the first + * input data stream are slid by after each transformation. + * @param slideSize2 + * The number of elements with which the batches of the second + * input data stream are slid by after each transformation. + * @return The transformed {@link ConnectedDataStream} + */ + public CoBatchedDataStream batch(long batchSize1, long batchSize2, long slideSize1, + long slideSize2) { + if (batchSize1 < 1 || batchSize2 < 1) { + throw new IllegalArgumentException("Batch size must be positive"); + } + if (slideSize1 < 1 || slideSize2 < 1) { + throw new IllegalArgumentException("Slide size must be positive"); + } + return new CoBatchedDataStream(this, batchSize1, batchSize2, slideSize1, + slideSize2); + } + + /** + * Batch operation for connected data stream. Collects each input data + * stream's elements simultaneously into batches creating a new + * {@link CoBatchedDataStream}. Then the user can apply + * {@link CoBatchedDataStream#reduce} transformation on the + * {@link CoBatchedDataStream}. + * + * @param batchSize1 + * The number of elements in each batch of the first input data + * stream + * @param batchSize2 + * The number of elements in each batch of the second input data + * stream + * @return The transformed {@link ConnectedDataStream} + */ + public CoBatchedDataStream batch(long batchSize1, long batchSize2) { + return batch(batchSize1, batchSize2, batchSize1, batchSize2); + } + + /** + * Window operation for connected data stream. Collects each input data + * stream's elements simultaneously into sliding windows creating a new + * {@link CoWindowDataStream}. Then the user can apply + * {@link WindowDataStream#reduce} transformation on the + * {@link CoWindowDataStream}. The user can implement their own time stamps + * or use the system time by default. + * + * @param windowSize1 + * The length of the window of the first input data stream + * @param windowSize2 + * The length of the window of the second input data stream + * @param slideInterval1 + * The number of milliseconds with which the windows of the first + * input data stream are slid by after each transformation + * @param slideInterval2 + * The number of milliseconds with which the windows of the + * second input data stream are slid by after each transformation + * @param timeStamp1 + * User defined function for extracting time-stamps from each + * element of the first input data stream + * @param timeStamp2 + * User defined function for extracting time-stamps from each + * element of the second input data stream + * @return The transformed {@link ConnectedDataStream} + */ + public CoWindowDataStream window(long windowSize1, long windowSize2, + long slideInterval1, long slideInterval2, TimeStamp timeStamp1, + TimeStamp timeStamp2) { + if (windowSize1 < 1 || windowSize2 < 1) { + throw new IllegalArgumentException("Window size must be positive"); + } + if (slideInterval1 < 1 || slideInterval2 < 1) { + throw new IllegalArgumentException("Slide interval must be positive"); + } + return new CoWindowDataStream(this, windowSize1, windowSize2, slideInterval1, + slideInterval2, timeStamp1, timeStamp2); + } + + /** + * Window operation for connected data stream. Collects each input data + * stream's elements simultaneously into sliding windows creating a new + * {@link CoWindowDataStream}. Then the user can apply + * {@link WindowDataStream#reduce} transformation on the + * {@link CoWindowDataStream}. + * + * @param windowSize1 + * The length of the window of the first input data stream in + * milliseconds. + * @param windowSize2 + * The length of the window of the second input data stream in + * milliseconds. + * @param slideInterval1 + * The number of milliseconds with which the windows of the first + * input data stream are slid by after each transformation + * @param slideInterval2 + * The number of milliseconds with which the windows of the + * second input data stream are slid by after each transformation + * @return The transformed {@link ConnectedDataStream} + */ + public CoWindowDataStream window(long windowSize1, long windowSize2, + long slideInterval1, long slideInterval2) { + return window(windowSize1, windowSize2, slideInterval1, slideInterval2, + new DefaultTimeStamp(), new DefaultTimeStamp()); + } + + /** + * Window operation for connected data stream. Collects each input data + * stream's elements simultaneously into windows creating a new + * {@link CoWindowDataStream}. Then the user can apply + * {@link WindowDataStream#reduce} transformation on the + * {@link CoWindowDataStream}. The user can implement their own time stamps + * or use the system time by default. + * + * @param windowSize1 + * The length of the window of the first input data stream + * @param windowSize2 + * The length of the window of the second input data stream + * @param timeStamp1 + * User defined function for extracting time-stamps from each + * element of the first input data stream + * @param timeStamp2 + * User defined function for extracting time-stamps from each + * element of the second input data stream + * @return The transformed {@link ConnectedDataStream} + */ + public CoWindowDataStream window(long windowSize1, long windowSize2, + TimeStamp timeStamp1, TimeStamp timeStamp2) { + return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2); + } + + /** + * Window operation for connected data stream. Collects each input data + * stream's elements simultaneously into windows creating a new + * {@link CoWindowDataStream}. Then the user can apply + * {@link WindowDataStream#reduce} transformation on the + * {@link CoWindowDataStream}. + * + * @param windowSize1 + * The length of the window of the first input data stream in + * milliseconds + * @param windowSize2 + * The length of the window of the second input data stream in + * milliseconds + * @return The transformed {@link ConnectedDataStream} + */ + public CoWindowDataStream window(long windowSize1, long windowSize2) { + return window(windowSize1, windowSize2, windowSize1, windowSize2, + new DefaultTimeStamp(), new DefaultTimeStamp()); } /** @@ -150,7 +325,7 @@ public class ConnectedDataStream { * @param coMapper * The CoMapFunction used to jointly transform the two input * DataStreams - * @return The transformed DataStream + * @return The transformed {@link DataStream} */ public SingleOutputStreamOperator map(CoMapFunction coMapper) { FunctionTypeWrapper in1TypeWrapper = new FunctionTypeWrapper(coMapper, @@ -177,7 +352,7 @@ public class ConnectedDataStream { * @param coFlatMapper * The CoFlatMapFunction used to jointly transform the two input * DataStreams - * @return The transformed DataStream + * @return The transformed {@link DataStream} */ public SingleOutputStreamOperator flatMap( CoFlatMapFunction coFlatMapper) { @@ -194,19 +369,19 @@ public class ConnectedDataStream { /** * Applies a reduce transformation on a {@link ConnectedDataStream} and maps - * the outputs to a common type. The transformation calls - * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for - * each element of the first input and {@link CoReduceFunction#reduce2} and - * {@link CoReduceFunction#map2} for each element of the second input. This - * type of reduce is much faster than reduceGroup since the reduce function - * can be applied incrementally. The user can also extend the - * {@link RichCoReduceFunction} to gain access to other features provided by - * the {@link RichFuntion} interface. + * the outputs to a common type. If the {@link ConnectedDataStream} is + * batched or windowed then the reduce transformation is applied on every + * sliding batch/window of the data stream. If the connected data stream is + * grouped then the reducer is applied on every group of elements sharing + * the same key. This type of reduce is much faster than reduceGroup since + * the reduce function can be applied incrementally. The user can also + * extend the {@link RichCoReduceFunction} to gain access to other features + * provided by the {@link RichFuntion} interface. * * @param coReducer * The {@link CoReduceFunction} that will be called for every * element of the inputs. - * @return The transformed DataStream. + * @return The transformed {@link DataStream}. */ public SingleOutputStreamOperator reduce(CoReduceFunction coReducer) { @@ -216,14 +391,8 @@ public class ConnectedDataStream { CoReduceFunction.class, 1); FunctionTypeWrapper outTypeWrapper = new FunctionTypeWrapper(coReducer, CoReduceFunction.class, 2); - if (this.isGrouped) { - return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, - outTypeWrapper, new CoGroupedReduceInvokable(coReducer, - keyPosition1, keyPosition2)); - } else { - return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, - outTypeWrapper, new CoReduceInvokable(coReducer)); - } + return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, + getReduceInvokable(coReducer)); } /** @@ -296,6 +465,18 @@ public class ConnectedDataStream { slideInterval, timestamp1, timestamp2)); } + protected CoInvokable getReduceInvokable( + CoReduceFunction coReducer) { + CoReduceInvokable invokable; + if (isGrouped) { + invokable = new CoGroupedReduceInvokable(coReducer, keyPosition1, + keyPosition2); + } else { + invokable = new CoReduceInvokable(coReducer); + } + return invokable; + } + protected SingleOutputStreamOperator addCoFunction(String functionName, final Function function, TypeWrapper in1TypeWrapper, TypeWrapper in2TypeWrapper, TypeWrapper outTypeWrapper, @@ -306,7 +487,7 @@ public class ConnectedDataStream { environment, functionName, outTypeWrapper); try { - input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, + dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName, SerializationUtils.serialize((Serializable) function), environment.getDegreeOfParallelism()); @@ -314,12 +495,16 @@ public class ConnectedDataStream { throw new RuntimeException("Cannot serialize user defined function"); } - input1.connectGraph(input1, returnStream.getId(), 1); - input1.connectGraph(input2, returnStream.getId(), 2); + dataStream1.connectGraph(dataStream1, returnStream.getId(), 1); + dataStream1.connectGraph(dataStream2, returnStream.getId(), 2); // TODO consider iteration return returnStream; } + protected ConnectedDataStream copy() { + return new ConnectedDataStream(this); + } + } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java index b9adc69daa2..edf5a8f4c90 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java @@ -236,9 +236,11 @@ public class CoBatchReduceInvokable extends CoInvokable= 0) { - for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) { - if (!streamBatch.circularBuffer.isEmpty()) { - streamBatch.circularBuffer.remove(); + if (streamBatch.circularBuffer.isFull()) { + for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) { + if (!streamBatch.circularBuffer.isEmpty()) { + streamBatch.circularBuffer.remove(); + } } } if (!streamBatch.circularBuffer.isEmpty()) { -- GitLab