提交 dc7d81c9 编写于 作者: Y yanghua 提交者: Stefan Richter

[FLINK-9511] Implement state TTL configuration

This closes #6277.
上级 5c43d2b8
......@@ -16,16 +16,20 @@
* limitations under the License.
*/
package org.apache.flink.runtime.state.ttl;
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;
/**
* Configuration of state TTL logic.
* TODO: builder
*/
public class TtlConfig {
public class StateTtlConfiguration {
/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
......@@ -61,21 +65,17 @@ public class TtlConfig {
private final TtlTimeCharacteristic timeCharacteristic;
private final Time ttl;
public TtlConfig(
private StateTtlConfiguration(
TtlUpdateType ttlUpdateType,
TtlStateVisibility stateVisibility,
TtlTimeCharacteristic timeCharacteristic,
Time ttl) {
Preconditions.checkNotNull(ttlUpdateType);
Preconditions.checkNotNull(stateVisibility);
Preconditions.checkNotNull(timeCharacteristic);
Preconditions.checkNotNull(ttl);
this.ttlUpdateType = Preconditions.checkNotNull(ttlUpdateType);
this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
this.ttl = Preconditions.checkNotNull(ttl);
Preconditions.checkArgument(ttl.toMilliseconds() > 0,
"TTL is expected to be positive");
this.ttlUpdateType = ttlUpdateType;
this.stateVisibility = stateVisibility;
this.timeCharacteristic = timeCharacteristic;
this.ttl = ttl;
}
public TtlUpdateType getTtlUpdateType() {
......@@ -93,4 +93,82 @@ public class TtlConfig {
public TtlTimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
@Override
public String toString() {
return "StateTtlConfiguration{" +
"ttlUpdateType=" + ttlUpdateType +
", stateVisibility=" + stateVisibility +
", timeCharacteristic=" + timeCharacteristic +
", ttl=" + ttl +
'}';
}
public static Builder newBuilder(Time ttl) {
return new Builder(ttl);
}
/**
* Builder for the {@link StateTtlConfiguration}.
*/
public static class Builder {
private TtlUpdateType ttlUpdateType = OnCreateAndWrite;
private TtlStateVisibility stateVisibility = NeverReturnExpired;
private TtlTimeCharacteristic timeCharacteristic = ProcessingTime;
private Time ttl;
public Builder(Time ttl) {
this.ttl = ttl;
}
/**
* Sets the ttl update type.
*
* @param ttlUpdateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
*/
public Builder setTtlUpdateType(TtlUpdateType ttlUpdateType) {
this.ttlUpdateType = ttlUpdateType;
return this;
}
/**
* Sets the state visibility.
*
* @param stateVisibility The state visibility configures whether expired user value can be returned or not.
*/
public Builder setStateVisibility(TtlStateVisibility stateVisibility) {
this.stateVisibility = stateVisibility;
return this;
}
/**
* Sets the time characteristic.
*
* @param timeCharacteristic The time characteristic configures time scale to use for ttl.
*/
public Builder setTimeCharacteristic(TtlTimeCharacteristic timeCharacteristic) {
this.timeCharacteristic = timeCharacteristic;
return this;
}
/**
* Sets the ttl time.
* @param ttl The ttl time.
*/
public Builder setTtl(Time ttl) {
this.ttl = ttl;
return this;
}
public StateTtlConfiguration build() {
return new StateTtlConfiguration(
ttlUpdateType,
stateVisibility,
timeCharacteristic,
ttl
);
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
......@@ -34,7 +35,7 @@ abstract class AbstractTtlDecorator<T> {
/** Wrapped original state handler. */
final T original;
final TtlConfig config;
final StateTtlConfiguration config;
final TtlTimeProvider timeProvider;
......@@ -49,18 +50,18 @@ abstract class AbstractTtlDecorator<T> {
AbstractTtlDecorator(
T original,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider) {
Preconditions.checkNotNull(original);
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(timeProvider);
Preconditions.checkArgument(config.getTtlUpdateType() != TtlConfig.TtlUpdateType.Disabled,
Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled,
"State does not need to be wrapped with TTL if it is configured as disabled.");
this.original = original;
this.config = config;
this.timeProvider = timeProvider;
this.updateTsOnRead = config.getTtlUpdateType() == TtlConfig.TtlUpdateType.OnReadAndWrite;
this.returnExpired = config.getStateVisibility() == TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
this.updateTsOnRead = config.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.OnReadAndWrite;
this.returnExpired = config.getStateVisibility() == StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
this.ttl = config.getTtl().toMilliseconds();
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;
......@@ -38,7 +39,7 @@ abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N,
implements InternalKvState<K, N, SV> {
private final TypeSerializer<SV> valueSerializer;
AbstractTtlState(S original, TtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
AbstractTtlState(S original, StateTtlConfiguration config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
super(original, config, timeProvider);
this.valueSerializer = valueSerializer;
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
......@@ -37,7 +38,7 @@ class TtlAggregateFunction<IN, ACC, OUT>
ThrowingRunnable<Exception> stateClear;
ThrowingConsumer<TtlValue<ACC>, Exception> updater;
TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, TtlConfig config, TtlTimeProvider timeProvider) {
TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, StateTtlConfiguration config, TtlTimeProvider timeProvider) {
super(aggFunction, config, timeProvider);
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
......@@ -39,7 +40,7 @@ class TtlAggregatingState<K, N, IN, ACC, OUT>
TtlAggregatingState(
InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<ACC> valueSerializer,
TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.StateTtlConfiguration;
/**
* This class wraps folding function with TTL logic.
......@@ -35,7 +36,7 @@ class TtlFoldFunction<T, ACC>
private final ACC defaultAccumulator;
TtlFoldFunction(
FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
FoldFunction<T, ACC> original, StateTtlConfiguration config, TtlTimeProvider timeProvider, ACC defaultAccumulator) {
super(original, config, timeProvider);
this.defaultAccumulator = defaultAccumulator;
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
......@@ -36,7 +37,7 @@ class TtlFoldingState<K, N, T, ACC>
implements InternalFoldingState<K, N, T, ACC> {
TtlFoldingState(
InternalFoldingState<K, N, T, TtlValue<ACC>> originalState,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<ACC> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
......@@ -42,7 +43,7 @@ class TtlListState<K, N, T> extends
implements InternalListState<K, N, T> {
TtlListState(
InternalListState<K, N, TtlValue<T>> originalState,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<List<T>> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
......@@ -43,7 +44,7 @@ class TtlMapState<K, N, UK, UV>
implements InternalMapState<K, N, UK, UV> {
TtlMapState(
InternalMapState<K, N, UK, TtlValue<UV>> original,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<Map<UK, UV>> valueSerializer) {
super(original, config, timeProvider, valueSerializer);
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfiguration;
/**
* This class wraps reducing function with TTL logic.
......@@ -31,7 +32,7 @@ class TtlReduceFunction<T>
TtlReduceFunction(
ReduceFunction<T> originalReduceFunction,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider) {
super(originalReduceFunction, config, timeProvider);
}
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalReducingState;
......@@ -35,7 +36,7 @@ class TtlReducingState<K, N, T>
implements InternalReducingState<K, N, T> {
TtlReducingState(
InternalReducingState<K, N, TtlValue<T>> originalState,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<T> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
......
......@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -48,14 +49,14 @@ public class TtlStateFactory {
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateFactory originalStateFactory,
TtlConfig ttlConfig,
StateTtlConfiguration ttlConfig,
TtlTimeProvider timeProvider) throws Exception {
Preconditions.checkNotNull(namespaceSerializer);
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(originalStateFactory);
Preconditions.checkNotNull(ttlConfig);
Preconditions.checkNotNull(timeProvider);
return ttlConfig.getTtlUpdateType() == TtlConfig.TtlUpdateType.Disabled ?
return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ?
originalStateFactory.createState(namespaceSerializer, stateDesc) :
new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
.createState(namespaceSerializer, stateDesc);
......@@ -64,10 +65,10 @@ public class TtlStateFactory {
private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> stateFactories;
private final KeyedStateFactory originalStateFactory;
private final TtlConfig ttlConfig;
private final StateTtlConfiguration ttlConfig;
private final TtlTimeProvider timeProvider;
private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
private TtlStateFactory(KeyedStateFactory originalStateFactory, StateTtlConfiguration ttlConfig, TtlTimeProvider timeProvider) {
this.originalStateFactory = originalStateFactory;
this.ttlConfig = ttlConfig;
this.timeProvider = timeProvider;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
......@@ -35,7 +36,7 @@ class TtlValueState<K, N, T>
implements InternalValueState<K, N, T> {
TtlValueState(
InternalValueState<K, N, TtlValue<T>> originalState,
TtlConfig config,
StateTtlConfiguration config,
TtlTimeProvider timeProvider,
TypeSerializer<T> valueSerializer) {
super(originalState, config, timeProvider, valueSerializer);
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.ttl;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.KeyedStateFactory;
......@@ -39,7 +40,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
S ttlState;
MockTimeProvider timeProvider;
TtlConfig ttlConfig;
StateTtlConfiguration ttlConfig;
ThrowingConsumer<UV, Exception> updater;
SupplierWithException<GV, Exception> getter;
......@@ -56,20 +57,21 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
GV emptyValue = null;
void initTest() {
initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
}
private void initTest(TtlConfig.TtlUpdateType updateType, TtlConfig.TtlStateVisibility visibility) {
private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
initTest(updateType, visibility, TTL);
}
private void initTest(TtlConfig.TtlUpdateType updateType, TtlConfig.TtlStateVisibility visibility, long ttl) {
private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
timeProvider = new MockTimeProvider();
ttlConfig = new TtlConfig(
updateType,
visibility,
TtlConfig.TtlTimeCharacteristic.ProcessingTime,
Time.milliseconds(ttl));
StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
ttlConfigBuilder.setTtlUpdateType(updateType)
.setStateVisibility(visibility)
.setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
.setTtl(Time.milliseconds(ttl));
ttlConfig = ttlConfigBuilder.build();
ttlState = createState();
initTestValues();
}
......@@ -96,7 +98,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testExactExpirationOnWrite() throws Exception {
initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
timeProvider.time = 0;
updater.accept(updateEmpty);
......@@ -123,7 +125,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testRelaxedExpirationOnWrite() throws Exception {
initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
updater.accept(updateEmpty);
......@@ -135,7 +137,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testExactExpirationOnRead() throws Exception {
initTest(TtlConfig.TtlUpdateType.OnReadAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired);
initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
timeProvider.time = 0;
updater.accept(updateEmpty);
......@@ -153,7 +155,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testRelaxedExpirationOnRead() throws Exception {
initTest(TtlConfig.TtlUpdateType.OnReadAndWrite, TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
updater.accept(updateEmpty);
......@@ -168,7 +170,7 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
@Test
public void testExpirationTimestampOverflow() throws Exception {
initTest(TtlConfig.TtlUpdateType.OnCreateAndWrite, TtlConfig.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
timeProvider.time = 10;
updater.accept(updateEmpty);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册