提交 ce8f966c 编写于 作者: A Aljoscha Krettek

[FLINK-3243] Fix Interplay of TimeCharacteristic and Time Windows

This adds dedicated WindowAssigners for processing time and event time.
timeWindow() and timeWindowAll() respect the TimeCharacteristic set
on the StreamExecutionEnvironment.

This will make the easy stuff easy, i.e. using time windows and quickly
switching the time characteristic. Users will then have the flexibility
to mix different kinds of window assigners in their job.

This also expands the translation tests to verify that the correct
window operators are instantiated.
上级 ebab4707
此差异已折叠。
......@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
......@@ -156,22 +155,20 @@ public class AllWindowedStream<T, W extends Window> {
OneInputStreamOperator<T, T> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ReduceIterableAllWindowFunction<W, T>(function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReduceIterableAllWindowFunction<W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, input.getType(), operator).setParallelism(1);
......@@ -262,22 +259,20 @@ public class AllWindowedStream<T, W extends Window> {
NonKeyedWindowOperator<T, R, W> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
function,
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
function,
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, resultType, operator).setParallelism(1);
......@@ -333,22 +328,20 @@ public class AllWindowedStream<T, W extends Window> {
OneInputStreamOperator<T, R> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new ReduceApplyAllWindowFunction<>(preAggregator, function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
new ReduceApplyAllWindowFunction<>(preAggregator, function),
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, resultType, operator).setParallelism(1);
......@@ -404,8 +397,6 @@ public class AllWindowedStream<T, W extends Window> {
OneInputStreamOperator<T, R> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + evictor + ", " + udfName + ")";
......@@ -414,7 +405,7 @@ public class AllWindowedStream<T, W extends Window> {
new HeapWindowBuffer.Factory<T>(),
new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
......@@ -423,7 +414,7 @@ public class AllWindowedStream<T, W extends Window> {
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
new HeapWindowBuffer.Factory<T>(),
new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, resultType, operator).setParallelism(1);
......
......@@ -45,6 +45,7 @@ import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
......@@ -62,7 +63,9 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
......@@ -657,7 +660,11 @@ public class DataStream<T> {
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
return windowAll(TumblingTimeWindows.of(size));
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(TumblingProcessingTimeWindows.of(size));
} else {
return windowAll(TumblingTimeWindows.of(size));
}
}
/**
......@@ -677,7 +684,11 @@ public class DataStream<T> {
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
return windowAll(SlidingTimeWindows.of(size, slide));
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingTimeWindows.of(size, slide));
}
}
/**
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
......@@ -36,7 +37,9 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
......@@ -170,7 +173,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @param size The size of the window.
*/
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
return window(TumblingTimeWindows.of(size));
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingTimeWindows.of(size));
}
}
/**
......@@ -185,7 +192,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @param size The size of the window.
*/
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
return window(SlidingTimeWindows.of(size, slide));
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingTimeWindows.of(size, slide));
}
}
/**
......
......@@ -32,7 +32,6 @@ import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
......@@ -42,8 +41,8 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
......@@ -260,8 +259,6 @@ public class WindowedStream<T, K, W extends Window> {
WindowOperator<K, T, Iterable<T>, R, W> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
......@@ -275,7 +272,7 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
function,
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
......@@ -289,7 +286,7 @@ public class WindowedStream<T, K, W extends Window> {
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, resultType, operator);
......@@ -346,8 +343,6 @@ public class WindowedStream<T, K, W extends Window> {
OneInputStreamOperator<T, R> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
......@@ -361,7 +356,7 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new ReduceApplyWindowFunction<>(reduceFunction, function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -376,7 +371,7 @@ public class WindowedStream<T, K, W extends Window> {
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, resultType, operator);
......@@ -434,8 +429,6 @@ public class WindowedStream<T, K, W extends Window> {
OneInputStreamOperator<T, R> operator;
boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
......@@ -450,7 +443,7 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new FoldApplyWindowFunction<>(initialValue, foldFunction, function),
trigger,
evictor).enableSetProcessingTime(setProcessingTime);
evictor);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -466,7 +459,7 @@ public class WindowedStream<T, K, W extends Window> {
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger).enableSetProcessingTime(setProcessingTime);
trigger);
}
return input.transform(opName, resultType, operator);
......@@ -679,8 +672,8 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<R> resultType,
String functionName) {
if (windowAssigner instanceof SlidingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
SlidingTimeWindows timeWindows = (SlidingTimeWindows) windowAssigner;
if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSlide();
......@@ -710,8 +703,8 @@ public class WindowedStream<T, K, W extends Window> {
windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
} else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
TumblingTimeWindows timeWindows = (TumblingTimeWindows) windowAssigner;
} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSize();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the current
* system time of the machine the operation is running on. Windows can possibly overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private SlidingProcessingTimeWindows(long size, long slide) {
this.size = size;
this.slide = slide;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
timestamp = System.currentTimeMillis();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
}
/**
* Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
* elements to sliding time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(Time size, Time slide) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
}
......@@ -20,10 +20,8 @@ package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -80,11 +78,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return ProcessingTimeTrigger.create();
} else {
return EventTimeTrigger.create();
}
return EventTimeTrigger.create();
}
@Override
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based on the current
* system time of the machine the operation is running on. Windows cannot overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private long size;
private TumblingProcessingTimeWindows(long size) {
this.size = size;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
long start = timestamp - (timestamp % size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
public long getSize() {
return size;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingProcessingTimeWindows(" + size + ")";
}
/**
* Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(Time size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
}
......@@ -20,10 +20,8 @@ package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -66,11 +64,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return ProcessingTimeTrigger.create();
} else {
return EventTimeTrigger.create();
}
return EventTimeTrigger.create();
}
@Override
......
......@@ -76,13 +76,6 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
timestampedCollector);
}
@Override
public EvictingNonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
super.enableSetProcessingTime(setProcessingTime);
return this;
}
// ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
......
......@@ -73,10 +73,9 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
this.windowStateDescriptor = windowStateDescriptor;
}
@Override
@SuppressWarnings("unchecked")
public final void processElement(StreamRecord<IN> element) throws Exception {
public void processElement(StreamRecord<IN> element) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
K key = (K) getStateBackend().getCurrentKey();
......@@ -145,13 +144,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
}
@Override
public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
super.enableSetProcessingTime(setProcessingTime);
return this;
}
// ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
......
......@@ -94,13 +94,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
/**
* If this is true. The current processing time is set as the timestamp of incoming elements.
* This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
* if eviction should happen based on processing time.
*/
private boolean setProcessingTime = false;
/**
* This is used to copy the incoming element because it can be put into several window
* buffers.
......@@ -238,10 +231,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@Override
@SuppressWarnings("unchecked")
public final void processElement(StreamRecord<IN> element) throws Exception {
if (setProcessingTime) {
element.replace(element.getValue(), System.currentTimeMillis());
}
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
for (W window: elementWindows) {
......@@ -585,17 +574,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
}
/**
* When this flag is enabled the current processing time is set as the timestamp of elements
* upon arrival. This must be used, for example, when using the
* {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
* time semantics.
*/
public NonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
this.setProcessingTime = setProcessingTime;
return this;
}
// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------
......@@ -642,11 +620,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
// Getters for testing
// ------------------------------------------------------------------------
@VisibleForTesting
public boolean isSetProcessingTime() {
return setProcessingTime;
}
@VisibleForTesting
public Trigger<? super IN, ? super W> getTrigger() {
return trigger;
......
......@@ -108,13 +108,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor;
/**
* If this is true. The current processing time is set as the timestamp of incoming elements.
* This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
* if eviction should happen based on processing time.
*/
protected boolean setProcessingTime = false;
/**
* This is used to copy the incoming element because it can be put into several window
* buffers.
......@@ -230,10 +223,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
if (setProcessingTime) {
element.replace(element.getValue(), System.currentTimeMillis());
}
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
K key = (K) getStateBackend().getCurrentKey();
......@@ -510,17 +499,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
}
/**
* When this flag is enabled the current processing time is set as the timestamp of elements
* upon arrival. This must be used, for example, when using the
* {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
* time semantics.
*/
public WindowOperator<K, IN, ACC, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
this.setProcessingTime = setProcessingTime;
return this;
}
// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------
......@@ -590,11 +568,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// Getters for testing
// ------------------------------------------------------------------------
@VisibleForTesting
public boolean isSetProcessingTime() {
return setProcessingTime;
}
@VisibleForTesting
public Trigger<? super IN, ? super W> getTrigger() {
return trigger;
......
......@@ -70,7 +70,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
......@@ -93,7 +92,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
......@@ -118,7 +116,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertTrue(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
......@@ -142,7 +139,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertTrue(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
......@@ -167,7 +163,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
......@@ -193,7 +188,6 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
......
......@@ -18,8 +18,11 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -27,7 +30,10 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
......@@ -85,6 +91,54 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
}
@Test
@SuppressWarnings("rawtypes")
public void testEventTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
}
/**
* These tests ensure that the fast aligned time windows operator is used if the
* conditions are right.
......
......@@ -75,48 +75,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
});
}
/**
* These tests ensure that the fast aligned time windows operator is used if the
* conditions are right.
*/
@Test
public void testFastTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
}
});
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
}
/**
* These tests ensure that the correct trigger is set when using event-time windows.
*/
......@@ -139,7 +97,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
......@@ -163,7 +120,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
......@@ -189,7 +145,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
......@@ -214,7 +169,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertTrue(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
......@@ -240,7 +194,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
Assert.assertFalse(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
......@@ -267,7 +220,6 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
Assert.assertFalse(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
......
......@@ -560,8 +560,7 @@ class DataStream[T](stream: JavaStream[T]) {
* @param size The size of the window.
*/
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = {
val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
new AllWindowedStream(javaStream.timeWindowAll(size))
}
/**
......@@ -579,8 +578,8 @@ class DataStream[T](stream: JavaStream[T]) {
* @param size The size of the window.
*/
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] = {
val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
new AllWindowedStream(javaStream.timeWindowAll(size, slide))
}
/**
......
......@@ -62,8 +62,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* @param size The size of the window.
*/
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
new WindowedStream(javaStream.timeWindow(size))
}
/**
......@@ -96,8 +95,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* @param size The size of the window.
*/
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = {
val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
new WindowedStream(javaStream.timeWindow(size, slide))
}
/**
......
......@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
......@@ -150,7 +150,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
.windowAll(SlidingTimeWindows.of(
.windowAll(SlidingProcessingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
......@@ -165,7 +165,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
......
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDesc
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
......@@ -52,9 +52,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
.reduce(reducer)
val transform1 = window1.javaStream.getTransformation
......@@ -66,9 +64,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
.window(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.timeWindow(Time.minutes(1))
.apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() {
def apply(
key: Tuple,
......@@ -148,7 +144,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(
.window(SlidingProcessingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
......@@ -163,7 +159,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册