提交 90cfe0a7 编写于 作者: A Aljoscha Krettek

Fix test formatting and move WindowAssignerContext to WindowAssigner

上级 8803d15e
......@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import java.io.Serializable;
import java.util.Collection;
......@@ -70,4 +72,23 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
* {@code false} otherwise.
*/
public abstract boolean isEventTime();
/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time.
*
* <p>This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract static class WindowAssignerContext {
/**
* Returns the current processing time, as returned by
* the {@link StreamTask#getCurrentProcessingTime()}.
*/
public abstract long getCurrentProcessingTime();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.assigners;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time. This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract class WindowAssignerContext {
/**
* Returns the current processing time, as returned by
* the {@link StreamTask#getCurrentProcessingTime()}.
*/
public abstract long getCurrentProcessingTime();
}
......@@ -50,7 +50,6 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
......@@ -161,7 +160,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected transient Context context = new Context(null, null);
protected transient WindowAssignerContext windowAssignerContext;
protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
// ------------------------------------------------------------------------
// State that needs to be checkpointed
......@@ -248,7 +247,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context = new Context(null, null);
windowAssignerContext = new WindowAssignerContext() {
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
return WindowOperator.this.getCurrentProcessingTime();
......
......@@ -43,7 +43,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindow
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
......@@ -958,7 +958,7 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
......@@ -966,23 +966,25 @@ public class WindowOperatorTest {
// timestamp is ignored in processing time
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testTimeProvider.setCurrentTime(5000);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
testTimeProvider.setCurrentTime(7000);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
......@@ -1018,6 +1020,8 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// timestamp is ignored in processing time
......@@ -1025,27 +1029,36 @@ public class WindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
testTimeProvider.setCurrentTime(1000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
testTimeProvider.setCurrentTime(2000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testTimeProvider.setCurrentTime(3000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
testTimeProvider.setCurrentTime(7000);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 4999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
......@@ -1082,6 +1095,8 @@ public class WindowOperatorTest {
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
testHarness.open();
// timestamp is ignored in processing time
......@@ -1092,6 +1107,11 @@ public class WindowOperatorTest {
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
testTimeProvider.setCurrentTime(5000);
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
......@@ -1100,14 +1120,11 @@ public class WindowOperatorTest {
testTimeProvider.setCurrentTime(10000);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 7999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 7999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
assertEquals(expectedOutput.size(), testHarness.getOutput().size());
for (Object elem : testHarness.getOutput()) {
if (elem instanceof StreamRecord) {
......@@ -1214,7 +1231,7 @@ public class WindowOperatorTest {
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
long timestamp = Long.MAX_VALUE - 1750;
Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2", 1), timestamp, new WindowAssignerContext() {
Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2", 1), timestamp, new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
return operator.windowAssignerContext.getCurrentProcessingTime();
......
......@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
......@@ -43,6 +44,7 @@ import org.mockito.stubbing.Answer;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
......@@ -85,7 +87,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) {
this(operator, executionConfig, null);
this(operator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
}
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig,
......@@ -127,31 +129,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final long execTime = (Long) invocation.getArguments()[0];
final Triggerable target = (Triggerable) invocation.getArguments()[1];
if (timeServiceProvider == null) {
Thread caller = new Thread() {
@Override
public void run() {
final long delay = execTime - mockTask.getCurrentProcessingTime();
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException ignored) {
}
}
synchronized (checkpointLock) {
try {
target.trigger(execTime);
} catch (Exception ignored) {
}
}
}
};
caller.start();
} else {
timeServiceProvider.registerTimer(
timeServiceProvider.registerTimer(
execTime, new TriggerTask(checkpointLock, target, execTime));
}
return null;
}
}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
......@@ -159,9 +138,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
doAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
return timeServiceProvider == null ?
System.currentTimeMillis() :
timeServiceProvider.getCurrentProcessingTime();
return timeServiceProvider.getCurrentProcessingTime();
}
}).when(mockTask).getCurrentProcessingTime();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册