From 24f33e75100b94a3c6b00bcf9c2945a7e1322909 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 7 Jan 2021 12:39:59 +0100 Subject: [PATCH] [hotfix] Remove explicit YarnClusterDescriptor.zookeeperNamespace The YarnClusterDescriptor.zookeeperNamespace has been replaced by the configuration which is deployed together with the Yarn cluster. Hence, it is no longer needed. --- .../flink/yarn/YarnClusterDescriptor.java | 35 ++++--------------- .../org/apache/flink/yarn/YarnConfigKeys.java | 1 - .../yarn/entrypoint/YarnEntrypointUtils.java | 7 ---- .../flink/yarn/FlinkYarnSessionCliTest.java | 6 ++-- 4 files changed, 9 insertions(+), 40 deletions(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index e2b373a82c0..f31cfc1c2d5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -153,8 +153,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor { private final String applicationType; - private String zookeeperNamespace; - private YarnConfigOptions.UserJarInclusion userJarInclusion; public YarnClusterDescriptor( @@ -183,10 +181,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor { this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME); this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE); this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); - - // we want to ignore the default value at this point. - this.zookeeperNamespace = - flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null); } private Optional> decodeFilesToShipToCluster( @@ -358,14 +352,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor { } } - public String getZookeeperNamespace() { - return zookeeperNamespace; - } - - private void setZookeeperNamespace(String zookeeperNamespace) { - this.zookeeperNamespace = zookeeperNamespace; - } - public String getNodeLabel() { return nodeLabel; } @@ -824,17 +810,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor { final ApplicationId appId = appContext.getApplicationId(); // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------ - String zkNamespace = getZookeeperNamespace(); - // no user specified cli argument for namespace? - if (zkNamespace == null || zkNamespace.isEmpty()) { - // namespace defined in config? else use applicationId as default. - zkNamespace = - configuration.getString( - HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId)); - setZookeeperNamespace(zkNamespace); - } - - configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); + setHAClusterIdIfNotSet(configuration, appId); if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) { // activate re-execution of failed applications @@ -1133,7 +1109,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor { YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString( fileUploader.getEnvShipResourceList())); - appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace()); appMasterEnv.put( YarnConfigKeys.FLINK_YARN_FILES, fileUploader.getApplicationDir().toUri().toString()); @@ -1781,9 +1756,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor { flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId)); + setHAClusterIdIfNotSet(flinkConfiguration, appId); + } + + private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) { // set cluster-id to app id if not specified - if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) { - flinkConfiguration.set( + if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) { + configuration.set( HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId)); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java index fa5c0f0a5ef..96bf808bf85 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java @@ -40,7 +40,6 @@ public class YarnConfigKeys { public static final String LOCAL_KEYTAB_PATH = "_LOCAL_KEYTAB_PATH"; public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL"; public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME"; - public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE"; public static final String ENV_KRB5_PATH = "_KRB5_PATH"; public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH"; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index a0136247a87..959459e9bd4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -21,7 +21,6 @@ package org.apache.flink.yarn.entrypoint; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.RestOptions; @@ -58,8 +57,6 @@ public class YarnEntrypointUtils { final String keytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE); - final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key()); Preconditions.checkState( hostname != null, @@ -69,10 +66,6 @@ public class YarnEntrypointUtils { configuration.setString(JobManagerOptions.ADDRESS, hostname); configuration.setString(RestOptions.ADDRESS, hostname); - if (zooKeeperNamespace != null) { - configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zooKeeperNamespace); - } - // if a web monitor shall be started, set the port to random binding if (configuration.getInteger(WebOptions.PORT, 0) >= 0) { configuration.setInteger(WebOptions.PORT, 0); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 9132cecd4c9..74c9f227adb 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -160,11 +160,9 @@ public class FlinkYarnSessionCliTest extends TestLogger { CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); Configuration executorConfig = yarnCLI.toConfiguration(commandLine); - ClusterClientFactory clientFactory = getClusterClientFactory(executorConfig); - YarnClusterDescriptor descriptor = - (YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig); - assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace()); + assertThat( + executorConfig.get(HighAvailabilityOptions.HA_CLUSTER_ID), is(zkNamespaceCliInput)); } @Test -- GitLab