提交 38003c28 编写于 作者: S Stefan Richter

[FLINK-6475] [checkpoint] Incremental snapshots in RocksDB should not hold...

[FLINK-6475] [checkpoint] Incremental snapshots in RocksDB should not hold lock during async file upload
上级 b8ffacb1
...@@ -844,54 +844,47 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ...@@ -844,54 +844,47 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyedStateHandle materializeSnapshot() throws Exception { KeyedStateHandle materializeSnapshot() throws Exception {
synchronized (stateBackend.asyncSnapshotLock) { stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
if (stateBackend.db == null) { // write meta data
throw new IOException("RocksDB closed."); metaStateHandle = materializeMetaData();
}
stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry);
// write meta data // write state data
metaStateHandle = materializeMetaData(); Preconditions.checkState(backupFileSystem.exists(backupPath));
// write state data FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
Preconditions.checkState(backupFileSystem.exists(backupPath)); if (fileStatuses != null) {
for (FileStatus fileStatus : fileStatuses) {
Path filePath = fileStatus.getPath();
String fileName = filePath.getName();
FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); if (fileName.endsWith(SST_FILE_SUFFIX)) {
if (fileStatuses != null) { StreamStateHandle fileHandle =
for (FileStatus fileStatus : fileStatuses) { baseSstFiles == null ? null : baseSstFiles.get(fileName);
Path filePath = fileStatus.getPath();
String fileName = filePath.getName();
if (fileName.endsWith(SST_FILE_SUFFIX)) { if (fileHandle == null) {
StreamStateHandle fileHandle = fileHandle = materializeStateData(filePath);
baseSstFiles == null ? null : baseSstFiles.get(fileName);
if (fileHandle == null) { newSstFiles.put(fileName, fileHandle);
fileHandle = materializeStateData(filePath);
newSstFiles.put(fileName, fileHandle);
} else {
oldSstFiles.put(fileName, fileHandle);
}
} else { } else {
StreamStateHandle fileHandle = materializeStateData(filePath); oldSstFiles.put(fileName, fileHandle);
miscFiles.put(fileName, fileHandle);
} }
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
miscFiles.put(fileName, fileHandle);
} }
} }
}
Map<String, StreamStateHandle> sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); Map<String, StreamStateHandle> sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size());
sstFiles.putAll(newSstFiles); sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles); sstFiles.putAll(oldSstFiles);
stateBackend.materializedSstFiles.put(checkpointId, sstFiles); stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId,
stateBackend.operatorIdentifier, stateBackend.keyGroupRange, stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle);
}
} }
void stop() { void stop() {
......
...@@ -90,6 +90,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen ...@@ -90,6 +90,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
public void close() throws IOException { public void close() throws IOException {
synchronized (getSynchronizationLock()) { synchronized (getSynchronizationLock()) {
if (closed) {
return;
}
IOUtils.closeAllQuietly(closeableToRef.keySet()); IOUtils.closeAllQuietly(closeableToRef.keySet());
closeableToRef.clear(); closeableToRef.clear();
......
...@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; ...@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import java.io.Closeable; import java.io.Closeable;
...@@ -122,6 +123,9 @@ public abstract class AbstractKeyedStateBackend<K> ...@@ -122,6 +123,9 @@ public abstract class AbstractKeyedStateBackend<K>
*/ */
@Override @Override
public void dispose() { public void dispose() {
IOUtils.closeQuietly(this);
if (kvStateRegistry != null) { if (kvStateRegistry != null) {
kvStateRegistry.unregisterAll(); kvStateRegistry.unregisterAll();
} }
......
...@@ -120,6 +120,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { ...@@ -120,6 +120,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
@Override @Override
public void dispose() { public void dispose() {
IOUtils.closeQuietly(this);
registeredStates.clear(); registeredStates.clear();
} }
......
...@@ -26,7 +26,6 @@ import java.util.Collection; ...@@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
...@@ -358,12 +357,10 @@ public abstract class AbstractStreamOperator<OUT> ...@@ -358,12 +357,10 @@ public abstract class AbstractStreamOperator<OUT>
public void dispose() throws Exception { public void dispose() throws Exception {
if (operatorStateBackend != null) { if (operatorStateBackend != null) {
IOUtils.closeQuietly(operatorStateBackend);
operatorStateBackend.dispose(); operatorStateBackend.dispose();
} }
if (keyedStateBackend != null) { if (keyedStateBackend != null) {
IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose(); keyedStateBackend.dispose();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册