提交 2c11291f 编写于 作者: Y Yangze Guo 提交者: azagrebin

[FLINK-14951][tests] Harden the thread safety of State TTL backend tests

上级 5c89d128
......@@ -19,12 +19,15 @@
package org.apache.flink.streaming.tests;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A stub implementation of a {@link TtlTimeProvider} which guarantees that
* processing time increases monotonically.
......@@ -54,14 +57,24 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable {
private static final Object lock = new Object();
@GuardedBy("lock")
static long freeze() {
static <T, E extends Throwable> T doWithFrozenTime(FunctionWithException<Long, T, E> action) throws E {
synchronized (lock) {
if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) {
timeIsFrozen = true;
return getCurrentTimestamp();
} else {
return lastReturnedProcessingTime;
}
final long timestampBeforeUpdate = freeze();
T result = action.apply(timestampBeforeUpdate);
final long timestampAfterUpdate = unfreezeTime();
checkState(timestampAfterUpdate == timestampBeforeUpdate,
"Timestamps before and after the update do not match.");
return result;
}
}
private static long freeze() {
if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) {
timeIsFrozen = true;
return getCurrentTimestamp();
} else {
return lastReturnedProcessingTime;
}
}
......@@ -87,11 +100,8 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable {
return lastReturnedProcessingTime;
}
@GuardedBy("lock")
static long unfreezeTime() {
synchronized (lock) {
timeIsFrozen = false;
return lastReturnedProcessingTime;
}
private static long unfreezeTime() {
timeIsFrozen = false;
return lastReturnedProcessingTime;
}
}
......@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Update state with TTL for each verifier.
......@@ -114,19 +113,13 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String
TtlStateVerifier<?, ?> verifier,
Object update) throws Exception {
final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze();
State state = states.get(verifier.getId());
Object valueBeforeUpdate = verifier.get(state);
verifier.update(state, update);
Object updatedValue = verifier.get(state);
final long timestampAfterUpdate = MonotonicTTLTimeProvider.unfreezeTime();
checkState(
timestampAfterUpdate == timestampBeforeUpdate,
"Timestamps before and after the update do not match."
);
return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, timestampAfterUpdate);
return MonotonicTTLTimeProvider.doWithFrozenTime(frozenTimestamp -> {
State state = states.get(verifier.getId());
Object valueBeforeUpdate = verifier.get(state);
verifier.update(state, update);
Object updatedValue = verifier.get(state);
return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, frozenTimestamp);
});
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册