提交 ab2895fa 编写于 作者: U Ufuk Celebi 提交者: Fabian Hueske

[FLINK-2893] [runtime] Consistent naming of recovery config parameters

Rename config key prefix from 'ha.zookeeper' to 'recovery.zookeeper'
Rename config key from 'state.backend.fs.dir.recovery' => 'state.backend.fs.recoverydir'
Move ZooKeeper file system state backend configuration keys

This closes #1286
上级 3c8a6588
......@@ -390,23 +390,23 @@ Flink supports the 'standalone' mode where only a single JobManager runs and no
The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing.
Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution.
In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state.
In order to use the 'zookeeper' mode, it is mandatory to also define the `ha.zookeeper.quorum` configuration value.
In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.
- `ha.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
- `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
- `ha.zookeeper.dir`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.
- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.
- `ha.zookeeper.dir.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
- `ha.zookeeper.dir.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
- `recovery.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
- `ha.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
- `recovery.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
- `ha.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
- `recovery.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
- `ha.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
- `recovery.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
- `ha.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
- `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
## Background
......
......@@ -50,13 +50,13 @@ In high availabliity mode, all Flink components try to connect to a JobManager v
- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
<pre>ha.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
<pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
- The following configuration keys are optional:
- `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for coordination
- `recovery.zookeeper.path.root: /flink [default]`: ZooKeeper directory to use for coordination
- TODO Add client configuration keys
## Starting an HA-cluster
......@@ -93,7 +93,7 @@ The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each
<pre>
recovery.mode: zookeeper
ha.zookeeper.quorum: localhost</pre>
recovery.zookeeper.quorum: localhost</pre>
2. **Configure masters** in `conf/masters`:
......
......@@ -410,12 +410,6 @@ public final class ConfigConstants {
*/
public static final String STATE_BACKEND = "state.backend";
/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
*/
public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery";
// ----------------------------- Miscellaneous ----------------------------
/**
......@@ -433,31 +427,37 @@ public final class ConfigConstants {
// --------------------------- ZooKeeper ----------------------------------
/** ZooKeeper servers. */
public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
*/
public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
/** ZooKeeper root path. */
public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";
public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";
public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";
/** ZooKeeper root path (ZNode) for job graphs. */
public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs";
public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";
/** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints";
public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";
/** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter";
public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";
public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";
public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";
// ------------------------------------------------------------------------
// Default Values
......
......@@ -137,4 +137,4 @@ webclient.port: 8080
#
# recovery.mode: zookeeper
#
# ha.zookeeper.quorum: localhost
# recovery.zookeeper.quorum: localhost
......@@ -100,7 +100,7 @@ public class BlobServer extends Thread implements BlobService {
// Recovery. Check that everything has been setup correctly. This is not clean, but it's
// better to resolve this with some upcoming changes to the state backend setup.
else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) {
config.containsKey(ConfigConstants.ZOOKEEPER_RECOVERY_PATH)) {
this.blobStore = new FileSystemBlobStore(config);
}
......
......@@ -51,12 +51,12 @@ class FileSystemBlobStore implements BlobStore {
FileSystemBlobStore(Configuration config) throws IOException {
String stateBackendBasePath = config.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
if (stateBackendBasePath.equals("")) {
throw new IllegalConfigurationException(String.format("Missing configuration for " +
"file system state backend recovery path. Please specify via " +
"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
"'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
}
stateBackendBasePath += "/blob";
......
......@@ -276,11 +276,11 @@ public class ZooKeeperUtils {
String prefix) throws IOException {
String rootPath = configuration.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
if (rootPath.equals("")) {
throw new IllegalConfigurationException("Missing recovery path. Specify via " +
"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
"configuration key '" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "'.");
} else {
return new FileSystemStateStorageHelper<T>(rootPath, prefix);
}
......
......@@ -70,7 +70,7 @@ public class BlobRecoveryITCase {
Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, recoveryDir.getPath());
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
......
......@@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
......
......@@ -197,7 +197,7 @@ public class JobManagerProcess extends TestJvmProcess {
* <code>--port PORT</code>.
*
* <p>Other arguments are parsed to a {@link Configuration} and passed to the
* JobManager, for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum
* JobManager, for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum
* "xyz:123:456"</code>.
*/
public static void main(String[] args) {
......
......@@ -99,7 +99,7 @@ public class TaskManagerProcess extends TestJvmProcess {
* and streaming jobs).
*
* <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
* for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum "xyz:123:456"</code>.
* for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
*/
public static void main(String[] args) throws Exception {
try {
......
......@@ -81,7 +81,7 @@ public class ZooKeeperTestUtils {
// File system state backend
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, fsStateHandlePath + "/recovery");
// Akka failure detection and execution retries
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
......
......@@ -556,7 +556,7 @@ public class ChaosMonkeyITCase {
0, files.length);
}
File fsRecovery = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""));
File fsRecovery = new File(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, ""));
LOG.info("Checking " + fsRecovery);
......
......@@ -95,7 +95,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
......@@ -144,7 +144,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
......
......@@ -112,11 +112,11 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
String fsStateHandlePath = tmp.getRoot().getPath();
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
"@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
AbstractFlinkYarnCluster yarnCluster = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册