提交 f8390181 编写于 作者: Z zjureel 提交者: zentol

[FLINK-6498] Migrate Zookeeper configuration options

This closes #4123.
上级 3d2f3f65
......@@ -218,7 +218,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
......
......@@ -944,65 +944,111 @@ public final class ConfigConstants {
// --------------------------- ZooKeeper ----------------------------------
/** ZooKeeper servers. */
/**
* ZooKeeper servers.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_QUORUM}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum";
/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_STORAGE_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir";
/** ZooKeeper root path. */
/**
* ZooKeeper root path.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";
/** ZooKeeper root path (ZNode) for job graphs. */
/**
* ZooKeeper root path (ZNode) for job graphs.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
/** ZooKeeper root path (ZNode) for completed checkpoints. */
/**
* ZooKeeper root path (ZNode) for completed checkpoints.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints";
/** ZooKeeper root path (ZNode) for checkpoint counters. */
/**
* ZooKeeper root path (ZNode) for checkpoint counters.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";
/** ZooKeeper root path (ZNode) for Mesos workers. */
/**
* ZooKeeper root path (ZNode) for Mesos workers.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "high-availability.zookeeper.path.mesos-workers";
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout";
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT} */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait";
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CLIENT_ACL = "high-availability.zookeeper.client.acl";
/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}. */
@PublicEvolving
@Deprecated
public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";
/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_SERVICE_NAME}. */
@PublicEvolving
@Deprecated
public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";
/** @deprecated Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
......@@ -1632,51 +1678,103 @@ public final class ConfigConstants {
// --------------------------- ZooKeeper ----------------------------------
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH} */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers";
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000;
/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3;
// - Defaults for required ZooKeeper configuration keys -------------------
/** ZooKeeper default client port. */
/**
* ZooKeeper default client port.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_CLIENT_PORT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
/** ZooKeeper default init limit. */
/**
* ZooKeeper default init limit.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_INIT_LIMIT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
/** ZooKeeper default sync limit. */
/**
* ZooKeeper default sync limit.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_SYNC_LIMIT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
/** ZooKeeper default peer port. */
/**
* ZooKeeper default peer port.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_PEER_PORT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
/** ZooKeeper default leader port. */
/**
* ZooKeeper default leader port.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_LEADER_PORT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
/** Defaults for ZK client security **/
/**
* Defaults for ZK client security.
* @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}.
*/
@Deprecated
public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
/** ACL options supported "creator" or "open" */
/**
* ACL options supported "creator" or "open".
* @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}.
*/
@Deprecated
public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open";
// ----------------------------- Metrics ----------------------------
......
......@@ -58,14 +58,6 @@ public class HighAvailabilityOptions {
key("high-availability.storageDir")
.noDefaultValue()
.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");
/**
* The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.quorum");
// ------------------------------------------------------------------------
......@@ -92,6 +84,14 @@ public class HighAvailabilityOptions {
// ZooKeeper Options
// ------------------------------------------------------------------------
/**
* The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.quorum");
/**
* The root path under which Flink stores its entries in ZooKeeper
*/
......@@ -100,6 +100,46 @@ public class HighAvailabilityOptions {
.defaultValue("/flink")
.withDeprecatedKeys("recovery.zookeeper.path.root");
public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE =
key("high-availability.zookeeper.path.namespace")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.path.namespace");
public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")
.withDeprecatedKeys("recovery.zookeeper.path.latch");
/** ZooKeeper root path (ZNode) for job graphs. */
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
key("high-availability.zookeeper.path.jobgraphs")
.defaultValue("/jobgraphs")
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs");
public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
key("high-availability.zookeeper.path.leader")
.defaultValue("/leader")
.withDeprecatedKeys("recovery.zookeeper.path.leader");
/** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
key("high-availability.zookeeper.path.checkpoints")
.defaultValue("/checkpoints")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints");
/** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
key("high-availability.zookeeper.path.checkpoint-counter")
.defaultValue("/checkpoint-counter")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter");
/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
key("high-availability.zookeeper.path.mesos-workers")
.defaultValue("/mesos-workers")
.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers");
// ------------------------------------------------------------------------
// ZooKeeper Client Settings
// ------------------------------------------------------------------------
......@@ -128,6 +168,10 @@ public class HighAvailabilityOptions {
key("high-availability.zookeeper.path.running-registry")
.defaultValue("/running_job_registry/");
public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
key("high-availability.zookeeper.client.acl")
.defaultValue("open");
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
......
......@@ -18,11 +18,10 @@
package org.apache.flink.mesos.runtime.clusterframework.services;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
import org.apache.flink.util.ConfigurationUtil;
/**
* Utilities for the {@link MesosServices}.
......@@ -44,11 +43,8 @@ public class MesosServicesUtils {
return new StandaloneMesosServices();
case ZOOKEEPER:
final String zkMesosRootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH,
ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH);
final String zkMesosRootPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_MESOS_WORKERS_PATH);
ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory(
configuration,
......
......@@ -25,10 +25,10 @@ import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
......@@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.util.ConfigurationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
......@@ -87,8 +86,7 @@ public class ZooKeeperUtils {
String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
boolean disableSaslClient = configuration.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
ACLProvider aclProvider;
......@@ -96,7 +94,7 @@ public class ZooKeeperUtils {
if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
String errorMessage = "Cannot set ACL role to " + aclMode +" since SASL authentication is " +
"disabled through the " + ConfigConstants.ZOOKEEPER_SASL_DISABLE + " property";
"disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() + " property";
LOG.warn(errorMessage);
throw new IllegalConfigurationException(errorMessage);
}
......@@ -185,11 +183,8 @@ public class ZooKeeperUtils {
final Configuration configuration,
final String pathSuffix)
{
String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
String leaderPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderRetrievalService(client, leaderPath);
}
......@@ -221,16 +216,10 @@ public class ZooKeeperUtils {
final Configuration configuration,
final String pathSuffix)
{
final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix;
final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
final String latchPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
final String leaderPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
......@@ -254,11 +243,7 @@ public class ZooKeeperUtils {
RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH,
ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
return new ZooKeeperSubmittedJobGraphStore(
client, zooKeeperSubmittedJobsPath, stateStorage, executor);
......@@ -284,11 +269,8 @@ public class ZooKeeperUtils {
checkNotNull(configuration, "Configuration");
String checkpointsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
String checkpointsPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH);
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
configuration,
......@@ -317,11 +299,8 @@ public class ZooKeeperUtils {
Configuration configuration,
JobID jobId) {
String checkpointIdCounterPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
String checkpointIdCounterPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
......@@ -391,11 +370,11 @@ public class ZooKeeperUtils {
* Return the configured {@link ZkClientACLMode}.
*
* @param config The config to parse
* @return Configured ACL mode or {@link ConfigConstants#DEFAULT_HA_ZOOKEEPER_CLIENT_ACL} if not
* @return Configured ACL mode or the default defined by {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL} if not
* configured.
*/
public static ZkClientACLMode fromConfig(Configuration config) {
String aclMode = config.getString(ConfigConstants.HA_ZOOKEEPER_CLIENT_ACL, null);
String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL);
if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) {
return ZkClientACLMode.OPEN;
} else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) {
......
......@@ -298,7 +298,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
final String FAULTY_CONTENDER_URL = "faultyContender";
final String leaderPath = "/leader";
configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
ZooKeeperLeaderElectionService leaderElectionService = null;
ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
......@@ -463,8 +463,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
testingContender = new TestingContender(TEST_URL, leaderElectionService);
listener = new TestingListener();
final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
final String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
cache = new NodeCache(client2, leaderPath);
ExistsCacheListener existsListener = new ExistsCacheListener(cache);
......
......@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
......@@ -392,8 +393,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
// ZooKeeper
String currentJobsPath = config.getString(
ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
......@@ -424,8 +424,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
// ZooKeeper
String currentJobsPath = config.getString(
ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
......
......@@ -66,7 +66,7 @@ import java.util.Properties;
import java.util.concurrent.Callable;
import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
import static org.apache.flink.configuration.HighAvailabilityOptions.HA_ZOOKEEPER_NAMESPACE;
/**
* Class handling the command line interface to the YARN session.
......@@ -597,9 +597,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
cmd.getOptionValue(zookeeperNamespace.getOpt())
: yarnDescriptor.getFlinkConfiguration()
.getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(applicationId.getOpt()));
.getString(HA_ZOOKEEPER_NAMESPACE, cmd.getOptionValue(applicationId.getOpt()));
LOG.info("Going to use the ZK namespace: {}", zkNamespace);
yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE, zkNamespace);
try {
yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册