From 7ab6837fde3adb588273ef6bb8f4f7a215fe9c03 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 1 Jul 2016 18:54:44 +0200 Subject: [PATCH] [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id This closes #2191 --- ...iFrontendYarnAddressConfigurationTest.java | 3 +- .../yarn/AbstractYarnClusterDescriptor.java | 38 ---------- .../flink/yarn/cli/FlinkYarnSessionCli.java | 70 ++++++++++--------- 3 files changed, 38 insertions(+), 73 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index a99c835a987..c3328a264e2 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -109,8 +109,7 @@ public class CliFrontendYarnAddressConfigurationTest { private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42); - private static final String validPropertiesFile = - "jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT; + private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID; private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33"; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 641182e4c7d..5d47b13fffe 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -22,7 +22,6 @@ import org.apache.flink.client.CliFrontend; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.RecoveryMode; @@ -302,43 +301,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor return yarnClient; } - /** - * Retrieves the Yarn application and cluster from the config - * @param config The config with entries to retrieve the cluster - * @return YarnClusterClient - * @deprecated This should be removed in the future - */ - public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) - throws UnsupportedOperationException { - String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - - if (jobManagerHost != null && jobManagerPort != -1) { - - YarnClient yarnClient = getYarnClient(); - final List applicationReports; - try { - applicationReports = yarnClient.getApplications(); - } catch (Exception e) { - throw new RuntimeException("Couldn't get Yarn application reports", e); - } - for (ApplicationReport report : applicationReports) { - if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { - LOG.info("Found application '{}' " + - "with JobManager host name '{}' and port '{}' from Yarn properties file.", - report.getApplicationId(), jobManagerHost, jobManagerPort); - return retrieve(report.getApplicationId().toString()); - } - } - - } - - LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'", - jobManagerHost, jobManagerPort); - - throw new IllegalConfigurationException("Could not resume Yarn cluster from config."); - } - @Override public YarnClusterClient retrieve(String applicationID) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 126f0f1fdcc..989bee4ab49 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -24,18 +24,18 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +47,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -75,7 +74,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine // YARN-session related constants private static final String YARN_PROPERTIES_FILE = ".yarn-properties-"; - private static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager"; + static final String YARN_APPLICATION_ID_KEY = "applicationID"; private static final String YARN_PROPERTIES_PARALLELISM = "parallelism"; private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; @@ -152,24 +151,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine /** - * Resumes from a Flink Yarn properties file + * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful * @param cmdLine The command-line parameters * @param flinkConfiguration The flink configuration - * @return True if the properties were loaded, false otherwise + * @return Yarn application id or null if none could be retrieved */ - private boolean resumeFromYarnProperties(CommandLine cmdLine, Configuration flinkConfiguration) { + private String loadYarnPropertiesFile(CommandLine cmdLine, Configuration flinkConfiguration) { String jobManagerOption = cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); if (jobManagerOption != null) { // don't resume from properties file if a JobManager has been specified - return false; + return null; } for (Option option : cmdLine.getOptions()) { if (ALL_OPTIONS.hasOption(option.getOpt())) { if (!option.getOpt().equals(DETACHED.getOpt())) { // don't resume from properties file if yarn options have been specified - return false; + return null; } } } @@ -177,7 +176,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine // load the YARN properties File propertiesFile = getYarnPropertiesLocation(flinkConfiguration); if (!propertiesFile.exists()) { - return false; + return null; } logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); @@ -192,6 +191,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine throw new RuntimeException("Cannot read the YARN properties file", e); } + // get the Yarn application id from the properties file + String applicationID = yarnProperties.getProperty(YARN_APPLICATION_ID_KEY); + if (applicationID == null) { + throw new IllegalConfigurationException("Yarn properties file found but doesn't contain a " + + "Yarn applicaiton id. Please delete the file at " + propertiesFile.getAbsolutePath()); + } + + try { + // try converting id to ApplicationId + ConverterUtils.toApplicationId(applicationID); + } + catch (Exception e) { + throw new RuntimeException("YARN properties contains an invalid entry for " + + "application id: " + applicationID, e); + } + + logAndSysout("Using Yarn application id from YARN properties " + applicationID); + // configure the default parallelism from YARN String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); if (propParallelism != null) { // maybe the property is not set @@ -207,22 +224,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine } } - // get the JobManager address from the YARN properties - String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); - InetSocketAddress jobManagerAddress; - if (address != null) { - try { - jobManagerAddress = ClientUtils.parseHostPortAddress(address); - // store address in config from where it is retrieved by the retrieval service - CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress); - } - catch (Exception e) { - throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e); - } - - logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); - } - // handle the YARN client's dynamic properties String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); Map dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); @@ -230,7 +231,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); } - return true; + return applicationID; } public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { @@ -449,7 +450,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt()); - return yarnJobManager || yarnAppId || resumeFromYarnProperties(commandLine, configuration); + return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null; } @Override @@ -481,10 +482,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine yarnDescriptor.setFlinkConfiguration(config); return yarnDescriptor.retrieve(applicationID); // then try to load from yarn properties - } else if (resumeFromYarnProperties(cmdLine, config)) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); - yarnDescriptor.setFlinkConfiguration(config); - return yarnDescriptor.retrieveFromConfig(config); + } else { + String applicationId = loadYarnPropertiesFile(cmdLine, config); + if (applicationId != null) { + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + yarnDescriptor.setFlinkConfiguration(config); + return yarnDescriptor.retrieve(applicationId); + } } throw new UnsupportedOperationException("Could not resume a Yarn cluster."); @@ -581,7 +585,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()); Properties yarnProps = new Properties(); - yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); + yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString()); if (yarnDescriptor.getTaskManagerSlots() != -1) { String parallelism = Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount()); -- GitLab