提交 74d10cad 编写于 作者: U Ufuk Celebi 提交者: Stephan Ewen

[FLINK-3131] [contrib, runtime, streaming-java] Add long getStateSize() to...

[FLINK-3131] [contrib, runtime, streaming-java] Add long getStateSize() to StateHandle and KvStateSnapshot

In order to report the state sizes, we need to expose them. All state backends
currently available backends know the state size. Only the LazyDbKvState does
not expose it at the moment, because it serializes the data lazily. This can be
changed in a follow-up fix.
上级 a50eb73e
......@@ -149,14 +149,15 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
long handleId = rnd.nextLong();
String jobIdShort = env.getJobID().toShortString();
byte[] serializedState = InstantiationUtil.serializeObject(state);
dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement,
checkpointID, timestamp, handleId,
InstantiationUtil.serializeObject(state));
serializedState);
insertStatement.executeUpdate();
return new DbStateHandle<S>(jobIdShort, checkpointID, timestamp, handleId,
dbConfig);
dbConfig, serializedState.length);
}
}, numSqlRetries, sqlRetrySleep);
} else {
......
......@@ -17,16 +17,16 @@
package org.apache.flink.contrib.streaming.state;
import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Callable;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
/**
* State handle implementation for storing checkpoints as byte arrays in
......@@ -46,12 +46,22 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
private final long handleId;
public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) {
private final long stateSize;
public DbStateHandle(
String jobId,
long checkpointId,
long checkpointTs,
long handleId,
DbBackendConfig dbConfig,
long stateSize) {
this.checkpointId = checkpointId;
this.handleId = handleId;
this.jobId = jobId;
this.dbConfig = dbConfig;
this.checkpointTs = checkpointTs;
this.stateSize = stateSize;
}
protected byte[] getBytes() throws IOException {
......@@ -87,4 +97,9 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
public S getState(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
return InstantiationUtil.deserializeObject(getBytes(), userCodeClassLoader);
}
@Override
public long getStateSize() {
return stateSize;
}
}
......@@ -395,6 +395,13 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, Check
// Don't discard, it will be compacted by the LazyDbKvState
}
@Override
public long getStateSize() throws Exception {
// Because the state is serialzied in a lazy fashion we don't know
// the size of the state yet.
return 0;
}
}
/**
......
......@@ -39,8 +39,7 @@ public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> ex
/**
* Loads the key/value state back from this snapshot.
*
*
*
* @param stateBackend The state backend that created this snapshot and can restore the key/value state
* from this snapshot.
* @param keySerializer The serializer for the keys.
......@@ -60,11 +59,21 @@ public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> ex
ClassLoader classLoader,
long recoveryTimestamp) throws Exception;
/**
* Discards the state snapshot, removing any resources occupied by it.
*
* @throws Exception Exceptions occurring during the state disposal should be forwarded.
*/
void discardState() throws Exception;
/**
* Returns the size of the state in bytes.
*
* <p>If the the size is not known, return <code>0</code>.
*
* @return Size of the state in bytes.
*
* @throws Exception If the operation fails during size retrieval.
*/
long getStateSize() throws Exception;
}
......@@ -41,4 +41,9 @@ public class LocalStateHandle<T extends Serializable> implements StateHandle<T>
@Override
public void discardState() {}
@Override
public long getStateSize() {
return 0;
}
}
......@@ -211,5 +211,10 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
public void discardState() throws Exception {
stream.discardState();
}
@Override
public long getStateSize() throws Exception {
return stream.getStateSize();
}
}
}
......@@ -43,4 +43,15 @@ public interface StateHandle<T> extends Serializable {
* used any more.
*/
void discardState() throws Exception;
/**
* Returns the size of the state in bytes.
*
* <p>If the the size is not known, return <code>0</code>.
*
* @return Size of the state in bytes.
*
* @throws Exception If the operation fails during size retrieval.
*/
long getStateSize() throws Exception;
}
......@@ -82,4 +82,14 @@ public abstract class AbstractFileState implements java.io.Serializable {
}
return fs;
}
/**
* Returns the file size in bytes.
*
* @return The file size in bytes.
* @throws IOException Thrown if the file system cannot be accessed.
*/
protected long getFileSize() throws IOException {
return getFileSystem().getFileStatus(filePath).getLen();
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
......@@ -52,4 +53,15 @@ public class FileSerializableStateHandle<T extends Serializable> extends Abstrac
return (T) ois.readObject();
}
}
/**
* Returns the file size in bytes.
*
* @return The file size in bytes.
* @throws IOException Thrown if the file system cannot be accessed.
*/
@Override
public long getStateSize() throws IOException {
return getFileSize();
}
}
......@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
......@@ -46,6 +47,17 @@ public class FileStreamStateHandle extends AbstractFileState implements StreamSt
return getFileSystem().open(getFilePath());
}
/**
* Returns the file size in bytes.
*
* @return The file size in bytes.
* @throws IOException Thrown if the file system cannot be accessed.
*/
@Override
public long getStateSize() throws IOException {
return getFileSize();
}
@Override
public <T extends Serializable> StateHandle<T> toSerializableHandle() {
return new FileSerializableStateHandle<>(getFilePath());
......
......@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.KvStateSnapshot;
import java.io.IOException;
import java.util.HashMap;
/**
......@@ -92,4 +93,15 @@ public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements Kv
throw new Exception("Failed to restore state from file system", e);
}
}
/**
* Returns the file size in bytes.
*
* @return The file size in bytes.
* @throws IOException Thrown if the file system cannot be accessed.
*/
@Override
public long getStateSize() throws IOException {
return getFileSize();
}
}
......@@ -52,7 +52,11 @@ public final class ByteStreamStateHandle implements StreamStateHandle {
@Override
public void discardState() {}
@Override
public long getStateSize() {
return data.length;
}
@Override
public <T extends Serializable> StateHandle<T> toSerializableHandle() {
return new SerializedStateHandle<T>(data);
......
......@@ -19,8 +19,8 @@
package org.apache.flink.runtime.state.memory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.util.DataInputDeserializer;
import java.util.HashMap;
......@@ -63,7 +63,6 @@ public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, Me
this.numEntries = numEntries;
}
@Override
public MemHeapKvState<K, V> restoreState(
MemoryStateBackend stateBackend,
......@@ -100,4 +99,9 @@ public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, Me
*/
@Override
public void discardState() {}
@Override
public long getStateSize() {
return data.length;
}
}
......@@ -77,4 +77,9 @@ public class SerializedStateHandle<T extends Serializable> implements StateHandl
*/
@Override
public void discardState() {}
@Override
public long getStateSize() {
return serializedData.length;
}
}
......@@ -59,7 +59,7 @@ public class CheckpointMessagesTest {
AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
new JobID(), new ExecutionAttemptID(), 87658976143L,
new SerializedValue<StateHandle<?>>(new MyHandle()));
new SerializedValue<StateHandle<?>>(new MyHandle()), 0);
testSerializabilityEqualsHashCode(noState);
testSerializabilityEqualsHashCode(withState);
......@@ -100,5 +100,10 @@ public class CheckpointMessagesTest {
@Override
public void discardState() throws Exception {
}
@Override
public long getStateSize() {
return 0;
}
}
}
......@@ -578,6 +578,11 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger {
numberOfDiscardCalls++;
}
@Override
public long getStateSize() {
return 0;
}
public int getNumberOfDiscardCalls() {
return numberOfDiscardCalls;
}
......
......@@ -18,8 +18,11 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateHandle;
import java.util.HashMap;
/**
* List of task states for a chain of streaming tasks.
*/
......@@ -30,9 +33,41 @@ public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
/** The states for all operator */
private final StreamTaskState[] states;
private final long stateSize;
public StreamTaskStateList(StreamTaskState[] states) {
public StreamTaskStateList(StreamTaskState[] states) throws Exception {
this.states = states;
long sumStateSize = 0;
if (states != null) {
for (StreamTaskState state : states) {
if (state != null) {
StateHandle<?> operatorState = state.getOperatorState();
StateHandle<?> functionState = state.getFunctionState();
HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates = state.getKvStates();
if (operatorState != null) {
sumStateSize += operatorState.getStateSize();
}
if (functionState != null) {
sumStateSize += functionState.getStateSize();
}
if (kvStates != null) {
for (KvStateSnapshot<?, ?, ?> kvState : kvStates.values()) {
if (kvState != null) {
sumStateSize += kvState.getStateSize();
}
}
}
}
}
}
// State size as sum of all state sizes
stateSize = sumStateSize;
}
public boolean isEmpty() {
......@@ -57,4 +92,9 @@ public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
}
}
}
@Override
public long getStateSize() throws Exception {
return stateSize;
}
}
......@@ -182,6 +182,11 @@ public class StreamTaskAsyncCheckpointTest {
public StateHandle<String> materialize() throws Exception {
return new TestStateHandle(checkpointId, timestamp);
}
@Override
public long getStateSize() {
return 0;
}
}
private static class TestStateHandle implements StateHandle<String> {
......@@ -202,6 +207,11 @@ public class StreamTaskAsyncCheckpointTest {
@Override
public void discardState() throws Exception {
}
@Override
public long getStateSize() {
return 0;
}
}
public static class DummyMapFunction<T> implements MapFunction<T, T> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册