提交 7469c17c 编写于 作者: A Aljoscha Krettek

[FLINK-3339] Make ValueState.update(null) act as ValueState.clear()

This was causing problems with TypeSerializers that choke on null
values, especially in the Scala KeyedStream.*WithState() family of
functions.
上级 6d83c9d9
......@@ -102,6 +102,10 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
@Override
public void update(V value) throws IOException {
if (value == null) {
clear();
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
try {
......
......@@ -94,6 +94,11 @@ public class FsValueState<K, N, V>
throw new RuntimeException("No key available.");
}
if (value == null) {
clear();
return;
}
if (currentNSState == null) {
currentNSState = new HashMap<>();
state.put(currentNamespace, currentNSState);
......
......@@ -69,6 +69,11 @@ public class MemValueState<K, N, V>
throw new RuntimeException("No key available.");
}
if (value == null) {
clear();
return;
}
if (currentNSState == null) {
currentNSState = new HashMap<>();
state.put(currentNamespace, currentNSState);
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import com.google.common.base.Joiner;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListState;
......@@ -31,7 +32,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.types.IntValue;
......@@ -113,12 +116,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
kv.dispose();
// restore the first snapshot and validate it
// restore the first snapshot and validate it
KvState<Integer, Void, ValueState<String>, ValueStateDescriptor<String>, B> restored1 = snapshot1.restoreState(
backend,
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 10);
snapshot1.discardState();
@SuppressWarnings("unchecked")
ValueState<String> restored1State = (ValueState<String>) restored1;
......@@ -135,6 +140,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 10);
snapshot2.discardState();
@SuppressWarnings("unchecked")
ValueState<String> restored2State = (ValueState<String>) restored2;
......@@ -146,6 +153,70 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
assertEquals("u3", restored2State.value());
}
/**
* This test verifies that passing {@code null} to {@link ValueState#update(Object)} acts
* the same as {@link ValueState#clear()}.
*
* @throws Exception
*/
@Test
public void testValueStateNullUpdate() throws Exception {
// precondition: LongSerializer must fail on null value. this way the test would fail
// later if null values where actually stored in the state instead of acting as clear()
try {
LongSerializer.INSTANCE.serialize(null,
new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
fail("Should faill with NullPointerException");
} catch (NullPointerException e) {
// alrighty
}
backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
kvId.initializeSerializerUnlessSet(new ExecutionConfig());
ValueState<Long> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
KvState<Integer, Void, ValueState<Long>, ValueStateDescriptor<Long>, B> kv =
(KvState<Integer, Void, ValueState<Long>, ValueStateDescriptor<Long>, B>) state;
// some modifications to the state
kv.setCurrentKey(1);
// verify default value
assertEquals(42L, (long) state.value());
state.update(1L);
assertEquals(1L, (long) state.value());
kv.setCurrentKey(2);
assertEquals(42L, (long) state.value());
kv.setCurrentKey(1);
state.clear();
assertEquals(42L, (long) state.value());
state.update(17L);
assertEquals(17L, (long) state.value());
state.update(null);
assertEquals(42L, (long) state.value());
// draw a snapshot, this would fail with a NPE if update(null) would not act as clear()
KvStateSnapshot<Integer, Void, ValueState<Long>, ValueStateDescriptor<Long>, B> snapshot1 =
kv.snapshot(682375462378L, 2);
kv.dispose();
// restore the snapshot
snapshot1.restoreState(
backend,
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 10);
}
@Test
@SuppressWarnings("unchecked,rawtypes")
public void testListState() {
......@@ -202,6 +273,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 10);
snapshot1.discardState();
@SuppressWarnings("unchecked")
ListState<String> restored1State = (ListState<String>) restored1;
......@@ -218,6 +291,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 20);
snapshot2.discardState();
@SuppressWarnings("unchecked")
ListState<String> restored2State = (ListState<String>) restored2;
......@@ -299,6 +374,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 10);
snapshot1.discardState();
@SuppressWarnings("unchecked")
ReducingState<String> restored1State = (ReducingState<String>) restored1;
......@@ -315,6 +392,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
IntSerializer.INSTANCE,
this.getClass().getClassLoader(), 20);
snapshot2.discardState();
@SuppressWarnings("unchecked")
ReducingState<String> restored2State = (ReducingState<String>) restored2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册