提交 56cb7937 编写于 作者: G Gyula Fora

[streaming] FullStream window helper added + partitioner bugfix

Closes #614
上级 579f991e
...@@ -634,6 +634,7 @@ Several predefined policies are provided in the API, including delta-based, coun ...@@ -634,6 +634,7 @@ Several predefined policies are provided in the API, including delta-based, coun
* `Time.of(…)` * `Time.of(…)`
* `Count.of(…)` * `Count.of(…)`
* `Delta.of(…)` * `Delta.of(…)`
* `FullStream.window()`
For detailed description of these policies please refer to the [Javadocs](http://flink.apache.org/docs/latest/api/java/). For detailed description of these policies please refer to the [Javadocs](http://flink.apache.org/docs/latest/api/java/).
...@@ -774,6 +775,9 @@ The above call would create global windows of 1000 elements group it by the firs ...@@ -774,6 +775,9 @@ The above call would create global windows of 1000 elements group it by the firs
Notice that here we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream. Notice that here we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the key idea is that each transformation still corresponds to the same 1000 elements in the original stream.
#### Periodic aggregations on the full stream history
Sometimes it is necessary to aggregate over all the previously seen data in the stream. For this purpose either use the `dataStream.window(FullStream.window()).every(trigger)` or equivalently `dataStream.every(trigger)`.
#### Global vs local discretisation #### Global vs local discretisation
By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a parallelism of 1 to be able to correctly execute the discretisation logic. By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a parallelism of 1 to be able to correctly execute the discretisation logic.
......
...@@ -55,9 +55,9 @@ import org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator; ...@@ -55,9 +55,9 @@ import org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator;
import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator; import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis; import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
...@@ -73,6 +73,7 @@ import org.apache.flink.streaming.api.operators.StreamReduce; ...@@ -73,6 +73,7 @@ import org.apache.flink.streaming.api.operators.StreamReduce;
import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.FullStream;
import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper; import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
...@@ -155,7 +156,7 @@ public class DataStream<OUT> { ...@@ -155,7 +156,7 @@ public class DataStream<OUT> {
this.id = dataStream.id; this.id = dataStream.id;
this.parallelism = dataStream.parallelism; this.parallelism = dataStream.parallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames); this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.partitioner = dataStream.partitioner; this.partitioner = dataStream.partitioner.copy();
this.streamGraph = dataStream.streamGraph; this.streamGraph = dataStream.streamGraph;
this.typeInfo = dataStream.typeInfo; this.typeInfo = dataStream.typeInfo;
this.mergedStreams = new ArrayList<DataStream<OUT>>(); this.mergedStreams = new ArrayList<DataStream<OUT>>();
...@@ -573,8 +574,8 @@ public class DataStream<OUT> { ...@@ -573,8 +574,8 @@ public class DataStream<OUT> {
TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(), TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), false); Utils.getCallLocationName(), false);
return transform("Fold", outType, new StreamFold<OUT, R>(clean(folder), return transform("Fold", outType, new StreamFold<OUT, R>(clean(folder), initialValue,
initialValue, outType)); outType));
} }
/** /**
...@@ -910,11 +911,11 @@ public class DataStream<OUT> { ...@@ -910,11 +911,11 @@ public class DataStream<OUT> {
* transformation like {@link WindowedDataStream#reduceWindow}, * transformation like {@link WindowedDataStream#reduceWindow},
* {@link WindowedDataStream#mapWindow} or aggregations on preset * {@link WindowedDataStream#mapWindow} or aggregations on preset
* chunks(windows) of the data stream. To define windows a * chunks(windows) of the data stream. To define windows a
* {@link WindowingHelper} such as {@link Time}, {@link Count} and * {@link WindowingHelper} such as {@link Time}, {@link Count},
* {@link Delta} can be used.</br></br> When applied to a grouped data * {@link Delta} and {@link FullStream} can be used.</br></br> When applied
* stream, the windows (evictions) and slide sizes (triggers) will be * to a grouped data stream, the windows (evictions) and slide sizes
* computed on a per group basis. </br></br> For more advanced control over * (triggers) will be computed on a per group basis. </br></br> For more
* the trigger and eviction policies please refer to * advanced control over the trigger and eviction policies please refer to
* {@link #window(trigger, eviction)} </br> </br> For example to create a * {@link #window(trigger, eviction)} </br> </br> For example to create a
* sum every 5 seconds in a tumbling fashion:</br> * sum every 5 seconds in a tumbling fashion:</br>
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
...@@ -927,7 +928,8 @@ public class DataStream<OUT> { ...@@ -927,7 +928,8 @@ public class DataStream<OUT> {
* *
* @param policyHelper * @param policyHelper
* Any {@link WindowingHelper} such as {@link Time}, * Any {@link WindowingHelper} such as {@link Time},
* {@link Count} and {@link Delta} to define the window size. * {@link Count}, {@link Delta} {@link FullStream} to define the
* window size.
* @return A {@link WindowedDataStream} providing further operations. * @return A {@link WindowedDataStream} providing further operations.
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
...@@ -955,6 +957,17 @@ public class DataStream<OUT> { ...@@ -955,6 +957,17 @@ public class DataStream<OUT> {
return new WindowedDataStream<OUT>(this, trigger, eviction); return new WindowedDataStream<OUT>(this, trigger, eviction);
} }
/**
* Create a {@link WindowedDataStream} on the full stream history, to
* produce periodic aggregates.
*
* @return A {@link WindowedDataStream} providing further operations.
*/
@SuppressWarnings("rawtypes")
public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
return window(FullStream.window()).every(policyHelper);
}
/** /**
* Writes a DataStream to the standard output stream (stdout).<br> * Writes a DataStream to the standard output stream (stdout).<br>
* For each element of the DataStream the result of * For each element of the DataStream the result of
...@@ -1266,8 +1279,7 @@ public class DataStream<OUT> { ...@@ -1266,8 +1279,7 @@ public class DataStream<OUT> {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(), DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(),
sinkOperator); sinkOperator);
streamGraph.addOperator(returnStream.getId(), sinkOperator, getType(), null, streamGraph.addOperator(returnStream.getId(), sinkOperator, getType(), null, "Stream Sink");
"Stream Sink");
this.connectGraph(this.copy(), returnStream.getId(), 0); this.connectGraph(this.copy(), returnStream.getId(), 0);
......
...@@ -34,9 +34,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; ...@@ -34,9 +34,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.RichWindowMapFunction; import org.apache.flink.streaming.api.functions.RichWindowMapFunction;
import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.windowing.GroupedActiveDiscretizer; import org.apache.flink.streaming.api.operators.windowing.GroupedActiveDiscretizer;
import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer; import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
...@@ -55,9 +55,14 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; ...@@ -55,9 +55,14 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer; import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator; import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
...@@ -65,10 +70,6 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedP ...@@ -65,10 +70,6 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedP
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer; import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
import org.apache.flink.streaming.util.keys.KeySelectorUtil; import org.apache.flink.streaming.util.keys.KeySelectorUtil;
...@@ -240,6 +241,9 @@ public class WindowedDataStream<OUT> { ...@@ -240,6 +241,9 @@ public class WindowedDataStream<OUT> {
* @return The discretised stream * @return The discretised stream
*/ */
public DataStream<StreamWindow<OUT>> getDiscretizedStream() { public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
if (getEviction() instanceof KeepAllEvictionPolicy) {
throw new RuntimeException("Cannot get discretized stream for full stream window");
}
return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>()) return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>())
.getDiscretizedStream(); .getDiscretizedStream();
} }
...@@ -347,7 +351,7 @@ public class WindowedDataStream<OUT> { ...@@ -347,7 +351,7 @@ public class WindowedDataStream<OUT> {
*/ */
public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) { public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)), return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction); getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
} }
/** /**
...@@ -372,7 +376,8 @@ public class WindowedDataStream<OUT> { ...@@ -372,7 +376,8 @@ public class WindowedDataStream<OUT> {
TypeInformation<R> outType) { TypeInformation<R> outType) {
return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction), return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
new BasicWindowBuffer<OUT>()).mapWindow(windowMapFunction, outType); getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction,
outType);
} }
private DiscretizedStream<OUT> discretize(WindowTransformation transformation, private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
...@@ -393,9 +398,7 @@ public class WindowedDataStream<OUT> { ...@@ -393,9 +398,7 @@ public class WindowedDataStream<OUT> {
.setParallelism(parallelism) .setParallelism(parallelism)
.transform(windowBuffer.getClass().getSimpleName(), .transform(windowBuffer.getClass().getSimpleName(),
new StreamWindowTypeInfo<OUT>(getType()), bufferOperator) new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
.setParallelism(parallelism), groupByKey, transformation, .setParallelism(parallelism), groupByKey, transformation, false);
WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
dataStream.getParallelism()));
} }
...@@ -497,14 +500,26 @@ public class WindowedDataStream<OUT> { ...@@ -497,14 +500,26 @@ public class WindowedDataStream<OUT> {
if (transformation == WindowTransformation.REDUCEWINDOW) { if (transformation == WindowTransformation.REDUCEWINDOW) {
if (WindowUtils.isTumblingPolicy(trigger, eviction)) { if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
if (eviction instanceof KeepAllEvictionPolicy) {
if (groupByKey == null) { if (groupByKey == null) {
return new TumblingPreReducer<OUT>( return new TumblingPreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), getType() (ReduceFunction<OUT>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig())); .createSerializer(getExecutionConfig())).noEvict();
} else { } else {
return new TumblingGroupedPreReducer<OUT>( return new TumblingGroupedPreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType() (ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
getType().createSerializer(getExecutionConfig())).noEvict();
}
} else {
if (groupByKey == null) {
return new TumblingPreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig())); .createSerializer(getExecutionConfig()));
} else {
return new TumblingGroupedPreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey,
getType().createSerializer(getExecutionConfig()));
}
} }
} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) { } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
if (groupByKey == null) { if (groupByKey == null) {
...@@ -564,8 +579,14 @@ public class WindowedDataStream<OUT> { ...@@ -564,8 +579,14 @@ public class WindowedDataStream<OUT> {
} }
} }
} }
if (eviction instanceof KeepAllEvictionPolicy) {
throw new RuntimeException(
"Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations");
} else {
return new BasicWindowBuffer<OUT>(); return new BasicWindowBuffer<OUT>();
} }
}
/** /**
* Applies an aggregation that sums every window of the data stream at the * Applies an aggregation that sums every window of the data stream at the
......
...@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; ...@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
...@@ -118,7 +119,7 @@ public class WindowUtils { ...@@ -118,7 +119,7 @@ public class WindowUtils {
} }
public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) { public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
if (eviction instanceof TumblingEvictionPolicy) { if (eviction instanceof TumblingEvictionPolicy || eviction instanceof KeepAllEvictionPolicy) {
return true; return true;
} else if (isTimeOnly(trigger, eviction)) { } else if (isTimeOnly(trigger, eviction)) {
long slide = getSlideSize(trigger); long slide = getSlideSize(trigger);
...@@ -140,7 +141,8 @@ public class WindowUtils { ...@@ -140,7 +141,8 @@ public class WindowUtils {
} }
public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) { public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy; return trigger instanceof TimeTriggerPolicy
&& (eviction instanceof TimeEvictionPolicy || eviction instanceof KeepAllEvictionPolicy);
} }
public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) { public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.helper;
import java.io.Serializable;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
/**
* Window that represents the full stream history. Can be used only as eviction
* policy and only with operations that support pre-aggregator such as reduce or
* aggregations.
*/
public class FullStream<DATA> implements WindowingHelper<DATA>, Serializable {
private static final long serialVersionUID = 1L;
private FullStream() {
}
@Override
public EvictionPolicy<DATA> toEvict() {
return new KeepAllEvictionPolicy<DATA>();
}
@Override
public TriggerPolicy<DATA> toTrigger() {
throw new RuntimeException(
"Full stream policy can be only used as eviction. Use .every(..) after the window call.");
}
/**
* Returns a helper representing an eviction that keeps all previous record
* history.
*/
public static <R> FullStream<R> window() {
return new FullStream<R>();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.policy;
public class KeepAllEvictionPolicy<T> implements EvictionPolicy<T> {
private static final long serialVersionUID = 1L;
@Override
public int notifyEviction(T datapoint, boolean triggered, int bufferSize) {
return 0;
}
}
\ No newline at end of file
...@@ -37,16 +37,23 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre ...@@ -37,16 +37,23 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
private KeySelector<T, ?> keySelector; private KeySelector<T, ?> keySelector;
private Map<Object, T> reducedValues; private Map<Object, T> reducedValues;
private Map<Object, T> keyInstancePerKey = new HashMap<Object, T>();
private TypeSerializer<T> serializer; private TypeSerializer<T> serializer;
private boolean evict = true;
public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector, public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
TypeSerializer<T> serializer) { TypeSerializer<T> serializer) {
this(reducer, keySelector, serializer, true);
}
public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
TypeSerializer<T> serializer, boolean evict) {
this.reducer = reducer; this.reducer = reducer;
this.serializer = serializer; this.serializer = serializer;
this.keySelector = keySelector; this.keySelector = keySelector;
this.reducedValues = new HashMap<Object, T>(); this.reducedValues = new HashMap<Object, T>();
this.evict = evict;
} }
public void emitWindow(Collector<StreamWindow<T>> collector) { public void emitWindow(Collector<StreamWindow<T>> collector) {
...@@ -55,11 +62,12 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre ...@@ -55,11 +62,12 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
StreamWindow<T> currentWindow = createEmptyWindow(); StreamWindow<T> currentWindow = createEmptyWindow();
currentWindow.addAll(reducedValues.values()); currentWindow.addAll(reducedValues.values());
collector.collect(currentWindow); collector.collect(currentWindow);
reducedValues.clear();
} else if (emitEmpty) { } else if (emitEmpty) {
collector.collect(createEmptyWindow()); collector.collect(createEmptyWindow());
} }
if (evict) {
reducedValues.clear();
}
} }
public void store(T element) throws Exception { public void store(T element) throws Exception {
...@@ -74,18 +82,15 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre ...@@ -74,18 +82,15 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
} }
reducedValues.put(key, reduced); reducedValues.put(key, reduced);
if (emitPerGroup && !keyInstancePerKey.containsKey(key)) {
keyInstancePerKey.put(key, element);
}
} }
@Override
public void evict(int n) { public void evict(int n) {
} }
@Override @Override
public TumblingGroupedPreReducer<T> clone() { public TumblingGroupedPreReducer<T> clone() {
return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer); return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer, evict);
} }
@Override @Override
...@@ -93,4 +98,9 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre ...@@ -93,4 +98,9 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
return reducedValues.toString(); return reducedValues.toString();
} }
public TumblingGroupedPreReducer<T> noEvict() {
this.evict = false;
return this;
}
} }
...@@ -23,7 +23,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; ...@@ -23,7 +23,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
/** /**
* Non-grouped pre-reducer for tumbling eviction policy (the slide size is the same as the window size). * Non-grouped pre-reducer for tumbling eviction policy (the slide size is the
* same as the window size).
*/ */
public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator { public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
...@@ -34,9 +35,17 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega ...@@ -34,9 +35,17 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
private T reduced; private T reduced;
private TypeSerializer<T> serializer; private TypeSerializer<T> serializer;
private boolean evict = true;
public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) { public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
this(reducer, serializer, true);
}
private TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
boolean evict) {
this.reducer = reducer; this.reducer = reducer;
this.serializer = serializer; this.serializer = serializer;
this.evict = evict;
} }
public void emitWindow(Collector<StreamWindow<T>> collector) { public void emitWindow(Collector<StreamWindow<T>> collector) {
...@@ -44,10 +53,13 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega ...@@ -44,10 +53,13 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
StreamWindow<T> currentWindow = createEmptyWindow(); StreamWindow<T> currentWindow = createEmptyWindow();
currentWindow.add(reduced); currentWindow.add(reduced);
collector.collect(currentWindow); collector.collect(currentWindow);
reduced = null;
} else if (emitEmpty) { } else if (emitEmpty) {
collector.collect(createEmptyWindow()); collector.collect(createEmptyWindow());
} }
if (evict) {
reduced = null;
}
} }
public void store(T element) throws Exception { public void store(T element) throws Exception {
...@@ -63,7 +75,7 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega ...@@ -63,7 +75,7 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
@Override @Override
public TumblingPreReducer<T> clone() { public TumblingPreReducer<T> clone() {
return new TumblingPreReducer<T>(reducer, serializer); return new TumblingPreReducer<T>(reducer, serializer, evict);
} }
@Override @Override
...@@ -77,4 +89,9 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega ...@@ -77,4 +89,9 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
return this; return this;
} }
public TumblingPreReducer<T> noEvict() {
this.evict = false;
return this;
}
} }
...@@ -31,9 +31,11 @@ public class DistributePartitioner<T> extends StreamPartitioner<T> { ...@@ -31,9 +31,11 @@ public class DistributePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private int[] returnArray = new int[] {-1}; private int[] returnArray = new int[] {-1};
private boolean forward;
public DistributePartitioner(boolean forward) { public DistributePartitioner(boolean forward) {
super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE); super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
this.forward = forward;
} }
@Override @Override
...@@ -43,4 +45,8 @@ public class DistributePartitioner<T> extends StreamPartitioner<T> { ...@@ -43,4 +45,8 @@ public class DistributePartitioner<T> extends StreamPartitioner<T> {
return this.returnArray; return this.returnArray;
} }
public StreamPartitioner<T> copy() {
return new DistributePartitioner<T>(forward);
}
} }
...@@ -41,4 +41,8 @@ public abstract class StreamPartitioner<T> implements ...@@ -41,4 +41,8 @@ public abstract class StreamPartitioner<T> implements
public PartitioningStrategy getStrategy() { public PartitioningStrategy getStrategy() {
return strategy; return strategy;
} }
public StreamPartitioner<T> copy() {
return this;
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.windowing; package org.apache.flink.streaming.api.operators.windowing;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -25,6 +26,7 @@ import java.util.Collections; ...@@ -25,6 +26,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
...@@ -34,6 +36,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; ...@@ -34,6 +36,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.FullStream;
import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment;
...@@ -77,6 +80,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -77,6 +80,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial")
@Test @Test
public void test() throws Exception { public void test() throws Exception {
...@@ -108,28 +112,49 @@ public class WindowIntegrationTest implements Serializable { ...@@ -108,28 +112,49 @@ public class WindowIntegrationTest implements Serializable {
DataStream<Integer> source = env.fromCollection(inputs); DataStream<Integer> source = env.fromCollection(inputs);
source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream() source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new CentralSink1()); .addSink(new TestSink1());
source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap()) source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
.flatten().addSink(new CentralSink2()); .flatten().addSink(new TestSink2());
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream() source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink1()); .addSink(new TestSink4());
source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2)) source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2()); .mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream() source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
.addSink(new CentralSink3()); .addSink(new TestSink3());
source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream() source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
.addSink(new DistributedSink3()); .addSink(new TestSink6());
source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten() source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4()); .addSink(new TestSink7());
source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0) source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
.getDiscretizedStream().addSink(new DistributedSink5()); .getDiscretizedStream().addSink(new TestSink8());
try {
source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
fail();
} catch (Exception e) {
}
try {
source.window(FullStream.window()).getDiscretizedStream();
fail();
} catch (Exception e) {
}
try {
source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
fail();
} catch (Exception e) {
}
source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
.getDiscretizedStream().addSink(new TestSink12());
DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() { DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -164,11 +189,17 @@ public class WindowIntegrationTest implements Serializable { ...@@ -164,11 +189,17 @@ public class WindowIntegrationTest implements Serializable {
} }
}); });
source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream() source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
.addSink(new DistributedSink6());
source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream() source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink7()); .addSink(new TestSink10());
source.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value;
}
}).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
env.execute(); env.execute();
...@@ -180,7 +211,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -180,7 +211,7 @@ public class WindowIntegrationTest implements Serializable {
expected1.add(StreamWindow.fromElements(10)); expected1.add(StreamWindow.fromElements(10));
expected1.add(StreamWindow.fromElements(32)); expected1.add(StreamWindow.fromElements(32));
validateOutput(expected1, CentralSink1.windows); validateOutput(expected1, TestSink1.windows);
// Tumbling Time of 4 grouped by mod 2 // Tumbling Time of 4 grouped by mod 2
List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
...@@ -190,7 +221,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -190,7 +221,7 @@ public class WindowIntegrationTest implements Serializable {
expected2.add(StreamWindow.fromElements(10)); expected2.add(StreamWindow.fromElements(10));
expected2.add(StreamWindow.fromElements(11, 11)); expected2.add(StreamWindow.fromElements(11, 11));
validateOutput(expected2, CentralSink2.windows); validateOutput(expected2, TestSink2.windows);
// groupby mod 2 sum ( Tumbling Time of 4) // groupby mod 2 sum ( Tumbling Time of 4)
List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
...@@ -200,7 +231,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -200,7 +231,7 @@ public class WindowIntegrationTest implements Serializable {
expected3.add(StreamWindow.fromElements(8)); expected3.add(StreamWindow.fromElements(8));
expected3.add(StreamWindow.fromElements(10)); expected3.add(StreamWindow.fromElements(10));
validateOutput(expected3, DistributedSink1.windows); validateOutput(expected3, TestSink4.windows);
// groupby mod3 Tumbling Count of 2 grouped by mod 2 // groupby mod3 Tumbling Count of 2 grouped by mod 2
List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
...@@ -212,7 +243,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -212,7 +243,7 @@ public class WindowIntegrationTest implements Serializable {
expected4.add(StreamWindow.fromElements(11)); expected4.add(StreamWindow.fromElements(11));
expected4.add(StreamWindow.fromElements(3)); expected4.add(StreamWindow.fromElements(3));
validateOutput(expected4, DistributedSink2.windows); validateOutput(expected4, TestSink5.windows);
// min ( Time of 2 slide 3 ) // min ( Time of 2 slide 3 )
List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
...@@ -220,7 +251,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -220,7 +251,7 @@ public class WindowIntegrationTest implements Serializable {
expected5.add(StreamWindow.fromElements(4)); expected5.add(StreamWindow.fromElements(4));
expected5.add(StreamWindow.fromElements(10)); expected5.add(StreamWindow.fromElements(10));
validateOutput(expected5, CentralSink3.windows); validateOutput(expected5, TestSink3.windows);
// groupby mod 2 max ( Tumbling Time of 4) // groupby mod 2 max ( Tumbling Time of 4)
List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
...@@ -230,25 +261,25 @@ public class WindowIntegrationTest implements Serializable { ...@@ -230,25 +261,25 @@ public class WindowIntegrationTest implements Serializable {
expected6.add(StreamWindow.fromElements(4)); expected6.add(StreamWindow.fromElements(4));
expected6.add(StreamWindow.fromElements(10)); expected6.add(StreamWindow.fromElements(10));
validateOutput(expected6, DistributedSink3.windows); validateOutput(expected6, TestSink6.windows);
List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5)); expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
expected7.add(StreamWindow.fromElements(10)); expected7.add(StreamWindow.fromElements(10));
expected7.add(StreamWindow.fromElements(10, 11, 11)); expected7.add(StreamWindow.fromElements(10, 11, 11));
validateOutput(expected7, DistributedSink4.windows); validateOutput(expected7, TestSink7.windows);
List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
expected8.add(StreamWindow.fromElements(4, 8)); expected8.add(StreamWindow.fromElements(4, 8));
expected8.add(StreamWindow.fromElements(4, 5)); expected8.add(StreamWindow.fromElements(4, 5));
expected8.add(StreamWindow.fromElements(10, 22)); expected8.add(StreamWindow.fromElements(10, 22));
for (List<Integer> sw : DistributedSink5.windows) { for (List<Integer> sw : TestSink8.windows) {
Collections.sort(sw); Collections.sort(sw);
} }
validateOutput(expected8, DistributedSink5.windows); validateOutput(expected8, TestSink8.windows);
List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
expected9.add(StreamWindow.fromElements(6)); expected9.add(StreamWindow.fromElements(6));
...@@ -257,17 +288,50 @@ public class WindowIntegrationTest implements Serializable { ...@@ -257,17 +288,50 @@ public class WindowIntegrationTest implements Serializable {
expected9.add(StreamWindow.fromElements(30)); expected9.add(StreamWindow.fromElements(30));
expected9.add(StreamWindow.fromElements(38)); expected9.add(StreamWindow.fromElements(38));
validateOutput(expected9, DistributedSink6.windows); validateOutput(expected9, TestSink9.windows);
List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>(); List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
expected10.add(StreamWindow.fromElements(6, 9)); expected10.add(StreamWindow.fromElements(6, 9));
expected10.add(StreamWindow.fromElements(16, 24)); expected10.add(StreamWindow.fromElements(16, 24));
for (List<Integer> sw : DistributedSink7.windows) { for (List<Integer> sw : TestSink10.windows) {
Collections.sort(sw); Collections.sort(sw);
} }
validateOutput(expected10, DistributedSink7.windows); validateOutput(expected10, TestSink10.windows);
List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
expected11.add(StreamWindow.fromElements(8));
expected11.add(StreamWindow.fromElements(38));
expected11.add(StreamWindow.fromElements(49));
for (List<Integer> sw : TestSink11.windows) {
Collections.sort(sw);
}
validateOutput(expected11, TestSink11.windows);
List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
expected12.add(StreamWindow.fromElements(4, 4));
expected12.add(StreamWindow.fromElements(18, 20));
expected12.add(StreamWindow.fromElements(18, 31));
for (List<Integer> sw : TestSink12.windows) {
Collections.sort(sw);
}
validateOutput(expected12, TestSink12.windows);
List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
expected13.add(StreamWindow.fromElements(17));
expected13.add(StreamWindow.fromElements(27));
expected13.add(StreamWindow.fromElements(49));
for (List<Integer> sw : TestSink13.windows) {
Collections.sort(sw);
}
validateOutput(expected13, TestSink13.windows);
} }
...@@ -276,7 +340,46 @@ public class WindowIntegrationTest implements Serializable { ...@@ -276,7 +340,46 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class CentralSink1 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@Override
public void invoke(StreamWindow<Integer> value) throws Exception {
windows.add(value);
}
}
@SuppressWarnings("serial")
private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@Override
public void invoke(StreamWindow<Integer> value) throws Exception {
windows.add(value);
}
}
@SuppressWarnings("serial")
private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@Override
public void invoke(StreamWindow<Integer> value) throws Exception {
windows.add(value);
}
}
@SuppressWarnings("serial")
private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -289,7 +392,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -289,7 +392,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class CentralSink2 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -302,7 +405,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -302,7 +405,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class CentralSink3 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -315,7 +418,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -315,7 +418,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink1 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -328,7 +431,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -328,7 +431,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink2 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -341,7 +444,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -341,7 +444,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink3 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -354,7 +457,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -354,7 +457,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink4 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -367,7 +470,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -367,7 +470,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink5 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -380,7 +483,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -380,7 +483,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink6 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
...@@ -393,7 +496,7 @@ public class WindowIntegrationTest implements Serializable { ...@@ -393,7 +496,7 @@ public class WindowIntegrationTest implements Serializable {
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class DistributedSink7 implements SinkFunction<StreamWindow<Integer>> { private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>()); .synchronizedList(new ArrayList<StreamWindow<Integer>>());
......
...@@ -66,6 +66,7 @@ public class TumblingGroupedPreReducerTest { ...@@ -66,6 +66,7 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(0))); wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(1)));
wb.emitWindow(collector); wb.emitWindow(collector);
wb.evict(2);
assertEquals(1, collected.size()); assertEquals(1, collected.size());
...@@ -76,12 +77,10 @@ public class TumblingGroupedPreReducerTest { ...@@ -76,12 +77,10 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2))); wb.store(serializer.copy(inputs.get(2)));
// Nothing should happen here
wb.evict(3);
wb.store(serializer.copy(inputs.get(3))); wb.store(serializer.copy(inputs.get(3)));
wb.emitWindow(collector); wb.emitWindow(collector);
wb.evict(4);
assertEquals(2, collected.size()); assertEquals(2, collected.size());
...@@ -114,6 +113,7 @@ public class TumblingGroupedPreReducerTest { ...@@ -114,6 +113,7 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(0))); wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(1)));
wb.emitWindow(collector); wb.emitWindow(collector);
wb.evict(2);
assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0)); assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
...@@ -121,6 +121,7 @@ public class TumblingGroupedPreReducerTest { ...@@ -121,6 +121,7 @@ public class TumblingGroupedPreReducerTest {
wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2))); wb.store(serializer.copy(inputs.get(2)));
wb.emitWindow(collector); wb.emitWindow(collector);
wb.evict(3);
assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1)); assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
......
...@@ -59,6 +59,7 @@ public class TumblingPreReducerTest { ...@@ -59,6 +59,7 @@ public class TumblingPreReducerTest {
wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(1)));
wb.emitWindow(collector); wb.emitWindow(collector);
wb.evict(2);
assertEquals(1, collected.size()); assertEquals(1, collected.size());
assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)), assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
...@@ -68,12 +69,10 @@ public class TumblingPreReducerTest { ...@@ -68,12 +69,10 @@ public class TumblingPreReducerTest {
wb.store(serializer.copy(inputs.get(1))); wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2))); wb.store(serializer.copy(inputs.get(2)));
// Nothing should happen here
wb.evict(3);
wb.store(serializer.copy(inputs.get(3))); wb.store(serializer.copy(inputs.get(3)));
wb.emitWindow(collector); wb.emitWindow(collector);
wb.evict(4);
assertEquals(2, collected.size()); assertEquals(2, collected.size());
assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(10, -2)), assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(10, -2)),
......
...@@ -531,6 +531,13 @@ class DataStream[T](javaStream: JavaStream[T]) { ...@@ -531,6 +531,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
def window(trigger: TriggerPolicy[T], eviction: EvictionPolicy[T]): def window(trigger: TriggerPolicy[T], eviction: EvictionPolicy[T]):
WindowedDataStream[T] = javaStream.window(trigger, eviction) WindowedDataStream[T] = javaStream.window(trigger, eviction)
/**
* Create a WindowedDataStream based on the full stream history to perform periodic
* aggregations.
*/
def every(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] =
javaStream.every(windowingHelper)
/** /**
* *
* Operator used for directing tuples to specific named outputs using an * Operator used for directing tuples to specific named outputs using an
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册