提交 7ab6837f 编写于 作者: M Maximilian Michels

[FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id

This closes #2191
上级 f722b737
......@@ -109,8 +109,7 @@ public class CliFrontendYarnAddressConfigurationTest {
private static final ApplicationId TEST_YARN_APPLICATION_ID =
ApplicationId.newInstance(System.currentTimeMillis(), 42);
private static final String validPropertiesFile =
"jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT;
private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;
private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
......
......@@ -22,7 +22,6 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
......@@ -302,43 +301,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return yarnClient;
}
/**
* Retrieves the Yarn application and cluster from the config
* @param config The config with entries to retrieve the cluster
* @return YarnClusterClient
* @deprecated This should be removed in the future
*/
public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config)
throws UnsupportedOperationException {
String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
if (jobManagerHost != null && jobManagerPort != -1) {
YarnClient yarnClient = getYarnClient();
final List<ApplicationReport> applicationReports;
try {
applicationReports = yarnClient.getApplications();
} catch (Exception e) {
throw new RuntimeException("Couldn't get Yarn application reports", e);
}
for (ApplicationReport report : applicationReports) {
if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) {
LOG.info("Found application '{}' " +
"with JobManager host name '{}' and port '{}' from Yarn properties file.",
report.getApplicationId(), jobManagerHost, jobManagerPort);
return retrieve(report.getApplicationId().toString());
}
}
}
LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'",
jobManagerHost, jobManagerPort);
throw new IllegalConfigurationException("Could not resume Yarn cluster from config.");
}
@Override
public YarnClusterClient retrieve(String applicationID) {
......
......@@ -24,18 +24,18 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,7 +47,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
......@@ -75,7 +74,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// YARN-session related constants
private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
private static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
static final String YARN_APPLICATION_ID_KEY = "applicationID";
private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
......@@ -152,24 +151,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
/**
* Resumes from a Flink Yarn properties file
* Tries to load a Flink Yarn properties file and returns the Yarn application id if successful
* @param cmdLine The command-line parameters
* @param flinkConfiguration The flink configuration
* @return True if the properties were loaded, false otherwise
* @return Yarn application id or null if none could be retrieved
*/
private boolean resumeFromYarnProperties(CommandLine cmdLine, Configuration flinkConfiguration) {
private String loadYarnPropertiesFile(CommandLine cmdLine, Configuration flinkConfiguration) {
String jobManagerOption = cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
if (jobManagerOption != null) {
// don't resume from properties file if a JobManager has been specified
return false;
return null;
}
for (Option option : cmdLine.getOptions()) {
if (ALL_OPTIONS.hasOption(option.getOpt())) {
if (!option.getOpt().equals(DETACHED.getOpt())) {
// don't resume from properties file if yarn options have been specified
return false;
return null;
}
}
}
......@@ -177,7 +176,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// load the YARN properties
File propertiesFile = getYarnPropertiesLocation(flinkConfiguration);
if (!propertiesFile.exists()) {
return false;
return null;
}
logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
......@@ -192,6 +191,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
throw new RuntimeException("Cannot read the YARN properties file", e);
}
// get the Yarn application id from the properties file
String applicationID = yarnProperties.getProperty(YARN_APPLICATION_ID_KEY);
if (applicationID == null) {
throw new IllegalConfigurationException("Yarn properties file found but doesn't contain a " +
"Yarn applicaiton id. Please delete the file at " + propertiesFile.getAbsolutePath());
}
try {
// try converting id to ApplicationId
ConverterUtils.toApplicationId(applicationID);
}
catch (Exception e) {
throw new RuntimeException("YARN properties contains an invalid entry for " +
"application id: " + applicationID, e);
}
logAndSysout("Using Yarn application id from YARN properties " + applicationID);
// configure the default parallelism from YARN
String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
if (propParallelism != null) { // maybe the property is not set
......@@ -207,22 +224,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
}
// get the JobManager address from the YARN properties
String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
InetSocketAddress jobManagerAddress;
if (address != null) {
try {
jobManagerAddress = ClientUtils.parseHostPortAddress(address);
// store address in config from where it is retrieved by the retrieval service
CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
}
catch (Exception e) {
throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e);
}
logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
}
// handle the YARN client's dynamic properties
String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
......@@ -230,7 +231,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
}
return true;
return applicationID;
}
public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
......@@ -449,7 +450,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
boolean yarnJobManager = ID.equals(jobManagerOption);
boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt());
return yarnJobManager || yarnAppId || resumeFromYarnProperties(commandLine, configuration);
return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null;
}
@Override
......@@ -481,10 +482,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieve(applicationID);
// then try to load from yarn properties
} else if (resumeFromYarnProperties(cmdLine, config)) {
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieveFromConfig(config);
} else {
String applicationId = loadYarnPropertiesFile(cmdLine, config);
if (applicationId != null) {
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieve(applicationId);
}
}
throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
......@@ -581,7 +585,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
Properties yarnProps = new Properties();
yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
if (yarnDescriptor.getTaskManagerSlots() != -1) {
String parallelism =
Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册