未验证 提交 eaef2d7f 编写于 作者: S Stephan Ewen 提交者: Aljoscha Krettek

[FLINK-17659] Add common watermark strategies and WatermarkStrategies helper

上级 792e0cc7
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
/**
* A watermark generator that assumes monotonically ascending timestamps within the
* stream split and periodically generates watermarks based on that assumption.
*
* <p>The current watermark is always one after the latest (highest) timestamp,
* because we assume that more records with the same timestamp may still follow.
*
* <p>The watermarks are generated periodically and tightly follow the latest
* timestamp in the data. The delay introduced by this strategy is mainly the periodic
* interval in which the watermarks are generated, which can be configured via
* {@link org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
*/
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/**
* Creates a new watermark generator with for ascending timestamps.
*/
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A WatermarkGenerator for situations where records are out of order, but you can place an upper
* bound on how far the events are out of order. An out-of-order bound B means that once an
* event with timestamp T was encountered, no events older than {@code T - B} will follow any more.
*
* <p>The watermarks are generated periodically. The delay introduced by this watermark strategy
* is the periodic interval length, plus the out-of-orderness bound.
*/
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
/**
* A timestamp assigner that assigns timestamps based on the machine's wall clock.
* If this assigner is used after a stream source, it realizes "ingestion time" semantics.
*
* @param <T> The type of the elements that get timestamps assigned.
*/
@Public
public final class IngestionTimeAssigner<T> implements TimestampAssigner<T> {
private static final long serialVersionUID = 1L;
private long maxTimestamp;
@Override
public long extractTimestamp(T element, long recordTimestamp) {
// make sure timestamps are monotonously increasing, even when the system clock re-syncs
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* WatermarkStrategies is a simply way to build a {@link WatermarkStrategy} by configuring
* common strategies.
*/
public final class WatermarkStrategies {
/** The base strategy for watermark generation. Starting point, is always set. */
private final WatermarkStrategy<?> baseStrategy;
/** Optional idle timeout for watermarks. */
@Nullable
private Duration idleTimeout;
private WatermarkStrategies(WatermarkStrategy<?> baseStrategy) {
this.baseStrategy = baseStrategy;
}
// ------------------------------------------------------------------------
// builder methods
// ------------------------------------------------------------------------
/**
* Add an idle timeout to the watermark strategy.
* If no records flow in a partition of a stream for that amount of time, then that partition
* is considered "idle" and will not hold back the progress of watermarks in downstream operators.
*
* <p>Idleness can be important if some partitions have little data and might not have events during
* some periods. Without idleness, these streams can stall the overall event time progress of the
* application.
*/
public WatermarkStrategies withIdleness(Duration idleTimeout) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero");
this.idleTimeout = idleTimeout;
return this;
}
/**
* Build the watermark strategy.
*/
public <T> WatermarkStrategy<T> build() {
@SuppressWarnings("unchecked")
WatermarkStrategy<T> strategy = (WatermarkStrategy<T>) this.baseStrategy;
if (idleTimeout != null) {
strategy = new WithIdlenessStrategy<>(strategy, idleTimeout);
}
return strategy;
}
// ------------------------------------------------------------------------
// builder entry points
// ------------------------------------------------------------------------
/**
* Starts building a watermark strategy for situations with monotonously ascending
* timestamps.
*
* <p>The watermarks are generated periodically and tightly follow the latest
* timestamp in the data. The delay introduced by this strategy is mainly the periodic
* interval in which the watermarks are generated.
*
* @see AscendingTimestampsWatermarks
*/
public static WatermarkStrategies forMonotonousTimestamps() {
return new WatermarkStrategies(AscendingTimestampsWatermarks::new);
}
/**
* Starts building a watermark strategy for situations where records are out of order, but
* you can place an upper bound on how far the events are out of order.
* An out-of-order bound B means that once the an event with timestamp T was encountered, no
* events older than {@code T - B} will follow any more.
*
* <p>The watermarks are generated periodically. The delay introduced by this watermark strategy
* is the periodic interval length, plus the out of orderness bound.
*
* @see BoundedOutOfOrdernessWatermarks
*/
public static WatermarkStrategies forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return new WatermarkStrategies(() -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness));
}
/**
* Starts building a watermark strategy based on an existing {@code WatermarkStrategy}.
*/
public static WatermarkStrategies forStrategy(WatermarkStrategy<?> strategy) {
return new WatermarkStrategies(strategy);
}
/**
* Starts building a watermark strategy based on an existing {@code WatermarkGenerator}.
*/
public static <X extends WatermarkGenerator<?> & Serializable> WatermarkStrategies forGenerator(X generator) {
@SuppressWarnings("unchecked")
final WatermarkGenerator<Object> gen = (WatermarkGenerator<Object>) generator;
return new WatermarkStrategies(new FromSerializedGeneratorStrategy<>(gen));
}
// ------------------------------------------------------------------------
private static final class FromSerializedGeneratorStrategy<T> implements WatermarkStrategy<T> {
private static final long serialVersionUID = 1L;
private final WatermarkGenerator<T> generator;
private FromSerializedGeneratorStrategy(WatermarkGenerator<T> generator) {
this.generator = generator;
}
@Override
public WatermarkGenerator<T> createWatermarkGenerator() {
try {
byte[] serialized = InstantiationUtil.serializeObject(generator);
return InstantiationUtil.deserializeObject(serialized, generator.getClass().getClassLoader());
}
catch (Exception e) {
throw new FlinkRuntimeException("Cannot clone watermark generator via serialization");
}
}
}
private static final class WithIdlenessStrategy<T> implements WatermarkStrategy<T> {
private static final long serialVersionUID = 1L;
private final WatermarkStrategy<T> baseStrategy;
private final Duration idlenessTimeout;
private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) {
this.baseStrategy = baseStrategy;
this.idlenessTimeout = idlenessTimeout;
}
@Override
public WatermarkGenerator<T> createWatermarkGenerator() {
return new WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(), idlenessTimeout);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.io.Serializable;
/**
* The WatermarkStrategy defines how to generate {@link Watermark}s in the stream sources.
* The WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator} that
* generates the watermarks.
*
* <p>This interface is {@link Serializable} because watermark strategies may be shipped
* to workers during distributed execution.
*/
@Public
public interface WatermarkStrategy<T> extends Serializable {
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
WatermarkGenerator<T> createWatermarkGenerator();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A WatermarkGenerator that adds idleness detection to another WatermarkGenerator.
* If no events come within a certain time (timeout duration) then this generator marks
* the stream as idle, until the next watermark is generated.
*/
@Public
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
private final WatermarkGenerator<T> watermarks;
private final IdlenessTimer idlenessTimer;
/**
* Creates a new WatermarksWithIdleness generator to the given generator idleness
* detection with the given timeout.
*
* @param watermarks The original watermark generator.
* @param idleTimeout The timeout for the idleness detection.
*/
public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
this(watermarks, idleTimeout, SystemClock.getInstance());
}
@VisibleForTesting
WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero");
this.watermarks = checkNotNull(watermarks, "watermarks");
this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
watermarks.onEvent(event, eventTimestamp, output);
idlenessTimer.activity();
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
if (idlenessTimer.checkIfIdle()) {
output.markIdle();
}
else {
watermarks.onPeriodicEmit(output);
}
}
// ------------------------------------------------------------------------
@VisibleForTesting
static final class IdlenessTimer {
/** The clock used to measure elapsed time. */
private final Clock clock;
/** Counter to detect change. No problem if it overflows. */
private long counter;
/** The value of the counter at the last activity check. */
private long lastCounter;
/** The first time (relative to {@link Clock#relativeTimeNanos()}) when the activity
* check found that no activity happened since the last check.
* Special value: 0 = no timer. */
private long startOfInactivityNanos;
/** The duration before the output is marked as idle. */
private final long maxIdleTimeNanos;
IdlenessTimer(Clock clock, Duration idleTimeout) {
this.clock = clock;
long idleNanos;
try {
idleNanos = idleTimeout.toNanos();
} catch (ArithmeticException ignored) {
// long integer overflow
idleNanos = Long.MAX_VALUE;
}
this.maxIdleTimeNanos = idleNanos;
}
public void activity() {
counter++;
}
public boolean checkIfIdle() {
if (counter != lastCounter) {
// activity since the last check. we reset the timer
lastCounter = counter;
startOfInactivityNanos = 0L;
return false;
}
else // timer started but has not yet reached idle timeout
if (startOfInactivityNanos == 0L) {
// first time that we see no activity since the last periodic probe
// begin the timer
startOfInactivityNanos = clock.relativeTimeNanos();
return false;
}
else {
return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;
}
}
}
}
......@@ -30,7 +30,15 @@ import java.util.concurrent.atomic.AtomicLong;
@PublicEvolving
public final class ManualClock extends Clock {
private AtomicLong currentTime = new AtomicLong(0L);
private final AtomicLong currentTime;
public ManualClock() {
this(0);
}
public ManualClock(long startTime) {
this.currentTime = new AtomicLong(startTime);
}
@Override
public long absoluteTimeMillis() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Tests for the {@link AscendingTimestampsWatermarks} class.
*/
public class AscendingTimestampsWatermarksTest {
@Test
public void testWatermarkBeforeRecords() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final AscendingTimestampsWatermarks<Object> watermarks = new AscendingTimestampsWatermarks<>();
watermarks.onPeriodicEmit(output);
assertEquals(Long.MIN_VALUE, output.lastWatermark().getTimestamp());
}
@Test
public void testWatermarkAfterEvent() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final AscendingTimestampsWatermarks<Object> watermarks = new AscendingTimestampsWatermarks<>();
watermarks.onEvent(new Object(), 1337L, output);
watermarks.onPeriodicEmit(output);
assertEquals(1336L, output.lastWatermark().getTimestamp());
}
@Test
public void testWatermarkAfterEventWithLowerTimestamp() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final AscendingTimestampsWatermarks<Object> watermarks = new AscendingTimestampsWatermarks<>();
watermarks.onEvent(new Object(), 12345L, output);
watermarks.onEvent(new Object(), 12340L, output);
watermarks.onPeriodicEmit(output);
assertEquals(12344L, output.lastWatermark().getTimestamp());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.junit.Test;
import java.time.Duration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Tests for the {@link AscendingTimestampsWatermarks} class.
*/
public class BoundedOutOfOrdernessWatermarksTest {
@Test
public void testWatermarkBeforeRecords() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final BoundedOutOfOrdernessWatermarks<Object> watermarks =
new BoundedOutOfOrdernessWatermarks<>(Duration.ofMillis(10));
watermarks.onPeriodicEmit(output);
assertNotNull(output.lastWatermark());
assertEquals(Long.MIN_VALUE, output.lastWatermark().getTimestamp());
}
@Test
public void testWatermarkAfterEvent() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final BoundedOutOfOrdernessWatermarks<Object> watermarks =
new BoundedOutOfOrdernessWatermarks<>(Duration.ofMillis(10));
watermarks.onEvent(new Object(), 1337L, output);
watermarks.onPeriodicEmit(output);
assertEquals(1326L, output.lastWatermark().getTimestamp());
}
@Test
public void testWatermarkAfterNonMonotonousEvents() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final BoundedOutOfOrdernessWatermarks<Object> watermarks =
new BoundedOutOfOrdernessWatermarks<>(Duration.ofMillis(10));
watermarks.onEvent(new Object(), 12345L, output);
watermarks.onEvent(new Object(), 12300L, output);
watermarks.onEvent(new Object(), 12340L, output);
watermarks.onEvent(new Object(), 12280L, output);
watermarks.onPeriodicEmit(output);
assertEquals(12334L, output.lastWatermark().getTimestamp());
}
@Test
public void testRepeatedProbe() {
final TestingWatermarkOutput output = new TestingWatermarkOutput();
final BoundedOutOfOrdernessWatermarks<Object> watermarks =
new BoundedOutOfOrdernessWatermarks<>(Duration.ofMillis(10));
watermarks.onEvent(new Object(), 723456L, new TestingWatermarkOutput());
watermarks.onPeriodicEmit(new TestingWatermarkOutput());
watermarks.onPeriodicEmit(output);
assertEquals(723445L, output.lastWatermark().getTimestamp());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
/**
* A testing implementation of {@link WatermarkOutput}.
*/
final class TestingWatermarkOutput implements WatermarkOutput {
private Watermark lastWatermark;
private boolean isIdle;
@Override
public void emitWatermark(Watermark watermark) {
lastWatermark = watermark;
isIdle = false;
}
@Override
public void markIdle() {
isIdle = true;
}
public Watermark lastWatermark() {
return lastWatermark;
}
public boolean isIdle() {
return isIdle;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.eventtime;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness.IdlenessTimer;
import org.apache.flink.util.clock.ManualClock;
import org.junit.Test;
import java.time.Duration;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test for the {@link WatermarksWithIdleness} class.
*/
public class WatermarksWithIdlenessTest {
@Test(expected = IllegalArgumentException.class)
public void testZeroTimeout() {
new WatermarksWithIdleness<>(new AscendingTimestampsWatermarks<>(), Duration.ZERO);
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeTimeout() {
new WatermarksWithIdleness<>(new AscendingTimestampsWatermarks<>(), Duration.ofMillis(-1L));
}
@Test
public void testInitiallyActive() {
final ManualClock clock = new ManualClock(System.nanoTime());
final IdlenessTimer timer = new IdlenessTimer(clock, Duration.ofMillis(10));
assertFalse(timer.checkIfIdle());
}
@Test
public void testIdleWithoutEvents() {
final ManualClock clock = new ManualClock(System.nanoTime());
final IdlenessTimer timer = new IdlenessTimer(clock, Duration.ofMillis(10));
timer.checkIfIdle(); // start timer
clock.advanceTime(11, MILLISECONDS);
assertTrue(timer.checkIfIdle());
}
@Test
public void testRepeatedIdleChecks() {
final ManualClock clock = new ManualClock(System.nanoTime());
final IdlenessTimer timer = createTimerAndMakeIdle(clock, Duration.ofMillis(122));
assertTrue(timer.checkIfIdle());
clock.advanceTime(100, MILLISECONDS);
assertTrue(timer.checkIfIdle());
}
@Test
public void testActiveAfterIdleness() {
final ManualClock clock = new ManualClock(System.nanoTime());
final IdlenessTimer timer = createTimerAndMakeIdle(clock, Duration.ofMillis(10));
timer.activity();
assertFalse(timer.checkIfIdle());
}
@Test
public void testIdleActiveIdle() {
final ManualClock clock = new ManualClock(System.nanoTime());
final IdlenessTimer timer = createTimerAndMakeIdle(clock, Duration.ofMillis(122));
// active again
timer.activity();
assertFalse(timer.checkIfIdle());
// idle again
timer.checkIfIdle(); // start timer
clock.advanceTime(Duration.ofMillis(123));
assertTrue(timer.checkIfIdle());
}
private static IdlenessTimer createTimerAndMakeIdle(ManualClock clock, Duration idleTimeout) {
final IdlenessTimer timer = new IdlenessTimer(clock, idleTimeout);
timer.checkIfIdle(); // start timer
clock.advanceTime(Duration.ofMillis(idleTimeout.toMillis() + 1));
assertTrue(timer.checkIfIdle()); // rigger timer
return timer;
}
}
......@@ -42,10 +42,10 @@ public interface TimestampAssigner<T> extends org.apache.flink.api.common.eventt
* {@code Long.MIN_VALUE}.
*
* @param element The element that the timestamp will be assigned to.
* @param previousElementTimestamp The previous internal timestamp of the element,
* @param recordTimestamp The previous internal timestamp of the element,
* or a negative value, if no timestamp has been assigned yet.
* @return The new timestamp.
*/
@Override
long extractTimestamp(T element, long previousElementTimestamp);
long extractTimestamp(T element, long recordTimestamp);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册