From f839018131024860a1b25b13cea7e1313add28d5 Mon Sep 17 00:00:00 2001 From: zjureel Date: Wed, 14 Jun 2017 17:38:25 +0800 Subject: [PATCH] [FLINK-6498] Migrate Zookeeper configuration options This closes #4123. --- .../fs/RollingSinkSecuredITCase.java | 2 +- .../flink/configuration/ConfigConstants.java | 124 ++++++++++++++++-- .../HighAvailabilityOptions.java | 60 +++++++-- .../services/MesosServicesUtils.java | 10 +- .../flink/runtime/util/ZooKeeperUtils.java | 53 +++----- .../ZooKeeperLeaderElectionTest.java | 5 +- .../JobManagerHAJobGraphRecoveryITCase.java | 7 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 6 +- 8 files changed, 191 insertions(+), 76 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index 6bd75d4dc6a..866b2f3e868 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -218,7 +218,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString(CoreOptions.STATE_BACKEND, "filesystem"); - config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 35d3d139766..d467dfa1192 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -944,65 +944,111 @@ public final class ConfigConstants { // --------------------------- ZooKeeper ---------------------------------- - /** ZooKeeper servers. */ + /** + * ZooKeeper servers. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_QUORUM}. + */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum"; /** * File system state backend base path for recoverable state handles. Recovery state is written * to this path and the file state handles are persisted for recovery. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_STORAGE_PATH}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir"; - /** ZooKeeper root path. */ + /** + * ZooKeeper root path. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. + */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch"; - /** ZooKeeper root path (ZNode) for job graphs. */ + /** + * ZooKeeper root path (ZNode) for job graphs. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. + */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader"; - /** ZooKeeper root path (ZNode) for completed checkpoints. */ + /** + * ZooKeeper root path (ZNode) for completed checkpoints. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. + */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints"; - /** ZooKeeper root path (ZNode) for checkpoint counters. */ + /** + * ZooKeeper root path (ZNode) for checkpoint counters. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}. + */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter"; - /** ZooKeeper root path (ZNode) for Mesos workers. */ + /** + * ZooKeeper root path (ZNode) for Mesos workers. + * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}. + */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "high-availability.zookeeper.path.mesos-workers"; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout"; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout"; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT} */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait"; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts"; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}. */ @PublicEvolving + @Deprecated public static final String HA_ZOOKEEPER_CLIENT_ACL = "high-availability.zookeeper.client.acl"; + /** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}. */ @PublicEvolving + @Deprecated public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable"; + /** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_SERVICE_NAME}. */ @PublicEvolving + @Deprecated public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name"; /** @deprecated Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */ @@ -1632,51 +1678,103 @@ public final class ConfigConstants { // --------------------------- ZooKeeper ---------------------------------- + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH} */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter"; + /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}. */ + @Deprecated public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers"; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT}. */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000; + /** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3; // - Defaults for required ZooKeeper configuration keys ------------------- - /** ZooKeeper default client port. */ + /** + * ZooKeeper default client port. + * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_CLIENT_PORT}. + */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181; - /** ZooKeeper default init limit. */ + /** + * ZooKeeper default init limit. + * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_INIT_LIMIT}. + */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; - /** ZooKeeper default sync limit. */ + /** + * ZooKeeper default sync limit. + * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_SYNC_LIMIT}. + */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5; - /** ZooKeeper default peer port. */ + /** + * ZooKeeper default peer port. + * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_PEER_PORT}. + */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888; - /** ZooKeeper default leader port. */ + /** + * ZooKeeper default leader port. + * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_LEADER_PORT}. + */ + @Deprecated public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888; - /** Defaults for ZK client security **/ + /** + * Defaults for ZK client security. + * @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}. + */ + @Deprecated public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true; - /** ACL options supported "creator" or "open" */ + /** + * ACL options supported "creator" or "open". + * @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}. + */ + @Deprecated public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open"; // ----------------------------- Metrics ---------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 2cfb25a6e7a..2b026b935ea 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -58,14 +58,6 @@ public class HighAvailabilityOptions { key("high-availability.storageDir") .noDefaultValue() .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir"); - - /** - * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper. - */ - public static final ConfigOption HA_ZOOKEEPER_QUORUM = - key("high-availability.zookeeper.quorum") - .noDefaultValue() - .withDeprecatedKeys("recovery.zookeeper.quorum"); // ------------------------------------------------------------------------ @@ -92,6 +84,14 @@ public class HighAvailabilityOptions { // ZooKeeper Options // ------------------------------------------------------------------------ + /** + * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper. + */ + public static final ConfigOption HA_ZOOKEEPER_QUORUM = + key("high-availability.zookeeper.quorum") + .noDefaultValue() + .withDeprecatedKeys("recovery.zookeeper.quorum"); + /** * The root path under which Flink stores its entries in ZooKeeper */ @@ -100,6 +100,46 @@ public class HighAvailabilityOptions { .defaultValue("/flink") .withDeprecatedKeys("recovery.zookeeper.path.root"); + public static final ConfigOption HA_ZOOKEEPER_NAMESPACE = + key("high-availability.zookeeper.path.namespace") + .noDefaultValue() + .withDeprecatedKeys("recovery.zookeeper.path.namespace"); + + public static final ConfigOption HA_ZOOKEEPER_LATCH_PATH = + key("high-availability.zookeeper.path.latch") + .defaultValue("/leaderlatch") + .withDeprecatedKeys("recovery.zookeeper.path.latch"); + + /** ZooKeeper root path (ZNode) for job graphs. */ + public static final ConfigOption HA_ZOOKEEPER_JOBGRAPHS_PATH = + key("high-availability.zookeeper.path.jobgraphs") + .defaultValue("/jobgraphs") + .withDeprecatedKeys("recovery.zookeeper.path.jobgraphs"); + + public static final ConfigOption HA_ZOOKEEPER_LEADER_PATH = + key("high-availability.zookeeper.path.leader") + .defaultValue("/leader") + .withDeprecatedKeys("recovery.zookeeper.path.leader"); + + /** ZooKeeper root path (ZNode) for completed checkpoints. */ + public static final ConfigOption HA_ZOOKEEPER_CHECKPOINTS_PATH = + key("high-availability.zookeeper.path.checkpoints") + .defaultValue("/checkpoints") + .withDeprecatedKeys("recovery.zookeeper.path.checkpoints"); + + /** ZooKeeper root path (ZNode) for checkpoint counters. */ + public static final ConfigOption HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = + key("high-availability.zookeeper.path.checkpoint-counter") + .defaultValue("/checkpoint-counter") + .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter"); + + /** ZooKeeper root path (ZNode) for Mesos workers. */ + @PublicEvolving + public static final ConfigOption HA_ZOOKEEPER_MESOS_WORKERS_PATH = + key("high-availability.zookeeper.path.mesos-workers") + .defaultValue("/mesos-workers") + .withDeprecatedKeys("recovery.zookeeper.path.mesos-workers"); + // ------------------------------------------------------------------------ // ZooKeeper Client Settings // ------------------------------------------------------------------------ @@ -128,6 +168,10 @@ public class HighAvailabilityOptions { key("high-availability.zookeeper.path.running-registry") .defaultValue("/running_job_registry/"); + public static final ConfigOption ZOOKEEPER_CLIENT_ACL = + key("high-availability.zookeeper.client.acl") + .defaultValue("open"); + // ------------------------------------------------------------------------ /** Not intended to be instantiated */ diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java index a28020af89a..370a760465a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java @@ -18,11 +18,10 @@ package org.apache.flink.mesos.runtime.clusterframework.services; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory; -import org.apache.flink.util.ConfigurationUtil; /** * Utilities for the {@link MesosServices}. @@ -44,11 +43,8 @@ public class MesosServicesUtils { return new StandaloneMesosServices(); case ZOOKEEPER: - final String zkMesosRootPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH, - ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH); + final String zkMesosRootPath = configuration.getString( + HighAvailabilityOptions.HA_ZOOKEEPER_MESOS_WORKERS_PATH); ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory( configuration, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 9ade5ece532..a7ac500a1d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -25,10 +25,10 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter; @@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper; -import org.apache.flink.util.ConfigurationUtil; import org.apache.flink.util.Preconditions; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -87,8 +86,7 @@ public class ZooKeeperUtils { String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); - boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, - ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE); + boolean disableSaslClient = configuration.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE); ACLProvider aclProvider; @@ -96,7 +94,7 @@ public class ZooKeeperUtils { if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) { String errorMessage = "Cannot set ACL role to " + aclMode +" since SASL authentication is " + - "disabled through the " + ConfigConstants.ZOOKEEPER_SASL_DISABLE + " property"; + "disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() + " property"; LOG.warn(errorMessage); throw new IllegalConfigurationException(errorMessage); } @@ -185,11 +183,8 @@ public class ZooKeeperUtils { final Configuration configuration, final String pathSuffix) { - String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, - ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix; + String leaderPath = configuration.getString( + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderRetrievalService(client, leaderPath); } @@ -221,16 +216,10 @@ public class ZooKeeperUtils { final Configuration configuration, final String pathSuffix) { - final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH, - ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix; - final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH, - ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix; + final String latchPath = configuration.getString( + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix; + final String leaderPath = configuration.getString( + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); } @@ -254,11 +243,7 @@ public class ZooKeeperUtils { RetrievableStateStorageHelper stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph"); // ZooKeeper submitted jobs root dir - String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH, - ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH); + String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH); return new ZooKeeperSubmittedJobGraphStore( client, zooKeeperSubmittedJobsPath, stateStorage, executor); @@ -284,11 +269,8 @@ public class ZooKeeperUtils { checkNotNull(configuration, "Configuration"); - String checkpointsPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH, - ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH); + String checkpointsPath = configuration.getString( + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH); RetrievableStateStorageHelper stateStorage = createFileSystemStateStorage( configuration, @@ -317,11 +299,8 @@ public class ZooKeeperUtils { Configuration configuration, JobID jobId) { - String checkpointIdCounterPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH, - ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH); + String checkpointIdCounterPath = configuration.getString( + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); @@ -391,11 +370,11 @@ public class ZooKeeperUtils { * Return the configured {@link ZkClientACLMode}. * * @param config The config to parse - * @return Configured ACL mode or {@link ConfigConstants#DEFAULT_HA_ZOOKEEPER_CLIENT_ACL} if not + * @return Configured ACL mode or the default defined by {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL} if not * configured. */ public static ZkClientACLMode fromConfig(Configuration config) { - String aclMode = config.getString(ConfigConstants.HA_ZOOKEEPER_CLIENT_ACL, null); + String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL); if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) { return ZkClientACLMode.OPEN; } else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 6efd2700ee8..73cf0630163 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -298,7 +298,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { final String FAULTY_CONTENDER_URL = "faultyContender"; final String leaderPath = "/leader"; - configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath); ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -463,8 +463,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { testingContender = new TestingContender(TEST_URL, leaderElectionService); listener = new TestingListener(); - final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH); + final String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH); cache = new NodeCache(client2, leaderPath); ExistsCacheListener existsListener = new ExistsCacheListener(cache); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 80b8e188041..3f2eea333f3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -392,8 +393,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { // ZooKeeper String currentJobsPath = config.getString( - ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH); Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); @@ -424,8 +424,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { // ZooKeeper String currentJobsPath = config.getString( - ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, - ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH); Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index f15314aca0b..73279da1a44 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -66,7 +66,7 @@ import java.util.Properties; import java.util.concurrent.Callable; import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; -import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY; +import static org.apache.flink.configuration.HighAvailabilityOptions.HA_ZOOKEEPER_NAMESPACE; /** * Class handling the command line interface to the YARN session. @@ -597,9 +597,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? cmd.getOptionValue(zookeeperNamespace.getOpt()) : yarnDescriptor.getFlinkConfiguration() - .getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(applicationId.getOpt())); + .getString(HA_ZOOKEEPER_NAMESPACE, cmd.getOptionValue(applicationId.getOpt())); LOG.info("Going to use the ZK namespace: {}", zkNamespace); - yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); + yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE, zkNamespace); try { yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); -- GitLab