From 74d10cada89abb4e830b55a95262f3fd68247ed0 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 14 Dec 2015 19:40:10 +0100 Subject: [PATCH] [FLINK-3131] [contrib, runtime, streaming-java] Add long getStateSize() to StateHandle and KvStateSnapshot In order to report the state sizes, we need to expose them. All state backends currently available backends know the state size. Only the LazyDbKvState does not expose it at the moment, because it serializes the data lazily. This can be changed in a follow-up fix. --- .../streaming/state/DbStateBackend.java | 5 ++- .../streaming/state/DbStateHandle.java | 27 +++++++++--- .../streaming/state/LazyDbKvState.java | 7 ++++ .../flink/runtime/state/KvStateSnapshot.java | 15 +++++-- .../flink/runtime/state/LocalStateHandle.java | 5 +++ .../flink/runtime/state/StateBackend.java | 5 +++ .../flink/runtime/state/StateHandle.java | 11 +++++ .../state/filesystem/AbstractFileState.java | 10 +++++ .../FileSerializableStateHandle.java | 12 ++++++ .../filesystem/FileStreamStateHandle.java | 12 ++++++ .../filesystem/FsHeapKvStateSnapshot.java | 12 ++++++ .../state/memory/ByteStreamStateHandle.java | 6 ++- .../memory/MemoryHeapKvStateSnapshot.java | 8 +++- .../state/memory/SerializedStateHandle.java | 5 +++ .../messages/CheckpointMessagesTest.java | 7 +++- .../ZooKeeperStateHandleStoreITCase.java | 5 +++ .../runtime/tasks/StreamTaskStateList.java | 42 ++++++++++++++++++- .../tasks/StreamTaskAsyncCheckpointTest.java | 10 +++++ 18 files changed, 188 insertions(+), 16 deletions(-) diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index 0cee8ae8bd5..ef45a4bc541 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -149,14 +149,15 @@ public class DbStateBackend extends StateBackend { long handleId = rnd.nextLong(); String jobIdShort = env.getJobID().toShortString(); + byte[] serializedState = InstantiationUtil.serializeObject(state); dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement, checkpointID, timestamp, handleId, - InstantiationUtil.serializeObject(state)); + serializedState); insertStatement.executeUpdate(); return new DbStateHandle(jobIdShort, checkpointID, timestamp, handleId, - dbConfig); + dbConfig, serializedState.length); } }, numSqlRetries, sqlRetrySleep); } else { diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java index 2ecfcc4b784..cc42b3f0f5a 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java @@ -17,16 +17,16 @@ package org.apache.flink.contrib.streaming.state; -import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.concurrent.Callable; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.util.InstantiationUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; /** * State handle implementation for storing checkpoints as byte arrays in @@ -46,12 +46,22 @@ public class DbStateHandle implements Serializable, StateHandle { private final long handleId; - public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) { + private final long stateSize; + + public DbStateHandle( + String jobId, + long checkpointId, + long checkpointTs, + long handleId, + DbBackendConfig dbConfig, + long stateSize) { + this.checkpointId = checkpointId; this.handleId = handleId; this.jobId = jobId; this.dbConfig = dbConfig; this.checkpointTs = checkpointTs; + this.stateSize = stateSize; } protected byte[] getBytes() throws IOException { @@ -87,4 +97,9 @@ public class DbStateHandle implements Serializable, StateHandle { public S getState(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { return InstantiationUtil.deserializeObject(getBytes(), userCodeClassLoader); } + + @Override + public long getStateSize() { + return stateSize; + } } diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java index aceb3c02a71..87a1f5742e4 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java @@ -395,6 +395,13 @@ public class LazyDbKvState implements KvState, Check // Don't discard, it will be compacted by the LazyDbKvState } + @Override + public long getStateSize() throws Exception { + // Because the state is serialzied in a lazy fashion we don't know + // the size of the state yet. + return 0; + } + } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java index e2e521cb326..682c093a81a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java @@ -39,8 +39,7 @@ public interface KvStateSnapshot> ex /** * Loads the key/value state back from this snapshot. - * - * + * * @param stateBackend The state backend that created this snapshot and can restore the key/value state * from this snapshot. * @param keySerializer The serializer for the keys. @@ -60,11 +59,21 @@ public interface KvStateSnapshot> ex ClassLoader classLoader, long recoveryTimestamp) throws Exception; - /** * Discards the state snapshot, removing any resources occupied by it. * * @throws Exception Exceptions occurring during the state disposal should be forwarded. */ void discardState() throws Exception; + + /** + * Returns the size of the state in bytes. + * + *

If the the size is not known, return 0. + * + * @return Size of the state in bytes. + * + * @throws Exception If the operation fails during size retrieval. + */ + long getStateSize() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java index f2be70a8614..4e60ab6b31c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java @@ -41,4 +41,9 @@ public class LocalStateHandle implements StateHandle @Override public void discardState() {} + + @Override + public long getStateSize() { + return 0; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index a30be803987..2c431251c3e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -211,5 +211,10 @@ public abstract class StateBackend> implem public void discardState() throws Exception { stream.discardState(); } + + @Override + public long getStateSize() throws Exception { + return stream.getStateSize(); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java index 53d57658d8b..b5d34b0ac3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java @@ -43,4 +43,15 @@ public interface StateHandle extends Serializable { * used any more. */ void discardState() throws Exception; + + /** + * Returns the size of the state in bytes. + * + *

If the the size is not known, return 0. + * + * @return Size of the state in bytes. + * + * @throws Exception If the operation fails during size retrieval. + */ + long getStateSize() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java index 08cd7dd85ad..8c2b12af7c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileState.java @@ -82,4 +82,14 @@ public abstract class AbstractFileState implements java.io.Serializable { } return fs; } + + /** + * Returns the file size in bytes. + * + * @return The file size in bytes. + * @throws IOException Thrown if the file system cannot be accessed. + */ + protected long getFileSize() throws IOException { + return getFileSystem().getFileStatus(filePath).getLen(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java index edbbe690022..456f2f2b958 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java @@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.InstantiationUtil; +import java.io.IOException; import java.io.ObjectInputStream; import java.io.Serializable; @@ -52,4 +53,15 @@ public class FileSerializableStateHandle extends Abstrac return (T) ois.readObject(); } } + + /** + * Returns the file size in bytes. + * + * @return The file size in bytes. + * @throws IOException Thrown if the file system cannot be accessed. + */ + @Override + public long getStateSize() throws IOException { + return getFileSize(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java index b2f2ecc2b82..3b060b5c901 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StreamStateHandle; +import java.io.IOException; import java.io.InputStream; import java.io.Serializable; @@ -46,6 +47,17 @@ public class FileStreamStateHandle extends AbstractFileState implements StreamSt return getFileSystem().open(getFilePath()); } + /** + * Returns the file size in bytes. + * + * @return The file size in bytes. + * @throws IOException Thrown if the file system cannot be accessed. + */ + @Override + public long getStateSize() throws IOException { + return getFileSize(); + } + @Override public StateHandle toSerializableHandle() { return new FileSerializableStateHandle<>(getFilePath()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java index 5c5896fc668..9c8663a96b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.state.KvStateSnapshot; +import java.io.IOException; import java.util.HashMap; /** @@ -92,4 +93,15 @@ public class FsHeapKvStateSnapshot extends AbstractFileState implements Kv throw new Exception("Failed to restore state from file system", e); } } + + /** + * Returns the file size in bytes. + * + * @return The file size in bytes. + * @throws IOException Thrown if the file system cannot be accessed. + */ + @Override + public long getStateSize() throws IOException { + return getFileSize(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index def8a36ea70..61473ea48f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -52,7 +52,11 @@ public final class ByteStreamStateHandle implements StreamStateHandle { @Override public void discardState() {} - + @Override + public long getStateSize() { + return data.length; + } + @Override public StateHandle toSerializableHandle() { return new SerializedStateHandle(data); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java index bda0290e749..0cb7fa48255 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.util.DataInputDeserializer; import java.util.HashMap; @@ -63,7 +63,6 @@ public class MemoryHeapKvStateSnapshot implements KvStateSnapshot restoreState( MemoryStateBackend stateBackend, @@ -100,4 +99,9 @@ public class MemoryHeapKvStateSnapshot implements KvStateSnapshot implements StateHandl */ @Override public void discardState() {} + + @Override + public long getStateSize() { + return serializedData.length; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 087e0fd8d7e..afd24050d76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -59,7 +59,7 @@ public class CheckpointMessagesTest { AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint( new JobID(), new ExecutionAttemptID(), 87658976143L, - new SerializedValue>(new MyHandle())); + new SerializedValue>(new MyHandle()), 0); testSerializabilityEqualsHashCode(noState); testSerializabilityEqualsHashCode(withState); @@ -100,5 +100,10 @@ public class CheckpointMessagesTest { @Override public void discardState() throws Exception { } + + @Override + public long getStateSize() { + return 0; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java index 788f70d777e..e166ed5f32d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java @@ -578,6 +578,11 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { numberOfDiscardCalls++; } + @Override + public long getStateSize() { + return 0; + } + public int getNumberOfDiscardCalls() { return numberOfDiscardCalls; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java index 7b8dbd50f29..e9f9ab6f9e5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java @@ -18,8 +18,11 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.runtime.state.StateHandle; +import java.util.HashMap; + /** * List of task states for a chain of streaming tasks. */ @@ -30,9 +33,41 @@ public class StreamTaskStateList implements StateHandle { /** The states for all operator */ private final StreamTaskState[] states; + private final long stateSize; - public StreamTaskStateList(StreamTaskState[] states) { + public StreamTaskStateList(StreamTaskState[] states) throws Exception { this.states = states; + + long sumStateSize = 0; + + if (states != null) { + for (StreamTaskState state : states) { + if (state != null) { + StateHandle operatorState = state.getOperatorState(); + StateHandle functionState = state.getFunctionState(); + HashMap> kvStates = state.getKvStates(); + + if (operatorState != null) { + sumStateSize += operatorState.getStateSize(); + } + + if (functionState != null) { + sumStateSize += functionState.getStateSize(); + } + + if (kvStates != null) { + for (KvStateSnapshot kvState : kvStates.values()) { + if (kvState != null) { + sumStateSize += kvState.getStateSize(); + } + } + } + } + } + } + + // State size as sum of all state sizes + stateSize = sumStateSize; } public boolean isEmpty() { @@ -57,4 +92,9 @@ public class StreamTaskStateList implements StateHandle { } } } + + @Override + public long getStateSize() throws Exception { + return stateSize; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java index 319cbc8b98d..313188a4847 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java @@ -182,6 +182,11 @@ public class StreamTaskAsyncCheckpointTest { public StateHandle materialize() throws Exception { return new TestStateHandle(checkpointId, timestamp); } + + @Override + public long getStateSize() { + return 0; + } } private static class TestStateHandle implements StateHandle { @@ -202,6 +207,11 @@ public class StreamTaskAsyncCheckpointTest { @Override public void discardState() throws Exception { } + + @Override + public long getStateSize() { + return 0; + } } public static class DummyMapFunction implements MapFunction { -- GitLab