提交 c09ff035 编写于 作者: G Gyula Fora

[FLINK-4441] Make RocksDB backend return null on empty state + add test for all backends

Closes #2399
上级 863dc180
......@@ -386,7 +386,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
@Override
public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
return new HashMap<>();
return null;
}
if (fullyAsyncBackup) {
......@@ -482,7 +482,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
@Override
public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
if (keyValueStateSnapshots.size() == 0) {
if (keyValueStateSnapshots == null) {
return;
}
......
......@@ -1048,6 +1048,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
@Test
public void testEmptyStateCheckpointing() {
try {
DummyEnvironment env = new DummyEnvironment("test", 1, 0);
backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend
.snapshotPartitionedState(682375462379L, 1);
assertNull(snapshot);
backend.dispose();
// Make sure we can restore from empty state
backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
backend.injectKeyValueStateSnapshots((HashMap) snapshot);
backend.dispose();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private static class AppendingReduce implements ReduceFunction<String> {
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册