From 38003c2829879f65e26914c0fedb102052fe201f Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Sun, 7 May 2017 20:22:34 +0200 Subject: [PATCH] [FLINK-6475] [checkpoint] Incremental snapshots in RocksDB should not hold lock during async file upload --- .../state/RocksDBKeyedStateBackend.java | 63 +++++++++---------- .../flink/util/AbstractCloseableRegistry.java | 4 ++ .../state/AbstractKeyedStateBackend.java | 4 ++ .../state/DefaultOperatorStateBackend.java | 1 + .../api/operators/AbstractStreamOperator.java | 3 - 5 files changed, 37 insertions(+), 38 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 3cb21acf022..f5dddd6f2f0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -844,54 +844,47 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { KeyedStateHandle materializeSnapshot() throws Exception { - synchronized (stateBackend.asyncSnapshotLock) { + stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); - if (stateBackend.db == null) { - throw new IOException("RocksDB closed."); - } - - stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); + // write meta data + metaStateHandle = materializeMetaData(); - // write meta data - metaStateHandle = materializeMetaData(); + // write state data + Preconditions.checkState(backupFileSystem.exists(backupPath)); - // write state data - Preconditions.checkState(backupFileSystem.exists(backupPath)); + FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); + if (fileStatuses != null) { + for (FileStatus fileStatus : fileStatuses) { + Path filePath = fileStatus.getPath(); + String fileName = filePath.getName(); - FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - Path filePath = fileStatus.getPath(); - String fileName = filePath.getName(); + if (fileName.endsWith(SST_FILE_SUFFIX)) { + StreamStateHandle fileHandle = + baseSstFiles == null ? null : baseSstFiles.get(fileName); - if (fileName.endsWith(SST_FILE_SUFFIX)) { - StreamStateHandle fileHandle = - baseSstFiles == null ? null : baseSstFiles.get(fileName); + if (fileHandle == null) { + fileHandle = materializeStateData(filePath); - if (fileHandle == null) { - fileHandle = materializeStateData(filePath); - - newSstFiles.put(fileName, fileHandle); - } else { - oldSstFiles.put(fileName, fileHandle); - } + newSstFiles.put(fileName, fileHandle); } else { - StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(fileName, fileHandle); + oldSstFiles.put(fileName, fileHandle); } + } else { + StreamStateHandle fileHandle = materializeStateData(filePath); + miscFiles.put(fileName, fileHandle); } } + } - Map sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); - sstFiles.putAll(newSstFiles); - sstFiles.putAll(oldSstFiles); + Map sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); + sstFiles.putAll(newSstFiles); + sstFiles.putAll(oldSstFiles); - stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + stateBackend.materializedSstFiles.put(checkpointId, sstFiles); - return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, - stateBackend.operatorIdentifier, stateBackend.keyGroupRange, - checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); - } + return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, + stateBackend.operatorIdentifier, stateBackend.keyGroupRange, + checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); } void stop() { diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index 766ede93234..f949779ff18 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -90,6 +90,10 @@ public abstract class AbstractCloseableRegistry implemen public void close() throws IOException { synchronized (getSynchronizationLock()) { + if (closed) { + return; + } + IOUtils.closeAllQuietly(closeableToRef.keySet()); closeableToRef.clear(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 4f3ed016ac0..47ebe3b73f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import java.io.Closeable; @@ -122,6 +123,9 @@ public abstract class AbstractKeyedStateBackend */ @Override public void dispose() { + + IOUtils.closeQuietly(this); + if (kvStateRegistry != null) { kvStateRegistry.unregisterAll(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index ec4aa81dfd6..ab0c1f0bf3e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -120,6 +120,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public void dispose() { + IOUtils.closeQuietly(this); registeredStates.clear(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 8c1caee4dd2..057df2b9d86 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.Map; -import org.apache.commons.io.IOUtils; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; @@ -358,12 +357,10 @@ public abstract class AbstractStreamOperator public void dispose() throws Exception { if (operatorStateBackend != null) { - IOUtils.closeQuietly(operatorStateBackend); operatorStateBackend.dispose(); } if (keyedStateBackend != null) { - IOUtils.closeQuietly(keyedStateBackend); keyedStateBackend.dispose(); } } -- GitLab