提交 f96ba06e 编写于 作者: G Gyula Fora 提交者: mbalassi

[streaming] New windowing API merge and cleanup + several minor fixes

上级 751f1017
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
/**
* A {@link BatchedDataStream} represents a data stream whose elements are
* batched together in a sliding batch. operations like
* {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
* are applied for each batch and the batch is slid afterwards.
*
* @param <OUT>
* The output type of the {@link BatchedDataStream}
*/
public class BatchedDataStream<OUT> {
protected DataStream<OUT> dataStream;
protected boolean isGrouped;
protected KeySelector<OUT, ?> keySelector;
protected long batchSize;
protected long slideSize;
protected BatchedDataStream(DataStream<OUT> dataStream, long batchSize, long slideSize) {
if (dataStream instanceof GroupedDataStream) {
this.isGrouped = true;
this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
} else {
this.isGrouped = false;
}
this.dataStream = dataStream.copy();
this.batchSize = batchSize;
this.slideSize = slideSize;
}
protected BatchedDataStream(BatchedDataStream<OUT> batchedDataStream) {
this.dataStream = batchedDataStream.dataStream.copy();
this.isGrouped = batchedDataStream.isGrouped;
this.keySelector = batchedDataStream.keySelector;
this.batchSize = batchedDataStream.batchSize;
this.slideSize = batchedDataStream.slideSize;
}
/**
* Groups the elements of the {@link BatchedDataStream} by the given key
* positions to be used with grouped operators.
*
* @param fields
* The position of the fields on which the
* {@link BatchedDataStream} will be grouped.
* @return The transformed {@link BatchedDataStream}
*/
public BatchedDataStream<OUT> groupBy(int... fields) {
return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
}
/**
* Groups a {@link BatchedDataStream} using field expressions. A field
* expression is either the name of a public field or a getter method with
* parentheses of the {@link BatchedDataStream}S underlying type. A dot can
* be used to drill down into objects, as in
* {@code "field1.getInnerField2()" }.
*
* @param fields
* One or more field expressions on which the DataStream will be
* grouped.
* @return The grouped {@link BatchedDataStream}
**/
public BatchedDataStream<OUT> groupBy(String... fields) {
return new BatchedDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize);
}
/**
* Applies a reduce transformation on every sliding batch/window of the data
* stream. If the data stream is grouped then the reducer is applied on
* every group of elements sharing the same key. This type of reduce is much
* faster than reduceGroup since the reduce function can be applied
* incrementally. The user can also extend the {@link RichReduceFunction} to
* gain access to other features provided by the {@link RichFuntion}
* interface.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
* element of the input values in the batch/window.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), getReduceInvokable(reducer));
}
/**
* Applies a reduceGroup transformation on preset batches/windows of the
* DataStream. The transformation calls a {@link GroupReduceFunction} for
* each batch/window. Each GroupReduceFunction call can return any number of
* elements including none. The user can also extend
* {@link RichGroupReduceFunction} to gain access to other features provided
* by the {@link RichFuntion} interface.
*
* @param reducer
* The {@link GroupReduceFunction} that will be called for every
* batch/window.
* @return The transformed DataStream.
*/
public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) {
return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer));
}
/**
* Applies an aggregation that sums every sliding batch/window of the data
* stream at the given position.
*
* @param positionToSum
* The position in the data point to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
dataStream.checkFieldRange(positionToSum);
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
}
/**
* Syntactic sugar for sum(0)
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum() {
return sum(0);
}
/**
* Applies an aggregation that that gives the sum of the pojo data stream at
* the given field expression. A field expression is either the name of a
* public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(String field) {
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
getOutputType()));
}
/**
* Applies an aggregation that that gives the minimum of every sliding
* batch/window of the data stream at the given position.
*
* @param positionToMin
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
dataStream.checkFieldRange(positionToMin);
return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
AggregationType.MIN));
}
/**
* Applies an aggregation that gives the minimum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same minimum value the operator returns the first element by
* default.
*
* @param positionToMinBy
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
/**
* Applies an aggregation that gives the minimum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same minimum value the operator returns either the first or last
* one depending on the parameter setting.
*
* @param positionToMinBy
* The position in the data point to minimize
* @param first
* If true, then the operator return the first element with the
* minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
dataStream.checkFieldRange(positionToMinBy);
return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
AggregationType.MINBY, first));
}
/**
* Syntactic sugar for min(0)
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min() {
return min(0);
}
/**
* Applies an aggregation that gives the maximum of every sliding
* batch/window of the data stream at the given position.
*
* @param positionToMax
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
dataStream.checkFieldRange(positionToMax);
return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
AggregationType.MAX));
}
/**
* Applies an aggregation that gives the maximum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same maximum value the operator returns the first by default.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
/**
* Applies an aggregation that gives the maximum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same maximum value the operator returns either the first or last
* one depending on the parameter setting.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @param first
* If true, then the operator return the first element with the
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
dataStream.checkFieldRange(positionToMaxBy);
return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
AggregationType.MAXBY, first));
}
/**
* Syntactic sugar for max(0)
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max() {
return max(0);
}
/**
* Applies an aggregation that that gives the minimum of the pojo data
* stream at the given field expression. A field expression is either the
* name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MIN, false));
}
/**
* Applies an aggregation that that gives the maximum of the pojo data
* stream at the given field expression. A field expression is either the
* name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MAX, false));
}
/**
* Applies an aggregation that that gives the minimum element of the pojo
* data stream by the given field expression. A field expression is either
* the name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @param first
* If True then in case of field equality the first object will
* be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MINBY, first));
}
/**
* Applies an aggregation that that gives the maximum element of the pojo
* data stream by the given field expression. A field expression is either
* the name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @param first
* If True then in case of field equality the first object will
* be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MAXBY, first));
}
/**
* Gets the output type.
*
* @return The output type.
*/
public TypeInformation<OUT> getOutputType() {
return dataStream.getOutputType();
}
private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
return returnStream;
}
protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
BatchReduceInvokable<OUT> invokable;
if (isGrouped) {
invokable = new GroupedBatchReduceInvokable<OUT>(reducer, batchSize, slideSize,
keySelector);
} else {
invokable = new BatchReduceInvokable<OUT>(reducer, batchSize, slideSize);
}
return invokable;
}
protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
GroupReduceFunction<OUT, R> reducer) {
BatchGroupReduceInvokable<OUT, R> invokable;
if (isGrouped) {
invokable = new GroupedBatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
keySelector);
} else {
invokable = new BatchGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize);
}
return invokable;
}
protected BatchedDataStream<OUT> copy() {
return new BatchedDataStream<OUT>(this);
}
}
......@@ -663,7 +663,6 @@ public class DataStream<OUT> {
return new GroupedDataStream<OUT>(this, keySelector);
}
/**
* This allows you to set up windowing through a nice API using
* {@link WindowingHelper} such as {@link Time}, {@link Count} and
......@@ -678,101 +677,6 @@ public class DataStream<OUT> {
public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
return new WindowedDataStream<OUT>(this, policyHelpers);
}
/**
* Collects the data stream elements into sliding batches creating a new
* {@link BatchedDataStream}. The user can apply transformations like
* {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
* or aggregations on the {@link BatchedDataStream}.
*
* @param batchSize
* The number of elements in each batch at each operator
* @param slideSize
* The number of elements with which the batches are slid by
* after each transformation.
* @return The transformed {@link DataStream}
*/
public BatchedDataStream<OUT> batch(long batchSize, long slideSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive");
}
if (slideSize < 1) {
throw new IllegalArgumentException("Slide size must be positive");
}
return new BatchedDataStream<OUT>(this, batchSize, slideSize);
}
/**
* Collects the data stream elements into sliding batches creating a new
* {@link BatchedDataStream}. The user can apply transformations like
* {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup}
* or aggregations on the {@link BatchedDataStream}.
*
* @param batchSize
* The number of elements in each batch at each operator
* @return The transformed {@link DataStream}
*/
public BatchedDataStream<OUT> batch(long batchSize) {
return batch(batchSize, batchSize);
}
/**
* Collects the data stream elements into sliding windows creating a new
* {@link WindowDataStream}. The user can apply transformations like
* {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
* aggregations on the {@link WindowDataStream}.
*
* @param windowSize
* The length of the window in milliseconds.
* @param slideInterval
* The number of milliseconds with which the windows are slid by
* after each transformation.
* @param timestamp
* User defined function for extracting time-stamps from each
* element
* @return The transformed {@link DataStream}
*/
public WindowDataStream<OUT> window(long windowSize, long slideInterval,
TimeStamp<OUT> timestamp) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
return new WindowDataStream<OUT>(this, windowSize, slideInterval, timestamp);
}
/**
* Collects the data stream elements into sliding windows creating a new
* {@link WindowDataStream}. The user can apply transformations like
* {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
* aggregations on the {@link WindowDataStream}.
*
* @param windowSize
* The length of the window in milliseconds.
* @param slideInterval
* The number of milliseconds with which the windows are slid by
* after each transformation.
* @return The transformed {@link DataStream}
*/
public WindowDataStream<OUT> window(long windowSize, long slideInterval) {
return window(windowSize, slideInterval, new DefaultTimeStamp<OUT>());
}
/**
* Collects the data stream elements into sliding windows creating a new
* {@link WindowDataStream}. The user can apply transformations like
* {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or
* aggregations on the {@link WindowDataStream}.
*
* @param windowSize
* The length of the window in milliseconds.
* @return The transformed {@link DataStream}
*/
public WindowDataStream<OUT> window(long windowSize) {
return window(windowSize, windowSize);
}
/**
* Applies an aggregation that sums the data stream at the given position.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
/**
* A {@link WindowDataStream} represents a data stream whose elements are
* batched together in a sliding window. operations like
* {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
* are applied for each window and the window is slid afterwards.
*
* @param <OUT>
* The output type of the {@link WindowDataStream}
*/
public class WindowDataStream<OUT> extends BatchedDataStream<OUT> {
TimeStamp<OUT> timeStamp;
protected WindowDataStream(DataStream<OUT> dataStream, long windowSize, long slideInterval,
TimeStamp<OUT> timeStamp) {
super(dataStream, windowSize, slideInterval);
this.timeStamp = timeStamp;
}
protected WindowDataStream(WindowDataStream<OUT> windowDataStream) {
super(windowDataStream);
this.timeStamp = windowDataStream.timeStamp;
}
public WindowDataStream<OUT> groupBy(int... keyPositions) {
return new WindowDataStream<OUT>(dataStream.groupBy(keyPositions), batchSize, slideSize,
timeStamp);
}
public WindowDataStream<OUT> groupBy(String... fields) {
return new WindowDataStream<OUT>(dataStream.groupBy(fields), batchSize, slideSize,
timeStamp);
}
protected BatchReduceInvokable<OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
WindowReduceInvokable<OUT> invokable;
if (isGrouped) {
invokable = new GroupedWindowReduceInvokable<OUT>(reducer, batchSize, slideSize,
keySelector, timeStamp);
} else {
invokable = new WindowReduceInvokable<OUT>(reducer, batchSize, slideSize, timeStamp);
}
return invokable;
}
protected <R> BatchGroupReduceInvokable<OUT, R> getGroupReduceInvokable(
GroupReduceFunction<OUT, R> reducer) {
BatchGroupReduceInvokable<OUT, R> invokable;
if (isGrouped) {
invokable = new GroupedWindowGroupReduceInvokable<OUT, R>(reducer, batchSize,
slideSize, keySelector, timeStamp);
} else {
invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, batchSize, slideSize,
timeStamp);
}
return invokable;
}
public WindowDataStream<OUT> copy() {
return new WindowDataStream<OUT>(this);
}
}
......@@ -17,25 +17,34 @@
package org.apache.flink.streaming.api.datastream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.operator.WindowingInvokable;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowingGroupInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowingReduceInvokable;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
/**
* A {@link WindowedDataStream} represents a data stream whose elements
* are batched together in a sliding batch. operations like
* A {@link WindowedDataStream} represents a data stream whose elements are
* batched together in a sliding batch. operations like
* {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
* are applied for each batch and the batch is slid afterwards.
*
......@@ -48,19 +57,31 @@ public class WindowedDataStream<OUT> {
protected boolean isGrouped;
protected KeySelector<OUT, ?> keySelector;
protected WindowingHelper<OUT>[] triggerPolicies;
protected WindowingHelper<OUT>[] evictionPolicies;
protected List<WindowingHelper<OUT>> triggerPolicies;
protected List<WindowingHelper<OUT>> evictionPolicies;
protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) {
this.dataStream = dataStream.copy();
this.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
for (WindowingHelper<OUT> helper : policyHelpers) {
this.triggerPolicies.add(helper);
}
protected WindowedDataStream(DataStream<OUT> dataStream,
WindowingHelper<OUT>... policyHelpers) {
if (dataStream instanceof GroupedDataStream) {
this.isGrouped = true;
this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
} else {
this.isGrouped = false;
}
this.dataStream = dataStream.copy();
this.triggerPolicies = policyHelpers;
}
protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
this.dataStream = windowedDataStream.dataStream.copy();
this.isGrouped = windowedDataStream.isGrouped;
this.keySelector = windowedDataStream.keySelector;
this.triggerPolicies = windowedDataStream.triggerPolicies;
this.evictionPolicies = windowedDataStream.evictionPolicies;
}
protected LinkedList<TriggerPolicy<OUT>> getTriggers() {
......@@ -87,18 +108,43 @@ public class WindowedDataStream<OUT> {
return evictionPolicyList;
}
protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
this.dataStream = windowedDataStream.dataStream.copy();
this.isGrouped = windowedDataStream.isGrouped;
this.keySelector = windowedDataStream.keySelector;
this.triggerPolicies = windowedDataStream.triggerPolicies;
this.evictionPolicies = windowedDataStream.evictionPolicies;
protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
// Add Time triggers to central triggers
for (TriggerPolicy<OUT> trigger : getTriggers()) {
if (trigger instanceof TimeTriggerPolicy) {
cTriggers.add(trigger);
}
}
return cTriggers;
}
protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
// Everything except Time triggers are distributed
for (TriggerPolicy<OUT> trigger : getTriggers()) {
if (!(trigger instanceof TimeTriggerPolicy)) {
dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
}
}
return dTriggers;
}
protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
for (EvictionPolicy<OUT> evicter : getEvicters()) {
evicters.add((CloneableEvictionPolicy<OUT>) evicter);
}
return evicters;
}
/**
* Groups the elements of the {@link WindowedDataStream} by the given
* key position to be used with grouped operators.
* {@link KeySelector} to be used with grouped operators.
*
* @param keySelector
* The specification of the key on which the
......@@ -109,7 +155,39 @@ public class WindowedDataStream<OUT> {
WindowedDataStream<OUT> ret = this.copy();
ret.dataStream = ret.dataStream.groupBy(keySelector);
ret.isGrouped = true;
ret.keySelector = keySelector;
ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
return ret;
}
/**
* Groups the elements of the {@link WindowedDataStream} by the given key
* positions to be used with grouped operators.
*
* @param fields
* The position of the fields to group by.
* @return The transformed {@link WindowedDataStream}
*/
public WindowedDataStream<OUT> groupBy(int... fields) {
WindowedDataStream<OUT> ret = this.copy();
ret.dataStream = ret.dataStream.groupBy(fields);
ret.isGrouped = true;
ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
return ret;
}
/**
* Groups the elements of the {@link WindowedDataStream} by the given field
* expressions to be used with grouped operators.
*
* @param fields
* The position of the fields to group by.
* @return The transformed {@link WindowedDataStream}
*/
public WindowedDataStream<OUT> groupBy(String... fields) {
WindowedDataStream<OUT> ret = this.copy();
ret.dataStream = ret.dataStream.groupBy(fields);
ret.isGrouped = true;
ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
return ret;
}
......@@ -126,13 +204,242 @@ public class WindowedDataStream<OUT> {
* information
* @return The single output operator
*/
public SingleOutputStreamOperator<Tuple2<OUT, String[]>, ?> reduce(
ReduceFunction<OUT> reduceFunction) {
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
new FunctionTypeWrapper<OUT>(reduceFunction, ReduceFunction.class, 0),
new CombineTypeWrapper<OUT, String[]>(dataStream.outTypeWrapper,
new ObjectTypeWrapper<String[]>(new String[] { "" })),
new WindowingInvokable<OUT>(reduceFunction, getTriggers(), getEvicters()));
dataStream.outTypeWrapper, dataStream.outTypeWrapper,
getReduceInvokable(reduceFunction));
}
public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
GroupReduceFunction<OUT, R> reduceFunction) {
return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
dataStream.outTypeWrapper, new FunctionTypeWrapper<R>(reduceFunction,
GroupReduceFunction.class, 1), getReduceGroupInvokable(reduceFunction));
}
/**
* Applies an aggregation that sums every sliding batch/window of the data
* stream at the given position.
*
* @param positionToSum
* The position in the data point to sum
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
dataStream.checkFieldRange(positionToSum);
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
}
/**
* Syntactic sugar for sum(0)
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum() {
return sum(0);
}
/**
* Applies an aggregation that that gives the sum of the pojo data stream at
* the given field expression. A field expression is either the name of a
* public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(String field) {
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
getOutputType()));
}
/**
* Applies an aggregation that that gives the minimum of every sliding
* batch/window of the data stream at the given position.
*
* @param positionToMin
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
dataStream.checkFieldRange(positionToMin);
return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
AggregationType.MIN));
}
/**
* Applies an aggregation that gives the minimum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same minimum value the operator returns the first element by
* default.
*
* @param positionToMinBy
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
/**
* Applies an aggregation that gives the minimum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same minimum value the operator returns either the first or last
* one depending on the parameter setting.
*
* @param positionToMinBy
* The position in the data point to minimize
* @param first
* If true, then the operator return the first element with the
* minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
dataStream.checkFieldRange(positionToMinBy);
return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
AggregationType.MINBY, first));
}
/**
* Syntactic sugar for min(0)
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min() {
return min(0);
}
/**
* Applies an aggregation that gives the maximum of every sliding
* batch/window of the data stream at the given position.
*
* @param positionToMax
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
dataStream.checkFieldRange(positionToMax);
return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
AggregationType.MAX));
}
/**
* Applies an aggregation that gives the maximum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same maximum value the operator returns the first by default.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
/**
* Applies an aggregation that gives the maximum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same maximum value the operator returns either the first or last
* one depending on the parameter setting.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @param first
* If true, then the operator return the first element with the
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
dataStream.checkFieldRange(positionToMaxBy);
return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
AggregationType.MAXBY, first));
}
/**
* Syntactic sugar for max(0)
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max() {
return max(0);
}
/**
* Applies an aggregation that that gives the minimum of the pojo data
* stream at the given field expression. A field expression is either the
* name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MIN, false));
}
/**
* Applies an aggregation that that gives the maximum of the pojo data
* stream at the given field expression. A field expression is either the
* name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MAX, false));
}
/**
* Applies an aggregation that that gives the minimum element of the pojo
* data stream by the given field expression. A field expression is either
* the name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @param first
* If True then in case of field equality the first object will
* be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MINBY, first));
}
/**
* Applies an aggregation that that gives the maximum element of the pojo
* data stream by the given field expression. A field expression is either
* the name of a public field or a getter method with parentheses of the
* {@link DataStream}S underlying type. A dot can be used to drill down into
* objects, as in {@code "field1.getInnerField2()" }.
*
* @param field
* The field expression based on which the aggregation will be
* applied.
* @param first
* If True then in case of field equality the first object will
* be returned
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
AggregationType.MAXBY, first));
}
/**
......@@ -153,11 +460,45 @@ public class WindowedDataStream<OUT> {
WindowedDataStream<OUT> ret = this.copy();
if (ret.evictionPolicies == null) {
ret.evictionPolicies = ret.triggerPolicies;
ret.triggerPolicies = policyHelpers;
} else {
ret.triggerPolicies = (WindowingHelper<OUT>[]) ArrayUtils.addAll(triggerPolicies,
policyHelpers);
ret.triggerPolicies = new ArrayList<WindowingHelper<OUT>>();
}
for (WindowingHelper<OUT> helper : policyHelpers) {
ret.triggerPolicies.add(helper);
}
return ret;
}
private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
return returnStream;
}
protected <R> StreamInvokable<OUT, R> getReduceGroupInvokable(
GroupReduceFunction<OUT, R> reducer) {
StreamInvokable<OUT, R> invokable;
if (isGrouped) {
invokable = new GroupedWindowingInvokable<OUT, R>(reducer, keySelector,
getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
} else {
invokable = new WindowingGroupInvokable<OUT, R>(reducer, getTriggers(), getEvicters());
}
return invokable;
}
protected StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
StreamInvokable<OUT, OUT> invokable;
if (isGrouped) {
invokable = new GroupedWindowingInvokable<OUT, OUT>(reducer, keySelector,
getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
} else {
invokable = new WindowingReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
}
return invokable;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import java.io.Serializable;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.CircularFifoList;
public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer;
protected long slideSize;
protected long batchSize;
protected int granularity;
protected int batchPerSlide;
protected StreamBatch batch;
protected StreamBatch currentBatch;
protected long numberOfBatches;
public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
long slideSize) {
super(reduceFunction);
this.reducer = reduceFunction;
this.batchSize = batchSize;
this.slideSize = slideSize;
this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
this.batchPerSlide = (int) (slideSize / granularity);
this.numberOfBatches = batchSize / granularity;
this.batch = new StreamBatch();
}
@Override
protected void immutableInvoke() throws Exception {
if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty");
}
while (reuse != null) {
StreamBatch batch = getBatch(reuse);
batch.addToBuffer(reuse.getObject());
resetReuse();
reuse = recordIterator.next(reuse);
}
reduceLastBatch();
}
@Override
// TODO: implement mutableInvoke for reduce
protected void mutableInvoke() throws Exception {
System.out.println("Immutable setting is used");
immutableInvoke();
}
protected StreamBatch getBatch(StreamRecord<IN> next) {
return batch;
}
protected void reduce(StreamBatch batch) {
this.currentBatch = batch;
callUserFunctionAndLogException();
}
protected void reduceLastBatch() {
batch.reduceLastBatch();
}
@Override
protected void callUserFunction() throws Exception {
if(!currentBatch.circularList.isEmpty()){
reducer.reduce(currentBatch.circularList.getIterable(), collector);
}
}
protected class StreamBatch implements Serializable {
private static final long serialVersionUID = 1L;
private long counter;
protected long minibatchCounter;
protected CircularFifoList<IN> circularList;
public StreamBatch() {
this.circularList = new CircularFifoList<IN>();
this.counter = 0;
this.minibatchCounter = 0;
}
public void addToBuffer(IN nextValue) throws Exception {
circularList.add(nextValue);
counter++;
if (miniBatchEnd()) {
circularList.newSlide();
minibatchCounter++;
if (batchEnd()) {
reduceBatch();
circularList.shiftWindow(batchPerSlide);
}
}
}
protected boolean miniBatchEnd() {
if( (counter % granularity) == 0){
counter = 0;
return true;
}else{
return false;
}
}
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
public void reduceBatch() {
reduce(this);
}
public void reduceLastBatch() {
if (!miniBatchEnd()) {
reduceBatch();
}
}
@Override
public String toString(){
return circularList.toString();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;
public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
private static final long serialVersionUID = 1L;
protected ReduceFunction<OUT> reducer;
protected long slideSize;
protected long batchSize;
protected int granularity;
protected long batchPerSlide;
protected long numberOfBatches;
protected StreamBatch batch;
protected StreamBatch currentBatch;
protected TypeSerializer<OUT> serializer;
public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize, long slideSize) {
super(reduceFunction);
this.reducer = reduceFunction;
this.batchSize = batchSize;
this.slideSize = slideSize;
this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
this.batchPerSlide = slideSize / granularity;
this.numberOfBatches = batchSize / granularity;
}
@Override
protected void immutableInvoke() throws Exception {
if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty");
}
while (reuse != null) {
StreamBatch batch = getBatch(reuse);
batch.reduceToBuffer(reuse.getObject());
resetReuse();
reuse = recordIterator.next(reuse);
}
reduceLastBatch();
}
@Override
protected void mutableInvoke() throws Exception {
System.out.println("Immutable setting is used");
immutableInvoke();
}
protected StreamBatch getBatch(StreamRecord<OUT> next) {
return batch;
}
protected void reduce(StreamBatch batch) {
this.currentBatch = batch;
callUserFunctionAndLogException();
}
protected void reduceLastBatch() throws Exception {
batch.reduceLastBatch();
}
@Override
protected void callUserFunction() throws Exception {
Iterator<OUT> reducedIterator = currentBatch.getIterator();
OUT reduced = null;
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
while (reducedIterator.hasNext()) {
OUT next = reducedIterator.next();
if (next != null) {
reduced = reducer.reduce(serializer.copy(reduced), serializer.copy(next));
}
}
if (reduced != null) {
collector.collect(reduced);
}
}
protected class StreamBatch implements Serializable {
private static final long serialVersionUID = 1L;
protected long counter;
protected long minibatchCounter;
protected OUT currentValue;
boolean changed;
protected NullableCircularBuffer circularBuffer;
public StreamBatch() {
this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
this.counter = 0;
this.minibatchCounter = 0;
this.changed = false;
}
public void reduceToBuffer(OUT nextValue) throws Exception {
if (currentValue != null) {
currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
} else {
currentValue = nextValue;
}
counter++;
if (miniBatchEnd()) {
addToBuffer();
if (batchEnd()) {
reduceBatch();
}
}
}
protected void addToBuffer() {
circularBuffer.add(currentValue);
changed = true;
minibatchCounter++;
currentValue = null;
}
protected boolean miniBatchEnd() {
if ((counter % granularity) == 0) {
counter = 0;
return true;
} else {
return false;
}
}
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
public void reduceLastBatch() throws Exception {
if (miniBatchInProgress()) {
addToBuffer();
}
if (changed == true && minibatchCounter >= 0) {
if (circularBuffer.isFull()) {
for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
if (!circularBuffer.isEmpty()) {
circularBuffer.remove();
}
}
}
if (!circularBuffer.isEmpty()) {
reduce(this);
}
}
}
public boolean miniBatchInProgress() {
return currentValue != null;
}
public void reduceBatch() {
reduce(this);
changed = false;
}
@SuppressWarnings("unchecked")
public Iterator<OUT> getIterator() {
return circularBuffer.iterator();
}
@Override
public String toString() {
return circularBuffer.toString();
}
}
@Override
public void open(Configuration config) throws Exception{
super.open(config);
serializer = inSerializer.getObjectSerializer();
this.batch = new StreamBatch();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
KeySelector<IN, ?> keySelector;
Map<Object, StreamWindow> streamWindows;
List<Object> cleanList;
long currentMiniBatchCount = 0;
public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction,
long windowSize, long slideInterval, KeySelector<IN, ?> keySelector,
TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval, timestamp);
this.keySelector = keySelector;
this.reducer = reduceFunction;
this.streamWindows = new HashMap<Object, StreamWindow>();
}
@Override
protected StreamBatch getBatch(StreamRecord<IN> next) {
Object key = next.getKey(keySelector);
StreamWindow window = streamWindows.get(key);
if (window == null) {
window = new GroupedStreamWindow();
for (int i = 0; i < currentMiniBatchCount; i++) {
window.circularList.newSlide();
}
streamWindows.put(key, window);
}
this.window = window;
return window;
}
@Override
protected void reduceLastBatch() {
for (StreamBatch window : streamWindows.values()) {
window.reduceLastBatch();
}
}
private void shiftGranularityAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.circularList.newSlide();
}
}
private void slideAllWindows() {
currentMiniBatchCount -= batchPerSlide;
for (StreamBatch window : streamWindows.values()) {
window.circularList.shiftWindow(batchPerSlide);
}
}
private void reduceAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.reduceBatch();
}
}
protected class GroupedStreamWindow extends StreamWindow {
private static final long serialVersionUID = 1L;
public GroupedStreamWindow() {
super();
}
@Override
public void addToBuffer(IN nextValue) throws Exception {
checkWindowEnd(timestamp.getTimestamp(nextValue));
if (currentMiniBatchCount >= 0) {
circularList.add(nextValue);
}
}
@Override
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
shiftGranularityAllWindows();
currentMiniBatchCount += 1;
if (batchEnd()) {
reduceAllWindows();
slideAllWindows();
}
}
}
@Override
public boolean batchEnd() {
if (currentMiniBatchCount == numberOfBatches) {
return true;
}
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT> {
private static final long serialVersionUID = 1L;
private KeySelector<OUT, ?> keySelector;
private Map<Object, StreamWindow> streamWindows;
private long currentMiniBatchCount = 0;
public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
long slideInterval, KeySelector<OUT, ?> keySelector, TimeStamp<OUT> timestamp) {
super(reduceFunction, windowSize, slideInterval, timestamp);
this.keySelector = keySelector;
this.streamWindows = new HashMap<Object, StreamWindow>();
}
@Override
protected StreamBatch getBatch(StreamRecord<OUT> next) {
Object key = next.getKey(keySelector);
StreamWindow window = streamWindows.get(key);
if (window == null) {
window = new GroupedStreamWindow();
window.minibatchCounter = currentMiniBatchCount;
streamWindows.put(key, window);
}
this.window = window;
return window;
}
private void addToAllBuffers() {
for (StreamBatch window : streamWindows.values()) {
window.addToBuffer();
}
}
private void reduceAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.minibatchCounter -= batchPerSlide;
window.reduceBatch();
}
}
@Override
protected void reduceLastBatch() throws Exception {
for (StreamBatch window : streamWindows.values()) {
window.reduceLastBatch();
}
}
protected class GroupedStreamWindow extends StreamWindow {
private static final long serialVersionUID = 1L;
public GroupedStreamWindow() {
super();
}
@Override
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
addToAllBuffers();
if (batchEnd()) {
reduceAllWindows();
}
}
currentMiniBatchCount = this.minibatchCounter;
}
@Override
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
return true;
}
return false;
}
}
}
......@@ -21,9 +21,10 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
......@@ -69,7 +70,7 @@ import org.slf4j.LoggerFactory;
* @param <IN>
* The type of input elements handled by this operator invokable.
*/
public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
/**
* Auto-generated serial version UID
......@@ -84,7 +85,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
private Map<Object, WindowingInvokable<IN>> windowingGroups = new HashMap<Object, WindowingInvokable<IN>>();
private Map<Object, WindowingInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowingInvokable<IN, OUT>>();
private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
......@@ -117,7 +118,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
* that it forwards/distributed calls all groups.
*
* @param userFunction
* The user defined {@link ReduceFunction}.
* The user defined function.
* @param keySelector
* A key selector to extract the key for the groups from the
* input data.
......@@ -137,7 +138,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
* If only one element is contained a group, this element itself
* is returned as aggregated result.)
*/
public GroupedWindowingInvokable(ReduceFunction<IN> userFunction,
public GroupedWindowingInvokable(Function userFunction,
KeySelector<IN, ?> keySelector,
LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
......@@ -165,8 +166,8 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
// Continuously run
while (reuse != null) {
WindowingInvokable<IN> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
.getObject()));
WindowingInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
.getKey(reuse.getObject()));
if (groupInvokable == null) {
groupInvokable = makeNewGroup(reuse);
}
......@@ -175,7 +176,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
IN[] result = trigger.preNotifyTrigger(reuse.getObject());
for (IN in : result) {
for (WindowingInvokable<IN> group : windowingGroups.values()) {
for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
}
}
......@@ -193,7 +194,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
groupInvokable.processRealElement(reuse.getObject());
} else {
// call user function for all groups
for (WindowingInvokable<IN> group : windowingGroups.values()) {
for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
if (group == groupInvokable) {
// process real with initialized policies
group.processRealElement(reuse.getObject(), currentTriggerPolicies);
......@@ -219,7 +220,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
}
// finally trigger the buffer.
for (WindowingInvokable<IN> group : windowingGroups.values()) {
for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.emitFinalWindow(centralTriggerPolicies);
}
......@@ -239,7 +240,8 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
* {@link KeySelector#getKey(Object)}, the exception is not
* catched by this method.
*/
private WindowingInvokable<IN> makeNewGroup(StreamRecord<IN> element) throws Exception {
@SuppressWarnings("unchecked")
private WindowingInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception {
// clone the policies
LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
......@@ -250,10 +252,17 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
clonedDistributedEvictionPolicies.add(eviction.clone());
}
@SuppressWarnings("unchecked")
WindowingInvokable<IN> groupInvokable = new WindowingInvokable<IN>(
(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
clonedDistributedEvictionPolicies);
WindowingInvokable<IN, OUT> groupInvokable;
if (userFunction instanceof ReduceFunction) {
groupInvokable = (WindowingInvokable<IN, OUT>) new WindowingReduceInvokable<IN>(
(ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies,
clonedDistributedEvictionPolicies);
} else {
groupInvokable = new WindowingGroupInvokable<IN, OUT>(
(GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies,
clonedDistributedEvictionPolicies);
}
groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
groupInvokable.open(this.parameters);
windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
......@@ -305,7 +314,7 @@ public class GroupedWindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN
@Override
public void sendFakeElement(IN datapoint) {
for (WindowingInvokable<IN> group : windowingGroups.values()) {
for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private long startTime;
protected long nextRecordTime;
protected TimeStamp<IN> timestamp;
protected StreamWindow window;
public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
long slideInterval, TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval);
this.timestamp = timestamp;
this.startTime = timestamp.getStartTime();
this.window = new StreamWindow();
this.batch = this.window;
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
if (timestamp instanceof DefaultTimeStamp) {
(new TimeCheck()).start();
}
}
protected class StreamWindow extends StreamBatch {
private static final long serialVersionUID = 1L;
public StreamWindow() {
super();
}
@Override
public void addToBuffer(IN nextValue) throws Exception {
checkWindowEnd(timestamp.getTimestamp(nextValue));
if (minibatchCounter >= 0) {
circularList.add(nextValue);
}
}
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
circularList.newSlide();
minibatchCounter++;
if (batchEnd()) {
reduceBatch();
circularList.shiftWindow(batchPerSlide);
}
}
}
@Override
protected boolean miniBatchEnd() {
if (nextRecordTime < startTime + granularity) {
return false;
} else {
startTime += granularity;
return true;
}
}
}
private class TimeCheck extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(slideSize);
} catch (InterruptedException e) {
}
if (isRunning) {
window.checkWindowEnd(System.currentTimeMillis());
} else {
break;
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
private static final long serialVersionUID = 1L;
protected long startTime;
protected long nextRecordTime;
protected TimeStamp<OUT> timestamp;
protected StreamWindow window;
public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
long slideInterval, TimeStamp<OUT> timestamp) {
super(reduceFunction, windowSize, slideInterval);
this.timestamp = timestamp;
this.startTime = timestamp.getStartTime();
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.window = new StreamWindow();
this.batch = this.window;
if (timestamp instanceof DefaultTimeStamp) {
(new TimeCheck()).start();
}
}
protected class StreamWindow extends StreamBatch {
private static final long serialVersionUID = 1L;
public StreamWindow() {
super();
}
@Override
public void reduceToBuffer(OUT nextValue) throws Exception {
checkWindowEnd(timestamp.getTimestamp(nextValue));
if (currentValue != null) {
currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
} else {
currentValue = nextValue;
}
}
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
addToBuffer();
if (batchEnd()) {
reduceBatch();
}
}
}
@Override
public void reduceBatch() {
reduce(this);
}
@Override
protected boolean miniBatchEnd() {
if (nextRecordTime < startTime + granularity) {
return false;
} else {
startTime += granularity;
return true;
}
}
@Override
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
}
private class TimeCheck extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(slideSize);
} catch (InterruptedException e) {
}
if (isRunning) {
window.checkWindowEnd(System.currentTimeMillis());
} else {
break;
}
}
}
}
}
......@@ -17,45 +17,27 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
public class GroupedBatchGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
public class WindowingGroupInvokable<IN, OUT> extends WindowingInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
GroupReduceFunction<IN, OUT> reducer;
Map<Object, StreamBatch> streamBatches;
KeySelector<IN, ?> keySelector;
public GroupedBatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
long slideSize, KeySelector<IN, ?> keySelector) {
super(reduceFunction, batchSize, slideSize);
this.keySelector = keySelector;
this.streamBatches = new HashMap<Object, StreamBatch>();
}
@Override
protected StreamBatch getBatch(StreamRecord<IN> next) {
Object key = next.getKey(keySelector);
StreamBatch batch = streamBatches.get(key);
if(batch == null){
batch=new StreamBatch();
streamBatches.put(key, batch);
}
return batch;
public WindowingGroupInvokable(GroupReduceFunction<IN, OUT> userFunction,
LinkedList<TriggerPolicy<IN>> triggerPolicies,
LinkedList<EvictionPolicy<IN>> evictionPolicies) {
super(userFunction, triggerPolicies, evictionPolicies);
this.reducer = userFunction;
}
@Override
protected void reduceLastBatch() {
for(StreamBatch batch: streamBatches.values()){
batch.reduceLastBatch();
}
protected void callUserFunction() throws Exception {
reducer.reduce(buffer, collector);
}
}
......@@ -17,8 +17,12 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
......@@ -28,12 +32,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
/**
* Auto-generated serial version UID
......@@ -47,9 +46,8 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
private LinkedList<IN> buffer = new LinkedList<IN>();
protected LinkedList<IN> buffer = new LinkedList<IN>();
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
private ReduceFunction<IN> reducer;
/**
* This constructor created a windowing invokable using trigger and eviction
......@@ -64,12 +62,10 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* A list of {@link EvictionPolicy}s and/or
* {@link ActiveEvictionPolicy}s
*/
public WindowingInvokable(ReduceFunction<IN> userFunction,
LinkedList<TriggerPolicy<IN>> triggerPolicies,
public WindowingInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies,
LinkedList<EvictionPolicy<IN>> evictionPolicies) {
super(userFunction);
this.reducer = userFunction;
this.triggerPolicies = triggerPolicies;
this.evictionPolicies = evictionPolicies;
......@@ -369,28 +365,4 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
}
}
@Override
protected void callUserFunction() throws Exception {
Iterator<IN> reducedIterator = buffer.iterator();
IN reduced = null;
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
while (reducedIterator.hasNext()) {
IN next = reducedIterator.next();
if (next != null) {
reduced = reducer.reduce(reduced, next);
}
}
if (reduced != null) {
String[] tmp = new String[currentTriggerPolicies.size()];
for (int i = 0; i < tmp.length; i++) {
tmp[i] = currentTriggerPolicies.get(i).toString();
}
collector.collect(new Tuple2<IN, String[]>(reduced, tmp));
}
}
}
......@@ -17,42 +17,43 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
public class WindowingReduceInvokable<IN> extends WindowingInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
KeySelector<OUT, ?> keySelector;
Map<Object, StreamBatch> streamBatches;
public GroupedBatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize,
long slideSize, KeySelector<OUT, ?> keySelector) {
super(reduceFunction, batchSize, slideSize);
this.keySelector = keySelector;
this.streamBatches = new HashMap<Object, StreamBatch>();
ReduceFunction<IN> reducer;
public WindowingReduceInvokable(ReduceFunction<IN> userFunction,
LinkedList<TriggerPolicy<IN>> triggerPolicies,
LinkedList<EvictionPolicy<IN>> evictionPolicies) {
super(userFunction, triggerPolicies, evictionPolicies);
this.reducer = userFunction;
}
@Override
protected StreamBatch getBatch(StreamRecord<OUT> next) {
Object key = next.getKey(keySelector);
StreamBatch batch = streamBatches.get(key);
if (batch == null) {
batch = new StreamBatch();
streamBatches.put(key, batch);
protected void callUserFunction() throws Exception {
Iterator<IN> reducedIterator = buffer.iterator();
IN reduced = null;
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
return batch;
}
@Override
protected void reduceLastBatch() throws Exception {
for (StreamBatch batch : streamBatches.values()) {
batch.reduceLastBatch();
while (reducedIterator.hasNext()) {
IN next = reducedIterator.next();
if (next != null) {
reduced = reducer.reduce(reduced, next);
}
}
if (reduced != null) {
collector.collect(reduced);
}
}
}
......@@ -23,21 +23,24 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
/**
* Represents a count based trigger or eviction policy.
* Use the {@link Count#of(int)} to get an instance.
* Represents a count based trigger or eviction policy. Use the
* {@link Count#of(int)} to get an instance.
*/
@SuppressWarnings("rawtypes")
public class Count implements WindowingHelper {
private int count;
private int deleteOnEviction = 1;
private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
/**
* Specifies on which element a trigger or an eviction should happen (based
* on the count of the elements).
*
* This constructor does exactly the same as {@link Count#of(int)}.
* This constructor does exactly the same as {@link Count#of(int)}.
*
* @param count the number of elements to count before trigger/evict
* @param count
* the number of elements to count before trigger/evict
*/
public Count(int count) {
this.count = count;
......@@ -45,12 +48,22 @@ public class Count implements WindowingHelper {
@Override
public EvictionPolicy<?> toEvict() {
return new CountEvictionPolicy(count);
return new CountEvictionPolicy(count, deleteOnEviction);
}
@Override
public TriggerPolicy<?> toTrigger() {
return new CountTriggerPolicy(count);
return new CountTriggerPolicy(count, startValue);
}
public Count withDelete(int deleteOnEviction) {
this.deleteOnEviction = deleteOnEviction;
return this;
}
public Count startingAt(int startValue) {
this.startValue = startValue;
return this;
}
/**
......
......@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.windowing.helper;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
/**
......@@ -39,7 +40,9 @@ public class Time<DATA> implements WindowingHelper<DATA> {
private int timeVal;
private TimeUnit granularity;
private Extractor<Long, DATA> timeToData;
private Extractor<Long, DATA> longToDATAExtractor;
private TimeStamp<DATA> timeStamp;
private long delay;
/**
* Creates an helper representing a trigger which triggers every given
......@@ -47,50 +50,29 @@ public class Time<DATA> implements WindowingHelper<DATA> {
*
* @param timeVal
* The number of time units
* @param granularity
* @param timeUnit
* The unit of time such as minute oder millisecond. Note that
* the smallest possible granularity is milliseconds. Any smaller
* time unit might cause an error at runtime due to conversion
* problems.
* @param timeToData
* This policy creates fake elements to not miss windows in case
* no element arrived within the duration of the window. This
* extractor should wrap a long into such an element of type
* DATA.
*/
public Time(int timeVal, TimeUnit granularity, Extractor<Long, DATA> timeToData) {
private Time(int timeVal, TimeUnit timeUnit) {
this.timeVal = timeVal;
this.granularity = granularity;
this.timeToData = timeToData;
}
/**
* Creates an helper representing a trigger which triggers every given
* timeVal or an eviction which evicts all elements older than timeVal.
*
* The default granularity for timeVal used in this method is seconds.
*
* @param timeVal
* The number of time units measured in seconds.
* @param timeToData
* This policy creates fake elements to not miss windows in case
* no element arrived within the duration of the window. This
* extractor should wrap a long into such an element of type
* DATA.
*/
public Time(int timeVal, Extractor<Long, DATA> timeToData) {
this(timeVal, TimeUnit.SECONDS, timeToData);
this.granularity = timeUnit;
this.longToDATAExtractor = new NullExtractor<DATA>();
this.timeStamp = new DefaultTimeStamp<DATA>();
this.delay = 0;
}
@Override
public EvictionPolicy<DATA> toEvict() {
return new TimeEvictionPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>());
return new TimeEvictionPolicy<DATA>(granularityInMillis(), timeStamp);
}
@Override
public TriggerPolicy<DATA> toTrigger() {
return new TimeTriggerPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>(),
timeToData);
return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay,
longToDATAExtractor);
}
/**
......@@ -104,22 +86,39 @@ public class Time<DATA> implements WindowingHelper<DATA> {
* the smallest possible granularity is milliseconds. Any smaller
* time unit might cause an error at runtime due to conversion
* problems.
* @param timeToData
* This policy creates fake elements to not miss windows in case
* no element arrived within the duration of the window. This
* extractor should wrap a long into such an element of type
* DATA.
* @return an helper representing a trigger which triggers every given
* timeVal or an eviction which evicts all elements older than
* timeVal.
*/
public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity,
Extractor<Long, DATA> timeToData) {
return new Time<DATA>(timeVal, granularity, timeToData);
public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity) {
return new Time<DATA>(timeVal, granularity);
}
@SuppressWarnings("unchecked")
public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp, Extractor<Long, R> extractor) {
this.timeStamp = (TimeStamp<DATA>) timeStamp;
this.longToDATAExtractor = (Extractor<Long, DATA>) extractor;
return (Time<R>) this;
}
public Time<DATA> withDelay(long delay) {
this.delay = delay;
return this;
}
private long granularityInMillis() {
return this.granularity.toMillis(this.timeVal);
}
public static class NullExtractor<T> implements Extractor<Long, T> {
private static final long serialVersionUID = 1L;
@Override
public T extract(Long in) {
return null;
}
}
}
......@@ -30,7 +30,7 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
*/
private static final long serialVersionUID = -6357200688886103968L;
private static final int DEFAULT_START_VALUE = 0;
public static final int DEFAULT_START_VALUE = 0;
private int counter;
private int max;
......
......@@ -87,10 +87,10 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
// delete and count expired tuples
int counter = 0;
long threshold = timestamp.getTimestamp(datapoint) - granularity;
while (!buffer.isEmpty()) {
if (timestamp.getTimestamp(buffer.getFirst()) < timestamp.getTimestamp(datapoint)
- granularity) {
if (timestamp.getTimestamp(buffer.getFirst()) < threshold) {
buffer.removeFirst();
counter++;
} else {
......
......@@ -41,9 +41,11 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
*/
private static final long serialVersionUID = -5122753802440196719L;
private long startTime;
private long granularity;
private TimeStamp<DATA> timestamp;
protected long startTime;
protected long granularity;
protected TimeStamp<DATA> timestamp;
protected long delay;
private Extractor<Long, DATA> longToDATAExtractor;
/**
......@@ -54,8 +56,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* example, the policy will trigger at every second point in time.
*
* @param granularity
* The granularity of the trigger. If this value is set to 2 the
* policy will trigger at every second time point
* The granularity of the trigger. If this value is set to x the
* policy will trigger at every x-th time point
* @param timestamp
* The {@link TimeStamp} to measure the time with. This can be
* either user defined of provided by the API.
......@@ -72,10 +74,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
/**
* This is mostly the same as
* {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
* to granularity and timestamp a delay can be specified for the first
* trigger. If the start time given by the timestamp is x, the delay is y,
* and the granularity is z, the first trigger will happen at x+y+z.
* {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In
* addition to granularity and timestamp a delay can be specified for the
* first trigger. If the start time given by the timestamp is x, the delay
* is y, and the granularity is z, the first trigger will happen at x+y+z.
*
* @param granularity
* The granularity of the trigger. If this value is set to 2 the
......@@ -98,21 +100,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
this.startTime = timestamp.getStartTime() + delay;
this.timestamp = timestamp;
this.granularity = granularity;
this.delay = delay;
this.longToDATAExtractor = timeWrapper;
}
@Override
public synchronized boolean notifyTrigger(DATA datapoint) {
long recordTime = timestamp.getTimestamp(datapoint);
// start time is included, but end time is excluded: >=
if (recordTime >= startTime + granularity) {
if (granularity != 0) {
startTime = recordTime - ((recordTime - startTime) % granularity);
}
return true;
} else {
return false;
}
}
/**
......@@ -126,7 +116,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
// check if there is more then one window border missed
// use > here. In case >= would fit, the regular call will do the job.
while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
fakeElements.add(longToDATAExtractor.extract(startTime += granularity));
startTime += granularity;
fakeElements.add(longToDATAExtractor.extract(startTime));
}
return (DATA[]) fakeElements.toArray();
}
......@@ -162,7 +153,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
if (System.currentTimeMillis() >= startTime + granularity) {
startTime += granularity;
callback.sendFakeElement(longToDATAExtractor.extract(startTime += granularity));
callback.sendFakeElement(longToDATAExtractor.extract(startTime));
}
}
......@@ -191,9 +182,23 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
}
}
@Override
public synchronized boolean notifyTrigger(DATA datapoint) {
long recordTime = timestamp.getTimestamp(datapoint);
// start time is included, but end time is excluded: >=
if (recordTime >= startTime + granularity) {
if (granularity != 0) {
startTime = recordTime - ((recordTime - startTime) % granularity);
}
return true;
} else {
return false;
}
}
@Override
public TimeTriggerPolicy<DATA> clone() {
return new TimeTriggerPolicy<DATA>(granularity, timestamp, 0, longToDATAExtractor);
return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay, longToDATAExtractor);
}
}
......@@ -17,13 +17,25 @@
package org.apache.flink.streaming.api;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class PrintTest{
public class PrintTest implements Serializable {
private static final long MEMORYSIZE = 32;
......@@ -44,12 +56,52 @@ public class PrintTest{
return true;
}
}
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String, Integer>>();
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)
.window(Time.of(2, TimeUnit.MILLISECONDS).withTimeStamp(new TimeStamp<Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value;
}
@Override
public long getStartTime() {
return 1;
}
}, new Extractor<Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer extract(Long in) {
return in.intValue();
}
})).every(Count.of(2)).reduceGroup(new GroupReduceFunction<Integer, String>() {
@Override
public void reduce(Iterable<Integer> values, Collector<String> out)
throws Exception {
String o = "|";
for (Integer v : values) {
o = o + v + "|";
}
out.collect(o);
}
}).print();
env.executeTest(MEMORYSIZE);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class BatchGroupReduceTest {
public static final class MySlidingBatchReduce implements GroupReduceFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
for (Integer value : values) {
out.collect(value.toString());
}
out.collect(END_OF_BATCH);
}
}
private final static String END_OF_BATCH = "end of batch";
private final static int SLIDING_BATCH_SIZE = 3;
private final static int SLIDE_SIZE = 2;
@Test
public void slidingBatchReduceTest() {
BatchGroupReduceInvokable<Integer, String> invokable = new BatchGroupReduceInvokable<Integer, String>(
new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE);
List<String> expected = Arrays.asList("1", "2", "3", END_OF_BATCH, "3", "4", "5",
END_OF_BATCH, "5", "6", "7", END_OF_BATCH);
List<String> actual = MockInvokable.createAndExecute(invokable,
Arrays.asList(1, 2, 3, 4, 5, 6, 7));
assertEquals(expected, actual);
}
public static final class MyBatchReduce implements GroupReduceFunction<Double, Double> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception {
Double sum = 0.;
Double count = 0.;
for (Double value : values) {
sum += value;
count++;
}
if (count > 0) {
out.collect(Double.valueOf(sum / count));
}
}
}
private static final int BATCH_SIZE = 5;
@Test
public void nonSlidingBatchReduceTest() {
List<Double> inputs = new ArrayList<Double>();
for (Double i = 1.; i <= 100; i++) {
inputs.add(i);
}
BatchGroupReduceInvokable<Double, Double> invokable = new BatchGroupReduceInvokable<Double, Double>(
new MyBatchReduce(), BATCH_SIZE, BATCH_SIZE);
List<Double> avgs = MockInvokable.createAndExecute(invokable, inputs);
for (int i = 0; i < avgs.size(); i++) {
assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;
public class BatchReduceTest {
@Test
public void BatchReduceInvokableTest() {
List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
inputs.add(i);
}
BatchReduceInvokable<Integer> invokable = new BatchReduceInvokable<Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}, 3, 2);
List<Integer> expected = new ArrayList<Integer>();
expected.add(6);
expected.add(12);
expected.add(18);
expected.add(24);
expected.add(19);
assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
List<Integer> inputs2 = new ArrayList<Integer>();
inputs2.add(1);
inputs2.add(2);
inputs2.add(-1);
inputs2.add(-3);
inputs2.add(-4);
BatchReduceInvokable<Integer> invokable2 = new BatchReduceInvokable<Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
if (value1 <= value2) {
return value1;
} else {
return value2;
}
}
}, 2, 3);
List<Integer> expected2 = new ArrayList<Integer>();
expected2.add(1);
expected2.add(-4);
assertEquals(expected2, MockInvokable.createAndExecute(invokable2, inputs2));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.streaming.util.keys.ObjectKeySelector;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class GroupedBatchGroupReduceTest {
public static final class MySlidingBatchReduce1 implements GroupReduceFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
for (Integer value : values) {
out.collect(value.toString());
}
out.collect(END_OF_GROUP);
}
}
public static final class MySlidingBatchReduce2 extends
RichGroupReduceFunction<Tuple2<Integer, String>, String> {
private static final long serialVersionUID = 1L;
String openString;
@Override
public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<String> out)
throws Exception {
out.collect(openString);
for (Tuple2<Integer, String> value : values) {
out.collect(value.f0.toString());
}
out.collect(END_OF_GROUP);
}
@Override
public void open(Configuration c) {
openString = "open";
}
}
private final static String END_OF_GROUP = "end of group";
@SuppressWarnings("unchecked")
@Test
public void slidingBatchGroupReduceTest() {
@SuppressWarnings("rawtypes")
GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>(
new MySlidingBatchReduce1(), 2, 2, new ObjectKeySelector());
List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "3", "3", END_OF_GROUP, "2",
END_OF_GROUP);
List<String> actual = MockInvokable.createAndExecute(invokable1,
Arrays.asList(1, 1, 2, 3, 3));
assertEquals(expected, actual);
GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
new MySlidingBatchReduce2(), 2, 2, new TupleKeySelector<Tuple2<Integer, String>>(1));
expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", "3", END_OF_GROUP,
"open", "4", END_OF_GROUP);
actual = MockInvokable.createAndExecute(invokable2, Arrays.asList(
new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "a"),
new Tuple2<Integer, String>(3, "b"), new Tuple2<Integer, String>(3, "b"),
new Tuple2<Integer, String>(4, "a")));
assertEquals(expected, actual);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.streaming.util.keys.ObjectKeySelector;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class GroupedBatchReduceTest {
@Test
public void BatchReduceInvokableTest() {
List<Integer> inputs = new ArrayList<Integer>();
inputs.add(1);
inputs.add(1);
inputs.add(5);
inputs.add(5);
inputs.add(5);
inputs.add(1);
inputs.add(1);
inputs.add(5);
inputs.add(1);
inputs.add(5);
List<Integer> expected = new ArrayList<Integer>();
expected.add(15);
expected.add(3);
expected.add(3);
expected.add(15);
GroupedBatchReduceInvokable<Integer> invokable = new GroupedBatchReduceInvokable<Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}, 3, 2, new ObjectKeySelector<Integer>());
List<Integer> actual = MockInvokable.createAndExecute(invokable, inputs);
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
assertEquals(expected.size(), actual.size());
List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
inputs2.add(new Tuple2<Integer, String>(1, "a"));
inputs2.add(new Tuple2<Integer, String>(0, "b"));
inputs2.add(new Tuple2<Integer, String>(2, "a"));
inputs2.add(new Tuple2<Integer, String>(-1, "a"));
inputs2.add(new Tuple2<Integer, String>(-2, "a"));
inputs2.add(new Tuple2<Integer, String>(10, "a"));
inputs2.add(new Tuple2<Integer, String>(2, "b"));
inputs2.add(new Tuple2<Integer, String>(1, "a"));
List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
expected2.add(new Tuple2<Integer, String>(-1, "a"));
expected2.add(new Tuple2<Integer, String>(-2, "a"));
expected2.add(new Tuple2<Integer, String>(0, "b"));
GroupedBatchReduceInvokable<Tuple2<Integer, String>> invokable2 = new GroupedBatchReduceInvokable<Tuple2<Integer, String>>(
new ReduceFunction<Tuple2<Integer, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
Tuple2<Integer, String> value2) throws Exception {
if (value1.f0 <= value2.f0) {
return value1;
} else {
return value2;
}
}
}, 3, 3, new TupleKeySelector<Tuple2<Integer, String>>(1));
List<Tuple2<Integer, String>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
new HashSet<Tuple2<Integer, String>>(actual2));
assertEquals(expected2.size(), actual2.size());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class GroupedWindowGroupReduceInvokableTest {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void windowReduceTest() {
List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
inputs2.add(new Tuple2<String, Integer>("a", 1));
inputs2.add(new Tuple2<String, Integer>("a", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 5));
inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10));
// 1,2-4,5-7,8-10
List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
expected2.add(new Tuple2<String, Integer>("a", 3));
expected2.add(new Tuple2<String, Integer>("b", 4));
expected2.add(new Tuple2<String, Integer>("b", 5));
expected2.add(new Tuple2<String, Integer>("a", 7));
expected2.add(new Tuple2<String, Integer>("b", 10));
GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>> invokable2 = new GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>>(
new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>("", 0);
for (@SuppressWarnings("unused")
Tuple2<String, Integer> value : values) {
}
for (Tuple2<String, Integer> value : values) {
outTuple.f0 = value.f0;
outTuple.f1 += value.f1;
}
out.collect(outTuple);
}
}, 2, 3, new TupleKeySelector( 0),
new TimeStamp<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<String, Integer> value) {
return value.f1;
}
@Override
public long getStartTime() {
return 1;
}
});
List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
new HashSet<Tuple2<String, Integer>>(actual2));
assertEquals(expected2.size(), actual2.size());
}
}
......@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
......@@ -30,12 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.MockInvokable;
......@@ -78,7 +78,7 @@ public class GroupedWindowingInvokableTest {
LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
......@@ -95,11 +95,11 @@ public class GroupedWindowingInvokableTest {
}
}, triggers, evictions, centralTriggers);
List<Tuple2<Integer, String[]>> result = MockInvokable.createAndExecute(invokable, inputs);
List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
List<Integer> actual = new LinkedList<Integer>();
for (Tuple2<Integer, String[]> current : result) {
actual.add(current.f0);
for (Integer current : result) {
actual.add(current);
}
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
......@@ -137,7 +137,7 @@ public class GroupedWindowingInvokableTest {
LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
GroupedWindowingInvokable<Tuple2<Integer, String>> invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>>(
GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
new ReduceFunction<Tuple2<Integer, String>>() {
private static final long serialVersionUID = 1L;
......@@ -153,12 +153,11 @@ public class GroupedWindowingInvokableTest {
}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
centralTriggers);
List<Tuple2<Tuple2<Integer, String>, String[]>> result = MockInvokable.createAndExecute(
invokable2, inputs2);
List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
for (Tuple2<Tuple2<Integer, String>, String[]> current : result) {
actual2.add(current.f0);
for (Tuple2<Integer, String> current : result) {
actual2.add(current);
}
assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
......@@ -266,14 +265,13 @@ public class GroupedWindowingInvokableTest {
LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
GroupedWindowingInvokable<Tuple2<Integer, String>> invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>>(
GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
distributedTriggers, evictions, triggers);
ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
for (Tuple2<Tuple2<Integer, String>, String[]> t : MockInvokable.createAndExecute(
invokable, inputs)) {
result.add(t.f0);
for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
......@@ -330,7 +328,7 @@ public class GroupedWindowingInvokableTest {
}
};
GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
myReduceFunction, new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
......@@ -341,8 +339,8 @@ public class GroupedWindowingInvokableTest {
}, distributedTriggers, evictions, triggers);
ArrayList<Integer> result = new ArrayList<Integer>();
for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t.f0);
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
......@@ -350,8 +348,8 @@ public class GroupedWindowingInvokableTest {
}
/**
* Test for combination of centralized trigger and
* distributed trigger at the same time
* Test for combination of centralized trigger and distributed trigger at
* the same time
*/
@Test
public void testGroupedWindowingInvokableCentralAndDistrTrigger() {
......@@ -406,7 +404,7 @@ public class GroupedWindowingInvokableTest {
}
};
GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
myReduceFunction, new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
......@@ -417,8 +415,8 @@ public class GroupedWindowingInvokableTest {
}, distributedTriggers, evictions, triggers);
ArrayList<Integer> result = new ArrayList<Integer>();
for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t.f0);
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;
public class WindowGroupReduceInvokableTest {
public static final class MySlidingWindowReduce implements GroupReduceFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
for (Integer value : values) {
out.collect(value.toString());
}
out.collect(EOW);
}
}
public static final class MyTimestamp implements TimeStamp<Integer> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
private long start;
public MyTimestamp(List<Long> timestamps) {
this.timestamps = timestamps.iterator();
this.start = timestamps.get(0);
}
@Override
public long getTimestamp(Integer value) {
long ts = timestamps.next();
return ts;
}
@Override
public long getStartTime() {
return start;
}
}
private final static String EOW = "|";
private static List<WindowGroupReduceInvokable<Integer, String>> invokables = new ArrayList<WindowGroupReduceInvokable<Integer, String>>();
private static List<List<String>> expectedResults = new ArrayList<List<String>>();
@Before
public void before() {
long windowSize = 3;
long slideSize = 2;
List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L,
110L);
expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7",
EOW, "7", "8", "9", EOW, "9", "10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));
windowSize = 10;
slideSize = 5;
timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", "6", EOW, "3",
"4", "5", "6", EOW, "7", EOW, "7",
"8", "9", EOW, "8", "9", "10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));
windowSize = 10;
slideSize = 4;
timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
expectedResults.add(Arrays.asList("1", "2","3" ,EOW, "3", "4", "5","6", EOW, "3", "4", "5", "6",
EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8","9",
"10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));
}
@Test
public void slidingBatchReduceTest() {
List<List<String>> actualResults = new ArrayList<List<String>>();
for (WindowGroupReduceInvokable<Integer, String> invokable : invokables) {
List<String> result = MockInvokable.createAndExecute(invokable,
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
actualResults.add(result);
}
Iterator<List<String>> actualResult = actualResults.iterator();
for (List<String> expectedResult : expectedResults) {
assertEquals(expectedResult, actualResult.next());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
public class WindowReduceInvokableTest {
@Test
public void windowReduceTest() {
List<Integer> inputs = new ArrayList<Integer>();
inputs.add(1);
inputs.add(2);
inputs.add(2);
inputs.add(3);
inputs.add(4);
inputs.add(5);
inputs.add(10);
inputs.add(11);
inputs.add(11);
// 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
// 12-12-5-10-32
List<Integer> expected = new ArrayList<Integer>();
expected.add(12);
expected.add(12);
expected.add(5);
expected.add(10);
expected.add(32);
WindowReduceInvokable<Integer> invokable = new WindowReduceInvokable<Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}, 4, 2, new TimeStamp<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value;
}
@Override
public long getStartTime() {
return 1;
}
});
assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
inputs2.add(new Tuple2<String, Integer>("a", 1));
inputs2.add(new Tuple2<String, Integer>("a", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 5));
inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10));
List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
expected2.add(new Tuple2<String, Integer>("a", 3));
expected2.add(new Tuple2<String, Integer>("b", 4));
expected2.add(new Tuple2<String, Integer>("b", 5));
expected2.add(new Tuple2<String, Integer>("a", 7));
expected2.add(new Tuple2<String, Integer>("b", 10));
@SuppressWarnings({ "unchecked", "rawtypes" })
GroupedWindowReduceInvokable<Tuple2<String, Integer>> invokable2 = new GroupedWindowReduceInvokable<Tuple2<String, Integer>>(
new ReduceFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
}
}, 2, 3, new TupleKeySelector(0), new TimeStamp<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<String, Integer> value) {
return value.f1;
}
@Override
public long getStartTime() {
return 1;
}
});
List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
new HashSet<Tuple2<String, Integer>>(actual2));
assertEquals(expected2.size(), actual2.size());
}
}
......@@ -17,21 +17,20 @@
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;
......@@ -92,6 +91,7 @@ public class WindowingInvokableTest {
// trigger after 4, then every 2)
triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L,
new Extractor<Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
......@@ -103,12 +103,12 @@ public class WindowingInvokableTest {
// Always delete all elements older then 4
evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
triggers, evictions);
WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
myReduceFunction, triggers, evictions);
ArrayList<Integer> result = new ArrayList<Integer>();
for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t.f0);
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(expected, result);
......@@ -147,8 +147,8 @@ public class WindowingInvokableTest {
// time after the 3rd element
evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
triggers, evictions);
WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
myReduceFunction, triggers, evictions);
List<Integer> expected = new ArrayList<Integer>();
expected.add(6);
......@@ -157,8 +157,8 @@ public class WindowingInvokableTest {
expected.add(24);
expected.add(19);
List<Integer> result = new ArrayList<Integer>();
for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t.f0);
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(expected, result);
......@@ -200,16 +200,16 @@ public class WindowingInvokableTest {
// time after on the 5th element
evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
WindowingInvokable<Integer> invokable2 = new WindowingInvokable<Integer>(myReduceFunction,
triggers, evictions);
WindowingInvokable<Integer, Integer> invokable2 = new WindowingReduceInvokable<Integer>(
myReduceFunction, triggers, evictions);
List<Integer> expected2 = new ArrayList<Integer>();
expected2.add(1);
expected2.add(-4);
result = new ArrayList<Integer>();
for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable2, inputs2)) {
result.add(t.f0);
for (Integer t : MockInvokable.createAndExecute(invokable2, inputs2)) {
result.add(t);
}
assertEquals(expected2, result);
......@@ -223,8 +223,8 @@ public class WindowingInvokableTest {
triggers.add(new CountTriggerPolicy<Integer>(3));
LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
evictions.add(new CountEvictionPolicy<Integer>(2,2));
evictions.add(new CountEvictionPolicy<Integer>(3,3));
evictions.add(new CountEvictionPolicy<Integer>(2, 2));
evictions.add(new CountEvictionPolicy<Integer>(3, 3));
List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
......@@ -258,12 +258,12 @@ public class WindowingInvokableTest {
}
};
WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
triggers, evictions);
WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
myReduceFunction, triggers, evictions);
ArrayList<Integer> result = new ArrayList<Integer>();
for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t.f0);
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(expected, result);
......
......@@ -17,12 +17,15 @@
package org.apache.flink.streaming.api.windowing.policy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.junit.Test;
import static org.junit.Assert.*;
public class TimeTriggerPolicyTest {
@Test
......@@ -49,15 +52,8 @@ public class TimeTriggerPolicyTest {
// test different granularity
for (long granularity = 0; granularity < 31; granularity++) {
// create policy
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp,
new Extractor<Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer extract(Long in) {
return in.intValue();
}
});
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
timeStamp, new Time.NullExtractor<Integer>());
// remember window border
// Remark: This might NOT work in case the timeStamp uses
......@@ -88,7 +84,7 @@ public class TimeTriggerPolicyTest {
@Test
public void timeTriggerPreNotifyTest() {
// create some test data
Integer[] times = { 1, 3, 20, 26};
Integer[] times = { 1, 3, 20, 26 };
// create a timestamp
@SuppressWarnings("serial")
......@@ -107,8 +103,9 @@ public class TimeTriggerPolicyTest {
};
// create policy
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp,
new Extractor<Long, Integer>() {
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
timeStamp, new Extractor<Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
......@@ -117,16 +114,16 @@ public class TimeTriggerPolicyTest {
}
});
//expected result
Integer[][] result={{},{},{5,10,15},{25}};
//call policy
for (int i=0;i<times.length;i++){
arrayEqualityCheck(result[i],policy.preNotifyTrigger(times[i]));
// expected result
Integer[][] result = { {}, {}, { 5, 10, 15 }, { 25 } };
// call policy
for (int i = 0; i < times.length; i++) {
arrayEqualityCheck(result[i], policy.preNotifyTrigger(times[i]));
policy.notifyTrigger(times[i]);
}
}
private void arrayEqualityCheck(Object[] array1, Object[] array2) {
assertEquals("The result arrays must have the same length", array1.length, array2.length);
for (int i = 0; i < array1.length; i++) {
......
......@@ -17,11 +17,14 @@
package org.apache.flink.streaming.examples.ml;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.util.Collector;
/**
......@@ -58,7 +61,8 @@ public class IncrementalLearningSkeleton {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// build new model on every second of new data
DataStream<Double[]> model = env.addSource(new TrainingDataSource()).window(5000)
DataStream<Double[]> model = env.addSource(new TrainingDataSource())
.window(Time.of(5000, TimeUnit.MILLISECONDS))
.reduceGroup(new PartialModelBuilder());
// use partial model for prediction
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.util.Collector;
/**
* A minimal example as introduction to the policy based windowing
*/
public class BasicExample {
private static final int PARALLELISM = 1;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
// This reduce function does a String concat.
ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
/**
* Auto generates version ID
*/
private static final long serialVersionUID = 1L;
@Override
public String reduce(String value1, String value2) throws Exception {
return value1 + "|" + value2;
}
};
DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource())
.window(Count.of(5)).every(Count.of(2)).reduce(reduceFunction);
stream.print();
env.execute();
}
public static class BasicSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
String str = new String("streaming");
@Override
public void invoke(Collector<String> out) throws Exception {
// continuous emit
while (true) {
out.collect(str);
}
}
}
}
......@@ -17,35 +17,27 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.util.Collector;
import java.util.LinkedList;
/**
* This example uses count based tumbling windowing with multiple eviction
* policies at the same time.
*/
public class MultiplePoliciesExample {
private static final int PARALLELISM = 1;
private static final int PARALLELISM = 2;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
LinkedList<TriggerPolicy<String>> policies = new LinkedList<TriggerPolicy<String>>();
policies.add(new CountTriggerPolicy<String>(5));
policies.add(new CountTriggerPolicy<String>(8));
// This reduce function does a String concat.
ReduceFunction<String> reducer = new ReduceFunction<String>() {
GroupReduceFunction<String, String> reducer = new GroupReduceFunction<String, String>() {
/**
* Auto generates version ID
......@@ -53,14 +45,21 @@ public class MultiplePoliciesExample {
private static final long serialVersionUID = 1L;
@Override
public String reduce(String value1, String value2) throws Exception {
return value1 + "|" + value2;
public void reduce(Iterable<String> values, Collector<String> out) throws Exception {
String output = "|";
for (String v : values) {
output = output + v + "|";
}
out.collect(output);
}
};
DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource()).window(
policies, reducer);
DataStream<String> stream = env.addSource(new BasicSource())
.groupBy(0)
.window(Count.of(2))
.every(Count.of(3), Count.of(5))
.reduceGroup(reducer);
stream.print();
......@@ -70,13 +69,16 @@ public class MultiplePoliciesExample {
public static class BasicSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
String str = new String("streaming");
String str1 = new String("streaming");
String str2 = new String("flink");
@Override
public void invoke(Collector<String> out) throws Exception {
// continuous emit
while (true) {
out.collect(str);
out.collect(str1);
out.collect(str2);
}
}
}
......
......@@ -18,14 +18,10 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.util.Collector;
/**
......@@ -45,19 +41,18 @@ public class SlidingExample {
* SIMPLE-EXAMPLE: Use this to always keep the newest 10 elements in the
* buffer Resulting windows will have an overlap of 5 elements
*/
// TriggerPolicy<String> triggerPolicy=new
// CountTriggerPolicy<String>(5);
// EvictionPolicy<String> evictionPolicy=new
// CountEvictionPolicy<String>(10);
// DataStream<String> stream = env.addSource(new CountingSource())
// .window(Count.of(10))
// .every(Count.of(5))
// .reduce(reduceFunction);
/*
* ADVANCED-EXAMPLE: Use this to have the last element of the last
* window as first element of the next window while the window size is
* always 5
*/
TriggerPolicy<String> triggerPolicy = new CountTriggerPolicy<String>(4, -1);
EvictionPolicy<String> evictionPolicy = new CountEvictionPolicy<String>(5, 4);
// This reduce function does a String concat.
ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
......@@ -73,8 +68,10 @@ public class SlidingExample {
};
DataStream<Tuple2<String, String[]>> stream = env.addSource(new CountingSource()).window(
triggerPolicy, evictionPolicy, reduceFunction);
DataStream<String> stream = env.addSource(new CountingSource())
.window(Count.of(5).withDelete(4))
.every(Count.of(4).startingAt(-1))
.reduce(reduceFunction);
stream.print();
......
......@@ -17,27 +17,26 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.util.Collector;
/**
* This example shows the functionality of time based windows. It utilizes the
* {@link ActiveTriggerPolicy} implementation in the {@link TimeTriggerPolicy}.
* {@link ActiveTriggerPolicy} implementation in the
* {@link ActiveTimeTriggerPolicy}.
*/
public class TimeWindowingExample {
private static final int PARALLELISM = 1;
private static final int PARALLELISM = 2;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
......@@ -46,38 +45,21 @@ public class TimeWindowingExample {
// Prevent output from being blocked
env.setBufferTimeout(100);
// Trigger every 1000ms
TriggerPolicy<Integer> triggerPolicy = new TimeTriggerPolicy<Integer>(1000L,
new DefaultTimeStamp<Integer>(), new Extractor<Long, Integer>() {
DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
.groupBy(new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer extract(Long in) {
return in.intValue();
public Integer getKey(Integer value) throws Exception {
if (value < 3) {
return 0;
} else {
return 1;
}
}
});
// Always keep the newest 100 elements in the buffer
EvictionPolicy<Integer> evictionPolicy = new CountEvictionPolicy<Integer>(100);
// This reduce function does a String concat.
ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() {
/**
* default version ID
*/
private static final long serialVersionUID = 1L;
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
};
DataStream<Tuple2<Integer, String[]>> stream = env.addSource(new CountingSourceWithSleep()).window(triggerPolicy, evictionPolicy, reduceFunction);
}).window(Count.of(100)).every(Time.of(1000, TimeUnit.MILLISECONDS)).sum();
stream.print();
......@@ -97,6 +79,7 @@ public class TimeWindowingExample {
@Override
public void invoke(Collector<Integer> collector) throws Exception {
Random rnd = new Random();
// continuous emit
while (true) {
if (counter > 9999) {
......@@ -105,7 +88,7 @@ public class TimeWindowingExample {
System.out.println("Source continouse with emitting now!");
counter = 0;
}
collector.collect(counter);
collector.collect(rnd.nextInt(9) + 1);
// Wait 0.001 sec. before the next emit. Otherwise the source is
// too fast for local tests and you might always see
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册