[hotfix] Remove explicit YarnClusterDescriptor.zookeeperNamespace

The YarnClusterDescriptor.zookeeperNamespace has been replaced by the configuration which
is deployed together with the Yarn cluster. Hence, it is no longer needed.
上级 1c719c92
...@@ -153,8 +153,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { ...@@ -153,8 +153,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
private final String applicationType; private final String applicationType;
private String zookeeperNamespace;
private YarnConfigOptions.UserJarInclusion userJarInclusion; private YarnConfigOptions.UserJarInclusion userJarInclusion;
public YarnClusterDescriptor( public YarnClusterDescriptor(
...@@ -183,10 +181,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { ...@@ -183,10 +181,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME); this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE); this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
// we want to ignore the default value at this point.
this.zookeeperNamespace =
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
} }
private Optional<List<File>> decodeFilesToShipToCluster( private Optional<List<File>> decodeFilesToShipToCluster(
...@@ -358,14 +352,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { ...@@ -358,14 +352,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
} }
} }
public String getZookeeperNamespace() {
return zookeeperNamespace;
}
private void setZookeeperNamespace(String zookeeperNamespace) {
this.zookeeperNamespace = zookeeperNamespace;
}
public String getNodeLabel() { public String getNodeLabel() {
return nodeLabel; return nodeLabel;
} }
...@@ -824,17 +810,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { ...@@ -824,17 +810,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
final ApplicationId appId = appContext.getApplicationId(); final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------ // ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
String zkNamespace = getZookeeperNamespace(); setHAClusterIdIfNotSet(configuration, appId);
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
zkNamespace =
configuration.getString(
HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) { if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications // activate re-execution of failed applications
...@@ -1133,7 +1109,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { ...@@ -1133,7 +1109,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
YarnConfigKeys.ENV_CLIENT_SHIP_FILES, YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
encodeYarnLocalResourceDescriptorListToString( encodeYarnLocalResourceDescriptorListToString(
fileUploader.getEnvShipResourceList())); fileUploader.getEnvShipResourceList()));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put( appMasterEnv.put(
YarnConfigKeys.FLINK_YARN_FILES, YarnConfigKeys.FLINK_YARN_FILES,
fileUploader.getApplicationDir().toUri().toString()); fileUploader.getApplicationDir().toUri().toString());
...@@ -1781,9 +1756,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { ...@@ -1781,9 +1756,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId)); flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));
setHAClusterIdIfNotSet(flinkConfiguration, appId);
}
private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) {
// set cluster-id to app id if not specified // set cluster-id to app id if not specified
if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) { if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
flinkConfiguration.set( configuration.set(
HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId)); HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
} }
} }
......
...@@ -40,7 +40,6 @@ public class YarnConfigKeys { ...@@ -40,7 +40,6 @@ public class YarnConfigKeys {
public static final String LOCAL_KEYTAB_PATH = "_LOCAL_KEYTAB_PATH"; public static final String LOCAL_KEYTAB_PATH = "_LOCAL_KEYTAB_PATH";
public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL"; public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME"; public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";
public static final String ENV_KRB5_PATH = "_KRB5_PATH"; public static final String ENV_KRB5_PATH = "_KRB5_PATH";
public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH"; public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH";
......
...@@ -21,7 +21,6 @@ package org.apache.flink.yarn.entrypoint; ...@@ -21,7 +21,6 @@ package org.apache.flink.yarn.entrypoint;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.RestOptions;
...@@ -58,8 +57,6 @@ public class YarnEntrypointUtils { ...@@ -58,8 +57,6 @@ public class YarnEntrypointUtils {
final String keytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); final String keytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
final String zooKeeperNamespace = env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key()); final String hostname = env.get(ApplicationConstants.Environment.NM_HOST.key());
Preconditions.checkState( Preconditions.checkState(
hostname != null, hostname != null,
...@@ -69,10 +66,6 @@ public class YarnEntrypointUtils { ...@@ -69,10 +66,6 @@ public class YarnEntrypointUtils {
configuration.setString(JobManagerOptions.ADDRESS, hostname); configuration.setString(JobManagerOptions.ADDRESS, hostname);
configuration.setString(RestOptions.ADDRESS, hostname); configuration.setString(RestOptions.ADDRESS, hostname);
if (zooKeeperNamespace != null) {
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zooKeeperNamespace);
}
// if a web monitor shall be started, set the port to random binding // if a web monitor shall be started, set the port to random binding
if (configuration.getInteger(WebOptions.PORT, 0) >= 0) { if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
configuration.setInteger(WebOptions.PORT, 0); configuration.setInteger(WebOptions.PORT, 0);
......
...@@ -160,11 +160,9 @@ public class FlinkYarnSessionCliTest extends TestLogger { ...@@ -160,11 +160,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
Configuration executorConfig = yarnCLI.toConfiguration(commandLine); Configuration executorConfig = yarnCLI.toConfiguration(commandLine);
ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
YarnClusterDescriptor descriptor =
(YarnClusterDescriptor) clientFactory.createClusterDescriptor(executorConfig);
assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace()); assertThat(
executorConfig.get(HighAvailabilityOptions.HA_CLUSTER_ID), is(zkNamespaceCliInput));
} }
@Test @Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册