提交 3f700caf 编写于 作者: S Stephan Ewen

[FLINK-5995] [checkpoints] Harden test for state descriptor passing to OperatorState

上级 614abd29
......@@ -21,21 +21,27 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
......@@ -43,54 +49,67 @@ import static org.mockito.Mockito.when;
public class OperatorStateBackendTest {
AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
static Environment createMockEnvironment() {
Environment env = mock(Environment.class);
ExecutionConfig config = mock(ExecutionConfig.class);
when(env.getExecutionConfig()).thenReturn(config);
when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
return env;
}
private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception {
//TODO this is temporarily casted to test already functionality that we do not yet expose through public API
return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
createMockEnvironment(),
"test-operator");
}
private final ClassLoader classLoader = getClass().getClassLoader();
@Test
public void testCreateNew() throws Exception {
OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
public void testCreateOnAbstractStateBackend() throws Exception {
// we use the memory state backend as a subclass of the AbstractStateBackend
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend();
OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
createMockEnvironment(), "test-operator");
assertNotNull(operatorStateBackend);
assertTrue(operatorStateBackend.getRegisteredStateNames().isEmpty());
}
@Test
public void testRegisterStatesWithoutTypeSerializer() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
// prepare an execution config with a non standard type registered
final Class<?> registeredType = FutureTask.class;
// validate the precondition of this test - if this condition fails, we need to pick a different
// example serializer
assertFalse(new KryoSerializer<>(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(registeredType)
instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
final ExecutionConfig cfg = new ExecutionConfig();
cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);
final DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg);
ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class);
ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class);
ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor);
assertNotNull(listState);
ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
assertNotNull(listState2);
assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
// make sure that type registrations are forwarded
TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer();
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType)
instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
Iterator<String> it = listState2.get().iterator();
assertTrue(!it.hasNext());
assertFalse(it.hasNext());
listState2.add("kevin");
listState2.add("sunny");
it = listState2.get().iterator();
assertEquals("kevin", it.next());
assertEquals("sunny", it.next());
assertTrue(!it.hasNext());
assertFalse(it.hasNext());
}
@Test
public void testRegisterStates() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
final DefaultOperatorStateBackend operatorStateBackend =
new DefaultOperatorStateBackend(classLoader, new ExecutionConfig());
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
......@@ -173,7 +192,11 @@ public class OperatorStateBackendTest {
@Test
public void testSnapshotEmpty() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
final DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend)
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
CheckpointStreamFactory streamFactory =
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
......@@ -181,12 +204,16 @@ public class OperatorStateBackendTest {
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
Assert.assertNull(stateHandle);
assertNull(stateHandle);
}
@Test
public void testSnapshotRestore() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend)
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
......@@ -255,4 +282,14 @@ public class OperatorStateBackendTest {
}
}
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
private static Environment createMockEnvironment() {
Environment env = mock(Environment.class);
when(env.getExecutionConfig()).thenReturn(new ExecutionConfig());
when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
return env;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册