提交 f1ac0f27 编写于 作者: S Stefan Richter

[FLINK-9799][state] Generalize and unify state meta infos

This closes #6308.
上级 5363595d
......@@ -38,6 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
......@@ -111,12 +112,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
* <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredOperatorStateMetaInfos;
private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos;
/**
* Map of state names to their corresponding restored broadcast state meta info.
*/
private final Map<String, RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> restoredBroadcastStateMetaInfos;
private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos;
/**
* Cache of already accessed states.
......@@ -180,14 +181,16 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
// State access methods
// -------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
Preconditions.checkNotNull(stateDescriptor);
String name = Preconditions.checkNotNull(stateDescriptor.getName());
@SuppressWarnings("unchecked")
BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
BackendWritableBroadcastState<K, V> previous =
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
if (previous != null) {
checkStateNameAndMode(
previous.getStateMetaInfo().getName(),
......@@ -201,7 +204,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
BackendWritableBroadcastState<K, V> broadcastState =
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
......@@ -220,21 +224,24 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
broadcastState.getStateMetaInfo().getAssignmentMode(),
OperatorStateHandle.Mode.BROADCAST);
final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
@SuppressWarnings("unchecked")
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> restoredMetaInfo =
(RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V>) restoredBroadcastStateMetaInfos.get(name);
RegisteredBroadcastBackendStateMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastBackendStateMetaInfo<K, V>(metaInfoSnapshot);
// check compatibility to determine if state migration is required
CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getKeySerializer(),
UnloadableDummyTypeSerializer.class,
restoredMetaInfo.getKeySerializerConfigSnapshot(),
//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
broadcastStateKeySerializer);
CompatibilityResult<V> valueCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getValueSerializer(),
UnloadableDummyTypeSerializer.class,
restoredMetaInfo.getValueSerializerConfigSnapshot(),
//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
broadcastStateValueSerializer);
if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
......@@ -387,7 +394,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
// get the registered operator state infos ...
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
new ArrayList<>(registeredOperatorStatesDeepCopies.size());
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {
......@@ -395,7 +402,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
// ... get the registered broadcast operator state infos ...
List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> broadcastMetaInfoSnapshots =
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
new ArrayList<>(registeredBroadcastStatesDeepCopies.size());
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStatesDeepCopies.entrySet()) {
......@@ -497,36 +504,35 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredOperatorMetaInfoSnapshots =
List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
// Recreate all PartitionableListStates from the meta info
for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredOperatorMetaInfoSnapshots) {
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
final RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getPartitionStateSerializer() == null ||
restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
// must fail now if the previous serializer cannot be restored because there is no serializer
// capable of reading previous state
// TODO when eager state registration is in place, we can try to get a convert deserializer
// TODO from the newly registered serializer instead of simply failing here
throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName() + "]." +
throw new IOException("Unable to restore operator state [" + restoredSnapshot.getName() + "]." +
" The previous serializer of the operator state must be present; the serializer could" +
" have been removed from the classpath, or its implementation have changed and could" +
" not be loaded. This is a temporary restriction that will be fixed in future versions.");
}
restoredOperatorStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
restoredOperatorStateMetaInfos.put(restoredSnapshot.getName(), restoredSnapshot);
PartitionableListState<?> listState = registeredOperatorStates.get(restoredMetaInfo.getName());
PartitionableListState<?> listState = registeredOperatorStates.get(restoredSnapshot.getName());
if (null == listState) {
listState = new PartitionableListState<>(
new RegisteredOperatorBackendStateMetaInfo<>(
restoredMetaInfo.getName(),
restoredMetaInfo.getPartitionStateSerializer(),
restoredMetaInfo.getAssignmentMode()));
listState = new PartitionableListState<>(restoredMetaInfo);
registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
} else {
......@@ -535,37 +541,35 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
// ... and then get back the broadcast state.
List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> restoredBroadcastMetaInfoSnapshots =
List<StateMetaInfoSnapshot> restoredBroadcastMetaInfoSnapshots =
backendSerializationProxy.getBroadcastStateMetaInfoSnapshots();
for (RegisteredBroadcastBackendStateMetaInfo.Snapshot<? ,?> restoredMetaInfo : restoredBroadcastMetaInfoSnapshots) {
for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {
final RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
new RegisteredBroadcastBackendStateMetaInfo<>(restoredSnapshot);
if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null ||
restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer) {
restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer) {
// must fail now if the previous serializer cannot be restored because there is no serializer
// capable of reading previous state
// TODO when eager state registration is in place, we can try to get a convert deserializer
// TODO from the newly registered serializer instead of simply failing here
throw new IOException("Unable to restore broadcast state [" + restoredMetaInfo.getName() + "]." +
throw new IOException("Unable to restore broadcast state [" + restoredSnapshot.getName() + "]." +
" The previous key and value serializers of the state must be present; the serializers could" +
" have been removed from the classpath, or their implementations have changed and could" +
" not be loaded. This is a temporary restriction that will be fixed in future versions.");
}
restoredBroadcastStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
restoredBroadcastStateMetaInfos.put(restoredSnapshot.getName(), restoredSnapshot);
BackendWritableBroadcastState<? ,?> broadcastState = registeredBroadcastStates.get(restoredMetaInfo.getName());
BackendWritableBroadcastState<? ,?> broadcastState = registeredBroadcastStates.get(restoredSnapshot.getName());
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastBackendStateMetaInfo<>(
restoredMetaInfo.getName(),
restoredMetaInfo.getAssignmentMode(),
restoredMetaInfo.getKeySerializer(),
restoredMetaInfo.getValueSerializer()));
broadcastState = new HeapBroadcastState<>(restoredMetaInfo);
registeredBroadcastStates.put(broadcastState.getStateMetaInfo().getName(), broadcastState);
} else {
......@@ -752,16 +756,17 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
partitionableListState.getStateMetaInfo().getAssignmentMode(),
mode);
@SuppressWarnings("unchecked")
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> restoredMetaInfo =
(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredOperatorStateMetaInfos.get(name);
StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
RegisteredOperatorBackendStateMetaInfo<S> metaInfo =
new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
// check compatibility to determine if state migration is required
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getPartitionStateSerializer(),
metaInfo.getPartitionStateSerializer(),
UnloadableDummyTypeSerializer.class,
restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(),
//TODO this keys should not be exposed and should be adapted after FLINK-9377 was merged
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
newPartitionStateSerializer);
if (!stateCompatibility.isRequiresMigration()) {
......
......@@ -26,6 +26,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
......@@ -33,13 +36,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
/**
* Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state
* serialization logic here.
*/
public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
public static final int VERSION = 4;
public static final int VERSION = 5;
//TODO allow for more (user defined) compression formats + backwards compatibility story.
/** This specifies if we use a compressed format write the key-groups */
......@@ -51,7 +56,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
private TypeSerializer<K> keySerializer;
private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
private ClassLoader userCodeClassLoader;
......@@ -62,7 +67,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
public KeyedBackendSerializationProxy(
TypeSerializer<K> keySerializer,
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots,
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
boolean compression) {
this.usingKeyGroupCompression = compression;
......@@ -75,7 +80,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
}
public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> getStateMetaInfoSnapshots() {
public List<StateMetaInfoSnapshot> getStateMetaInfoSnapshots() {
return stateMetaInfoSnapshots;
}
......@@ -98,8 +103,7 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
@Override
public int[] getCompatibleVersions() {
// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
return new int[] {VERSION, 3, 2, 1};
return new int[]{VERSION, 4, 3, 2, 1};
}
@Override
......@@ -112,15 +116,12 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
// write in a way to be fault tolerant of read failures when deserializing the key serializer
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Collections.singletonList(
new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(keySerializer, keySerializerConfigSnapshot)));
Collections.singletonList(new Tuple2<>(keySerializer, keySerializerConfigSnapshot)));
// write individual registered keyed state metainfos
out.writeShort(stateMetaInfoSnapshots.size());
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) {
KeyedBackendStateMetaInfoSnapshotReaderWriters
.getWriterForVersion(VERSION, metaInfo)
.writeStateMetaInfo(out);
for (StateMetaInfoSnapshot metaInfoSnapshot : stateMetaInfoSnapshots) {
StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);
}
}
......@@ -152,16 +153,21 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
checkSerializerPresence(keySerializer);
}
int metaInfoVersion = readVersion > 4 ? CURRENT_STATE_META_INFO_SNAPSHOT_VERSION : readVersion;
final StateMetaInfoReader stateMetaInfoReader = StateMetaInfoSnapshotReadersWriters.getReader(
metaInfoVersion,
StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE);
int numKvStates = in.readShort();
stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> snapshot = KeyedBackendStateMetaInfoSnapshotReaderWriters
.getReaderForVersion(getReadVersion(), userCodeClassLoader)
.readStateMetaInfo(in);
StateMetaInfoSnapshot snapshot = stateMetaInfoReader.readStateMetaInfoSnapshot(in, userCodeClassLoader);
if (isSerializerPresenceRequired) {
checkSerializerPresence(snapshot.getNamespaceSerializer());
checkSerializerPresence(snapshot.getStateSerializer());
checkSerializerPresence(
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
checkSerializerPresence(
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
}
stateMetaInfoSnapshots.add(snapshot);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* Readers and writers for different versions of the {@link RegisteredKeyedBackendStateMetaInfo.Snapshot}.
* Outdated formats are also kept here for documentation of history backlog.
*/
public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
// -------------------------------------------------------------------------------
// Writers
// - v1: Flink 1.2.x
// - v2: Flink 1.3.x
// -------------------------------------------------------------------------------
public static <N, S> KeyedBackendStateMetaInfoWriter getWriterForVersion(
int version, RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
switch (version) {
case 1:
case 2:
return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
case 3:
// current version
case KeyedBackendSerializationProxy.VERSION:
return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
default:
// guard for future
throw new IllegalStateException(
"Unrecognized keyed backend state meta info writer version: " + version);
}
}
public interface KeyedBackendStateMetaInfoWriter {
void writeStateMetaInfo(DataOutputView out) throws IOException;
}
static abstract class AbstractKeyedBackendStateMetaInfoWriter<N, S> implements KeyedBackendStateMetaInfoWriter {
protected final RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo;
public AbstractKeyedBackendStateMetaInfoWriter(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
}
}
static class KeyedBackendStateMetaInfoWriterV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> {
public KeyedBackendStateMetaInfoWriterV1V2(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
super(stateMetaInfo);
}
@Override
public void writeStateMetaInfo(DataOutputView out) throws IOException {
out.writeInt(stateMetaInfo.getStateType().ordinal());
out.writeUTF(stateMetaInfo.getName());
TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getNamespaceSerializer());
TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getStateSerializer());
}
}
static class KeyedBackendStateMetaInfoWriterV3<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> {
public KeyedBackendStateMetaInfoWriterV3(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
super(stateMetaInfo);
}
@Override
public void writeStateMetaInfo(DataOutputView out) throws IOException {
out.writeInt(stateMetaInfo.getStateType().ordinal());
out.writeUTF(stateMetaInfo.getName());
// write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Arrays.asList(
new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
stateMetaInfo.getNamespaceSerializer(), stateMetaInfo.getNamespaceSerializerConfigSnapshot()),
new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
stateMetaInfo.getStateSerializer(), stateMetaInfo.getStateSerializerConfigSnapshot())));
}
}
// -------------------------------------------------------------------------------
// Readers
// - v1: Flink 1.2.x
// - v2: Flink 1.3.x
// -------------------------------------------------------------------------------
public static KeyedBackendStateMetaInfoReader getReaderForVersion(
int version, ClassLoader userCodeClassLoader) {
switch (version) {
case 1:
case 2:
return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
// current version
case 3:
case KeyedBackendSerializationProxy.VERSION:
return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
default:
// guard for future
throw new IllegalStateException(
"Unrecognized keyed backend state meta info reader version: " + version);
}
}
public interface KeyedBackendStateMetaInfoReader<N, S> {
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException;
}
static abstract class AbstractKeyedBackendStateMetaInfoReader implements KeyedBackendStateMetaInfoReader {
protected final ClassLoader userCodeClassLoader;
public AbstractKeyedBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
}
}
static class KeyedBackendStateMetaInfoReaderV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoReader {
public KeyedBackendStateMetaInfoReaderV1V2(ClassLoader userCodeClassLoader) {
super(userCodeClassLoader);
}
@Override
public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException {
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo =
new RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
metaInfo.setName(in.readUTF());
metaInfo.setNamespaceSerializer(TypeSerializerSerializationUtil.<N>tryReadSerializer(in, userCodeClassLoader, true));
metaInfo.setStateSerializer(TypeSerializerSerializationUtil.<S>tryReadSerializer(in, userCodeClassLoader, true));
// older versions do not contain the configuration snapshot
metaInfo.setNamespaceSerializerConfigSnapshot(null);
metaInfo.setStateSerializerConfigSnapshot(null);
return metaInfo;
}
}
@SuppressWarnings("unchecked")
static class KeyedBackendStateMetaInfoReaderV3<N, S> extends AbstractKeyedBackendStateMetaInfoReader {
public KeyedBackendStateMetaInfoReaderV3(ClassLoader userCodeClassLoader) {
super(userCodeClassLoader);
}
@Override
public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException {
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo =
new RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
metaInfo.setName(in.readUTF());
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
metaInfo.setNamespaceSerializer((TypeSerializer<N>) serializersAndConfigs.get(0).f0);
metaInfo.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(0).f1);
metaInfo.setStateSerializer((TypeSerializer<S>) serializersAndConfigs.get(1).f0);
metaInfo.setStateSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
return metaInfo;
}
}
}
......@@ -21,22 +21,27 @@ package org.apache.flink.runtime.state;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
/**
* Serialization proxy for all meta data in operator state backends. In the future we might also requiresMigration the actual state
* serialization logic here.
*/
public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
public static final int VERSION = 3;
public static final int VERSION = 4;
private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorStateMetaInfoSnapshots;
private List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> broadcastStateMetaInfoSnapshots;
private List<StateMetaInfoSnapshot> operatorStateMetaInfoSnapshots;
private List<StateMetaInfoSnapshot> broadcastStateMetaInfoSnapshots;
private ClassLoader userCodeClassLoader;
public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
......@@ -44,8 +49,8 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
}
public OperatorBackendSerializationProxy(
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorStateMetaInfoSnapshots,
List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> broadcastStateMetaInfoSnapshots) {
List<StateMetaInfoSnapshot> operatorStateMetaInfoSnapshots,
List<StateMetaInfoSnapshot> broadcastStateMetaInfoSnapshots) {
this.operatorStateMetaInfoSnapshots = Preconditions.checkNotNull(operatorStateMetaInfoSnapshots);
this.broadcastStateMetaInfoSnapshots = Preconditions.checkNotNull(broadcastStateMetaInfoSnapshots);
......@@ -62,26 +67,22 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
@Override
public int[] getCompatibleVersions() {
// we are compatible with version 3 (Flink 1.5.x), 2 (Flink 1.4.x, Flink 1.3.x) and version 1 (Flink 1.2.x)
return new int[] {VERSION, 2, 1};
return new int[] {VERSION, 3, 2, 1};
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
writeStateMetaInfoSnapshots(operatorStateMetaInfoSnapshots, out);
writeStateMetaInfoSnapshots(broadcastStateMetaInfoSnapshots, out);
}
out.writeShort(operatorStateMetaInfoSnapshots.size());
for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> state : operatorStateMetaInfoSnapshots) {
OperatorBackendStateMetaInfoSnapshotReaderWriters
.getOperatorStateWriterForVersion(VERSION, state)
.writeOperatorStateMetaInfo(out);
}
out.writeShort(broadcastStateMetaInfoSnapshots.size());
for (RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> state : broadcastStateMetaInfoSnapshots) {
OperatorBackendStateMetaInfoSnapshotReaderWriters
.getBroadcastStateWriterForVersion(VERSION, state)
.writeBroadcastStateMetaInfo(out);
private void writeStateMetaInfoSnapshots(
List<StateMetaInfoSnapshot> snapshots,
DataOutputView out) throws IOException {
out.writeShort(snapshots.size());
for (StateMetaInfoSnapshot state : snapshots) {
StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(state, out);
}
}
......@@ -89,35 +90,39 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
public void read(DataInputView in) throws IOException {
super.read(in);
final int proxyReadVersion = getReadVersion();
final int metaInfoReadVersion = proxyReadVersion > 3 ?
CURRENT_STATE_META_INFO_SNAPSHOT_VERSION : proxyReadVersion;
final StateMetaInfoReader stateMetaInfoReader = StateMetaInfoSnapshotReadersWriters.getReader(
metaInfoReadVersion,
StateMetaInfoSnapshotReadersWriters.StateTypeHint.OPERATOR_STATE);
int numOperatorStates = in.readShort();
operatorStateMetaInfoSnapshots = new ArrayList<>(numOperatorStates);
for (int i = 0; i < numOperatorStates; i++) {
operatorStateMetaInfoSnapshots.add(
OperatorBackendStateMetaInfoSnapshotReaderWriters
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
.readOperatorStateMetaInfo(in));
stateMetaInfoReader.readStateMetaInfoSnapshot(in, userCodeClassLoader));
}
if (getReadVersion() >= 3) {
if (proxyReadVersion >= 3) {
// broadcast states did not exist prior to version 3
int numBroadcastStates = in.readShort();
broadcastStateMetaInfoSnapshots = new ArrayList<>(numBroadcastStates);
for (int i = 0; i < numBroadcastStates; i++) {
broadcastStateMetaInfoSnapshots.add(
OperatorBackendStateMetaInfoSnapshotReaderWriters
.getBroadcastStateReaderForVersion(getReadVersion(), userCodeClassLoader)
.readBroadcastStateMetaInfo(in));
stateMetaInfoReader.readStateMetaInfoSnapshot(in, userCodeClassLoader));
}
} else {
broadcastStateMetaInfoSnapshots = new ArrayList<>();
}
}
public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> getOperatorStateMetaInfoSnapshots() {
public List<StateMetaInfoSnapshot> getOperatorStateMetaInfoSnapshots() {
return operatorStateMetaInfoSnapshots;
}
public List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> getBroadcastStateMetaInfoSnapshots() {
public List<StateMetaInfoSnapshot> getBroadcastStateMetaInfoSnapshots() {
return broadcastStateMetaInfoSnapshots;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Readers and writers for different versions of the {@link RegisteredOperatorBackendStateMetaInfo.Snapshot}.
* Outdated formats are also kept here for documentation of history backlog.
*/
public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
private static final Logger LOG = LoggerFactory.getLogger(OperatorBackendStateMetaInfoSnapshotReaderWriters.class);
// -------------------------------------------------------------------------------
// Writers
// - v1: Flink 1.2.x
// - v2: Flink 1.3.x
// - v3: Flink 1.5.x
// -------------------------------------------------------------------------------
public static <S> OperatorBackendStateMetaInfoWriter getOperatorStateWriterForVersion(
int version, RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
switch (version) {
case 1:
return new OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo);
// current version
case 2:
case OperatorBackendSerializationProxy.VERSION:
return new OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo);
default:
// guard for future
throw new IllegalStateException(
"Unrecognized operator backend state meta info writer version: " + version);
}
}
public static <K, V> BroadcastStateMetaInfoWriter getBroadcastStateWriterForVersion(
final int version,
final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
switch (version) {
// current version
case OperatorBackendSerializationProxy.VERSION:
return new BroadcastStateMetaInfoWriterV3<>(broadcastStateMetaInfo);
default:
// guard for future
throw new IllegalStateException(
"Unrecognized broadcast state meta info writer version: " + version);
}
}
public interface OperatorBackendStateMetaInfoWriter {
void writeOperatorStateMetaInfo(DataOutputView out) throws IOException;
}
public interface BroadcastStateMetaInfoWriter {
void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException;
}
public static abstract class AbstractOperatorBackendStateMetaInfoWriter<S>
implements OperatorBackendStateMetaInfoWriter {
protected final RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo;
public AbstractOperatorBackendStateMetaInfoWriter(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
}
}
public abstract static class AbstractBroadcastStateMetaInfoWriter<K, V>
implements BroadcastStateMetaInfoWriter {
protected final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo;
public AbstractBroadcastStateMetaInfoWriter(final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
this.broadcastStateMetaInfo = Preconditions.checkNotNull(broadcastStateMetaInfo);
}
}
public static class OperatorBackendStateMetaInfoWriterV1<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> {
public OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
super(stateMetaInfo);
}
@Override
public void writeOperatorStateMetaInfo(DataOutputView out) throws IOException {
out.writeUTF(stateMetaInfo.getName());
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getPartitionStateSerializer());
}
}
public static class OperatorBackendStateMetaInfoWriterV2<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> {
public OperatorBackendStateMetaInfoWriterV2(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
super(stateMetaInfo);
}
@Override
public void writeOperatorStateMetaInfo(DataOutputView out) throws IOException {
out.writeUTF(stateMetaInfo.getName());
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
// write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Collections.singletonList(new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
stateMetaInfo.getPartitionStateSerializer(),
stateMetaInfo.getPartitionStateSerializerConfigSnapshot())));
}
}
public static class BroadcastStateMetaInfoWriterV3<K, V> extends AbstractBroadcastStateMetaInfoWriter<K, V> {
public BroadcastStateMetaInfoWriterV3(
final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
super(broadcastStateMetaInfo);
}
@Override
public void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException {
out.writeUTF(broadcastStateMetaInfo.getName());
out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal());
// write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Arrays.asList(
Tuple2.of(
broadcastStateMetaInfo.getKeySerializer(),
broadcastStateMetaInfo.getKeySerializerConfigSnapshot()
),
Tuple2.of(
broadcastStateMetaInfo.getValueSerializer(),
broadcastStateMetaInfo.getValueSerializerConfigSnapshot()
)
)
);
}
}
// -------------------------------------------------------------------------------
// Readers
// - v1: Flink 1.2.x
// - v2: Flink 1.3.x
// - v3: Flink 1.5.x
// -------------------------------------------------------------------------------
public static <S> OperatorBackendStateMetaInfoReader<S> getOperatorStateReaderForVersion(
int version, ClassLoader userCodeClassLoader) {
switch (version) {
case 1:
return new OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader);
// version 2 and version 3 (current)
case 2:
case OperatorBackendSerializationProxy.VERSION:
return new OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader);
default:
// guard for future
throw new IllegalStateException(
"Unrecognized operator backend state meta info reader version: " + version);
}
}
public static <K, V> BroadcastStateMetaInfoReader<K, V> getBroadcastStateReaderForVersion(
int version, ClassLoader userCodeClassLoader) {
switch (version) {
// current version
case OperatorBackendSerializationProxy.VERSION:
return new BroadcastStateMetaInfoReaderV3<>(userCodeClassLoader);
default:
// guard for future
throw new IllegalStateException(
"Unrecognized broadcast state meta info reader version: " + version);
}
}
public interface OperatorBackendStateMetaInfoReader<S> {
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readOperatorStateMetaInfo(DataInputView in) throws IOException;
}
public interface BroadcastStateMetaInfoReader<K, V> {
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> readBroadcastStateMetaInfo(final DataInputView in) throws IOException;
}
public static abstract class AbstractOperatorBackendStateMetaInfoReader<S>
implements OperatorBackendStateMetaInfoReader<S> {
protected final ClassLoader userCodeClassLoader;
public AbstractOperatorBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
}
}
public abstract static class AbstractBroadcastStateMetaInfoReader<K, V>
implements BroadcastStateMetaInfoReader<K, V> {
protected final ClassLoader userCodeClassLoader;
public AbstractBroadcastStateMetaInfoReader(final ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
}
}
public static class OperatorBackendStateMetaInfoReaderV1<S> extends AbstractOperatorBackendStateMetaInfoReader<S> {
public OperatorBackendStateMetaInfoReaderV1(ClassLoader userCodeClassLoader) {
super(userCodeClassLoader);
}
@SuppressWarnings("unchecked")
@Override
public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readOperatorStateMetaInfo(DataInputView in) throws IOException {
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
new RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
stateMetaInfo.setName(in.readUTF());
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
DataInputViewStream dis = new DataInputViewStream(in);
ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
try (
InstantiationUtil.FailureTolerantObjectInputStream ois =
new InstantiationUtil.FailureTolerantObjectInputStream(dis, userCodeClassLoader)) {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
TypeSerializer<S> stateSerializer = (TypeSerializer<S>) ois.readObject();
stateMetaInfo.setPartitionStateSerializer(stateSerializer);
} catch (ClassNotFoundException exception) {
throw new IOException(exception);
} finally {
Thread.currentThread().setContextClassLoader(previousClassLoader);
}
// old versions do not contain the partition state serializer's configuration snapshot
stateMetaInfo.setPartitionStateSerializerConfigSnapshot(null);
return stateMetaInfo;
}
}
@SuppressWarnings("unchecked")
public static class OperatorBackendStateMetaInfoReaderV2<S> extends AbstractOperatorBackendStateMetaInfoReader<S> {
public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) {
super(userCodeClassLoader);
}
@Override
public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readOperatorStateMetaInfo(DataInputView in) throws IOException {
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
new RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
stateMetaInfo.setName(in.readUTF());
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> stateSerializerAndConfig =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0);
stateMetaInfo.setPartitionStateSerializer((TypeSerializer<S>) stateSerializerAndConfig.f0);
stateMetaInfo.setPartitionStateSerializerConfigSnapshot(stateSerializerAndConfig.f1);
return stateMetaInfo;
}
}
public static class BroadcastStateMetaInfoReaderV3<K, V> extends AbstractBroadcastStateMetaInfoReader<K, V> {
public BroadcastStateMetaInfoReaderV3(final ClassLoader userCodeClassLoader) {
super(userCodeClassLoader);
}
@Override
public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> readBroadcastStateMetaInfo(final DataInputView in) throws IOException {
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> stateMetaInfo =
new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>();
stateMetaInfo.setName(in.readUTF());
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializers =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig = serializers.get(0);
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> valueSerializerAndConfig = serializers.get(1);
stateMetaInfo.setKeySerializer((TypeSerializer<K>) keySerializerAndConfig.f0);
stateMetaInfo.setKeySerializerConfigSnapshot(keySerializerAndConfig.f1);
stateMetaInfo.setValueSerializer((TypeSerializer<V>) valueSerializerAndConfig.f0);
stateMetaInfo.setValueSerializerConfigSnapshot(valueSerializerAndConfig.f1);
return stateMetaInfo;
}
}
}
......@@ -20,14 +20,17 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
import javax.annotation.Nonnull;
public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/** The name of the state, as registered by the user. */
private final String name;
public class RegisteredBroadcastBackendStateMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
/** The mode how elements in this state are assigned to tasks during restore. */
private final OperatorStateHandle.Mode assignmentMode;
......@@ -44,22 +47,30 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
final TypeSerializer<K> keySerializer,
final TypeSerializer<V> valueSerializer) {
super(name);
Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.BROADCAST);
this.name = Preconditions.checkNotNull(name);
this.assignmentMode = assignmentMode;
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
}
public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
this(
Preconditions.checkNotNull(copy).name,
copy.assignmentMode,
copy.keySerializer.duplicate(),
copy.valueSerializer.duplicate());
}
Preconditions.checkNotNull(copy);
this.name = copy.name;
this.assignmentMode = copy.assignmentMode;
this.keySerializer = copy.keySerializer.duplicate();
this.valueSerializer = copy.valueSerializer.duplicate();
@SuppressWarnings("unchecked")
public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
OperatorStateHandle.Mode.valueOf(
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
(TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
(TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
}
/**
......@@ -69,8 +80,27 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
return new RegisteredBroadcastBackendStateMetaInfo<>(this);
}
public String getName() {
return name;
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
assignmentMode.toString());
Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
serializerMap.put(keySerializerKey, keySerializer.duplicate());
serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
return new StateMetaInfoSnapshot(
name,
StateMetaInfoSnapshot.BackendStateType.BROADCAST,
optionsMap,
serializerConfigSnapshotsMap,
serializerMap);
}
public TypeSerializer<K> getKeySerializer() {
......@@ -85,16 +115,6 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
return assignmentMode;
}
public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> snapshot() {
return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
name,
assignmentMode,
keySerializer.duplicate(),
valueSerializer.duplicate(),
keySerializer.snapshotConfiguration(),
valueSerializer.snapshotConfiguration());
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
......@@ -132,116 +152,4 @@ public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
", assignmentMode=" + assignmentMode +
'}';
}
/**
* A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}.
*/
public static class Snapshot<K, V> {
private String name;
private OperatorStateHandle.Mode assignmentMode;
private TypeSerializer<K> keySerializer;
private TypeSerializer<V> valueSerializer;
private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
private TypeSerializerConfigSnapshot valueSerializerConfigSnapshot;
/** Empty constructor used when restoring the state meta info snapshot. */
Snapshot() {}
private Snapshot(
final String name,
final OperatorStateHandle.Mode assignmentMode,
final TypeSerializer<K> keySerializer,
final TypeSerializer<V> valueSerializer,
final TypeSerializerConfigSnapshot keySerializerConfigSnapshot,
final TypeSerializerConfigSnapshot valueSerializerConfigSnapshot) {
this.name = Preconditions.checkNotNull(name);
this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializerConfigSnapshot);
this.valueSerializerConfigSnapshot = Preconditions.checkNotNull(valueSerializerConfigSnapshot);
}
public String getName() {
return name;
}
void setName(String name) {
this.name = name;
}
public OperatorStateHandle.Mode getAssignmentMode() {
return assignmentMode;
}
void setAssignmentMode(OperatorStateHandle.Mode mode) {
this.assignmentMode = mode;
}
public TypeSerializer<K> getKeySerializer() {
return keySerializer;
}
void setKeySerializer(TypeSerializer<K> serializer) {
this.keySerializer = serializer;
}
public TypeSerializer<V> getValueSerializer() {
return valueSerializer;
}
void setValueSerializer(TypeSerializer<V> serializer) {
this.valueSerializer = serializer;
}
public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() {
return keySerializerConfigSnapshot;
}
void setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) {
this.keySerializerConfigSnapshot = configSnapshot;
}
public TypeSerializerConfigSnapshot getValueSerializerConfigSnapshot() {
return valueSerializerConfigSnapshot;
}
void setValueSerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) {
this.valueSerializerConfigSnapshot = configSnapshot;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo.Snapshot)) {
return false;
}
RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot =
(RegisteredBroadcastBackendStateMetaInfo.Snapshot) obj;
return name.equals(snapshot.getName())
&& assignmentMode.ordinal() == snapshot.getAssignmentMode().ordinal()
&& Objects.equals(keySerializer, snapshot.getKeySerializer())
&& Objects.equals(valueSerializer, snapshot.getValueSerializer())
&& keySerializerConfigSnapshot.equals(snapshot.getKeySerializerConfigSnapshot())
&& valueSerializerConfigSnapshot.equals(snapshot.getValueSerializerConfigSnapshot());
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + assignmentMode.hashCode();
result = 31 * result + ((keySerializer != null) ? keySerializer.hashCode() : 0);
result = 31 * result + ((valueSerializer != null) ? valueSerializer.hashCode() : 0);
result = 31 * result + keySerializerConfigSnapshot.hashCode();
result = 31 * result + valueSerializerConfigSnapshot.hashCode();
return result;
}
}
}
......@@ -24,9 +24,15 @@ import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -38,10 +44,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* @param <N> Type of namespace
* @param <S> Type of state value
*/
public class RegisteredKeyedBackendStateMetaInfo<N, S> {
public class RegisteredKeyedBackendStateMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
private final StateDescriptor.Type stateType;
private final String name;
private final TypeSerializer<N> namespaceSerializer;
private final TypeSerializer<S> stateSerializer;
......@@ -51,18 +56,24 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
TypeSerializer<N> namespaceSerializer,
TypeSerializer<S> stateSerializer) {
super(name);
this.stateType = checkNotNull(stateType);
this.name = checkNotNull(name);
this.namespaceSerializer = checkNotNull(namespaceSerializer);
this.stateSerializer = checkNotNull(stateSerializer);
}
public StateDescriptor.Type getStateType() {
return stateType;
@SuppressWarnings("unchecked")
public RegisteredKeyedBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
this(
StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
snapshot.getName(),
(TypeSerializer<N>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
(TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
}
public String getName() {
return name;
public StateDescriptor.Type getStateType() {
return stateType;
}
public TypeSerializer<N> getNamespaceSerializer() {
......@@ -73,16 +84,6 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
return stateSerializer;
}
public Snapshot<N, S> snapshot() {
return new Snapshot<>(
stateType,
name,
namespaceSerializer.duplicate(),
stateSerializer.duplicate(),
namespaceSerializer.snapshotConfiguration(),
stateSerializer.snapshotConfiguration());
}
@Override
public boolean equals(Object o) {
if (this == o) {
......@@ -126,132 +127,13 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
return result;
}
/**
* A consistent snapshot of a {@link RegisteredKeyedBackendStateMetaInfo}.
*/
public static class Snapshot<N, S> {
private StateDescriptor.Type stateType;
private String name;
private TypeSerializer<N> namespaceSerializer;
private TypeSerializer<S> stateSerializer;
private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot;
private TypeSerializerConfigSnapshot stateSerializerConfigSnapshot;
/** Empty constructor used when restoring the state meta info snapshot. */
Snapshot() {}
private Snapshot(
StateDescriptor.Type stateType,
String name,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<S> stateSerializer,
TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot,
TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) {
this.stateType = Preconditions.checkNotNull(stateType);
this.name = Preconditions.checkNotNull(name);
this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
this.namespaceSerializerConfigSnapshot = Preconditions.checkNotNull(namespaceSerializerConfigSnapshot);
this.stateSerializerConfigSnapshot = Preconditions.checkNotNull(stateSerializerConfigSnapshot);
}
public StateDescriptor.Type getStateType() {
return stateType;
}
void setStateType(StateDescriptor.Type stateType) {
this.stateType = stateType;
}
public String getName() {
return name;
}
void setName(String name) {
this.name = name;
}
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
void setNamespaceSerializer(TypeSerializer<N> namespaceSerializer) {
this.namespaceSerializer = namespaceSerializer;
}
public TypeSerializer<S> getStateSerializer() {
return stateSerializer;
}
void setStateSerializer(TypeSerializer<S> stateSerializer) {
this.stateSerializer = stateSerializer;
}
public TypeSerializerConfigSnapshot getNamespaceSerializerConfigSnapshot() {
return namespaceSerializerConfigSnapshot;
}
void setNamespaceSerializerConfigSnapshot(TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot) {
this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot;
}
public TypeSerializerConfigSnapshot getStateSerializerConfigSnapshot() {
return stateSerializerConfigSnapshot;
}
void setStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) {
this.stateSerializerConfigSnapshot = stateSerializerConfigSnapshot;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Snapshot<?, ?> that = (Snapshot<?, ?>) o;
if (!stateType.equals(that.stateType)) {
return false;
}
if (!getName().equals(that.getName())) {
return false;
}
// need to check for nulls because serializer and config snapshots may be null on restore
return Objects.equals(getStateSerializer(), that.getStateSerializer())
&& Objects.equals(getNamespaceSerializer(), that.getNamespaceSerializer())
&& Objects.equals(getNamespaceSerializerConfigSnapshot(), that.getNamespaceSerializerConfigSnapshot())
&& Objects.equals(getStateSerializerConfigSnapshot(), that.getStateSerializerConfigSnapshot());
}
@Override
public int hashCode() {
// need to check for nulls because serializer and config snapshots may be null on restore
int result = getName().hashCode();
result = 31 * result + getStateType().hashCode();
result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
result = 31 * result + (getNamespaceSerializerConfigSnapshot() != null ? getNamespaceSerializerConfigSnapshot().hashCode() : 0);
result = 31 * result + (getStateSerializerConfigSnapshot() != null ? getStateSerializerConfigSnapshot().hashCode() : 0);
return result;
}
}
/**
* Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
* This checks that the descriptor specifies identical names and state types, as well as
* serializers that are compatible for the restored k/v state bytes.
*/
public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredStateMetaInfoSnapshot,
public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
TypeSerializer<N> newNamespaceSerializer,
StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
......@@ -261,28 +143,36 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
"registered with [" + newStateDescriptor.getName() + "].");
final StateDescriptor.Type restoredType =
StateDescriptor.Type.valueOf(
restoredStateMetaInfoSnapshot.getOption(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
&& !Objects.equals(restoredStateMetaInfoSnapshot.getStateType(), StateDescriptor.Type.UNKNOWN)) {
&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
Preconditions.checkState(
newStateDescriptor.getType() == restoredStateMetaInfoSnapshot.getStateType(),
newStateDescriptor.getType() == restoredType,
"Incompatible state types. " +
"Was [" + restoredStateMetaInfoSnapshot.getStateType() + "], " +
"Was [" + restoredType + "], " +
"registered with [" + newStateDescriptor.getType() + "].");
}
// check compatibility results to determine if state migration is required
CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredStateMetaInfoSnapshot.getNamespaceSerializer(),
restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
null,
restoredStateMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
newNamespaceSerializer);
TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredStateMetaInfoSnapshot.getStateSerializer(),
restoredStateMetaInfoSnapshot.getTypeSerializer(
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
UnloadableDummyTypeSerializer.class,
restoredStateMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
newStateSerializer);
if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
......@@ -296,4 +186,27 @@ public class RegisteredKeyedBackendStateMetaInfo<N, S> {
newStateSerializer);
}
}
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
stateType.toString());
Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
return new StateMetaInfoSnapshot(
name,
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
optionsMap,
serializerConfigSnapshotsMap,
serializerMap);
}
}
......@@ -20,9 +20,13 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.Map;
/**
* Compound meta information for a registered state in an operator state backend.
......@@ -30,12 +34,7 @@ import java.util.Objects;
*
* @param <S> Type of the state.
*/
public class RegisteredOperatorBackendStateMetaInfo<S> {
/**
* The name of the state, as registered by the user
*/
private final String name;
public class RegisteredOperatorBackendStateMetaInfo<S> extends RegisteredStateMetaInfoBase {
/**
* The mode how elements in this state are assigned to tasks during restore
......@@ -51,19 +50,26 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
String name,
TypeSerializer<S> partitionStateSerializer,
OperatorStateHandle.Mode assignmentMode) {
this.name = Preconditions.checkNotNull(name);
super(Preconditions.checkNotNull(name));
this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
}
private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
this(
Preconditions.checkNotNull(copy).name,
copy.partitionStateSerializer.duplicate(),
copy.assignmentMode);
}
Preconditions.checkNotNull(copy);
this.name = copy.name;
this.partitionStateSerializer = copy.partitionStateSerializer.duplicate();
this.assignmentMode = copy.assignmentMode;
@SuppressWarnings("unchecked")
public RegisteredOperatorBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
(TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
OperatorStateHandle.Mode.valueOf(
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
}
/**
......@@ -73,8 +79,24 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
return new RegisteredOperatorBackendStateMetaInfo<>(this);
}
public String getName() {
return name;
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
assignmentMode.toString());
String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
Map<String, TypeSerializer<?>> serializerMap =
Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
return new StateMetaInfoSnapshot(
name,
StateMetaInfoSnapshot.BackendStateType.OPERATOR,
optionsMap,
serializerConfigSnapshotsMap,
serializerMap);
}
public OperatorStateHandle.Mode getAssignmentMode() {
......@@ -85,14 +107,6 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
return partitionStateSerializer;
}
public Snapshot<S> snapshot() {
return new Snapshot<>(
name,
assignmentMode,
partitionStateSerializer.duplicate(),
partitionStateSerializer.snapshotConfiguration());
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
......@@ -125,95 +139,4 @@ public class RegisteredOperatorBackendStateMetaInfo<S> {
", partitionStateSerializer=" + partitionStateSerializer +
'}';
}
/**
* A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}.
*/
public static class Snapshot<S> {
private String name;
private OperatorStateHandle.Mode assignmentMode;
private TypeSerializer<S> partitionStateSerializer;
private TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot;
/** Empty constructor used when restoring the state meta info snapshot. */
Snapshot() {}
private Snapshot(
String name,
OperatorStateHandle.Mode assignmentMode,
TypeSerializer<S> partitionStateSerializer,
TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) {
this.name = Preconditions.checkNotNull(name);
this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
this.partitionStateSerializerConfigSnapshot = Preconditions.checkNotNull(partitionStateSerializerConfigSnapshot);
}
public String getName() {
return name;
}
void setName(String name) {
this.name = name;
}
public OperatorStateHandle.Mode getAssignmentMode() {
return assignmentMode;
}
void setAssignmentMode(OperatorStateHandle.Mode assignmentMode) {
this.assignmentMode = assignmentMode;
}
public TypeSerializer<S> getPartitionStateSerializer() {
return partitionStateSerializer;
}
void setPartitionStateSerializer(TypeSerializer<S> partitionStateSerializer) {
this.partitionStateSerializer = partitionStateSerializer;
}
public TypeSerializerConfigSnapshot getPartitionStateSerializerConfigSnapshot() {
return partitionStateSerializerConfigSnapshot;
}
void setPartitionStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) {
this.partitionStateSerializerConfigSnapshot = partitionStateSerializerConfigSnapshot;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof Snapshot)) {
return false;
}
Snapshot snapshot = (Snapshot)obj;
// need to check for nulls because serializer and config snapshots may be null on restore
return name.equals(snapshot.getName())
&& assignmentMode.equals(snapshot.getAssignmentMode())
&& Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer())
&& Objects.equals(partitionStateSerializerConfigSnapshot, snapshot.getPartitionStateSerializerConfigSnapshot());
}
@Override
public int hashCode() {
// need to check for nulls because serializer and config snapshots may be null on restore
int result = getName().hashCode();
result = 31 * result + getAssignmentMode().hashCode();
result = 31 * result + (getPartitionStateSerializer() != null ? getPartitionStateSerializer().hashCode() : 0);
result = 31 * result + (getPartitionStateSerializerConfigSnapshot() != null ? getPartitionStateSerializerConfigSnapshot().hashCode() : 0);
return result;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import javax.annotation.Nonnull;
/**
* Base class for all registered state in state backends.
*/
public abstract class RegisteredStateMetaInfoBase {
/** The name of the state */
@Nonnull
protected final String name;
public RegisteredStateMetaInfoBase(@Nonnull String name) {
this.name = name;
}
@Nonnull
public String getName() {
return name;
}
@Nonnull
public abstract StateMetaInfoSnapshot snapshot();
}
......@@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
......@@ -144,7 +145,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
private final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos;
/**
* The configuration for local recovery.
......@@ -198,8 +199,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo;
if (stateTable != null) {
@SuppressWarnings("unchecked")
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfoSnapshot =
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateDesc.getName());
StateMetaInfoSnapshot restoredMetaInfoSnapshot =
restoredKvStateMetaInfos.get(stateDesc.getName());
Preconditions.checkState(
restoredMetaInfoSnapshot != null,
......@@ -332,10 +333,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
keySerializerRestored = true;
}
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
List<StateMetaInfoSnapshot> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
......@@ -344,11 +345,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (null == stateTable) {
RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
restoredMetaInfo.getStateType(),
restoredMetaInfo.getName(),
restoredMetaInfo.getNamespaceSerializer(),
restoredMetaInfo.getStateSerializer());
new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
stateTable = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
stateTables.put(restoredMetaInfo.getName(), stateTable);
......@@ -558,7 +555,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Too many KV-States: " + stateTables.size() +
". Currently at most " + Short.MAX_VALUE + " states are supported");
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
List<StateMetaInfoSnapshot> metaInfoSnapshots =
new ArrayList<>(stateTables.size());
final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
......
......@@ -49,6 +49,7 @@ class StateTableByKeyGroupReaders {
case 2:
case 3:
case 4:
case 5:
return new StateTableByKeyGroupReaderV2V3<>(table);
default:
throw new IllegalArgumentException("Unknown version: " + version);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.metainfo;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.InstantiationUtil;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class holds the deprecated implementations of readers for state meta infos. They can be removed when we drop
* backwards compatibility.
*/
public class LegacyStateMetaInfoReaders {
private LegacyStateMetaInfoReaders() {
}
/**
* Implementation of {@link StateMetaInfoReader} for version 3 of keyed state.
* - v3: Flink 1.4.x, 1.5.x
*/
static class KeyedBackendStateMetaInfoReaderV3V4 implements StateMetaInfoReader {
static final KeyedBackendStateMetaInfoReaderV3V4 INSTANCE = new KeyedBackendStateMetaInfoReaderV3V4();
@Nonnull
@Override
public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@Nonnull DataInputView in, @Nonnull ClassLoader userCodeClassLoader) throws IOException {
final StateDescriptor.Type stateDescType = StateDescriptor.Type.values()[in.readInt()];
final String stateName = in.readUTF();
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
stateDescType.toString());
Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
serializerMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
serializersAndConfigs.get(0).f0);
serializerMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
serializersAndConfigs.get(1).f0);
Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotMap = new HashMap<>(2);
serializerConfigSnapshotMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
serializersAndConfigs.get(0).f1);
serializerConfigSnapshotMap.put(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
serializersAndConfigs.get(1).f1);
return new StateMetaInfoSnapshot(
stateName,
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
optionsMap,
serializerConfigSnapshotMap,
serializerMap);
}
}
/**
* Implementation of {@link StateMetaInfoReader} for version 1 and 2 of keyed state.
* - v1: Flink 1.2.x
* - v2: Flink 1.3.x
*/
static class KeyedBackendStateMetaInfoReaderV1V2 implements StateMetaInfoReader {
static final KeyedBackendStateMetaInfoReaderV1V2 INSTANCE = new KeyedBackendStateMetaInfoReaderV1V2();
@Nonnull
@Override
public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@Nonnull DataInputView in,
@Nonnull ClassLoader userCodeClassLoader) throws IOException {
final StateDescriptor.Type stateDescType = StateDescriptor.Type.values()[in.readInt()];
final String stateName = in.readUTF();
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
stateDescType.toString());
Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
serializerMap.put(
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(),
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true));
serializerMap.put(
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true));
return new StateMetaInfoSnapshot(
stateName,
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
optionsMap,
Collections.emptyMap(),
serializerMap);
}
}
/**
* Unified reader for older versions of operator (version 2 and 3) AND broadcast state (version 3).
* <p>
* - v2: Flink 1.3.x, 1.4.x
* - v3: Flink 1.5.x
*/
static class OperatorBackendStateMetaInfoReaderV2V3 implements StateMetaInfoReader {
static final OperatorBackendStateMetaInfoReaderV2V3 INSTANCE = new OperatorBackendStateMetaInfoReaderV2V3();
private static final String[] ORDERED_KEY_STRINGS =
new String[]{
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(),
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()};
@Nonnull
@Override
public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@Nonnull DataInputView in,
@Nonnull ClassLoader userCodeClassLoader) throws IOException {
final String name = in.readUTF();
final OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[in.readByte()];
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
mode.toString());
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> stateSerializerAndConfigList =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader);
final int listSize = stateSerializerAndConfigList.size();
StateMetaInfoSnapshot.BackendStateType stateType = listSize == 1 ?
StateMetaInfoSnapshot.BackendStateType.OPERATOR : StateMetaInfoSnapshot.BackendStateType.BROADCAST;
Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(listSize);
Map<String, TypeSerializerConfigSnapshot> serializerConfigsMap = new HashMap<>(listSize);
for (int i = 0; i < listSize; ++i) {
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serializerAndConf =
stateSerializerAndConfigList.get(i);
// this particular mapping happens to support both, V2 and V3
String serializerKey = ORDERED_KEY_STRINGS[ORDERED_KEY_STRINGS.length - 1 - i];
serializerMap.put(
serializerKey,
serializerAndConf.f0);
serializerConfigsMap.put(
serializerKey,
serializerAndConf.f1);
}
return new StateMetaInfoSnapshot(
name,
stateType,
optionsMap,
serializerConfigsMap,
serializerMap);
}
}
/**
* Reader for older versions of operator state (version 1).
* - v1: Flink 1.2.x
*/
public static class OperatorBackendStateMetaInfoReaderV1 implements StateMetaInfoReader {
static final OperatorBackendStateMetaInfoReaderV1 INSTANCE = new OperatorBackendStateMetaInfoReaderV1();
@Nonnull
@Override
public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@Nonnull DataInputView in,
@Nonnull ClassLoader userCodeClassLoader) throws IOException {
final String name = in.readUTF();
final OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[in.readByte()];
final Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
mode.toString());
DataInputViewStream dis = new DataInputViewStream(in);
ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
try (
InstantiationUtil.FailureTolerantObjectInputStream ois =
new InstantiationUtil.FailureTolerantObjectInputStream(dis, userCodeClassLoader)) {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
TypeSerializer<?> stateSerializer = (TypeSerializer<?>) ois.readObject();
return new StateMetaInfoSnapshot(
name,
StateMetaInfoSnapshot.BackendStateType.OPERATOR,
optionsMap,
Collections.emptyMap(),
Collections.singletonMap(
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
stateSerializer));
} catch (ClassNotFoundException exception) {
throw new IOException(exception);
} finally {
Thread.currentThread().setContextClassLoader(previousClassLoader);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.metainfo;
import org.apache.flink.core.memory.DataInputView;
import javax.annotation.Nonnull;
import java.io.IOException;
/**
* Functional interface to read {@link StateMetaInfoSnapshot}.
*/
@FunctionalInterface
public interface StateMetaInfoReader {
/**
* Reads a snapshot from the given input view.
*
* @param inputView the input to read from.
* @param userCodeClassLoader user classloader to deserialize the objects in the snapshot.
* @return the deserialized snapshot.
* @throws IOException on deserialization problems.
*/
@Nonnull
StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@Nonnull DataInputView inputView,
@Nonnull ClassLoader userCodeClassLoader) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.metainfo;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
/**
* Generalized snapshot for meta information about one state in a state backend
* (e.g. {@link org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo}).
*/
public class StateMetaInfoSnapshot {
/**
* Enum that defines the different types of state that live in Flink backends.
*/
public enum BackendStateType {
KEY_VALUE,
OPERATOR,
BROADCAST,
TIMER
}
/**
* Predefined keys for the most common options in the meta info.
*/
public enum CommonOptionsKeys {
/** Key to define the {@link StateDescriptor.Type} of a key/value keyed-state */
KEYED_STATE_TYPE,
/**
* Key to define {@link org.apache.flink.runtime.state.OperatorStateHandle.Mode}, about how operator state is
* distributed on restore
*/
OPERATOR_STATE_DISTRIBUTION_MODE,
}
/**
* Predefined keys for the most common serializer types in the meta info.
*/
public enum CommonSerializerKeys {
KEY_SERIALIZER,
NAMESPACE_SERIALIZER,
VALUE_SERIALIZER
}
/** The name of the state. */
@Nonnull
private final String name;
@Nonnull
private final BackendStateType backendStateType;
/** Map of options (encoded as strings) for the state. */
@Nonnull
private final Map<String, String> options;
/** The configurations of all the type serializers used with the state. */
@Nonnull
private final Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshots;
// TODO this will go awy again after FLINK-9377 is merged, that is why it is currently duplicated here.
/** The serializers used by the state. */
@Nonnull
private final Map<String, TypeSerializer<?>> serializers;
public StateMetaInfoSnapshot(
@Nonnull String name,
@Nonnull BackendStateType backendStateType,
@Nonnull Map<String, String> options,
@Nonnull Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshots,
@Nonnull Map<String, TypeSerializer<?>> serializers) {
this.name = name;
this.backendStateType = backendStateType;
this.options = options;
this.serializerConfigSnapshots = serializerConfigSnapshots;
this.serializers = serializers;
}
@Nonnull
public BackendStateType getBackendStateType() {
return backendStateType;
}
@Nullable
public TypeSerializerConfigSnapshot getTypeSerializerConfigSnapshot(@Nonnull String key) {
return serializerConfigSnapshots.get(key);
}
@Nullable
public TypeSerializerConfigSnapshot getTypeSerializerConfigSnapshot(@Nonnull CommonSerializerKeys key) {
return getTypeSerializerConfigSnapshot(key.toString());
}
@Nullable
public String getOption(@Nonnull String key) {
return options.get(key);
}
@Nullable
public String getOption(@Nonnull StateMetaInfoSnapshot.CommonOptionsKeys key) {
return getOption(key.toString());
}
@Nonnull
public Map<String, String> getOptionsImmutable() {
return Collections.unmodifiableMap(options);
}
@Nonnull
public String getName() {
return name;
}
@Nullable
public TypeSerializer<?> getTypeSerializer(@Nonnull String key) {
return serializers.get(key);
}
@Nullable
public TypeSerializer<?> getTypeSerializer(@Nonnull CommonSerializerKeys key) {
return getTypeSerializer(key.toString());
}
@Nonnull
public Map<String, TypeSerializerConfigSnapshot> getSerializerConfigSnapshotsImmutable() {
return Collections.unmodifiableMap(serializerConfigSnapshots);
}
@Nonnull
public Map<String, TypeSerializer<?>> getSerializersImmutable() {
return Collections.unmodifiableMap(serializers);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.metainfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Static factory that gives out the write and readers for different versions of {@link StateMetaInfoSnapshot}.
*/
public class StateMetaInfoSnapshotReadersWriters {
/**
* Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
* - v5: Flink 1.6.x
*/
public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
/**
* Enum for backeards compatibility. This gives a hint about the expected state type for which a
* {@link StateMetaInfoSnapshot} should be deserialized.
*
* TODO this can go away after we eventually drop backwards compatibility with all versions < 5.
*/
public enum StateTypeHint {
KEYED_STATE,
OPERATOR_STATE
}
/**
* Returns the writer for {@link StateMetaInfoSnapshot}.
*/
@Nonnull
public static StateMetaInfoWriter getWriter() {
return CurrentWriterImpl.INSTANCE;
}
/**
* Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
*
* @param readVersion the format version to read.
* @param stateTypeHint a hint about the expected type to read.
* @return the requested reader.
*/
@Nonnull
public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
// latest version shortcut
return CurrentReaderImpl.INSTANCE;
}
if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
}
switch (stateTypeHint) {
case KEYED_STATE:
return getLegacyKeyedStateMetaInfoReader(readVersion);
case OPERATOR_STATE:
return getLegacyOperatorStateMetaInfoReader(readVersion);
default:
throw new IllegalArgumentException("Unsupported state type hint: " + stateTypeHint);
}
}
@Nonnull
private static StateMetaInfoReader getLegacyKeyedStateMetaInfoReader(int readVersion) {
switch (readVersion) {
case 1:
case 2:
return LegacyStateMetaInfoReaders.KeyedBackendStateMetaInfoReaderV1V2.INSTANCE;
case 3:
case 4:
return LegacyStateMetaInfoReaders.KeyedBackendStateMetaInfoReaderV3V4.INSTANCE;
default:
// guard for future
throw new IllegalStateException(
"Unrecognized keyed backend state meta info writer version: " + readVersion);
}
}
@Nonnull
private static StateMetaInfoReader getLegacyOperatorStateMetaInfoReader(int readVersion) {
switch (readVersion) {
case 1:
return LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV1.INSTANCE;
case 2:
case 3:
return LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3.INSTANCE;
default:
// guard for future
throw new IllegalStateException(
"Unrecognized operator backend state meta info writer version: " + readVersion);
}
}
//----------------------------------------------------------
/**
* Implementation of {@link StateMetaInfoWriter}.
*/
static class CurrentWriterImpl implements StateMetaInfoWriter {
private static final CurrentWriterImpl INSTANCE = new CurrentWriterImpl();
@Override
public void writeStateMetaInfoSnapshot(
@Nonnull StateMetaInfoSnapshot snapshot,
@Nonnull DataOutputView outputView) throws IOException {
final Map<String, String> optionsMap = snapshot.getOptionsImmutable();
final Map<String, TypeSerializer<?>> serializerMap = snapshot.getSerializersImmutable();
final Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
snapshot.getSerializerConfigSnapshotsImmutable();
Preconditions.checkState(serializerMap.size() == serializerConfigSnapshotsMap.size());
outputView.writeUTF(snapshot.getName());
outputView.writeInt(snapshot.getBackendStateType().ordinal());
outputView.writeInt(optionsMap.size());
for (Map.Entry<String, String> entry : optionsMap.entrySet()) {
outputView.writeUTF(entry.getKey());
outputView.writeUTF(entry.getValue());
}
outputView.writeInt(serializerMap.size());
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersWithConfig =
new ArrayList<>(serializerMap.size());
for (Map.Entry<String, TypeSerializer<?>> entry : serializerMap.entrySet()) {
final String key = entry.getKey();
outputView.writeUTF(key);
TypeSerializerConfigSnapshot configForSerializer =
Preconditions.checkNotNull(serializerConfigSnapshotsMap.get(key));
serializersWithConfig.add(new Tuple2<>(entry.getValue(), configForSerializer));
}
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(outputView, serializersWithConfig);
}
}
/**
* Implementation of {@link StateMetaInfoReader} for the current version and generic for all state types.
*/
static class CurrentReaderImpl implements StateMetaInfoReader {
private static final CurrentReaderImpl INSTANCE = new CurrentReaderImpl();
@Nonnull
@Override
public StateMetaInfoSnapshot readStateMetaInfoSnapshot(
@Nonnull DataInputView inputView,
@Nonnull ClassLoader userCodeClassLoader) throws IOException {
final String stateName = inputView.readUTF();
final StateMetaInfoSnapshot.BackendStateType stateType =
StateMetaInfoSnapshot.BackendStateType.values()[inputView.readInt()];
final int numOptions = inputView.readInt();
HashMap<String, String> optionsMap = new HashMap<>(numOptions);
for (int i = 0; i < numOptions; ++i) {
String key = inputView.readUTF();
String value = inputView.readUTF();
optionsMap.put(key, value);
}
final int numSerializer = inputView.readInt();
final ArrayList<String> serializerKeys = new ArrayList<>(numSerializer);
final HashMap<String, TypeSerializer<?>> serializerMap = new HashMap<>(numSerializer);
final HashMap<String, TypeSerializerConfigSnapshot> serializerConfigsMap = new HashMap<>(numSerializer);
for (int i = 0; i < numSerializer; ++i) {
serializerKeys.add(inputView.readUTF());
}
final List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersWithConfig =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(inputView, userCodeClassLoader);
for (int i = 0; i < numSerializer; ++i) {
String key = serializerKeys.get(i);
final Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serializerConfigTuple =
serializersWithConfig.get(i);
serializerMap.put(key, serializerConfigTuple.f0);
serializerConfigsMap.put(key, serializerConfigTuple.f1);
}
return new StateMetaInfoSnapshot(stateName, stateType, optionsMap, serializerConfigsMap, serializerMap);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.metainfo;
import org.apache.flink.core.memory.DataOutputView;
import javax.annotation.Nonnull;
import java.io.IOException;
/**
* Functional interface to write {@link StateMetaInfoSnapshot}.
*/
@FunctionalInterface
public interface StateMetaInfoWriter {
/**
* Writes the given snapshot to the output view.
*
* @param snapshot the snapshot to write.
* @param outputView the output to write into.
* @throws IOException on write problems.
*/
void writeStateMetaInfoSnapshot(
@Nonnull StateMetaInfoSnapshot snapshot,
@Nonnull DataOutputView outputView) throws IOException;
}
......@@ -54,16 +54,16 @@ public class CopyOnWriteStateTableTest extends TestLogger {
@Test
public void testPutGetRemoveContainsTransform() throws Exception {
RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
new CopyOnWriteStateTable<>(keyContext, metaInfo);
new CopyOnWriteStateTable<>(keyContext, metaInfo);
ArrayList<Integer> state_1_1 = new ArrayList<>();
state_1_1.add(41);
......@@ -106,13 +106,13 @@ public class CopyOnWriteStateTableTest extends TestLogger {
Assert.assertEquals(1, stateTable.size());
StateTransformationFunction<ArrayList<Integer>, Integer> function =
new StateTransformationFunction<ArrayList<Integer>, Integer>() {
@Override
public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
previousState.add(value);
return previousState;
}
};
new StateTransformationFunction<ArrayList<Integer>, Integer>() {
@Override
public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
previousState.add(value);
return previousState;
}
};
final int value = 4711;
stateTable.transform(1, 1, value, function);
......@@ -126,16 +126,16 @@ public class CopyOnWriteStateTableTest extends TestLogger {
@Test
public void testIncrementalRehash() {
RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
new CopyOnWriteStateTable<>(keyContext, metaInfo);
new CopyOnWriteStateTable<>(keyContext, metaInfo);
int insert = 0;
int remove = 0;
......@@ -171,16 +171,16 @@ public class CopyOnWriteStateTableTest extends TestLogger {
public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
final RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
new CopyOnWriteStateTable<>(keyContext, metaInfo);
new CopyOnWriteStateTable<>(keyContext, metaInfo);
final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> referenceMap = new HashMap<>();
......@@ -200,17 +200,17 @@ public class CopyOnWriteStateTableTest extends TestLogger {
int referencedSnapshotId = 0;
final StateTransformationFunction<ArrayList<Integer>, Integer> transformationFunction =
new StateTransformationFunction<ArrayList<Integer>, Integer>() {
@Override
public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
if (previousState == null) {
previousState = new ArrayList<>();
}
previousState.add(value);
// we give back the original, attempting to spot errors in to copy-on-write
return previousState;
new StateTransformationFunction<ArrayList<Integer>, Integer>() {
@Override
public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception {
if (previousState == null) {
previousState = new ArrayList<>();
}
};
previousState.add(value);
// we give back the original, attempting to spot errors in to copy-on-write
return previousState;
}
};
// the main loop for modifications
for (int i = 0; i < 10_000_000; ++i) {
......@@ -261,7 +261,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
final int updateValue = random.nextInt(1000);
stateTable.transform(key, namespace, updateValue, transformationFunction);
referenceMap.put(compositeKey, transformationFunction.apply(
referenceMap.remove(compositeKey), updateValue));
referenceMap.remove(compositeKey), updateValue));
break;
}
default: {
......@@ -326,16 +326,16 @@ public class CopyOnWriteStateTableTest extends TestLogger {
@Test
public void testCopyOnWriteContracts() {
RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE);
final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable =
new CopyOnWriteStateTable<>(keyContext, metaInfo);
new CopyOnWriteStateTable<>(keyContext, metaInfo);
ArrayList<Integer> originalState1 = new ArrayList<>(1);
ArrayList<Integer> originalState2 = new ArrayList<>(1);
......@@ -477,8 +477,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
@SuppressWarnings("unchecked")
private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(
HashMap<Tuple2<Integer, Integer>,
ArrayList<Integer>> map) {
HashMap<Tuple2<Integer, Integer>,
ArrayList<Integer>> map) {
Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new Tuple3[map.size()];
int pos = 0;
......@@ -491,8 +491,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
}
private void deepCheck(
Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
if (a == b) {
return;
......@@ -501,14 +501,14 @@ public class CopyOnWriteStateTableTest extends TestLogger {
Assert.assertEquals(a.length, b.length);
Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> comparator =
new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>() {
new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>() {
@Override
public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) {
int namespaceDiff = o1.f1 - o2.f1;
return namespaceDiff != 0 ? namespaceDiff : o1.f0 - o2.f0;
}
};
@Override
public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) {
int namespaceDiff = o1.f1 - o2.f1;
return namespaceDiff != 0 ? namespaceDiff : o1.f0 - o2.f0;
}
};
Arrays.sort(a, comparator);
Arrays.sort(b, comparator);
......
......@@ -47,17 +47,17 @@ public class StateTableSnapshotCompatibilityTest {
public void checkCompatibleSerializationFormats() throws IOException {
final Random r = new Random(42);
RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE));
new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
"test",
IntSerializer.INSTANCE,
new ArrayListSerializer<>(IntSerializer.INSTANCE));
final CopyOnWriteStateTableTest.MockInternalKeyContext<Integer> keyContext =
new CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
new CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> cowStateTable =
new CopyOnWriteStateTable<>(keyContext, metaInfo);
new CopyOnWriteStateTable<>(keyContext, metaInfo);
for (int i = 0; i < 100; ++i) {
ArrayList<Integer> list = new ArrayList<>(5);
......@@ -72,7 +72,7 @@ public class StateTableSnapshotCompatibilityTest {
StateSnapshot snapshot = cowStateTable.createSnapshot();
final NestedMapsStateTable<Integer, Integer, ArrayList<Integer>> nestedMapsStateTable =
new NestedMapsStateTable<>(keyContext, metaInfo);
new NestedMapsStateTable<>(keyContext, metaInfo);
restoreStateTableFromSnapshot(nestedMapsStateTable, snapshot, keyContext.getKeyGroupRange());
snapshot.release();
......@@ -96,9 +96,9 @@ public class StateTableSnapshotCompatibilityTest {
}
private static <K, N, S> void restoreStateTableFromSnapshot(
StateTable<K, N, S> stateTable,
StateSnapshot snapshot,
KeyGroupRange keyGroupRange) throws IOException {
StateTable<K, N, S> stateTable,
StateSnapshot snapshot,
KeyGroupRange keyGroupRange) throws IOException {
final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024);
final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
......@@ -111,7 +111,7 @@ public class StateTableSnapshotCompatibilityTest {
final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
final StateTableByKeyGroupReader keyGroupReader =
StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION);
StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION);
for (Integer keyGroup : keyGroupRange) {
keyGroupReader.readMappingsInKeyGroup(div, keyGroup);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.metainfo;
import org.junit.Assert;
import org.junit.Test;
/**
* This test fixes the enum constants in {@link StateMetaInfoSnapshot} because any changes can break backwards
* compatibility. Consider this before changing this test.
*/
public class StateMetaInfoSnapshotEnumConstantsTest {
@Test
public void testFixedBackendStateTypeEnumConstants() {
Assert.assertEquals(4, StateMetaInfoSnapshot.BackendStateType.values().length);
Assert.assertEquals(0, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal());
Assert.assertEquals(1, StateMetaInfoSnapshot.BackendStateType.OPERATOR.ordinal());
Assert.assertEquals(2, StateMetaInfoSnapshot.BackendStateType.BROADCAST.ordinal());
Assert.assertEquals(3, StateMetaInfoSnapshot.BackendStateType.TIMER.ordinal());
Assert.assertEquals("KEY_VALUE", StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.toString());
Assert.assertEquals("OPERATOR", StateMetaInfoSnapshot.BackendStateType.OPERATOR.toString());
Assert.assertEquals("BROADCAST", StateMetaInfoSnapshot.BackendStateType.BROADCAST.toString());
Assert.assertEquals("TIMER", StateMetaInfoSnapshot.BackendStateType.TIMER.toString());
}
@Test
public void testFixedOptionsEnumConstants() {
Assert.assertEquals(2, StateMetaInfoSnapshot.CommonOptionsKeys.values().length);
Assert.assertEquals(0, StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.ordinal());
Assert.assertEquals(1, StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.ordinal());
Assert.assertEquals("KEYED_STATE_TYPE", StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString());
Assert.assertEquals("OPERATOR_STATE_DISTRIBUTION_MODE", StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString());
}
@Test
public void testFixedSerializerEnumConstants() {
Assert.assertEquals(3, StateMetaInfoSnapshot.CommonSerializerKeys.values().length);
Assert.assertEquals(0, StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.ordinal());
Assert.assertEquals(1, StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.ordinal());
Assert.assertEquals(2, StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.ordinal());
Assert.assertEquals("KEY_SERIALIZER", StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString());
Assert.assertEquals("NAMESPACE_SERIALIZER", StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString());
Assert.assertEquals("VALUE_SERIALIZER", StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString());
}
}
......@@ -58,12 +58,13 @@ class RocksDBFoldingState<K, N, T, ACC>
* @param foldFunction The fold function used for folding state.
* @param backend The backend for which this state is bind to.
*/
private RocksDBFoldingState(ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<ACC> valueSerializer,
ACC defaultValue,
FoldFunction<T, ACC> foldFunction,
RocksDBKeyedStateBackend<K> backend) {
private RocksDBFoldingState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<ACC> valueSerializer,
ACC defaultValue,
FoldFunction<T, ACC> foldFunction,
RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
......
......@@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
......@@ -230,7 +231,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
private final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
......@@ -337,6 +338,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = kvStateInformation.get(state);
......@@ -668,11 +670,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
List<StateMetaInfoSnapshot> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
......@@ -685,11 +687,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
rocksDBKeyedStateBackend.columnOptions);
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
restoredMetaInfo.getStateType(),
restoredMetaInfo.getName(),
restoredMetaInfo.getNamespaceSerializer(),
restoredMetaInfo.getStateSerializer());
new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
......@@ -796,7 +794,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception {
IncrementalLocalKeyedStateHandle localKeyedStateHandle;
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
List<ColumnFamilyDescriptor> columnFamilyDescriptors;
// Recovery from remote incremental state.
......@@ -930,13 +928,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
@Nonnull
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
RestoredDBInstance(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
@Nonnull List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
this.db = db;
this.columnFamilyHandles = columnFamilyHandles;
this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0);
......@@ -964,7 +962,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
// read meta data
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
readMetaData(restoreStateHandle.getMetaStateHandle());
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
......@@ -984,18 +982,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
ColumnFamilyDescriptor columnFamilyDescriptor,
ColumnFamilyHandle columnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot) throws RocksDBException {
StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
if (null == registeredStateMetaInfoEntry) {
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
stateMetaInfoSnapshot.getStateType(),
stateMetaInfoSnapshot.getName(),
stateMetaInfoSnapshot.getNamespaceSerializer(),
stateMetaInfoSnapshot.getStateSerializer());
new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
registeredStateMetaInfoEntry =
new Tuple2<>(
......@@ -1071,12 +1065,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state meta data snapshot.
*/
private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(stateMetaInfoSnapshots.size());
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
......@@ -1094,7 +1088,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void restoreLocalStateIntoFullInstance(
IncrementalLocalKeyedStateHandle restoreStateHandle,
List<ColumnFamilyDescriptor> columnFamilyDescriptors,
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception {
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception {
// pick up again the old backend id, so the we can reference existing state
stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
......@@ -1120,15 +1114,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);
for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredKeyedBackendStateMetaInfo<>(
stateMetaInfoSnapshot.getStateType(),
stateMetaInfoSnapshot.getName(),
stateMetaInfoSnapshot.getNamespaceSerializer(),
stateMetaInfoSnapshot.getStateSerializer());
new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
stateBackend.kvStateInformation.put(
stateMetaInfoSnapshot.getName(),
......@@ -1177,7 +1167,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Reads Flink's state meta data file from the state handle.
*/
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
private List<StateMetaInfoSnapshot> readMetaData(
StreamStateHandle metaStateHandle) throws Exception {
FSDataInputStream inputStream = null;
......@@ -1299,7 +1289,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
StateDescriptor<?, S> stateDesc,
TypeSerializer<N> namespaceSerializer) throws StateMigrationException, IOException {
TypeSerializer<N> namespaceSerializer) throws StateMigrationException {
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
kvStateInformation.get(stateDesc.getName());
......@@ -1308,8 +1298,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (stateInfo != null) {
@SuppressWarnings("unchecked")
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfoSnapshot =
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(stateDesc.getName());
StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName());
Preconditions.checkState(
restoredMetaInfoSnapshot != null,
......@@ -1946,7 +1935,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* The state meta data.
*/
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
/**
* The copied column handle.
......@@ -2291,7 +2280,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private Set<StateHandleID> baseSstFiles;
/** The state meta data. */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>();
/** Local directory for the RocksDB native backup. */
private SnapshotDirectory localBackupDirectory;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册