提交 c8f81ba3 编写于 作者: U Ufuk Celebi

[FLINK-3556] [runtime] Remove false check in HA blob store configuration

This closes #1749.
上级 e8e88afd
......@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
......@@ -99,16 +100,11 @@ public class BlobServer extends Thread implements BlobService {
if (recoveryMode == RecoveryMode.STANDALONE) {
this.blobStore = new VoidBlobStore();
}
// Recovery. Check that everything has been setup correctly. This is not clean, but it's
// better to resolve this with some upcoming changes to the state backend setup.
else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
config.containsKey(ConfigConstants.ZOOKEEPER_RECOVERY_PATH)) {
// Recovery.
else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
this.blobStore = new FileSystemBlobStore(config);
}
// Fallback.
else {
this.blobStore = new VoidBlobStore();
} else {
throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + ".");
}
// configure the maximum number of concurrent connections
......
......@@ -40,6 +40,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* Blob store backed by {@link FileSystem}.
*
* <p>This is used in addition to the local blob storage
*/
class FileSystemBlobStore implements BlobStore {
......@@ -49,18 +51,15 @@ class FileSystemBlobStore implements BlobStore {
private final String basePath;
FileSystemBlobStore(Configuration config) throws IOException {
String stateBackendBasePath = config.getString(
ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, null);
if (stateBackendBasePath.equals("")) {
if (recoveryPath == null) {
throw new IllegalConfigurationException(String.format("Missing configuration for " +
"file system state backend recovery path. Please specify via " +
"'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
"file system state backend recovery path. Please specify via " +
"'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
}
stateBackendBasePath += "/blob";
this.basePath = stateBackendBasePath;
this.basePath = recoveryPath + "/blob";
FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath));
LOG.info("Created blob directory {}.", basePath);
......
......@@ -225,7 +225,6 @@ public class ZooKeeperUtils {
ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
configuration,
"completedCheckpoint");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册