diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 3cb21acf0221511fc98c72eb746d0a02a35c8b19..f5dddd6f2f0e434a82beaf8730cb68ff8a2a69f1 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -844,54 +844,47 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { KeyedStateHandle materializeSnapshot() throws Exception { - synchronized (stateBackend.asyncSnapshotLock) { + stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); - if (stateBackend.db == null) { - throw new IOException("RocksDB closed."); - } - - stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); + // write meta data + metaStateHandle = materializeMetaData(); - // write meta data - metaStateHandle = materializeMetaData(); + // write state data + Preconditions.checkState(backupFileSystem.exists(backupPath)); - // write state data - Preconditions.checkState(backupFileSystem.exists(backupPath)); + FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); + if (fileStatuses != null) { + for (FileStatus fileStatus : fileStatuses) { + Path filePath = fileStatus.getPath(); + String fileName = filePath.getName(); - FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - Path filePath = fileStatus.getPath(); - String fileName = filePath.getName(); + if (fileName.endsWith(SST_FILE_SUFFIX)) { + StreamStateHandle fileHandle = + baseSstFiles == null ? null : baseSstFiles.get(fileName); - if (fileName.endsWith(SST_FILE_SUFFIX)) { - StreamStateHandle fileHandle = - baseSstFiles == null ? null : baseSstFiles.get(fileName); + if (fileHandle == null) { + fileHandle = materializeStateData(filePath); - if (fileHandle == null) { - fileHandle = materializeStateData(filePath); - - newSstFiles.put(fileName, fileHandle); - } else { - oldSstFiles.put(fileName, fileHandle); - } + newSstFiles.put(fileName, fileHandle); } else { - StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(fileName, fileHandle); + oldSstFiles.put(fileName, fileHandle); } + } else { + StreamStateHandle fileHandle = materializeStateData(filePath); + miscFiles.put(fileName, fileHandle); } } + } - Map sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); - sstFiles.putAll(newSstFiles); - sstFiles.putAll(oldSstFiles); + Map sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); + sstFiles.putAll(newSstFiles); + sstFiles.putAll(oldSstFiles); - stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + stateBackend.materializedSstFiles.put(checkpointId, sstFiles); - return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, - stateBackend.operatorIdentifier, stateBackend.keyGroupRange, - checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); - } + return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, + stateBackend.operatorIdentifier, stateBackend.keyGroupRange, + checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); } void stop() { diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java index 766ede9323460e3e55d7fb7f57798e6ab6d1751a..f949779ff1831b0c9b089842f892d818f75ec0cb 100644 --- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java +++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java @@ -90,6 +90,10 @@ public abstract class AbstractCloseableRegistry implemen public void close() throws IOException { synchronized (getSynchronizationLock()) { + if (closed) { + return; + } + IOUtils.closeAllQuietly(closeableToRef.keySet()); closeableToRef.clear(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 4f3ed016ac0367262ac7f0e147661a0b5f0b22e0..47ebe3b73f558584591f91049e807dab8825e791 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import java.io.Closeable; @@ -122,6 +123,9 @@ public abstract class AbstractKeyedStateBackend */ @Override public void dispose() { + + IOUtils.closeQuietly(this); + if (kvStateRegistry != null) { kvStateRegistry.unregisterAll(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index ec4aa81dfd62dcb9e1fb23ca142b07198a4fc9d1..ab0c1f0bf3ed24c4ea58cf5215ff52cc2634d002 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -120,6 +120,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public void dispose() { + IOUtils.closeQuietly(this); registeredStates.clear(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 8c1caee4dd2c36e949cf3e342a7e57f314836291..057df2b9d860060fe7c6066d52ba7f5abc021bd8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -26,7 +26,6 @@ import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.Map; -import org.apache.commons.io.IOUtils; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; @@ -358,12 +357,10 @@ public abstract class AbstractStreamOperator public void dispose() throws Exception { if (operatorStateBackend != null) { - IOUtils.closeQuietly(operatorStateBackend); operatorStateBackend.dispose(); } if (keyedStateBackend != null) { - IOUtils.closeQuietly(keyedStateBackend); keyedStateBackend.dispose(); } }