未验证 提交 afb46450 编写于 作者: Y Yun Tang 提交者: GitHub

[FLINK-16949][test] Enhance AbstractStreamOperatorTestHarness to use customized TtlTimeProvider

This closes #11624
上级 36f8f6e3
......@@ -26,4 +26,8 @@ public class MockTtlTimeProvider implements TtlTimeProvider {
public long currentTimestamp() {
return time;
}
public void setCurrentTimestamp(long timestamp) {
this.time = timestamp;
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -85,13 +86,25 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
/** This object is the factory for everything related to state backends and checkpointing. */
private final StateBackend stateBackend;
private final TtlTimeProvider ttlTimeProvider;
public StreamTaskStateInitializerImpl(
Environment environment,
StateBackend stateBackend) {
this(environment, stateBackend, TtlTimeProvider.DEFAULT);
}
@VisibleForTesting
public StreamTaskStateInitializerImpl(
Environment environment,
StateBackend stateBackend,
TtlTimeProvider ttlTimeProvider) {
this.environment = environment;
this.taskStateManager = Preconditions.checkNotNull(environment.getTaskStateManager());
this.stateBackend = Preconditions.checkNotNull(stateBackend);
this.ttlTimeProvider = ttlTimeProvider;
}
// -----------------------------------------------------------------------------------------------------------------
......@@ -293,7 +306,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
taskInfo.getMaxNumberOfParallelSubtasks(),
keyGroupRange,
environment.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
ttlTimeProvider,
metricGroup,
stateHandles,
cancelStreamRegistryForRestore),
......
......@@ -44,6 +44,8 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
......@@ -62,7 +64,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
......@@ -104,6 +105,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
protected final TestProcessingTimeService processingTimeService;
protected final MockTtlTimeProvider ttlTimeProvider;
protected final MockStreamTask<OUT, ?> mockTask;
protected final TestTaskStateManager taskStateManager;
......@@ -118,6 +121,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
// use this as default for tests
protected StateBackend stateBackend = new MemoryStateBackend();
private CheckpointStorage checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
private final Object checkpointLock;
......@@ -241,7 +245,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
processingTimeService = new TestProcessingTimeService();
processingTimeService.setCurrentTime(0);
this.streamTaskStateInitializer = createStreamTaskStateManager(environment, stateBackend, processingTimeService);
ttlTimeProvider = new MockTtlTimeProvider();
ttlTimeProvider.setCurrentTimestamp(0);
this.streamTaskStateInitializer = createStreamTaskStateManager(environment, stateBackend, ttlTimeProvider);
BiConsumer<String, Throwable> handleAsyncException = (message, t) -> {
wasFailedExternally = true;
......@@ -264,10 +271,11 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
protected StreamTaskStateInitializer createStreamTaskStateManager(
Environment env,
StateBackend stateBackend,
ProcessingTimeService processingTimeService) {
TtlTimeProvider ttlTimeProvider) {
return new StreamTaskStateInitializerImpl(
env,
stateBackend);
stateBackend,
ttlTimeProvider);
}
public void setStateBackend(StateBackend stateBackend) {
......@@ -351,7 +359,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
public void setup(TypeSerializer<OUT> outputSerializer) {
if (!setupCalled) {
streamTaskStateInitializer =
createStreamTaskStateManager(environment, stateBackend, processingTimeService);
createStreamTaskStateManager(environment, stateBackend, ttlTimeProvider);
mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);
if (operator == null) {
......@@ -629,6 +637,10 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
processingTimeService.setCurrentTime(time);
}
public void setStateTtlProcessingTime(long timeStamp) {
ttlTimeProvider.setCurrentTimestamp(timeStamp);
}
public long getProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
......@@ -736,5 +748,4 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
// ignore
}
}
}
......@@ -18,10 +18,19 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
......@@ -52,4 +61,34 @@ public class AbstractStreamOperatorTestHarnessTest extends TestLogger {
result.open();
result.initializeState(new OperatorSubtaskState());
}
@Test
public void testSetTtlTimeProvider() throws Exception {
AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
try (AbstractStreamOperatorTestHarness<Integer> result = new AbstractStreamOperatorTestHarness<>(
operator,
1,
1,
0)) {
result.config.setStateKeySerializer(IntSerializer.INSTANCE);
Time timeToLive = Time.hours(1);
result.initializeState(new OperatorSubtaskState());
result.open();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
ValueState<Integer> state = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
int expectedValue = 42;
keyedStateBackend.setCurrentKey(1);
result.setStateTtlProcessingTime(0L);
state.update(expectedValue);
Assert.assertEquals(expectedValue, (int) state.value());
result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
Assert.assertNull(state.value());
}
}
}
......@@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.KeyContext;
......@@ -233,7 +234,7 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger {
protected StreamTaskStateInitializer createStreamTaskStateManager(
Environment env,
StateBackend stateBackend,
ProcessingTimeService processingTimeService) {
TtlTimeProvider ttlTimeProvider) {
return new StreamTaskStateInitializerImpl(env, stateBackend) {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册