diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 956fd059d9794390f97fc3891c66ababce32595e..191eb6f907ab36a4887bd2dc03a73454ead83546 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.ByteArrayInputStream; @@ -94,8 +95,8 @@ public abstract class StateDescriptor implements Serializabl private String queryableStateName; /** Name for queries against state created from this StateDescriptor. */ - @Nullable - private StateTtlConfiguration ttlConfig; + @Nonnull + private StateTtlConfiguration ttlConfig = StateTtlConfiguration.DISABLED; /** The default value returned by the state when no other value is bound to a key. */ @Nullable @@ -208,7 +209,8 @@ public abstract class StateDescriptor implements Serializabl * @throws IllegalStateException If queryable state name already set */ public void setQueryable(String queryableStateName) { - Preconditions.checkArgument(ttlConfig == null, + Preconditions.checkArgument( + ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled, "Queryable state is currently not supported with TTL"); if (this.queryableStateName == null) { this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name"); @@ -247,12 +249,14 @@ public abstract class StateDescriptor implements Serializabl */ public void enableTimeToLive(StateTtlConfiguration ttlConfig) { Preconditions.checkNotNull(ttlConfig); - Preconditions.checkArgument(queryableStateName == null, + Preconditions.checkArgument( + ttlConfig.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled && + queryableStateName == null, "Queryable state is currently not supported with TTL"); this.ttlConfig = ttlConfig; } - @Nullable + @Nonnull @Internal public StateTtlConfiguration getTtlConfig() { return ttlConfig; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java index 9bd8b15c02f91a526448b15c3dfbcc9d0dfc3cee..55ec29c19aaca13325672066588d2fec8973a4ab 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java @@ -21,6 +21,8 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.time.Time; import org.apache.flink.util.Preconditions; +import java.io.Serializable; + import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired; import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime; import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite; @@ -28,11 +30,19 @@ import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT /** * Configuration of state TTL logic. */ -public class StateTtlConfiguration { +public class StateTtlConfiguration implements Serializable { + + private static final long serialVersionUID = -7592693245044289793L; + + public static final StateTtlConfiguration DISABLED = + newBuilder(Time.milliseconds(Long.MAX_VALUE)).setTtlUpdateType(TtlUpdateType.Disabled).build(); + /** * This option value configures when to update last access timestamp which prolongs state TTL. */ public enum TtlUpdateType { + /** TTL is disabled. State does not expire. */ + Disabled, /** Last access timestamp is initialised when state is created and updated on every write operation. */ OnCreateAndWrite, /** The same as OnCreateAndWrite but also updated on read. */ @@ -91,6 +101,10 @@ public class StateTtlConfiguration { return timeCharacteristic; } + public boolean isEnabled() { + return ttlUpdateType != TtlUpdateType.Disabled; + } + @Override public String toString() { return "StateTtlConfiguration{" + @@ -129,6 +143,14 @@ public class StateTtlConfiguration { return this; } + public Builder updateTtlOnCreateAndWrite() { + return setTtlUpdateType(TtlUpdateType.OnCreateAndWrite); + } + + public Builder updateTtlOnReadAndWrite() { + return setTtlUpdateType(TtlUpdateType.OnReadAndWrite); + } + /** * Sets the state visibility. * @@ -139,6 +161,14 @@ public class StateTtlConfiguration { return this; } + public Builder returnExpiredIfNotCleanedUp() { + return setStateVisibility(TtlStateVisibility.ReturnExpiredIfNotCleanedUp); + } + + public Builder neverReturnExpired() { + return setStateVisibility(TtlStateVisibility.NeverReturnExpired); + } + /** * Sets the time characteristic. * @@ -149,6 +179,10 @@ public class StateTtlConfiguration { return this; } + public Builder useProcessingTime() { + return setTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime); + } + /** * Sets the ttl time. * @param ttl The ttl time. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 5909ac7c41558b2cb9f94c0f13874a3534e7af4a..e12ba900af2e6e0fdbb8c99aaac47db3eef2c29f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -54,10 +54,10 @@ public class TtlStateFactory { Preconditions.checkNotNull(stateDesc); Preconditions.checkNotNull(originalStateFactory); Preconditions.checkNotNull(timeProvider); - return stateDesc.getTtlConfig() == null ? - originalStateFactory.createInternalState(namespaceSerializer, stateDesc) : + return stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider) - .createState(namespaceSerializer, stateDesc); + .createState(namespaceSerializer, stateDesc) : + originalStateFactory.createInternalState(namespaceSerializer, stateDesc); } private final Map, KeyedStateFactory> stateFactories;