提交 614abd29 编写于 作者: 金竹 提交者: Stephan Ewen

[FLINK-5995] [checkpoints] Fix serializer initialization for Operator State

This closes #3503
上级 486f7249
......@@ -95,7 +95,7 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
Environment env,
String operatorIdentifier) throws Exception {
return new DefaultOperatorStateBackend(env.getUserClassLoader());
return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig());
}
// ------------------------------------------------------------------------
......
......@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
......@@ -56,15 +57,20 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
private final CloseableRegistry closeStreamOnCancelRegistry;
private final JavaSerializer<Serializable> javaSerializer;
private final ClassLoader userClassloader;
private final ExecutionConfig executionConfig;
public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException {
public DefaultOperatorStateBackend(ClassLoader userClassLoader, ExecutionConfig executionConfig) throws IOException {
this.closeStreamOnCancelRegistry = new CloseableRegistry();
this.userClassloader = Preconditions.checkNotNull(userClassLoader);
this.executionConfig = executionConfig;
this.javaSerializer = new JavaSerializer<>();
this.registeredStates = new HashMap<>();
}
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}
@Override
public Set<String> getRegisteredStateNames() {
return registeredStates.keySet();
......@@ -106,6 +112,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
String name = Preconditions.checkNotNull(stateDescriptor.getName());
TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
......
......@@ -17,6 +17,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
......@@ -28,6 +29,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.RunnableFuture;
......@@ -45,6 +47,8 @@ public class OperatorStateBackendTest {
static Environment createMockEnvironment() {
Environment env = mock(Environment.class);
ExecutionConfig config = mock(ExecutionConfig.class);
when(env.getExecutionConfig()).thenReturn(config);
when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
return env;
}
......@@ -63,6 +67,27 @@ public class OperatorStateBackendTest {
assertTrue(operatorStateBackend.getRegisteredStateNames().isEmpty());
}
@Test
public void testRegisterStatesWithoutTypeSerializer() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class);
ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class);
ListState<File> listState = operatorStateBackend.getOperatorState(stateDescriptor);
assertNotNull(listState);
ListState<String> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
assertNotNull(listState2);
assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
Iterator<String> it = listState2.get().iterator();
assertTrue(!it.hasNext());
listState2.add("kevin");
listState2.add("sunny");
it = listState2.get().iterator();
assertEquals("kevin", it.next());
assertEquals("sunny", it.next());
assertTrue(!it.hasNext());
}
@Test
public void testRegisterStates() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册