提交 9922d10a 编写于 作者: A Aljoscha Krettek

[FLINK-3536] Make clearer distinction between event time and processing time

This brings it more in line with *ProcessingTimeWindows and makes it
clear what type of window assigner it is.

The old name, i.e. SlidingTimeWindows and TumblingTimeWindows is still
available but deprecated.
上级 0ac2b1a7
......@@ -293,7 +293,7 @@ keyedStream.maxBy("key");
key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
See <a href="windows.html">windows</a> for a complete description of windows.
{% highlight java %}
dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
{% endhighlight %}
</p>
</td>
......@@ -307,7 +307,7 @@ dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 s
<p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
gathered in one task for the windowAll operator.</p>
{% highlight java %}
dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
{% endhighlight %}
</td>
</tr>
......@@ -410,7 +410,7 @@ dataStream.union(otherStream1, otherStream2, ...);
{% highlight java %}
dataStream.join(otherStream)
.where(0).equalTo(1)
.window(TumblingTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
{% endhighlight %}
</td>
......@@ -422,7 +422,7 @@ dataStream.join(otherStream)
{% highlight java %}
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
{% endhighlight %}
</td>
......@@ -669,7 +669,7 @@ keyedStream.maxBy("key")
key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
See <a href="windows.html">windows</a> for a description of windows.
{% highlight scala %}
dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
{% endhighlight %}
</p>
</td>
......@@ -683,7 +683,7 @@ dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))) // Last 5 se
<p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
gathered in one task for the windowAll operator.</p>
{% highlight scala %}
dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
{% endhighlight %}
</td>
</tr>
......@@ -759,7 +759,7 @@ dataStream.union(otherStream1, otherStream2, ...)
{% highlight scala %}
dataStream.join(otherStream)
.where(0).equalTo(1)
.window(TumblingTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
{% endhighlight %}
</td>
......@@ -771,7 +771,7 @@ dataStream.join(otherStream)
{% highlight scala %}
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingTimeWindows.of(Time.seconds(3)))
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
{% endhighlight %}
</td>
......
......@@ -62,7 +62,7 @@ windowing you would use window assigners such as `SlidingProcessingTimeWindows`
`TumblingProcessingTimeWindows`.
In order to work with event time semantics, i.e. if you want to use window assigners such as
`TumblingTimeWindows` or `SlidingTimeWindows`, you need to follow these steps:
`TumblingEventTimeWindows` or `SlidingEventTimeWindows`, you need to follow these steps:
- Set `enableTimestamps()`, as well the interval for watermark emission
(`setAutoWatermarkInterval(long milliseconds)`) in `ExecutionConfig`.
......
......@@ -191,7 +191,7 @@ window, and every time execution is triggered, 10 elements are retained in the w
<div data-lang="java" markdown="1">
{% highlight java %}
keyedStream
.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10));
{% endhighlight %}
......@@ -200,7 +200,7 @@ keyedStream
<div data-lang="scala" markdown="1">
{% highlight scala %}
keyedStream
.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10))
{% endhighlight %}
......@@ -214,7 +214,7 @@ The `WindowAssigner` defines how incoming elements are assigned to windows. A wi
that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according
to some notion of time described above within these values are part of the window).
For example, the `SlidingTimeWindows`
For example, the `SlidingEventTimeWindows`
assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that
time starts from 0 and is measured in milliseconds. Then, we have 6 windows
that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming
......@@ -256,7 +256,7 @@ stream.window(GlobalWindows.create());
watermark with value higher than its end-value is received.
</p>
{% highlight java %}
stream.window(TumblingTimeWindows.of(Time.seconds(1)));
stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
{% endhighlight %}
</td>
</tr>
......@@ -270,7 +270,7 @@ stream.window(TumblingTimeWindows.of(Time.seconds(1)));
watermark with value higher than its end-value is received.
</p>
{% highlight java %}
stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
{% endhighlight %}
</td>
</tr>
......@@ -338,7 +338,7 @@ stream.window(GlobalWindows.create)
watermark with value higher than its end-value is received.
</p>
{% highlight scala %}
stream.window(TumblingTimeWindows.of(Time.seconds(1)))
stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
{% endhighlight %}
</td>
</tr>
......@@ -352,7 +352,7 @@ stream.window(TumblingTimeWindows.of(Time.seconds(1)))
watermark with value higher than its end-value is received.
</p>
{% highlight scala %}
stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
{% endhighlight %}
</td>
</tr>
......@@ -743,8 +743,7 @@ stream.countWindow(1000)
<td>
{% highlight java %}
stream.window(GlobalWindows.create())
.trigger(CountTrigger.of(1000)
.evictor(CountEvictor.of(1000)))
.trigger(PurgingTrigger.of(CountTrigger.of(size)))
{% endhighlight %}
</td>
</tr>
......@@ -772,7 +771,7 @@ stream.timeWindow(Time.seconds(5))
</td>
<td>
{% highlight java %}
stream.window(TumblingTimeWindows.of((Time.seconds(5)))
stream.window(TumblingEventTimeWindows.of((Time.seconds(5)))
.trigger(EventTimeTrigger.create())
{% endhighlight %}
</td>
......@@ -786,7 +785,7 @@ stream.timeWindow(Time.seconds(5), Time.seconds(1))
</td>
<td>
{% highlight java %}
stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
.trigger(EventTimeTrigger.create())
{% endhighlight %}
</td>
......@@ -800,7 +799,7 @@ stream.timeWindow(Time.seconds(5))
</td>
<td>
{% highlight java %}
stream.window(TumblingTimeWindows.of((Time.seconds(5)))
stream.window(TumblingProcessingTimeWindows.of((Time.seconds(5)))
.trigger(ProcessingTimeTrigger.create())
{% endhighlight %}
</td>
......@@ -814,7 +813,7 @@ stream.timeWindow(Time.seconds(5), Time.seconds(1))
</td>
<td>
{% highlight java %}
stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create())
{% endhighlight %}
</td>
......@@ -834,7 +833,7 @@ same:
<div data-lang="java" markdown="1">
{% highlight java %}
nonKeyedStream
.windowAll(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10));
{% endhighlight %}
......@@ -843,7 +842,7 @@ nonKeyedStream
<div data-lang="scala" markdown="1">
{% highlight scala %}
nonKeyedStream
.windowAll(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(10))
{% endhighlight %}
......@@ -992,4 +991,4 @@ nonKeyedStream.countWindowAll(1000, 100)
</table>
</div>
</div>
\ No newline at end of file
</div>
......@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Random;
......@@ -84,7 +84,7 @@ public class WindowJoin {
.join(salaries)
.where(new NameKeySelector())
.equalTo(new NameKeySelector())
.window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
.apply(new MyJoinFunction());
// emit result
......
......@@ -21,7 +21,7 @@ package org.apache.flink.streaming.scala.examples.join
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import scala.Stream._
......@@ -56,7 +56,7 @@ object WindowJoin {
val joined = grades.join(salaries)
.where(_.name)
.equalTo(_.name)
.window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
.apply { (g, s) => Person(g.name, g.grade, s.salary) }
if (params.has("output")) {
......
......@@ -65,7 +65,7 @@ import static java.util.Objects.requireNonNull;
* DataStream<T> result = one.coGroup(two)
* .where(new MyFirstKeySelector())
* .equalTo(new MyFirstKeySelector())
* .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .apply(new MyCoGroupFunction());
* } </pre>
*/
......
......@@ -66,9 +66,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
......@@ -594,7 +594,7 @@ public class DataStream<T> {
* Windows this {@code DataStream} into tumbling time windows.
*
* <p>
* This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
* This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
* {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
* set using
*
......@@ -611,7 +611,7 @@ public class DataStream<T> {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(TumblingProcessingTimeWindows.of(size));
} else {
return windowAll(TumblingTimeWindows.of(size));
return windowAll(TumblingEventTimeWindows.of(size));
}
}
......@@ -619,7 +619,7 @@ public class DataStream<T> {
* Windows this {@code DataStream} into sliding time windows.
*
* <p>
* This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
* This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
* {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
......@@ -635,7 +635,7 @@ public class DataStream<T> {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return windowAll(SlidingProcessingTimeWindows.of(size, slide));
} else {
return windowAll(SlidingTimeWindows.of(size, slide));
return windowAll(SlidingEventTimeWindows.of(size, slide));
}
}
......
......@@ -57,7 +57,7 @@ import static java.util.Objects.requireNonNull;
* DataStream<T> result = one.join(two)
* .where(new MyFirstKeySelector())
* .equalTo(new MyFirstKeySelector())
* .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .apply(new MyJoinFunction());
* } </pre>
*/
......
......@@ -38,9 +38,9 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
......@@ -165,7 +165,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* Windows this {@code KeyedStream} into tumbling time windows.
*
* <p>
* This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
* This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
* {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
......@@ -176,7 +176,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingTimeWindows.of(size));
return window(TumblingEventTimeWindows.of(size));
}
}
......@@ -184,7 +184,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* Windows this {@code KeyedStream} into sliding time windows.
*
* <p>
* This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
* This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
* {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
* set using
* {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
......@@ -195,7 +195,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingTimeWindows.of(size, slide));
return window(SlidingEventTimeWindows.of(size, slide));
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
* elements. Windows can possibly overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
* } </pre>
*/
@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
protected SlidingEventTimeWindows(long size, long slide) {
this.size = size;
this.slide = slide;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
}
/**
* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to sliding time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
}
......@@ -39,7 +39,7 @@ import java.util.List;
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
......@@ -86,7 +86,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
}
/**
* Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
* Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
* elements to sliding time windows based on the element timestamp.
*
* @param size The size of the generated windows.
......
......@@ -19,94 +19,35 @@
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
* elements. Windows can possibly overlap.
*
* <p>
* For example, in order to window into windows of 1 minute, every 10 seconds:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(SlidingTimeWindows.of(Time.minutes(1), Time.seconds(10)));
* } </pre>
* @deprecated Please use {@link SlidingEventTimeWindows}.
*/
@PublicEvolving
public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@Deprecated
public class SlidingTimeWindows extends SlidingEventTimeWindows {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private SlidingTimeWindows(long size, long slide) {
this.size = size;
this.slide = slide;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingTimeWindows(" + size + ", " + slide + ")";
super(size, slide);
}
/**
* Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
* elements to sliding time windows based on the element timestamp.
*
* @deprecated Please use {@link SlidingEventTimeWindows#of(Time, Time)}.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
@Deprecated()
public static SlidingTimeWindows of(Time size, Time slide) {
return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
*
* <p>
* For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
* } </pre>
*/
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private long size;
protected TumblingEventTimeWindows(long size) {
this.size = size;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingEventTimeWindows(" + size + ")";
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
}
......@@ -38,7 +38,7 @@ import java.util.Collections;
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
* keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
* } </pre>
*/
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
......@@ -72,7 +72,7 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
}
/**
* Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
* Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
......
......@@ -19,80 +19,34 @@
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
* elements. Windows cannot overlap.
*
* <p>
* For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TumblingTimeWindows.of(Time.minutes(1)));
* } </pre>
* @deprecated Please use {@link TumblingEventTimeWindows}.
*/
@PublicEvolving
public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@Deprecated
public class TumblingTimeWindows extends TumblingEventTimeWindows {
private static final long serialVersionUID = 1L;
private long size;
private TumblingTimeWindows(long size) {
this.size = size;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingTimeWindows(" + size + ")";
super(size);
}
/**
* Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @deprecated Please use {@link TumblingEventTimeWindows#of(Time)}.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
@Deprecated()
public static TumblingTimeWindows of(Time size) {
return new TumblingTimeWindows(size.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
}
......@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
......@@ -65,7 +65,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
......@@ -73,11 +73,11 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
......@@ -95,7 +95,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
}
......@@ -110,7 +110,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduce(reducer);
......@@ -119,11 +119,11 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
......@@ -142,7 +142,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
}
......@@ -157,7 +157,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduce(reducer);
......@@ -166,12 +166,12 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
......@@ -191,7 +191,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
}
......@@ -210,7 +210,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyFolder folder = new DummyFolder();
DataStream<Integer> window1 = source
.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.fold(0, folder);
OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation();
......@@ -218,11 +218,11 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof FoldingWindowBuffer.Factory);
DataStream<Integer> window2 = source
.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.evictor(CountEvictor.of(13))
.fold(0, folder);
......@@ -231,7 +231,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
}
......
......@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
......@@ -109,7 +109,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
source1.coGroup(source2)
.where(new Tuple2KeyExtractor())
.equalTo(new Tuple2KeyExtractor())
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> first,
......@@ -207,7 +207,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
source1.join(source2)
.where(new Tuple3KeyExtractor())
.equalTo(new Tuple3KeyExtractor())
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
@Override
public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
......@@ -284,7 +284,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
source1.join(source1)
.where(new Tuple3KeyExtractor())
.equalTo(new Tuple3KeyExtractor())
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
@Override
public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
......
......@@ -26,12 +26,11 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
......@@ -45,7 +44,6 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -79,7 +77,7 @@ public class NonKeyedWindowOperatorTest {
final int WINDOW_SLIDE = 1;
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
windowBufferFactory,
new ReduceIterableAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
......@@ -155,7 +153,7 @@ public class NonKeyedWindowOperatorTest {
final int WINDOW_SIZE = 3;
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
windowBufferFactory,
new ReduceIterableAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
......
......@@ -30,8 +30,8 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -111,7 +111,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
DataStream<Tuple2<String, Integer>> window2 = source
......@@ -134,7 +134,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
}
......
......@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
......@@ -85,7 +85,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
source1
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
......@@ -149,7 +149,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
source1
.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
......
......@@ -31,8 +31,8 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
......@@ -137,7 +137,7 @@ public class WindowOperatorTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
......@@ -173,7 +173,7 @@ public class WindowOperatorTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
......@@ -268,7 +268,7 @@ public class WindowOperatorTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
......@@ -303,7 +303,7 @@ public class WindowOperatorTest {
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
......
......@@ -30,8 +30,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
......@@ -65,7 +65,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
......@@ -90,7 +90,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
......@@ -98,12 +98,12 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
......@@ -120,7 +120,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
}
......@@ -136,7 +136,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduce(reducer);
......@@ -145,12 +145,12 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
......@@ -169,7 +169,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
}
......@@ -185,7 +185,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduce(reducer);
......@@ -194,13 +194,13 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
......@@ -220,7 +220,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
}
......
......@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.NoOpSink;
......@@ -560,7 +560,7 @@ public class TimestampITCase {
source1
.keyBy(0)
.window(TumblingTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
......@@ -591,7 +591,7 @@ public class TimestampITCase {
source1
.keyBy(0)
.window(TumblingTimeWindows.of(Time.seconds(5)))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
......
......@@ -51,7 +51,7 @@ import scala.collection.JavaConverters._
* val result = one.coGroup(two)
* .where(new MyFirstKeySelector())
* .equalTo(new MyFirstKeySelector())
* .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .apply(new MyCoGroupFunction())
* } }}}
*/
......
......@@ -572,7 +572,7 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Windows this DataStream into tumbling time windows.
*
* This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
* This is a shortcut for either `.window(TumblingEventTimeWindows.of(size))` or
* `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
* set using
* [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
......@@ -590,7 +590,7 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Windows this DataStream into sliding time windows.
*
* This is a shortcut for either `.window(SlidingTimeWindows.of(size, slide))` or
* This is a shortcut for either `.window(SlidingEventTimeWindows.of(size, slide))` or
* `.window(SlidingProcessingTimeWindows.of(size, slide))` depending on the time characteristic
* set using
* [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
......
......@@ -49,7 +49,7 @@ import org.apache.flink.util.Collector
* val result = one.join(two)
* .where {t => ... }
* .equal {t => ... }
* .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
* .apply(new MyJoinFunction())
* } }}}
*/
......
......@@ -52,7 +52,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
/**
* Windows this [[KeyedStream]] into tumbling time windows.
*
* This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
* This is a shortcut for either `.window(TumblingEventTimeWindows.of(size))` or
* `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
* set using
* [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
......@@ -85,7 +85,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
/**
* Windows this [[KeyedStream]] into sliding time windows.
*
* This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or
* This is a shortcut for either `.window(SlidingEventTimeWindows.of(size))` or
* `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic
* set using
* [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
......
......@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
......@@ -59,7 +59,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
.windowAll(SlidingTimeWindows.of(
.windowAll(SlidingEventTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer)
......@@ -73,7 +73,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
.windowAll(SlidingTimeWindows.of(
.windowAll(SlidingEventTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
......@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
.windowAll(SlidingTimeWindows.of(
.windowAll(SlidingEventTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
......@@ -114,13 +114,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]]
assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
assertTrue(
winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]])
val window2 = source
.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
def apply(
......@@ -137,7 +137,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]]
assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
}
......@@ -170,7 +170,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(1000))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
......@@ -189,7 +189,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
}
......@@ -203,7 +203,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(
.window(SlidingEventTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
......@@ -223,14 +223,14 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
assertTrue(
winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
val window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
def apply(
......@@ -248,7 +248,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(
winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
}
......
......@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
......@@ -84,7 +84,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
source1.coGroup(source2)
.where(_._1)
.equalTo(_._1)
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) =>
"F:" + first.mkString("") + " S:" + second.mkString("")
}
......@@ -156,7 +156,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
source1.join(source2)
.where(_._1)
.equalTo(_._1)
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply( (l, r) => l.toString + ":" + r.toString)
.addSink(new SinkFunction[String]() {
def invoke(value: String) {
......@@ -219,7 +219,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
source1.join(source1)
.where(_._1)
.equalTo(_._1)
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply( (l, r) => l.toString + ":" + r.toString)
.addSink(new SinkFunction[String]() {
def invoke(value: String) {
......
......@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermark
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.junit.Test
......@@ -69,7 +69,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
source1
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
......@@ -116,7 +116,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
source1
.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
......
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDesc
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
......@@ -91,7 +91,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(
.window(SlidingEventTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
......@@ -105,14 +105,14 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
assertTrue(
winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
val window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
def apply(
......@@ -130,7 +130,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
}
......@@ -165,7 +165,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(1000))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
......@@ -185,7 +185,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
}
......@@ -199,7 +199,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
.window(SlidingTimeWindows.of(
.window(SlidingEventTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
......@@ -219,14 +219,14 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]]
assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
assertTrue(
winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
val window2 = source
.keyBy(0)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
def apply(
......@@ -244,7 +244,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]]
assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
assertTrue(
winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册