提交 25d52e4d 编写于 作者: A Aljoscha Krettek 提交者: Tzu-Li (Gordon) Tai

[hotfix] Fix various small issues in WindowOperatorContractTest

上级 3c4b1565
......@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
......@@ -36,7 +37,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
......@@ -155,7 +156,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
@SuppressWarnings("unchecked")
static Iterable<Integer> intIterable(Integer... values) {
return (Iterable<Integer>) argThat(containsInAnyOrder(values));
return (Iterable<Integer>) argThat(contains(values));
}
static TimeWindow anyTimeWindow() {
......@@ -247,55 +248,55 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
}
private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
}
private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
}
private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
}
private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
}
private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
}
private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
}
private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
}
private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
}
private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception {
when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
}
/**
* Verify that there is no late-date side output if the {@code WindowAssigner} does
* Verify that there is no late-data side output if the {@code WindowAssigner} does
* not assign any windows.
*/
@Test
......@@ -346,7 +347,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext());
assertThat(testHarness.getSideOutput(lateOutputTag),
containsInAnyOrder(isStreamRecord(0, 5L)));
contains(isStreamRecord(0, 5L)));
// we should also see side output if the WindowAssigner assigns no windows
when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
......@@ -358,7 +359,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), anyAssignerContext());
assertThat(testHarness.getSideOutput(lateOutputTag),
containsInAnyOrder(isStreamRecord(0, 5L), isStreamRecord(0, 10L)));
contains(isStreamRecord(0, 5L), isStreamRecord(0, 10L)));
}
......@@ -520,7 +521,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
}
@Test
......@@ -534,7 +535,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -573,7 +574,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
}
@Test
......@@ -1067,9 +1068,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
/**
* Verify that we neither invoke the trigger nor the window function if a timer
* for an empty merging window.
* for an empty merging window fires.
*/
public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
......@@ -1133,7 +1134,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we neither invoke the trigger nor the window function if a timer
* fires for a merging window that was already garbage collected.
*/
public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
......@@ -1166,7 +1167,6 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
testHarness.processElement(new StreamRecord<>(0, 0L));
assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set
......@@ -1311,7 +1311,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
}
public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
......@@ -1372,8 +1372,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback());
verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback());
verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback());
verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback());
......@@ -1392,7 +1393,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
/**
* Verify that windows are merged eagerly, if possible.
*/
public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception {
// in this test we only have one state window and windows are eagerly
// merged into the first window
......@@ -1456,8 +1457,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
shouldMergeWindows(
mockAssigner,
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)),
new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))),
new TimeWindow(0, 4));
// don't register a timer or update state in onElement, this checks
......@@ -1491,7 +1492,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we only keep one of the underlying state windows. This test also verifies that
* GC timers are correctly deleted when merging windows.
*/
public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
......@@ -1562,8 +1563,8 @@ public abstract class WindowOperatorContractTest extends TestLogger {
shouldMergeWindows(
mockAssigner,
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)),
new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
new TimeWindow(0, 4));
testHarness.processElement(new StreamRecord<>(0, 0L));
......@@ -1618,7 +1619,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor());
}
public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception {
MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
......@@ -1663,7 +1664,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor());
}
public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
......@@ -1767,7 +1768,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testGarbageCollectionTimer(new ProcessingTimeAdaptor());
}
public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -1812,7 +1813,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor());
}
public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -1868,7 +1869,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor());
}
public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -1938,7 +1939,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we correctly clean up even when a purging trigger has purged
* window state.
*/
public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -2009,7 +2010,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
* Verify that we correctly clean up even when a purging trigger has purged
* window state.
*/
public void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
private void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -2075,7 +2076,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testMergingWindowSetClearedAtGarbageCollection(new ProcessingTimeAdaptor());
}
public void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
......@@ -2120,12 +2121,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws Exception {
return TriggerResult.FIRE;
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
shouldFireOnElement(mockTrigger);
// 20 is just at the limit, window.maxTime() is 1 and allowed lateness is 20
testHarness.processWatermark(new Watermark(20));
......@@ -2159,12 +2155,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.getOutput().size());
assertEquals(0, testHarness.numKeyedStateEntries());
doAnswer(new Answer<TriggerResult>() {
@Override
public TriggerResult answer(InvocationOnMock invocation) throws Exception {
return TriggerResult.FIRE;
}
}).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext());
shouldFireOnElement(mockTrigger);
// window.maxTime() == 1 plus 20L of allowed lateness
testHarness.processWatermark(new Watermark(21));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册