提交 044fe5fd 编写于 作者: U Ufuk Celebi

[FLINK-2986] Fix typo in KvState interface snapshot method

上级 061e9297
...@@ -52,7 +52,7 @@ Kafka exploits this ability. ...@@ -52,7 +52,7 @@ Kafka exploits this ability.
## Checkpointing ## Checkpointing
The central part of Flink's fault tolerance mechanism is drawing consistent snapshots of the distributed data stream and operator state. The central part of Flink's fault tolerance mechanism is drawing consistent snapshots of the distributed data stream and operator state.
These shapshots act as consistent checkpoints to which the system can fall back in case of a failure. Flink's mechanism for drawing these These snapshots act as consistent checkpoints to which the system can fall back in case of a failure. Flink's mechanism for drawing these
snapshots is described in "[Lightweight Asynchronous Snapshots for Distributed Dataflows](http://arxiv.org/abs/1506.08603)". It is inspired by snapshots is described in "[Lightweight Asynchronous Snapshots for Distributed Dataflows](http://arxiv.org/abs/1506.08603)". It is inspired by
the standard [Chandy-Lamport algorithm](http://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf) for distributed snapshots and is the standard [Chandy-Lamport algorithm](http://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf) for distributed snapshots and is
specifically tailored to Flink's execution model. specifically tailored to Flink's execution model.
......
...@@ -51,7 +51,7 @@ public interface KvState<K, V, Backend extends StateBackend<Backend>> extends Op ...@@ -51,7 +51,7 @@ public interface KvState<K, V, Backend extends StateBackend<Backend>> extends Op
* @throws Exception Exceptions during snapshotting the state should be forwarded, so the system * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
* can react to failed snapshots. * can react to failed snapshots.
*/ */
KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception; KvStateSnapshot<K, V, Backend> snapshot(long checkpointId, long timestamp) throws Exception;
/** /**
* Gets the number of key/value pairs currently stored in the state. Note that is a key * Gets the number of key/value pairs currently stored in the state. Note that is a key
......
...@@ -69,7 +69,7 @@ public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBacken ...@@ -69,7 +69,7 @@ public class FsHeapKvState<K, V> extends AbstractHeapKvState<K, V, FsStateBacken
@Override @Override
public FsHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception { public FsHeapKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throws Exception {
// first, create an output stream to write to // first, create an output stream to write to
try (FsStateBackend.FsCheckpointStateOutputStream out = try (FsStateBackend.FsCheckpointStateOutputStream out =
backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { backend.createCheckpointStateOutputStream(checkpointId, timestamp)) {
......
...@@ -42,7 +42,7 @@ public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateB ...@@ -42,7 +42,7 @@ public class MemHeapKvState<K, V> extends AbstractHeapKvState<K, V, MemoryStateB
} }
@Override @Override
public MemoryHeapKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws Exception { public MemoryHeapKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throws Exception {
DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16)); DataOutputSerializer ser = new DataOutputSerializer(Math.max(size() * 16, 16));
writeStateToOutputView(ser); writeStateToOutputView(ser);
byte[] bytes = ser.getCopyOfBuffer(); byte[] bytes = ser.getCopyOfBuffer();
......
...@@ -247,7 +247,7 @@ public class FileStateBackendTest { ...@@ -247,7 +247,7 @@ public class FileStateBackendTest {
// draw a snapshot // draw a snapshot
KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 = KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
kv.shapshot(682375462378L, System.currentTimeMillis()); kv.snapshot(682375462378L, System.currentTimeMillis());
// make some more modifications // make some more modifications
kv.setCurrentKey(1); kv.setCurrentKey(1);
...@@ -259,7 +259,7 @@ public class FileStateBackendTest { ...@@ -259,7 +259,7 @@ public class FileStateBackendTest {
// draw another snapshot // draw another snapshot
KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 = KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
kv.shapshot(682375462379L, System.currentTimeMillis()); kv.snapshot(682375462379L, System.currentTimeMillis());
// validate the original state // validate the original state
assertEquals(3, kv.size()); assertEquals(3, kv.size());
...@@ -325,7 +325,7 @@ public class FileStateBackendTest { ...@@ -325,7 +325,7 @@ public class FileStateBackendTest {
kv.update("2"); kv.update("2");
KvStateSnapshot<Integer, String, FsStateBackend> snapshot = KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
kv.shapshot(682375462378L, System.currentTimeMillis()); kv.snapshot(682375462378L, System.currentTimeMillis());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
......
...@@ -166,7 +166,7 @@ public class MemoryStateBackendTest { ...@@ -166,7 +166,7 @@ public class MemoryStateBackendTest {
// draw a snapshot // draw a snapshot
KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 = KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 =
kv.shapshot(682375462378L, System.currentTimeMillis()); kv.snapshot(682375462378L, System.currentTimeMillis());
// make some more modifications // make some more modifications
kv.setCurrentKey(1); kv.setCurrentKey(1);
...@@ -178,7 +178,7 @@ public class MemoryStateBackendTest { ...@@ -178,7 +178,7 @@ public class MemoryStateBackendTest {
// draw another snapshot // draw another snapshot
KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 = KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
kv.shapshot(682375462379L, System.currentTimeMillis()); kv.snapshot(682375462379L, System.currentTimeMillis());
// validate the original state // validate the original state
assertEquals(3, kv.size()); assertEquals(3, kv.size());
...@@ -230,7 +230,7 @@ public class MemoryStateBackendTest { ...@@ -230,7 +230,7 @@ public class MemoryStateBackendTest {
kv.update("2"); kv.update("2");
KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot = KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
kv.shapshot(682375462378L, System.currentTimeMillis()); kv.snapshot(682375462378L, System.currentTimeMillis());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
......
...@@ -161,7 +161,7 @@ public abstract class AbstractStreamOperator<OUT> ...@@ -161,7 +161,7 @@ public abstract class AbstractStreamOperator<OUT>
HashMap<String, KvStateSnapshot<?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size()); HashMap<String, KvStateSnapshot<?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
for (Map.Entry<String, KvState<?, ?, ?>> entry : keyValueStatesByName.entrySet()) { for (Map.Entry<String, KvState<?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
KvStateSnapshot<?, ?, ?> snapshot = entry.getValue().shapshot(checkpointId, timestamp); KvStateSnapshot<?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);
snapshots.put(entry.getKey(), snapshot); snapshots.put(entry.getKey(), snapshot);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册