提交 c6358024 编写于 作者: G Gabor Gevay 提交者: Gyula Fora

[FLINK-1797] Add jumping pre-reducer for Count and Time windows

Closes #549
上级 1a3ae03b
......@@ -65,6 +65,10 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedP
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
......@@ -525,6 +529,33 @@ public class WindowedDataStream<OUT> {
WindowUtils.getTimeStampWrapper(trigger));
}
} else if(WindowUtils.isJumpingCountPolicy(trigger, eviction)){
if(groupByKey == null){
return new JumpingCountPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(), getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
} else {
return new JumpingCountGroupedPreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(),
groupByKey,
getType().createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
}
} else if(WindowUtils.isJumpingTimePolicy(trigger, eviction)){
if(groupByKey == null) {
return new JumpingTimePreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
getType().createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger),
WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
} else {
return new JumpingTimeGroupedPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
groupByKey,
getType().createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger),
WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
}
}
}
return new BasicWindowBuffer<OUT>();
......
......@@ -45,7 +45,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
} else {
// first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
// then for new
setParallelism(GlobalConfiguration.getInteger(
......
......@@ -39,7 +39,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
} else {
// first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
// then for new
setParallelism(GlobalConfiguration.getInteger(
......
......@@ -162,4 +162,30 @@ public class WindowUtils {
}
}
public static boolean isJumpingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
if (isCountOnly(trigger, eviction)) {
long slide = getSlideSize(trigger);
long window = getWindowSize(eviction);
return slide > window
&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
.getStart()
&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
} else {
return false;
}
}
public static boolean isJumpingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
if (isTimeOnly(trigger, eviction)) {
long slide = getSlideSize(trigger);
long window = getWindowSize(eviction);
return slide > window
&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
} else {
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;
public class JumpingCountGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
private static final long serialVersionUID = 1L;
private final long countToSkip; // How many elements should be jumped over
private long skipped = 0; // How many elements have we skipped since the last emitWindow
public JumpingCountGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
TypeSerializer<T> serializer, long countToSkip) {
super(reducer, keySelector, serializer);
this.countToSkip = countToSkip;
}
@Override
public void emitWindow(Collector<StreamWindow<T>> collector) {
super.emitWindow(collector);
skipped = 0;
}
@Override
public void store(T element) throws Exception {
if(skipped == countToSkip){
super.store(element);
} else {
skipped++;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;
/**
* Non-grouped pre-reducer for jumping time eviction policy
* (the policies are based on count, and the slide size is larger than the window size).
*/
public class JumpingCountPreReducer<T> extends TumblingPreReducer<T> {
private static final long serialVersionUID = 1L;
private final long countToSkip; // How many elements should be jumped over
private long skipped = 0; // How many elements have we skipped since the last emitWindow
public JumpingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer, long countToSkip){
super(reducer, serializer);
this.countToSkip = countToSkip;
}
@Override
public void emitWindow(Collector<StreamWindow<T>> collector) {
super.emitWindow(collector);
skipped = 0;
}
@Override
public void store(T element) throws Exception {
if(skipped == countToSkip){
super.store(element);
} else {
skipped++;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.util.Collector;
public class JumpingTimeGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
private static final long serialVersionUID = 1L;
private TimestampWrapper<T> timestampWrapper;
protected long windowStartTime;
private long slideSize;
public JumpingTimeGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
TypeSerializer<T> serializer,
long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
super(reducer, keySelector, serializer);
this.timestampWrapper = timestampWrapper;
this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
this.slideSize = slideSize;
}
@Override
public void emitWindow(Collector<StreamWindow<T>> collector) {
super.emitWindow(collector);
windowStartTime += slideSize;
}
public void store(T element) throws Exception {
if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
super.store(element);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.util.Collector;
/**
* Non-grouped pre-reducer for jumping time eviction policy
* (the policies are based on time, and the slide size is larger than the window size).
*/
public class JumpingTimePreReducer<T> extends TumblingPreReducer<T> {
private static final long serialVersionUID = 1L;
private TimestampWrapper<T> timestampWrapper;
protected long windowStartTime;
private long slideSize;
public JumpingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
super(reducer, serializer);
this.timestampWrapper = timestampWrapper;
this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
this.slideSize = slideSize;
}
@Override
public void emitWindow(Collector<StreamWindow<T>> collector) {
super.emitWindow(collector);
windowStartTime += slideSize;
}
public void store(T element) throws Exception {
if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
super.store(element);
}
}
}
......@@ -21,9 +21,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
/**
* Non-grouped pre-reducer for tumbling eviction policy.
*/
public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
private static final long serialVersionUID = 1L;
......
......@@ -20,9 +20,6 @@ package org.apache.flink.streaming.api.windowing.windowbuffer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
/**
* Non-grouped pre-reducer for tumbling eviction policy.
*/
public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
private static final long serialVersionUID = 1L;
......
......@@ -28,6 +28,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.StreamWindow;
/*
* Grouped pre-reducer for sliding eviction policy
* (the slide size is smaller than the window size).
*/
public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
private static final long serialVersionUID = 1L;
......
......@@ -24,6 +24,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;
/*
* Non-grouped pre-reducer for sliding eviction policy
* (the slide size is smaller than the window size).
*/
public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
private static final long serialVersionUID = 1L;
......
......@@ -23,7 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
/**
* Non-grouped pre-reducer for tumbling eviction policy.
* Non-grouped pre-reducer for sliding time eviction policy.
*/
public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
......
......@@ -22,7 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
/**
* Non-grouped pre-reducer for tumbling eviction policy.
* Non-grouped pre-reducer for sliding time eviction policy
* (the policies are based on time, and the slide size is smaller than the window size).
*/
public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
......
......@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;
/**
* Non-grouped pre-reducer for tumbling eviction policy.
* Non-grouped pre-reducer for tumbling eviction policy (the slide size is the same as the window size).
*/
public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
......
......@@ -430,7 +430,8 @@ public class AggregationFunctionTest {
}
public static class MyPojo implements Serializable {
private static final long serialVersionUID = 1L;
public int f0;
public int f1;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
public class JumpingCountGroupedPreReducerTest {
TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor
.getForObject(new Tuple2<Integer, Integer>(1, 1));
TypeSerializer<Tuple2<Integer, Integer>> serializer = type.createSerializer(null);
KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, type), type, null);
Reducer reducer = new Reducer();
@SuppressWarnings("unchecked")
@Test
public void testEmitWindow() throws Exception {
List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
inputs.add(new Tuple2<Integer, Integer>(1, 1));
inputs.add(new Tuple2<Integer, Integer>(0, 0));
inputs.add(new Tuple2<Integer, Integer>(1, -1));
inputs.add(new Tuple2<Integer, Integer>(1, -2));
inputs.add(new Tuple2<Integer, Integer>(100, -200));
TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
reducer, key, serializer, 1);
wb.store(serializer.copy(inputs.get(4)));
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.emitWindow(collector);
assertEquals(1, collected.size());
assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
wb.store(serializer.copy(inputs.get(4)));
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
// Nothing should happen here
wb.evict(3);
wb.store(serializer.copy(inputs.get(3)));
wb.emitWindow(collector);
assertEquals(2, collected.size());
assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, -2),
new Tuple2<Integer, Integer>(0, 0)), collected.get(1));
// Test whether function is mutating inputs or not
assertEquals(2, reducer.allInputs.size());
assertEquals(reducer.allInputs.get(0), inputs.get(2));
assertEquals(reducer.allInputs.get(1), inputs.get(3));
}
@SuppressWarnings("unchecked")
@Test
public void testEmitWindow2() throws Exception {
List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
inputs.add(new Tuple2<Integer, Integer>(1, 1));
inputs.add(new Tuple2<Integer, Integer>(0, 0));
inputs.add(new Tuple2<Integer, Integer>(1, -1));
inputs.add(new Tuple2<Integer, Integer>(1, -2));
inputs.add(new Tuple2<Integer, Integer>(100, -200));
TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
reducer, key, serializer, 1).sequentialID();
wb.store(serializer.copy(inputs.get(4)));
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.emitWindow(collector);
assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
wb.store(serializer.copy(inputs.get(4)));
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
wb.emitWindow(collector);
assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
}
private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
assertEquals(new HashSet<T>(first), new HashSet<T>(second));
}
@SuppressWarnings("serial")
private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
Tuple2<Integer, Integer> value2) throws Exception {
allInputs.add(value2);
value1.f0 = value1.f0 + value2.f0;
value1.f1 = value1.f1 + value2.f1;
return value1;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
import org.junit.Test;
public class JumpingCountPreReducerTest {
TypeSerializer<Tuple2<Integer, Integer>> serializer = TypeExtractor.getForObject(
new Tuple2<Integer, Integer>(1, 1)).createSerializer(null);
Reducer reducer = new Reducer();
@SuppressWarnings("unchecked")
@Test
public void testEmitWindow() throws Exception {
List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
inputs.add(new Tuple2<Integer, Integer>(1, 1));
inputs.add(new Tuple2<Integer, Integer>(2, 0));
inputs.add(new Tuple2<Integer, Integer>(3, -1));
inputs.add(new Tuple2<Integer, Integer>(4, -2));
inputs.add(new Tuple2<Integer, Integer>(5, -3));
TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>(
reducer, serializer, 2);
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
wb.store(serializer.copy(inputs.get(3)));
wb.store(serializer.copy(inputs.get(4)));
wb.emitWindow(collector);
assertEquals(1, collected.size());
assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(12, -6)),
collected.get(0));
wb.store(serializer.copy(inputs.get(0)));
wb.store(serializer.copy(inputs.get(1)));
wb.store(serializer.copy(inputs.get(2)));
// Nothing should happen here
wb.evict(3);
wb.store(serializer.copy(inputs.get(3)));
wb.emitWindow(collector);
assertEquals(2, collected.size());
assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(7, -3)),
collected.get(1));
// Test whether function is mutating inputs or not
assertEquals(3, reducer.allInputs.size());
assertEquals(reducer.allInputs.get(0), inputs.get(3));
assertEquals(reducer.allInputs.get(1), inputs.get(4));
assertEquals(reducer.allInputs.get(2), inputs.get(3));
}
@SuppressWarnings("serial")
private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
Tuple2<Integer, Integer> value2) throws Exception {
allInputs.add(value2);
value1.f0 = value1.f0 + value2.f0;
value1.f1 = value1.f1 + value2.f1;
return value1;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
import static org.junit.Assert.assertEquals;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
import org.junit.Test;
public class JumpingTimePreReducerTest {
TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
ReduceFunction<Integer> reducer = new SumReducer();
@Test
public void testEmitWindow() throws Exception {
TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
List<StreamWindow<Integer>> collected = collector.getCollected();
WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(
reducer, serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value;
}
}, 1));
wb.store(1);
wb.store(2);
wb.store(3);
wb.evict(1);
wb.emitWindow(collector);
assertEquals(1, collected.size());
assertEquals(StreamWindow.fromElements(5),
collected.get(0));
wb.store(4);
wb.store(5);
// Nothing should happen here
wb.evict(2);
wb.store(6);
wb.emitWindow(collector);
wb.evict(2);
wb.emitWindow(collector);
wb.store(12);
wb.emitWindow(collector);
assertEquals(3, collected.size());
assertEquals(StreamWindow.fromElements(11),
collected.get(1));
assertEquals(StreamWindow.fromElements(12),
collected.get(2));
}
private static class SumReducer implements ReduceFunction<Integer> {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册