diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 710f5066d5c618e8bd154f0d2d1c377adba3c6f0..cbc275772abe1d60f7c423bcb36a7e546f180ac1 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -20,8 +20,11 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.KvState; import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; @@ -30,7 +33,6 @@ import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; /** @@ -56,7 +58,7 @@ public abstract class AbstractRocksDBState backend; /** The column family of this particular instance of state */ protected ColumnFamilyHandle columnFamily; @@ -69,14 +71,20 @@ public abstract class AbstractRocksDBState namespaceSerializer, SD stateDesc, - RocksDBKeyedStateBackend backend) { + RocksDBKeyedStateBackend backend) { this.namespaceSerializer = namespaceSerializer; this.backend = backend; @@ -85,31 +93,27 @@ public abstract class AbstractRocksDBState des = KvStateRequestSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, + backend.getKeySerializer(), + namespaceSerializer); + + int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0); + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1); + return backend.db.get(columnFamily, keySerializationStream.toByteArray()); + + } + + protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { + writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace); + } + + protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException { + keySerializationStream.reset(); + writeKeyGroup(keyGroup); + writeKey(key); + writeNameSpace(namespace); + } + + private void writeKeyGroup(int keyGroup) throws IOException { + for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { + keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); + } + } + + private void writeKey(K key) throws IOException { + //write key + int beforeWrite = (int) keySerializationStream.getPosition(); + backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); + + if (ambiguousKeyPossible) { + //write size of key + writeLengthFrom(beforeWrite); + } + } + + private void writeNameSpace(N namespace) throws IOException { + int beforeWrite = (int) keySerializationStream.getPosition(); + namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); - if (value != null) { - return value; - } else { - return null; + if (ambiguousKeyPossible) { + //write length of namespace + writeLengthFrom(beforeWrite); } } + private void writeLengthFrom(int fromPosition) throws IOException { + int length = (int) (keySerializationStream.getPosition() - fromPosition); + writeVariableIntBytes(length); + } + + private void writeVariableIntBytes(int value) throws IOException { + do { + keySerializationDateDataOutputView.writeByte(value); + value >>>= 8; + } while (value != 0); + } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 8c0799b56818ff384c2e5efc003e9938baf3255d..3018f7b5a1372968f3238badc0a059314e6eed23 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -29,7 +29,6 @@ import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; /** @@ -66,7 +65,7 @@ public class RocksDBFoldingState public RocksDBFoldingState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, FoldingStateDescriptor stateDesc, - RocksDBKeyedStateBackend backend) { + RocksDBKeyedStateBackend backend) { super(columnFamily, namespaceSerializer, stateDesc, backend); @@ -79,11 +78,9 @@ public class RocksDBFoldingState @Override public ACC get() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return null; @@ -96,23 +93,21 @@ public class RocksDBFoldingState @Override public void add(T value) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); - + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); if (valueBytes == null) { - baos.reset(); + keySerializationStream.reset(); valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out); - backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } else { ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); ACC newValue = foldFunction.fold(oldValue, value); - baos.reset(); + keySerializationStream.reset(); valueSerializer.serialize(newValue, out); - backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 63f1fa2aedf57526bf080ec1b5c56e5195332974..a1634b2184389f624f75891328c9b7c26a8eb92e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -29,30 +30,49 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; +import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; +import java.util.PriorityQueue; +import java.util.concurrent.RunnableFuture; /** * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to @@ -79,26 +99,30 @@ public class RocksDBKeyedStateBackend extends KeyedStateBackend { /** Path where this configured instance stores its RocksDB data base */ private final File instanceRocksDBPath; + /** + * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous + * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try + * iterating over a disposed db. + */ + private final SerializableObject dbDisposeLock = new SerializableObject(); + /** * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState} * to store state. The different k/v states that we have don't each have their own RocksDB * instance. They all write to this instance but to their own column family. */ + @GuardedBy("dbDisposeLock") protected volatile RocksDB db; - /** - * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous - * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try - * iterating over a disposed db. - */ - private final SerializableObject dbCleanupLock = new SerializableObject(); - /** * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ private Map> kvStateInformation; + /** Number of bytes required to prefix the key groups. */ + private final int keyGroupPrefixBytes; + public RocksDBKeyedStateBackend( JobID jobId, String operatorIdentifier, @@ -108,7 +132,7 @@ public class RocksDBKeyedStateBackend extends KeyedStateBackend { TaskKvStateRegistry kvStateRegistry, TypeSerializer keySerializer, KeyGroupAssigner keyGroupAssigner, - KeyGroupRange keyGroupRange + KeyGroupRange keyGroupRange ) throws Exception { super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange); @@ -147,35 +171,543 @@ public class RocksDBKeyedStateBackend extends KeyedStateBackend { } catch (RocksDBException e) { throw new RuntimeException("Error while opening RocksDB instance.", e); } - + keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; kvStateInformation = new HashMap<>(); } + public RocksDBKeyedStateBackend( + JobID jobId, + String operatorIdentifier, + File instanceBasePath, + DBOptions dbOptions, + ColumnFamilyOptions columnFamilyOptions, + TaskKvStateRegistry kvStateRegistry, + TypeSerializer keySerializer, + KeyGroupAssigner keyGroupAssigner, + KeyGroupRange keyGroupRange, + List restoreState + ) throws Exception { + this( + jobId, + operatorIdentifier, + instanceBasePath, + dbOptions, + columnFamilyOptions, + kvStateRegistry, + keySerializer, + keyGroupAssigner, + keyGroupRange); + + LOG.info("Initializing RocksDB keyed state backend from snapshot."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); + } + + RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this); + restoreOperation.doRestore(restoreState); + } + + /** + * @see java.io.Closeable + * + * Should only be called by one thread. + * + * @throws Exception + */ @Override public void close() throws Exception { super.close(); - // we have to lock because we might have an asynchronous checkpoint going on - synchronized (dbCleanupLock) { - if (db != null) { - for (Tuple2 column : kvStateInformation.values()) { - column.f0.dispose(); - } + final RocksDB cleanupRockDBReference; - db.dispose(); - db = null; + // Acquire the log on dbDisposeLock, so that no ongoing snapshots access the db during cleanup + synchronized (dbDisposeLock) { + // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as + // working on the disposed object results in SEGFAULTS. Other code has to check field #db for null + // and access it in a synchronized block that locks on #dbDisposeLock. + cleanupRockDBReference = db; + db = null; + } + + // Dispose decoupled db + if (cleanupRockDBReference != null) { + for (Tuple2 column : kvStateInformation.values()) { + column.f0.dispose(); } + cleanupRockDBReference.dispose(); } FileUtils.deleteDirectory(instanceBasePath); } + public int getKeyGroupPrefixBytes() { + return keyGroupPrefixBytes; + } + + /** + * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and + * is also stopped when the backend is closed through {@link #close()}. For each backend, this method must always + * be called by the same thread. + * + * @param checkpointId The Id of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @param streamFactory The factory that we can use for writing our state to streams. + * + * @return Future to the state handle of the snapshot data. + * @throws Exception + */ @Override - public Future snapshot( - long checkpointId, - long timestamp, - CheckpointStreamFactory streamFactory) throws Exception { - throw new RuntimeException("Not implemented."); + public RunnableFuture snapshot( + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory) throws Exception { + + long startTime = System.currentTimeMillis(); + + if (kvStateInformation.isEmpty()) { + LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); + return new DoneFuture<>(null); + } + + final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory); + // hold the db lock while operation on the db to guard us against async db disposal + synchronized (dbDisposeLock) { + if (db != null) { + snapshotOperation.takeDBSnapShot(checkpointId, timestamp); + } else { + throw new IOException("RocksDB closed."); + } + } + + // implementation of the async IO operation, based on FutureTask + AbstractAsyncIOCallable ioCallable = + new AbstractAsyncIOCallable() { + + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + snapshotOperation.openCheckpointStream(); + return snapshotOperation.getOutStream(); + } + + @Override + public KeyGroupsStateHandle performOperation() throws Exception { + long startTime = System.currentTimeMillis(); + try { + // hold the db lock while operation on the db to guard us against async db disposal + synchronized (dbDisposeLock) { + if (db != null) { + snapshotOperation.writeDBSnapshot(); + } else { + throw new IOException("RocksDB closed."); + } + } + + } finally { + snapshotOperation.closeCheckpointStream(); + } + + LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " + + Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); + + return snapshotOperation.getSnapshotResultStateHandle(); + } + + @Override + public void done() { + // hold the db lock while operation on the db to guard us against async db disposal + synchronized (dbDisposeLock) { + if (db != null) { + snapshotOperation.releaseDBSnapshot(); + } + } + } + }; + + LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " + + Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); + + return AsyncStoppableTaskWithCallback.from(ioCallable); + } + + /** + * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. + */ + static final class RocksDBSnapshotOperation { + + static final int FIRST_BIT_IN_BYTE_MASK = 0x80; + static final int END_OF_KEY_GROUP_MARK = 0xFFFF; + + private final RocksDBKeyedStateBackend stateBackend; + private final KeyGroupRangeOffsets keyGroupRangeOffsets; + private final CheckpointStreamFactory checkpointStreamFactory; + + private long checkpointId; + private long checkpointTimeStamp; + + private Snapshot snapshot; + private CheckpointStreamFactory.CheckpointStateOutputStream outStream; + private DataOutputView outputView; + private List> kvStateIterators; + private KeyGroupsStateHandle snapshotResultStateHandle; + + + + public RocksDBSnapshotOperation( + RocksDBKeyedStateBackend stateBackend, + CheckpointStreamFactory checkpointStreamFactory) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); + } + + /** + * 1) Create a snapshot object from RocksDB. + * + * @param checkpointId id of the checkpoint for which we take the snapshot + * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot + */ + public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) throws IOException { + Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); + this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); + this.checkpointId = checkpointId; + this.checkpointTimeStamp = checkpointTimeStamp; + this.snapshot = stateBackend.db.getSnapshot(); + } + + /** + * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. + * + * @throws Exception + */ + public void openCheckpointStream() throws Exception { + Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); + outStream = checkpointStreamFactory. + createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); + outputView = new DataOutputViewStreamWrapper(outStream); + } + + /** + * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). + * + * @return + * @throws IOException + */ + public void writeDBSnapshot() throws IOException, InterruptedException { + Preconditions.checkNotNull(snapshot, "No ongoing snapshot to write."); + Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); + writeKVStateMetaData(); + writeKVStateData(); + } + + /** + * 4) Close the CheckpointStateOutputStream after writing and receive a state handle. + * + * @throws IOException + */ + public void closeCheckpointStream() throws IOException { + if(outStream != null) { + snapshotResultStateHandle = closeSnapshotStreamAndGetHandle(); + } + } + + /** + * 5) Release the snapshot object for RocksDB and clean up. + * + */ + public void releaseDBSnapshot() { + Preconditions.checkNotNull(snapshot, "No ongoing snapshot to release."); + stateBackend.db.releaseSnapshot(snapshot); + snapshot = null; + outStream = null; + outputView = null; + kvStateIterators = null; + } + + /** + * Returns the current CheckpointStateOutputStream (when it was opened and not yet closed) into which we write + * the state snapshot. + * + * @return the current CheckpointStateOutputStream + */ + public CheckpointStreamFactory.CheckpointStateOutputStream getOutStream() { + return outStream; + } + + /** + * Returns a state handle to the snapshot after the snapshot procedure is completed and null before. + * + * @return state handle to the completed snapshot + */ + public KeyGroupsStateHandle getSnapshotResultStateHandle() { + return snapshotResultStateHandle; + } + + private void writeKVStateMetaData() throws IOException, InterruptedException { + //write number of k/v states + outputView.writeInt(stateBackend.kvStateInformation.size()); + + int kvStateId = 0; + //iterate all column families, where each column family holds one k/v state, to write the metadata + for (Map.Entry> column : stateBackend.kvStateInformation.entrySet()) { + + //be cooperative and check for interruption from time to time in the hot loop + checkInterrupted(); + + //write StateDescriptor for this k/v state + ObjectOutputStream ooOut = new ObjectOutputStream(outStream); + ooOut.writeObject(column.getValue().f1); + //retrieve iterator for this k/v states + ReadOptions readOptions = new ReadOptions(); + readOptions.setSnapshot(snapshot); + RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions); + kvStateIterators.add(new Tuple2(iterator, kvStateId)); + ++kvStateId; + } + } + + private void writeKVStateData() throws IOException, InterruptedException { + + RocksDBMergeIterator iterator = new RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes); + + byte[] previousKey = null; + byte[] previousValue = null; + + //preamble: setup with first key-group as our lookahead + if (iterator.isValid()) { + //begin first key-group by recording the offset + keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos()); + //write the k/v-state id as metadata + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(iterator.kvStateId()); + previousKey = iterator.key(); + previousValue = iterator.value(); + iterator.next(); + } + + //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. + while (iterator.isValid()) { + + assert (!hasMetaDataFollowsFlag(previousKey)); + + //set signal in first key byte that meta data will follow in the stream after this k/v pair + if (iterator.isNewKeyGroup() || iterator.isNewKeyValueState()) { + + //be cooperative and check for interruption from time to time in the hot loop + checkInterrupted(); + + setMetaDataFollowsFlagInKey(previousKey); + } + + writeKeyValuePair(previousKey, previousValue); + + //write meta data if we have to + if (iterator.isNewKeyGroup()) { + // + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(END_OF_KEY_GROUP_MARK); + //begin new key-group + keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos()); + //write the kev-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(iterator.kvStateId()); + } else if (iterator.isNewKeyValueState()) { + //write the k/v-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(iterator.kvStateId()); + } + + //request next k/v pair + previousKey = iterator.key(); + previousValue = iterator.value(); + iterator.next(); + } + + //epilogue: write last key-group + if (previousKey != null) { + assert (!hasMetaDataFollowsFlag(previousKey)); + setMetaDataFollowsFlagInKey(previousKey); + writeKeyValuePair(previousKey, previousValue); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + outputView.writeShort(END_OF_KEY_GROUP_MARK); + } + } + + private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException { + StreamStateHandle stateHandle = outStream.closeAndGetHandle(); + outStream = null; + if (stateHandle != null) { + return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } else { + throw new IOException("Output stream returned null on close."); + } + } + + private void writeKeyValuePair(byte[] key, byte[] value) throws IOException { + BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView); + BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView); + } + + static void setMetaDataFollowsFlagInKey(byte[] key) { + key[0] |= FIRST_BIT_IN_BYTE_MASK; + } + + static void clearMetaDataFollowsFlag(byte[] key) { + key[0] &= (~RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + } + + static boolean hasMetaDataFollowsFlag(byte[] key) { + return 0 != (key[0] & RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + } + + private static void checkInterrupted() throws InterruptedException { + if(Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Snapshot canceled."); + } + } + } + + /** + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. + */ + static final class RocksDBRestoreOperation { + + private final RocksDBKeyedStateBackend rocksDBKeyedStateBackend; + + /** Current key-groups state handle from which we restore key-groups */ + private KeyGroupsStateHandle currentKeyGroupsStateHandle; + /** Current input stream we obtained from currentKeyGroupsStateHandle */ + private FSDataInputStream currentStateHandleInStream; + /** Current data input view that wraps currentStateHandleInStream */ + private DataInputView currentStateHandleInView; + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle */ + private List currentStateHandleKVStateColumnFamilies; + + /** + * Creates a restore operation object for the given state backend instance. + * + * @param rocksDBKeyedStateBackend the state backend into which we restore + */ + public RocksDBRestoreOperation(RocksDBKeyedStateBackend rocksDBKeyedStateBackend) { + this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); + } + + /** + * Restores all key-groups data that is referenced by the passed state handles. + * + * @param keyGroupsStateHandles List of all key groups state handles that shall be restored. + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException + */ + public void doRestore(List keyGroupsStateHandles) + throws IOException, ClassNotFoundException, RocksDBException { + + for (KeyGroupsStateHandle keyGroupsStateHandle : keyGroupsStateHandles) { + if (keyGroupsStateHandle != null) { + this.currentKeyGroupsStateHandle = keyGroupsStateHandle; + restoreKeyGroupsInStateHandle(); + } + } + } + + /** + * Restore one key groups state handle + * + * @throws IOException + * @throws RocksDBException + * @throws ClassNotFoundException + */ + private void restoreKeyGroupsInStateHandle() + throws IOException, RocksDBException, ClassNotFoundException { + try { + currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream(); + currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); + restoreKVStateMetaData(); + restoreKVStateData(); + } finally { + if(currentStateHandleInStream != null) { + currentStateHandleInStream.close(); + } + } + + } + + /** + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle + * + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException + */ + private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { + //read number of k/v states + int numColumns = currentStateHandleInView.readInt(); + + //those two lists are aligned and should later have the same size! + currentStateHandleKVStateColumnFamilies = new ArrayList<>(numColumns); + + //restore the empty columns for the k/v states through the metadata + for (int i = 0; i < numColumns; i++) { + ObjectInputStream ooIn = new ObjectInputStream(currentStateHandleInStream); + StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject(); + Tuple2 columnFamily = rocksDBKeyedStateBackend. + kvStateInformation.get(stateDescriptor.getName()); + + if(null == columnFamily) { + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions); + + columnFamily = new Tuple2<>(rocksDBKeyedStateBackend.db. + createColumnFamily(columnFamilyDescriptor), stateDescriptor); + rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily); + } + + currentStateHandleKVStateColumnFamilies.add(columnFamily.f0); + } + } + + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle + * + * @throws IOException + * @throws RocksDBException + */ + private void restoreKVStateData() throws IOException, RocksDBException { + //for all key-groups in the current state handle... + for (Tuple2 keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + long offset = keyGroupOffset.f1; + //not empty key-group? + if (0L != offset) { + currentStateHandleInStream.seek(offset); + boolean keyGroupHasMoreKeys = true; + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + int kvStateId = currentStateHandleInView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView); + if (RocksDBSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK + & currentStateHandleInView.readShort(); + if (RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } + } else { + rocksDBKeyedStateBackend.db.put(handle, key, value); + } + } + } + } + } } // ------------------------------------------------------------------------ @@ -197,12 +729,14 @@ public class RocksDBKeyedStateBackend extends KeyedStateBackend { if (stateInfo != null) { if (!stateInfo.f1.equals(descriptor)) { - throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor); + throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + + " trying access with " + descriptor); } return stateInfo.f0; } - ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), columnOptions); + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor( + descriptor.getName().getBytes(), columnOptions); try { ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor); @@ -248,4 +782,206 @@ public class RocksDBKeyedStateBackend extends KeyedStateBackend { return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); } + + /** + * Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator. + * Used by #MergeIterator. + */ + static final class MergeIterator { + + /** + * + * @param iterator The #RocksIterator to wrap . + * @param kvStateId Id of the K/V state to which this iterator belongs. + */ + public MergeIterator(RocksIterator iterator, int kvStateId) { + this.iterator = Preconditions.checkNotNull(iterator); + this.currentKey = iterator.key(); + this.kvStateId = kvStateId; + } + + private byte[] currentKey; + private final RocksIterator iterator; + private final int kvStateId; + + public byte[] getCurrentKey() { + return currentKey; + } + + public void setCurrentKey(byte[] currentKey) { + this.currentKey = currentKey; + } + + public RocksIterator getIterator() { + return iterator; + } + + public int getKvStateId() { + return kvStateId; + } + } + + /** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ + static final class RocksDBMergeIterator { + + private final PriorityQueue heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + + private MergeIterator currentSubIterator; + + RocksDBMergeIterator(List> kvStateIterators, final int keyGroupPrefixByteCount) throws IOException { + Preconditions.checkNotNull(kvStateIterators); + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator iteratorComparator = new Comparator() { + @Override + public int compare(MergeIterator o1, MergeIterator o2) { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.currentKey, o2.currentKey, keyGroupPrefixByteCount); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + } + }; + + if (kvStateIterators.size() > 0) { + this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + + for (Tuple2 rocksIteratorWithKVStateId : kvStateIterators) { + RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; + rocksIterator.seekToFirst(); + if (rocksIterator.isValid()) { + heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } + } + this.valid = !heap.isEmpty(); + this.currentSubIterator = heap.poll(); + } else { + // creating a PriorityQueue of size 0 results in an exception. + this.heap = null; + this.valid = false; + } + + this.newKeyGroup = true; + this.newKVState = true; + } + + /** + * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after + * calls to {@link #next()}. + */ + public void next() { + newKeyGroup = false; + newKVState = false; + + final RocksIterator rocksIterator = currentSubIterator.getIterator(); + rocksIterator.next(); + + byte[] oldKey = currentSubIterator.getCurrentKey(); + if (rocksIterator.isValid()) { + currentSubIterator.currentKey = rocksIterator.key(); + + if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { + heap.offer(currentSubIterator); + currentSubIterator = heap.poll(); + newKVState = currentSubIterator.getIterator() != rocksIterator; + detectNewKeyGroup(oldKey); + } + } else if (heap.isEmpty()) { + valid = false; + } else { + currentSubIterator = heap.poll(); + newKVState = true; + detectNewKeyGroup(oldKey); + } + + } + + private boolean isDifferentKeyGroup(byte[] a, byte[] b) { + return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount); + } + + private void detectNewKeyGroup(byte[] oldKey) { + if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) { + newKeyGroup = true; + } + } + + /** + * Returns the key-group for the current key. + * @return key-group for the current key + */ + public int keyGroup() { + int result = 0; + //big endian decode + for (int i = 0; i < keyGroupPrefixByteCount; ++i) { + result <<= 8; + result |= (currentSubIterator.currentKey[i] & 0xFF); + } + return result; + } + + public byte[] key() { + return currentSubIterator.getCurrentKey(); + } + + public byte[] value() { + return currentSubIterator.getIterator().value(); + } + + /** + * Returns the Id of the k/v state to which the current key belongs. + * @return Id of K/V state to which the current key belongs. + */ + public int kvStateId() { + return currentSubIterator.getKvStateId(); + } + + /** + * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor. + * @return true iff the current key belong to a different k/v-state than it's predecessor. + */ + public boolean isNewKeyValueState() { + return newKVState; + } + + /** + * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor. + * @return true iff the current key belong to a different key-group than it's predecessor. + */ + public boolean isNewKeyGroup() { + return newKeyGroup; + } + + /** + * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as + * {@link #next()} should only be called if valid returned true. Should be checked after each call to + * {@link #next()} before accessing iterator state. + * @return True iff this iterator is valid. + */ + public boolean isValid() { + return valid; + } + + private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) { + for (int i = 0; i < len; ++i) { + int diff = (a[i] & 0xFF) - (b[i] & 0xFF); + if (diff != 0) { + return diff; + } + } + return 0; + } + } + + /** + * Only visible for testing, DO NOT USE. + */ + public File getInstanceBasePath() { + return instanceBasePath; + } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index d8f937b7cf238919960a98dbc55317f92d9ccb9b..beea81a87e0cbc7b3c3a3ef5008690ab7540f73f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -28,7 +28,6 @@ import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -67,7 +66,7 @@ public class RocksDBListState public RocksDBListState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc, - RocksDBKeyedStateBackend backend) { + RocksDBKeyedStateBackend backend) { super(columnFamily, namespaceSerializer, stateDesc, backend); this.valueSerializer = stateDesc.getSerializer(); @@ -78,11 +77,9 @@ public class RocksDBListState @Override public Iterable get() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { @@ -107,16 +104,13 @@ public class RocksDBListState @Override public void add(V value) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); - - baos.reset(); - + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + keySerializationStream.reset(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); valueSerializer.serialize(value, out); - backend.db.merge(columnFamily, writeOptions, key, baos.toByteArray()); + backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index 15ae4933a1d65c4dcbf486f87b6454a5e3602521..068c051b524607007ac609eb2f918d9b8e5ccad0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -29,7 +29,6 @@ import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; /** @@ -65,7 +64,7 @@ public class RocksDBReducingState public RocksDBReducingState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc, - RocksDBKeyedStateBackend backend) { + RocksDBKeyedStateBackend backend) { super(columnFamily, namespaceSerializer, stateDesc, backend); this.valueSerializer = stateDesc.getSerializer(); @@ -77,11 +76,9 @@ public class RocksDBReducingState @Override public V get() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return null; @@ -94,23 +91,22 @@ public class RocksDBReducingState @Override public void add(V value) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); if (valueBytes == null) { - baos.reset(); + keySerializationStream.reset(); valueSerializer.serialize(value, out); - backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } else { V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); V newValue = reduceFunction.reduce(oldValue, value); - baos.reset(); + keySerializationStream.reset(); valueSerializer.serialize(newValue, out); - backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 62b71d90265bdc06b1ba37167158fad2263a2ec4..f950751ac05105e52a164d0d062b70e6ad664b0f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -221,12 +221,6 @@ public class RocksDBStateBackend extends AbstractStateBackend { @Override public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { - return null; - } - - if (fullyAsyncBackup) { - return performFullyAsyncSnapshot(checkpointId, timestamp); - } else { return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier); } @@ -261,10 +255,24 @@ public class RocksDBStateBackend extends AbstractStateBackend { String operatorIdentifier, TypeSerializer keySerializer, KeyGroupAssigner keyGroupAssigner, - KeyGroupRange keyGroupRange, + KeyGroupRange keyGroupRange, List restoredState, TaskKvStateRegistry kvStateRegistry) throws Exception { - throw new RuntimeException("Not implemented."); + + lazyInitializeForJob(env, operatorIdentifier); + + File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString()); + return new RocksDBKeyedStateBackend<>( + jobID, + operatorIdentifier, + instanceBasePath, + getDbOptions(), + getColumnOptions(), + kvStateRegistry, + keySerializer, + keyGroupAssigner, + keyGroupRange, + restoredState); } // ------------------------------------------------------------------------ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index b9c0e83af359322f3011d51d375d8095702aa439..9563ed80efdb5c4fce175bb9fa4664ea61bb93e3 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -24,13 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; import org.rocksdb.WriteOptions; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; /** @@ -63,7 +61,7 @@ public class RocksDBValueState public RocksDBValueState(ColumnFamilyHandle columnFamily, TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc, - RocksDBKeyedStateBackend backend) { + RocksDBKeyedStateBackend backend) { super(columnFamily, namespaceSerializer, stateDesc, backend); this.valueSerializer = stateDesc.getSerializer(); @@ -74,11 +72,9 @@ public class RocksDBValueState @Override public V value() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { return stateDesc.getDefaultValue(); @@ -95,14 +91,13 @@ public class RocksDBValueState clear(); return; } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); try { - writeKeyAndNamespace(out); - byte[] key = baos.toByteArray(); - baos.reset(); + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + keySerializationStream.reset(); valueSerializer.serialize(value, out); - backend.db.put(columnFamily, writeOptions, key, baos.toByteArray()); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } catch (Exception e) { throw new RuntimeException("Error while adding data to RocksDB", e); } @@ -110,11 +105,7 @@ public class RocksDBValueState @Override public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { - // Serialized key and namespace is expected to be of the same format - // as writeKeyAndNamespace() - Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - - byte[] value = backend.db.get(columnFamily, serializedKeyAndNamespace); + byte[] value = super.getSerializedValue(serializedKeyAndNamespace); if (value != null) { return value; diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java similarity index 67% rename from flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java rename to flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 0e35b60fe38a98708cd324f2960770a5d7eb39d2..624905ce92e4f8ac3eaef16f7748cf8d09856062 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -18,6 +18,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; @@ -28,17 +29,20 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; @@ -47,9 +51,9 @@ import org.apache.flink.util.OperatingSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; @@ -58,13 +62,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.File; +import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; +import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.CancellationException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * Tests for asynchronous RocksDB Key/Value state checkpoints. @@ -73,7 +79,7 @@ import static org.junit.Assert.assertTrue; @PrepareForTest({ResultPartitionWriter.class, FileSystem.class}) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) @SuppressWarnings("serial") -public class RocksDBAsyncKVSnapshotTest { +public class RocksDBAsyncSnapshotTest { @Before public void checkOperatingSystem() { @@ -88,14 +94,12 @@ public class RocksDBAsyncKVSnapshotTest { * test will simply lock forever. */ @Test - public void testAsyncCheckpoints() throws Exception { + public void testFullyAsyncSnapshot() throws Exception { + LocalFileSystem localFS = new LocalFileSystem(); localFS.initialize(new URI("file:///"), new Configuration()); PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS); - final OneShotLatch delayCheckpointLatch = new OneShotLatch(); - final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); - final OneInputStreamTask task = new OneInputStreamTask<>(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); @@ -119,12 +123,15 @@ public class RocksDBAsyncKVSnapshotTest { streamConfig.setStreamOperator(new AsyncCheckpointOperator()); + final OneShotLatch delayCheckpointLatch = new OneShotLatch(); + final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); + StreamMockEnvironment mockEnv = new StreamMockEnvironment( - testHarness.jobConfig, - testHarness.taskConfig, - testHarness.memorySize, - new MockInputSplitProvider(), - testHarness.bufferSize) { + testHarness.jobConfig, + testHarness.taskConfig, + testHarness.memorySize, + new MockInputSplitProvider(), + testHarness.bufferSize) { @Override public void acknowledgeCheckpoint(long checkpointId) { @@ -133,8 +140,8 @@ public class RocksDBAsyncKVSnapshotTest { @Override public void acknowledgeCheckpoint(long checkpointId, - ChainedStateHandle chainedStateHandle, - List keyGroupStateHandles) { + ChainedStateHandle chainedStateHandle, + List keyGroupStateHandles) { super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles); // block on the latch, to verify that triggerCheckpoint returns below, @@ -145,9 +152,7 @@ public class RocksDBAsyncKVSnapshotTest { e.printStackTrace(); } - // should be only one k/v state - assertEquals(1, keyGroupStateHandles.size()); // we now know that the checkpoint went through @@ -183,23 +188,20 @@ public class RocksDBAsyncKVSnapshotTest { } /** - * This ensures that asynchronous state handles are actually materialized asynchonously. - * - *

We use latches to block at various stages and see if the code still continues through - * the parts that are not asynchronous. If the checkpoint is not done asynchronously the - * test will simply lock forever. + * This tests ensures that canceling of asynchronous snapshots works as expected and does not block. + * @throws Exception */ @Test - public void testFullyAsyncCheckpoints() throws Exception { + public void testCancelFullyAsyncCheckpoints() throws Exception { LocalFileSystem localFS = new LocalFileSystem(); localFS.initialize(new URI("file:///"), new Configuration()); PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS); - final OneShotLatch delayCheckpointLatch = new OneShotLatch(); - final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); - final OneInputStreamTask task = new OneInputStreamTask<>(); + //ensure that the async threads complete before invoke method of the tasks returns. + task.setThreadPoolTerminationTimeout(Long.MAX_VALUE); + final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); testHarness.configureForKeyedStream(new KeySelector() { @@ -214,9 +216,10 @@ public class RocksDBAsyncKVSnapshotTest { File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state"); File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots"); - RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); + BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend(); + + RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend); backend.setDbStoragePath(dbDir.getAbsolutePath()); -// backend.enableFullyAsyncSnapshots(); streamConfig.setStateBackend(backend); @@ -227,34 +230,7 @@ public class RocksDBAsyncKVSnapshotTest { testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), - testHarness.bufferSize) { - - @Override - public void acknowledgeCheckpoint(long checkpointId) { - super.acknowledgeCheckpoint(checkpointId); - } - - @Override - public void acknowledgeCheckpoint(long checkpointId, - ChainedStateHandle chainedStateHandle, - List keyGroupStateHandles) { - super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles); - - // block on the latch, to verify that triggerCheckpoint returns below, - // even though the async checkpoint would not finish - try { - delayCheckpointLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - // should be only one k/v state - assertEquals(1, keyGroupStateHandles.size()); - - // we now know that the checkpoint went through - ensureCheckpointLatch.trigger(); - } - }; + testHarness.bufferSize); testHarness.invoke(mockEnv); @@ -273,19 +249,110 @@ public class RocksDBAsyncKVSnapshotTest { task.triggerCheckpoint(42, 17); - // now we allow the checkpoint - delayCheckpointLatch.trigger(); + BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await(); + task.cancel(); - // wait for the checkpoint to go through - ensureCheckpointLatch.await(); + BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger(); testHarness.endInput(); - testHarness.waitForTaskCompletion(); + try { + testHarness.waitForTaskCompletion(); + Assert.fail("Operation completed. Cancel failed."); + } catch (Exception expected) { + // we expect the exception from canceling snapshots + Throwable cause = expected.getCause(); + if(cause instanceof AsynchronousException) { + AsynchronousException asynchronousException = (AsynchronousException) cause; + cause = asynchronousException.getCause(); + Assert.assertTrue("Unexpected Exception: " + cause, + cause instanceof CancellationException //future canceled + || cause instanceof InterruptedException); //thread interrupted + + } else { + Assert.fail(); + } + } } + @Test + public void testConsistentSnapshotSerializationFlagsAndMasks() { + + Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK); + Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + + byte[] expectedKey = new byte[] {42, 42}; + byte[] modKey = expectedKey.clone(); + + Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + + RocksDBKeyedStateBackend.RocksDBSnapshotOperation.setMetaDataFollowsFlagInKey(modKey); + Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + + RocksDBKeyedStateBackend.RocksDBSnapshotOperation.clearMetaDataFollowsFlag(modKey); + Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + + Assert.assertTrue(Arrays.equals(expectedKey, modKey)); + } // ------------------------------------------------------------------------ + /** + * Creates us a CheckpointStateOutputStream that blocks write ops on a latch to delay writing of snapshots. + */ + static class BlockingStreamMemoryStateBackend extends MemoryStateBackend { + + public static OneShotLatch waitFirstWriteLatch = new OneShotLatch(); + + public static OneShotLatch unblockCancelLatch = new OneShotLatch(); + + volatile boolean closed = false; + + @Override + public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { + return new MemCheckpointStreamFactory(4 * 1024 * 1024) { + @Override + public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + + return new MemoryCheckpointOutputStream(4 * 1024 * 1024) { + @Override + public void write(int b) throws IOException { + waitFirstWriteLatch.trigger(); + try { + unblockCancelLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if(closed) { + throw new IOException("Stream closed."); + } + super.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + waitFirstWriteLatch.trigger(); + try { + unblockCancelLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if(closed) { + throw new IOException("Stream closed."); + } + super.write(b, off, len); + } + + @Override + public void close() { + closed = true; + super.close(); + } + }; + } + }; + } + } + public static class AsyncCheckpointOperator extends AbstractStreamOperator implements OneInputStreamOperator { diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1cb3b2b6be4b2015b4d121cd800924ec2f04dbf6 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +public class RocksDBMergeIteratorTest { + + private static final int NUM_KEY_VAL_STATES = 50; + private static final int MAX_NUM_KEYS = 20; + + @Test + public void testEmptyMergeIterator() throws IOException { + RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator = + new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2); + Assert.assertFalse(emptyIterator.isValid()); + } + + @Test + public void testMergeIterator() throws Exception { + Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE); + + testMergeIterator(Byte.MAX_VALUE); + testMergeIterator(Short.MAX_VALUE); + } + + public void testMergeIterator(int maxParallelism) throws Exception { + Random random = new Random(1234); + + File tmpDir = CommonTestUtils.createTempDirectory(); + + RocksDB rocksDB = RocksDB.open(tmpDir.getAbsolutePath()); + try { + List> rocksIteratorsWithKVStateId = new ArrayList<>(); + List> columnFamilyHandlesWithKeyCount = new ArrayList<>(); + + int totalKeysExpected = 0; + + for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) { + ColumnFamilyHandle handle = rocksDB.createColumnFamily( + new ColumnFamilyDescriptor(("column-" + c).getBytes())); + + ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos(); + DataOutputStream dos = new DataOutputStream(bos); + + int numKeys = random.nextInt(MAX_NUM_KEYS + 1); + + for (int i = 0; i < numKeys; ++i) { + if (maxParallelism <= Byte.MAX_VALUE) { + dos.writeByte(i); + } else { + dos.writeShort(i); + } + dos.writeInt(i); + byte[] key = bos.toByteArray(); + byte[] val = new byte[]{42}; + rocksDB.put(handle, key, val); + + bos.reset(); + } + columnFamilyHandlesWithKeyCount.add(new Tuple2<>(handle, numKeys)); + totalKeysExpected += numKeys; + } + + int id = 0; + for (Tuple2 columnFamilyHandle : columnFamilyHandlesWithKeyCount) { + rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id)); + ++id; + } + + RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, maxParallelism <= Byte.MAX_VALUE ? 1 : 2); + + int prevKVState = -1; + int prevKey = -1; + int prevKeyGroup = -1; + int totalKeysActual = 0; + + while (mergeIterator.isValid()) { + ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key()); + + int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get(); + int key = bb.getInt(); + + Assert.assertTrue(keyGroup >= prevKeyGroup); + Assert.assertTrue(key >= prevKey); + Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup()); + Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState()); + + prevKeyGroup = keyGroup; + prevKVState = mergeIterator.kvStateId(); + + //System.out.println(keyGroup + " " + key + " " + mergeIterator.kvStateId()); + mergeIterator.next(); + ++totalKeysActual; + } + + Assert.assertEquals(totalKeysExpected, totalKeysActual); + + for (Tuple2 handleWithCount : columnFamilyHandlesWithKeyCount) { + rocksDB.dropColumnFamily(handleWithCount.f0); + } + } finally { + rocksDB.close(); + } + } + +} diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 6f4a983b89e250ec209b76af5189f7093791c3d5..acf6cb80ac27ed4a527b07efb52536913b479238 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -1,322 +1,368 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.flink.contrib.streaming.state; -// -//import org.apache.commons.io.FileUtils; -//import org.apache.flink.api.common.JobID; -//import org.apache.flink.api.common.TaskInfo; -//import org.apache.flink.api.common.state.ValueStateDescriptor; -//import org.apache.flink.api.common.typeutils.TypeSerializer; -//import org.apache.flink.api.common.typeutils.base.IntSerializer; -//import org.apache.flink.runtime.execution.Environment; -//import org.apache.flink.runtime.io.disk.iomanager.IOManager; -//import org.apache.flink.runtime.state.AbstractStateBackend; -// -//import org.apache.flink.runtime.state.VoidNamespace; -//import org.apache.flink.runtime.state.VoidNamespaceSerializer; -//import org.apache.flink.util.OperatingSystem; -//import org.junit.Assume; -//import org.junit.Before; -//import org.junit.Test; -// -//import org.rocksdb.ColumnFamilyOptions; -//import org.rocksdb.CompactionStyle; -//import org.rocksdb.DBOptions; -// -//import java.io.File; -//import java.util.UUID; -// -//import static org.junit.Assert.*; -//import static org.mockito.Mockito.*; -// -///** -// * Tests for configuring the RocksDB State Backend -// */ -//@SuppressWarnings("serial") -//public class RocksDBStateBackendConfigTest { -// -// private static final String TEMP_URI = new File(System.getProperty("java.io.tmpdir")).toURI().toString(); -// -// @Before -// public void checkOperatingSystem() { -// Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); -// } -// -// // ------------------------------------------------------------------------ -// // RocksDB local file directory -// // ------------------------------------------------------------------------ -// -// @Test -// public void testSetDbPath() throws Exception { -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// -// assertNull(rocksDbBackend.getDbStoragePaths()); -// -// rocksDbBackend.setDbStoragePath("/abc/def"); -// assertArrayEquals(new String[] { "/abc/def" }, rocksDbBackend.getDbStoragePaths()); -// -// rocksDbBackend.setDbStoragePath(null); -// assertNull(rocksDbBackend.getDbStoragePaths()); -// -// rocksDbBackend.setDbStoragePaths("/abc/def", "/uvw/xyz"); -// assertArrayEquals(new String[] { "/abc/def", "/uvw/xyz" }, rocksDbBackend.getDbStoragePaths()); -// -// //noinspection NullArgumentToVariableArgMethod -// rocksDbBackend.setDbStoragePaths(null); -// assertNull(rocksDbBackend.getDbStoragePaths()); -// } -// -// @Test(expected = IllegalArgumentException.class) -// public void testSetNullPaths() throws Exception { -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// rocksDbBackend.setDbStoragePaths(); -// } -// -// @Test(expected = IllegalArgumentException.class) -// public void testNonFileSchemePath() throws Exception { -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition"); -// } -// -// // ------------------------------------------------------------------------ -// // RocksDB local file automatic from temp directories -// // ------------------------------------------------------------------------ -// -// @Test -// public void testUseTempDirectories() throws Exception { -// File dir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); -// File dir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); -// -// File[] tempDirs = new File[] { dir1, dir2 }; -// -// try { -// assertTrue(dir1.mkdirs()); -// assertTrue(dir2.mkdirs()); -// -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// assertNull(rocksDbBackend.getDbStoragePaths()); -// -// rocksDbBackend.initializeForJob(getMockEnvironment(tempDirs), "foobar", IntSerializer.INSTANCE); -// assertArrayEquals(tempDirs, rocksDbBackend.getStoragePaths()); -// } -// finally { -// FileUtils.deleteDirectory(dir1); -// FileUtils.deleteDirectory(dir2); -// } -// } -// -// // ------------------------------------------------------------------------ -// // RocksDB local file directory initialization -// // ------------------------------------------------------------------------ -// -// @Test -// public void testFailWhenNoLocalStorageDir() throws Exception { -// File targetDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); -// try { -// assertTrue(targetDir.mkdirs()); -// -// if (!targetDir.setWritable(false, false)) { -// System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable"); -// return; -// } -// -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); -// -// boolean hasFailure = false; -// try { -// rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE); -// } -// catch (Exception e) { -// assertTrue(e.getMessage().contains("No local storage directories available")); -// assertTrue(e.getMessage().contains(targetDir.getAbsolutePath())); -// hasFailure = true; -// } -// assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure); -// } -// finally { -// //noinspection ResultOfMethodCallIgnored -// targetDir.setWritable(true, false); -// FileUtils.deleteDirectory(targetDir); -// } -// } -// -// @Test -// public void testContinueOnSomeDbDirectoriesMissing() throws Exception { -// File targetDir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); -// File targetDir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); -// -// try { -// assertTrue(targetDir1.mkdirs()); -// assertTrue(targetDir2.mkdirs()); -// -// if (!targetDir1.setWritable(false, false)) { -// System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable"); -// return; -// } -// -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath()); -// -// try { -// rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE); -// -// // actually get a state to see whether we can write to the storage directory -// rocksDbBackend.getPartitionedState( -// VoidNamespace.INSTANCE, -// VoidNamespaceSerializer.INSTANCE, -// new ValueStateDescriptor<>("test", String.class, "")); -// } -// catch (Exception e) { -// e.printStackTrace(); -// fail("Backend initialization failed even though some paths were available"); -// } -// } finally { -// //noinspection ResultOfMethodCallIgnored -// targetDir1.setWritable(true, false); -// FileUtils.deleteDirectory(targetDir1); -// FileUtils.deleteDirectory(targetDir2); -// } -// } -// -// // ------------------------------------------------------------------------ -// // RocksDB Options -// // ------------------------------------------------------------------------ -// -// @Test -// public void testPredefinedOptions() throws Exception { -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// -// assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); -// -// rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); -// assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); -// -// DBOptions opt1 = rocksDbBackend.getDbOptions(); -// DBOptions opt2 = rocksDbBackend.getDbOptions(); -// -// assertEquals(opt1, opt2); -// -// ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions(); -// ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions(); -// -// assertEquals(columnOpt1, columnOpt2); -// -// assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle()); -// } -// -// @Test -// public void testOptionsFactory() throws Exception { -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// -// rocksDbBackend.setOptions(new OptionsFactory() { -// @Override -// public DBOptions createDBOptions(DBOptions currentOptions) { -// return currentOptions; -// } -// -// @Override -// public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { -// return currentOptions.setCompactionStyle(CompactionStyle.FIFO); -// } -// }); -// -// assertNotNull(rocksDbBackend.getOptions()); -// assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle()); -// } -// -// @Test -// public void testPredefinedAndOptionsFactory() throws Exception { -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); -// -// assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); -// -// rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); -// rocksDbBackend.setOptions(new OptionsFactory() { -// @Override -// public DBOptions createDBOptions(DBOptions currentOptions) { -// return currentOptions; -// } -// -// @Override -// public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { -// return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL); -// } -// }); -// -// assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); -// assertNotNull(rocksDbBackend.getOptions()); -// assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle()); -// } -// -// @Test -// public void testPredefinedOptionsEnum() { -// for (PredefinedOptions o : PredefinedOptions.values()) { -// DBOptions opt = o.createDBOptions(); -// try { -// assertNotNull(opt); -// } finally { -// opt.dispose(); -// } -// } -// } -// -// // ------------------------------------------------------------------------ -// // Contained Non-partitioned State Backend -// // ------------------------------------------------------------------------ -// -// @Test -// public void testCallsForwardedToNonPartitionedBackend() throws Exception { -// AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class); -// RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI, nonPartBackend); -// -// rocksDbBackend.initializeForJob(getMockEnvironment(), "foo", IntSerializer.INSTANCE); -// verify(nonPartBackend, times(1)).initializeForJob(any(Environment.class), anyString(), any(TypeSerializer.class)); -// -// rocksDbBackend.disposeAllStateForCurrentJob(); -// verify(nonPartBackend, times(1)).disposeAllStateForCurrentJob(); -// -// rocksDbBackend.close(); -// verify(nonPartBackend, times(1)).close(); -// } -// -// // ------------------------------------------------------------------------ -// // Utilities -// // ------------------------------------------------------------------------ -// -// private static Environment getMockEnvironment() { -// return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); -// } -// -// private static Environment getMockEnvironment(File[] tempDirs) { -// IOManager ioMan = mock(IOManager.class); -// when(ioMan.getSpillingDirectories()).thenReturn(tempDirs); -// -// Environment env = mock(Environment.class); -// when(env.getJobID()).thenReturn(new JobID()); -// when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader()); -// when(env.getIOManager()).thenReturn(ioMan); -// -// TaskInfo taskInfo = mock(TaskInfo.class); -// when(env.getTaskInfo()).thenReturn(taskInfo); -// -// when(taskInfo.getIndexOfThisSubtask()).thenReturn(0); -// return env; -// } -//} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.query.KvStateRegistry; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.HashKeyGroupAssigner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.util.OperatingSystem; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; + +import java.io.File; + +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * Tests for configuring the RocksDB State Backend + */ +@SuppressWarnings("serial") +public class RocksDBStateBackendConfigTest { + + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); + } + + // ------------------------------------------------------------------------ + // RocksDB local file directory + // ------------------------------------------------------------------------ + + @Test + public void testSetDbPath() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + File testDir1 = tempFolder.newFolder(); + File testDir2 = tempFolder.newFolder(); + + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath()); + assertArrayEquals(new String[] { testDir1.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePath(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath()); + assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths()); + + Environment env = getMockEnvironment(new File[] {}); + RocksDBKeyedStateBackend keyedBackend = (RocksDBKeyedStateBackend) rocksDbBackend.createKeyedStateBackend(env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner(1), + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); + + + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath()))); + + //noinspection NullArgumentToVariableArgMethod + rocksDbBackend.setDbStoragePaths(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetNullPaths() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + rocksDbBackend.setDbStoragePaths(); + } + + @Test(expected = IllegalArgumentException.class) + public void testNonFileSchemePath() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition"); + } + + // ------------------------------------------------------------------------ + // RocksDB local file automatic from temp directories + // ------------------------------------------------------------------------ + + /** + * This tests whether the RocksDB backends uses the temp directories that are provided + * from the {@link Environment} when no db storage path is set. + * + * @throws Exception + */ + @Test + public void testUseTempDirectories() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + File dir1 = tempFolder.newFolder(); + File dir2 = tempFolder.newFolder(); + + File[] tempDirs = new File[] { dir1, dir2 }; + + assertNull(rocksDbBackend.getDbStoragePaths()); + + Environment env = getMockEnvironment(tempDirs); + RocksDBKeyedStateBackend keyedBackend = (RocksDBKeyedStateBackend) rocksDbBackend.createKeyedStateBackend(env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner(1), + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); + + + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath()))); + } + + // ------------------------------------------------------------------------ + // RocksDB local file directory initialization + // ------------------------------------------------------------------------ + + @Test + public void testFailWhenNoLocalStorageDir() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + File targetDir = tempFolder.newFolder(); + + try { + if (!targetDir.setWritable(false, false)) { + System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable"); + return; + } + + rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); + + boolean hasFailure = false; + try { + Environment env = getMockEnvironment(); + rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "foobar", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner(1), + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("No local storage directories available")); + assertTrue(e.getMessage().contains(targetDir.getAbsolutePath())); + hasFailure = true; + } + assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure); + } + finally { + //noinspection ResultOfMethodCallIgnored + targetDir.setWritable(true, false); + FileUtils.deleteDirectory(targetDir); + } + } + + @Test + public void testContinueOnSomeDbDirectoriesMissing() throws Exception { + File targetDir1 = tempFolder.newFolder(); + File targetDir2 = tempFolder.newFolder(); + + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + try { + + if (!targetDir1.setWritable(false, false)) { + System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable"); + return; + } + + rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath()); + + try { + Environment env = getMockEnvironment(); + rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "foobar", + IntSerializer.INSTANCE, + new HashKeyGroupAssigner(1), + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + } + catch (Exception e) { + e.printStackTrace(); + fail("Backend initialization failed even though some paths were available"); + } + } finally { + //noinspection ResultOfMethodCallIgnored + targetDir1.setWritable(true, false); + FileUtils.deleteDirectory(targetDir1); + FileUtils.deleteDirectory(targetDir2); + } + } + + // ------------------------------------------------------------------------ + // RocksDB Options + // ------------------------------------------------------------------------ + + @Test + public void testPredefinedOptions() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); + + rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); + assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); + + DBOptions opt1 = rocksDbBackend.getDbOptions(); + DBOptions opt2 = rocksDbBackend.getDbOptions(); + + assertEquals(opt1, opt2); + + ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions(); + ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions(); + + assertEquals(columnOpt1, columnOpt2); + + assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle()); + } + + @Test + public void testOptionsFactory() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + rocksDbBackend.setOptions(new OptionsFactory() { + @Override + public DBOptions createDBOptions(DBOptions currentOptions) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + return currentOptions.setCompactionStyle(CompactionStyle.FIFO); + } + }); + + assertNotNull(rocksDbBackend.getOptions()); + assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle()); + } + + @Test + public void testPredefinedAndOptionsFactory() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); + + rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); + rocksDbBackend.setOptions(new OptionsFactory() { + @Override + public DBOptions createDBOptions(DBOptions currentOptions) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL); + } + }); + + assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); + assertNotNull(rocksDbBackend.getOptions()); + assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle()); + } + + @Test + public void testPredefinedOptionsEnum() { + for (PredefinedOptions o : PredefinedOptions.values()) { + DBOptions opt = o.createDBOptions(); + try { + assertNotNull(opt); + } finally { + opt.dispose(); + } + } + } + + // ------------------------------------------------------------------------ + // Contained Non-partitioned State Backend + // ------------------------------------------------------------------------ + + @Test + public void testCallsForwardedToNonPartitionedBackend() throws Exception { + AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class); + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, nonPartBackend); + + Environment env = getMockEnvironment(); + rocksDbBackend.createStreamFactory(env.getJobID(), "foobar"); + + verify(nonPartBackend, times(1)).createStreamFactory(any(JobID.class), anyString()); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static Environment getMockEnvironment() { + return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); + } + + private static Environment getMockEnvironment(File[] tempDirs) { + IOManager ioMan = mock(IOManager.class); + when(ioMan.getSpillingDirectories()).thenReturn(tempDirs); + + Environment env = mock(Environment.class); + when(env.getJobID()).thenReturn(new JobID()); + when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader()); + when(env.getIOManager()).thenReturn(ioMan); + when(env.getTaskKvStateRegistry()).thenReturn(new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + + TaskInfo taskInfo = mock(TaskInfo.class); + when(env.getTaskInfo()).thenReturn(taskInfo); + + when(taskInfo.getIndexOfThisSubtask()).thenReturn(0); + return env; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java new file mode 100644 index 0000000000000000000000000000000000000000..989e868d4772b2aa91ea36c7818e414e22a55a35 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.async; + +import java.io.Closeable; +import java.io.IOException; + +/** + * The abstract class encapsulates the lifecycle and execution strategy for asynchronous IO operations + * + * @param return type of the asynchronous call + * @param type of the IO handle + */ +public abstract class AbstractAsyncIOCallable implements StoppableCallbackCallable { + + private volatile boolean stopped; + + /** + * Closable handle to IO, e.g. an InputStream + */ + private volatile D ioHandle; + + /** + * Stores exception that might happen during close + */ + private volatile IOException stopException; + + + public AbstractAsyncIOCallable() { + this.stopped = false; + } + + /** + * This method implements the strategy for the actual IO operation: + * + * 1) Open the IO handle + * 2) Perform IO operation + * 3) Close IO handle + * + * @return Result of the IO operation, e.g. a deserialized object. + * @throws Exception exception that happened during the call. + */ + @Override + public V call() throws Exception { + + synchronized (this) { + if (isStopped()) { + throw new IOException("Task was already stopped. No I/O handle opened."); + } + + ioHandle = openIOHandle(); + } + + try { + + return performOperation(); + + } finally { + closeIOHandle(); + } + + } + + /** + * Open the IO Handle (e.g. a stream) on which the operation will be performed. + * + * @return the opened IO handle that implements #Closeable + * @throws Exception + */ + protected abstract D openIOHandle() throws Exception; + + /** + * Implements the actual IO operation on the opened IO handle. + * + * @return Result of the IO operation + * @throws Exception + */ + protected abstract V performOperation() throws Exception; + + /** + * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close, it can be accessed via + * #getStopException(). + */ + @Override + public void stop() { + closeIOHandle(); + } + + private synchronized void closeIOHandle() { + + if (!stopped) { + stopped = true; + + final D handle = ioHandle; + if (handle != null) { + try { + handle.close(); + } catch (IOException ex) { + stopException = ex; + } + } + } + } + + /** + * Returns the IO handle. + * @return the IO handle + */ + protected D getIoHandle() { + return ioHandle; + } + + /** + * Optional callback that subclasses can implement. This is called when the callable method completed, e.g. because + * it finished or was stopped. + */ + @Override + public void done() { + //optional callback hook + } + + /** + * Check if the IO operation is stopped + * + * @return true if stop() was called + */ + @Override + public boolean isStopped() { + return stopped; + } + + /** + * Returns Exception that might happen on stop. + * + * @return Potential Exception that happened open stopping. + */ + @Override + public IOException getStopException() { + return stopException; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java new file mode 100644 index 0000000000000000000000000000000000000000..13d9057cc39d0a0f335f9c5307cf21d7d81f0763 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.async; + +/** + * Callback for an asynchronous operation that is called on termination + */ +public interface AsyncDoneCallback { + + /** + * the callback + */ + void done(); + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java new file mode 100644 index 0000000000000000000000000000000000000000..560e56ad675787de936570783dcbcaf5e921c01e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.async; + +import java.io.IOException; + +/** + * An asynchronous operation that can be stopped. + */ +public interface AsyncStoppable { + + /** + * Stop the operation + */ + void stop(); + + /** + * Check whether the operation is stopped + * + * @return true iff operation is stopped + */ + boolean isStopped(); + + /** + * Delivers Exception that might happen during {@link #stop()} + * + * @return Exception that can happen during stop + */ + IOException getStopException(); + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java new file mode 100644 index 0000000000000000000000000000000000000000..8316e4f1f8eea91dfd27d7833b5a5fddb6bb1e95 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.async; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.FutureTask; + +/** + * @param return type of the callable function + */ +public class AsyncStoppableTaskWithCallback extends FutureTask { + + protected final StoppableCallbackCallable stoppableCallbackCallable; + + public AsyncStoppableTaskWithCallback(StoppableCallbackCallable callable) { + super(Preconditions.checkNotNull(callable)); + this.stoppableCallbackCallable = callable; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + + if (mayInterruptIfRunning) { + stoppableCallbackCallable.stop(); + } + + return super.cancel(mayInterruptIfRunning); + } + + @Override + protected void done() { + stoppableCallbackCallable.done(); + } + + public static AsyncStoppableTaskWithCallback from(StoppableCallbackCallable callable) { + return new AsyncStoppableTaskWithCallback<>(callable); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java new file mode 100644 index 0000000000000000000000000000000000000000..d45931691fce164a309625318e8c99c17e1f9ad5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.async; + +import java.util.concurrent.Callable; + +/** + * A {@link Callable} that can be stopped and offers a callback on termination. + * + * @param return value of the call operation. + */ +public interface StoppableCallbackCallable extends Callable, AsyncStoppable, AsyncDoneCallback { + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 4801d855f31073bc383922eba88cc45cc307153a..6f0a8147c78be4c02eb20b2f99b3f9b82b0e0af6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -71,7 +71,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { /** * A {@code CheckpointStateOutputStream} that writes into a byte array. */ - public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { + public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream { private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos(); @@ -86,13 +86,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { } @Override - public void write(int b) { + public void write(int b) throws IOException { os.write(b); isEmpty = false; } @Override - public void write(byte[] b, int off, int len) { + public void write(byte[] b, int off, int len) throws IOException { os.write(b, off, len); isEmpty = false; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 02579aa4401e98dcebe6997cda9044684aca43d1..13f650c044080e0f0c52d4d3c22e786adf1d061d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -66,6 +66,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Base class for all streaming tasks. A task is the unit of local processing that is deployed @@ -176,8 +177,12 @@ public abstract class StreamTask> private long lastCheckpointSize = 0; + /** Thread pool for async snapshot workers */ private ExecutorService asyncOperationsThreadPool; + /** Timeout to await the termination of the thread pool in milliseconds */ + private long threadPoolTerminationTimeout = 0L; + // ------------------------------------------------------------------------ // Life cycle methods for specific implementations // ------------------------------------------------------------------------ @@ -441,6 +446,10 @@ public abstract class StreamTask> if (!asyncOperationsThreadPool.isShutdown()) { asyncOperationsThreadPool.shutdownNow(); } + + if(threadPoolTerminationTimeout > 0L) { + asyncOperationsThreadPool.awaitTermination(threadPoolTerminationTimeout, TimeUnit.MILLISECONDS); + } } /** @@ -861,6 +870,15 @@ public abstract class StreamTask> }; } + /** + * Sets a timeout for the async thread pool. Default should always be 0 to avoid blocking restarts of task. + * + * @param threadPoolTerminationTimeout timeout for the async thread pool in milliseconds + */ + public void setThreadPoolTerminationTimeout(long threadPoolTerminationTimeout) { + this.threadPoolTerminationTimeout = threadPoolTerminationTimeout; + } + // ------------------------------------------------------------------------ /**