提交 f4ac8522 编写于 作者: M Maximilian Michels

[FLINK-3937] programmatic resuming of clusters

- integrates with and extends the refactoring of FLINK-3667
- enables to resume from Yarn properties or Yarn application id
- introduces additional StandaloneClusterDescriptor
- introduces DefaultCLI to get rid of standalone mode switches in CliFrontend
- various fixes and improvements
- remove legacy code from CliFrontend
- change activation code of CustomCommandLine interface
- use checked exceptions to signal supported operations
- remove all checked exceptions of type Exception
- fix logging and reduce verbosity of per-job clusters
- print 'id' argument in YarnSessionCli
- minor renaming of methods names
- improve documentation
- deprecate streaming option
- extend CliFrontendYarnAddressConfigurationTest
- move loading of custom CLIs to CliFrontend

This closes #2085
上级 875d4d23
......@@ -18,8 +18,7 @@
package org.apache.flink.client;
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
......@@ -30,6 +29,7 @@ import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
......@@ -39,7 +39,6 @@ import org.apache.flink.client.cli.StopOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -56,7 +55,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
......@@ -67,13 +65,12 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
......@@ -81,6 +78,8 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.text.SimpleDateFormat;
......@@ -89,6 +88,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -102,9 +102,11 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepo
*/
public class CliFrontend {
private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
// actions
public static final String ACTION_RUN = "run";
public static final String ACTION_INFO = "info";
private static final String ACTION_RUN = "run";
private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
......@@ -116,19 +118,24 @@ public class CliFrontend {
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
// --------------------------------------------------------------------------------------------
private static final List<CustomCommandLine> customCommandLine = new LinkedList<>();
static {
/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
customCommandLine.add(new DefaultCLI());
}
// --------------------------------------------------------------------------------------------
private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
private final Configuration config;
private final FiniteDuration clientTimeout;
private final FiniteDuration lookupTimeout;
private ActorSystem actorSystem;
/**
*
* @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded
......@@ -146,6 +153,8 @@ public class CliFrontend {
// load the configuration
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();
try {
......@@ -156,7 +165,6 @@ public class CliFrontend {
}
this.clientTimeout = AkkaUtils.getClientTimeout(config);
this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
}
......@@ -798,19 +806,20 @@ public class CliFrontend {
*
* @param options Command line options
*/
protected void updateConfig(CommandLineOptions options) {
if(options.getJobManagerAddress() != null){
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli()
.attachFlinkYarnClient(options.getCommandLine())
.getJobManagerAddress();
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(config, jobManagerAddress);
protected ClusterClient retrieveClient(CommandLineOptions options) {
CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
try {
ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
return client;
} catch (Exception e) {
LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
throw new IllegalConfigurationException("Couldn't retrieve client for cluster", e);
}
}
/**
* Retrieves the {@link ActorGateway} for the JobManager. The JobManager address is retrieved
* Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved
* from the provided {@link CommandLineOptions}.
*
* @param options CommandLineOptions specifying the JobManager URL
......@@ -818,92 +827,41 @@ public class CliFrontend {
* @throws Exception
*/
protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
// overwrite config values with given command line options
updateConfig(options);
// start an actor system if needed
if (this.actorSystem == null) {
LOG.info("Starting actor system to communicate with JobManager");
try {
scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
this.actorSystem = AkkaUtils.createActorSystem(
config,
new Some<scala.Tuple2<String, Object>>(systemEndpoint));
}
catch (Exception e) {
throw new IOException("Could not start actor system to communicate with JobManager", e);
}
LOG.info("Actor system successfully started");
}
LOG.info("Trying to lookup the JobManager gateway");
// Retrieve the ActorGateway from the LeaderRetrievalService
LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout);
return retrieveClient(options).getJobManagerGateway();
}
/**
* Retrieves a {@link ClusterClient} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
* Creates a {@link ClusterClient} object from the given command line options and other parameters.
* @param options Command line options
* @param programName Program name
* @throws Exception
*/
protected ClusterClient getClient(
CommandLineOptions options,
String programName)
throws Exception {
InetSocketAddress jobManagerAddress;
// try to get the JobManager address via command-line args
if (options.getJobManagerAddress() != null) {
String programName) throws Exception {
// Get the custom command-lines (e.g. Yarn/Mesos)
CustomCommandLine<?> activeCommandLine =
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
if (activeCommandLine != null) {
logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console");
// Default yarn application name to use, if nothing is specified on the command line
ClusterClient client;
try {
client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
logAndSysout("Cluster retrieved");
} catch (UnsupportedOperationException e) {
try {
String applicationName = "Flink Application: " + programName;
ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine());
client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
logAndSysout("Cluster started");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
} else {
// job manager address supplied on the command-line
LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress());
jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(config, jobManagerAddress);
return new StandaloneClusterClient(config);
}
// try to get the JobManager address via resuming of a cluster
} else {
for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) {
ClusterClient client = cli.retrieveCluster(config);
if (client != null) {
LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
return client;
}
} catch (UnsupportedOperationException e2) {
throw new IllegalConfigurationException(
"The JobManager address is neither provided at the command-line, " +
"nor configured in flink-conf.yaml.");
}
}
// read JobManager address from the config
if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
return new StandaloneClusterClient(config);
// We tried hard but couldn't find a JobManager address
} else {
throw new IllegalConfigurationException(
"The JobManager address is neither provided at the command-line, " +
"nor configured in flink-conf.yaml.");
}
logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
}
// --------------------------------------------------------------------------------------------
......@@ -917,7 +875,7 @@ public class CliFrontend {
* @return The return code for the process.
*/
private int handleArgException(Exception e) {
LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage()));
System.out.println(e.getMessage());
System.out.println();
......@@ -1039,14 +997,6 @@ public class CliFrontend {
}
}
public void shutdown() {
ActorSystem sys = this.actorSystem;
if (sys != null) {
this.actorSystem = null;
sys.shutdown();
}
}
/**
* Submits the job based on the arguments
*/
......@@ -1070,7 +1020,8 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
public static String getConfigurationDirectoryFromEnv() {
String location = System.getenv(ENV_CONFIG_DIRECTORY);
String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY);
if (location != null) {
if (new File(location).exists()) {
......@@ -1102,9 +1053,65 @@ public class CliFrontend {
* @param address Address to write to the configuration
* @param config The config to write to
*/
public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) {
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
}
// --------------------------------------------------------------------------------------------
// Custom command-line
// --------------------------------------------------------------------------------------------
/**
* Gets the custom command-line for the arguments.
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine cli : customCommandLine) {
if (cli.isActive(commandLine, config)) {
return cli;
}
}
throw new IllegalStateException("No command-line ran.");
}
/**
* Retrieves the loaded custom command-lines.
* @return An unmodifiyable list of loaded custom command-lines.
*/
public static List<CustomCommandLine> getCustomCommandLineList() {
return Collections.unmodifiableList(customCommandLine);
}
/**
* Loads a class from the classpath that implements the CustomCommandLine interface.
* @param className The fully-qualified class name to load.
* @param params The constructor parameters
*/
private static void loadCustomCommandLine(String className, Object... params) {
try {
Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);
// construct class types from the parameters
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}
Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
final CustomCommandLine cli = constructor.newInstance(params);
customCommandLine.add(cli);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
| InvocationTargetException e) {
LOG.warn("Unable to locate custom CLI class {}. " +
"Flink is not compiled with support for this class.", className, e);
}
}
}
......@@ -24,16 +24,10 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.util.Preconditions;
import org.apache.flink.client.CliFrontend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* A simple command line parser (based on Apache Commons CLI) that extracts command
......@@ -44,16 +38,6 @@ public class CliFrontendParser {
private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class);
/** command line interface of the YARN session, with a special initialization here
* to prefix all options with y/yarn. */
private static final Map<String, CustomCommandLine> customCommandLine = new HashMap<>(1);
static {
// we could easily add more here in the future
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
}
static final Option HELP_OPTION = new Option("h", "help", false,
"Show the help message for the CLI Frontend or the action.");
......@@ -82,9 +66,8 @@ public class CliFrontendParser {
static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
public static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
"Address of the JobManager (master) to which to connect. " +
"Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " +
"Use this flag to connect to a different JobManager than the one specified in the configuration.");
static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
......@@ -146,6 +129,10 @@ public class CliFrontendParser {
options.addOption(HELP_OPTION);
// backwards compatibility: ignore verbose flag (-v)
options.addOption(new Option("v", "verbose", false, "This option is deprecated."));
// add general options of all CLIs
for (CustomCommandLine customCLI : CliFrontend.getCustomCommandLineList()) {
customCLI.addGeneralOptions(options);
}
return options;
}
......@@ -158,11 +145,6 @@ public class CliFrontendParser {
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SAVEPOINT_PATH_OPTION);
for (CustomCommandLine customCLI : customCommandLine.values()) {
customCLI.addOptions(options);
}
return options;
}
......@@ -177,62 +159,85 @@ public class CliFrontendParser {
}
private static Options getRunOptions(Options options) {
Options o = getProgramSpecificOptions(options);
return getJobManagerAddressOption(o);
options = getProgramSpecificOptions(options);
options = getJobManagerAddressOption(options);
return addCustomCliOptions(options, true);
}
private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
return getJobManagerAddressOption(o);
}
private static Options getJobManagerAddressOption(Options options) {
options.addOption(ADDRESS_OPTION);
yarnSessionCLi.getYARNAttachCLIOptions(options);
return options;
}
private static Options getInfoOptions(Options options) {
options = getProgramSpecificOptions(options);
options = getJobManagerAddressOption(options);
return options;
return addCustomCliOptions(options, false);
}
private static Options getListOptions(Options options) {
options.addOption(RUNNING_OPTION);
options.addOption(SCHEDULED_OPTION);
options = getJobManagerAddressOption(options);
return addCustomCliOptions(options, false);
}
private static Options getCancelOptions(Options options) {
options = getJobManagerAddressOption(options);
return addCustomCliOptions(options, false);
}
private static Options getStopOptions(Options options) {
options = getJobManagerAddressOption(options);
return addCustomCliOptions(options, false);
}
private static Options getSavepointOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
return addCustomCliOptions(options, false);
}
// --------------------------------------------------------------------------------------------
// Help
// --------------------------------------------------------------------------------------------
private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
return getJobManagerAddressOption(o);
}
private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(CLASS_OPTION);
options.addOption(PARALLELISM_OPTION);
options = getJobManagerAddressOption(options);
return options;
}
private static Options getListOptions(Options options) {
private static Options getListOptionsWithoutDeprecatedOptions(Options options) {
options.addOption(RUNNING_OPTION);
options.addOption(SCHEDULED_OPTION);
options = getJobManagerAddressOption(options);
return options;
}
private static Options getCancelOptions(Options options) {
private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
return options;
}
private static Options getStopOptions(Options options) {
private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
return options;
}
private static Options getSavepointOptions(Options options) {
private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
return options;
}
// --------------------------------------------------------------------------------------------
// Help
// --------------------------------------------------------------------------------------------
/**
* Prints the help for the client.
*/
......@@ -261,14 +266,7 @@ public class CliFrontendParser {
formatter.setSyntaxPrefix(" \"run\" action options:");
formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options()));
// prints options from all available command-line classes
for (Map.Entry<String, CustomCommandLine> entry: customCommandLine.entrySet()) {
formatter.setSyntaxPrefix(" Additional arguments if -m " + entry.getKey() + " is set:");
Options customOpts = new Options();
entry.getValue().addOptions(customOpts);
formatter.printHelp(" ", customOpts);
System.out.println();
}
printCustomCliOptions(formatter, true);
System.out.println();
}
......@@ -282,10 +280,9 @@ public class CliFrontendParser {
System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"info\" action options:");
formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
Options yarnOpts = new Options();
yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
formatter.printHelp(" ", yarnOpts);
printCustomCliOptions(formatter, false);
System.out.println();
}
......@@ -297,7 +294,10 @@ public class CliFrontendParser {
System.out.println("\nAction \"list\" lists running and scheduled programs.");
System.out.println("\n Syntax: list [OPTIONS]");
formatter.setSyntaxPrefix(" \"list\" action options:");
formatter.printHelp(" ", getListOptions(new Options()));
formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(formatter, false);
System.out.println();
}
......@@ -309,7 +309,10 @@ public class CliFrontendParser {
System.out.println("\nAction \"stop\" stops a running program (streaming jobs only).");
System.out.println("\n Syntax: stop [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"stop\" action options:");
formatter.printHelp(" ", getStopOptions(new Options()));
formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(formatter, false);
System.out.println();
}
......@@ -321,11 +324,10 @@ public class CliFrontendParser {
System.out.println("\nAction \"cancel\" cancels a running program.");
System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"cancel\" action options:");
formatter.printHelp(" ", getCancelOptions(new Options()));
formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
Options yarnOpts = new Options();
yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
formatter.printHelp(" ", yarnOpts);
formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(formatter, false);
System.out.println();
}
......@@ -337,10 +339,50 @@ public class CliFrontendParser {
System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptions(new Options()));
formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options()));
printCustomCliOptions(formatter, false);
System.out.println();
}
/**
* Adds custom cli options
* @param options The options to add options to
* @param runOptions Whether to include run options
* @return Options with additions
*/
private static Options addCustomCliOptions(Options options, boolean runOptions) {
for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) {
cli.addGeneralOptions(options);
if (runOptions) {
cli.addRunOptions(options);
}
}
return options;
}
/**
* Prints custom cli options
* @param formatter The formatter to use for printing
* @param runOptions True if the run options should be printed, False to print only general options
*/
private static void printCustomCliOptions(HelpFormatter formatter, boolean runOptions) {
// prints options from all available command-line classes
for (CustomCommandLine cli: CliFrontend.getCustomCommandLineList()) {
if (cli.getId() != null) {
formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:");
Options customOpts = new Options();
cli.addGeneralOptions(customOpts);
if (runOptions) {
cli.addRunOptions(customOpts);
}
formatter.printHelp(" ", customOpts);
System.out.println();
}
}
}
// --------------------------------------------------------------------------------------------
// Line Parsing
// --------------------------------------------------------------------------------------------
......@@ -410,63 +452,4 @@ public class CliFrontendParser {
}
}
public static Map<String, CustomCommandLine> getAllCustomCommandLine() {
if (customCommandLine.isEmpty()) {
LOG.warn("No custom command-line classes were loaded.");
}
return Collections.unmodifiableMap(customCommandLine);
}
private static String getCliIdentifierString() {
StringBuilder builder = new StringBuilder();
boolean first = true;
for (String identifier : customCommandLine.keySet()) {
if (!first) {
builder.append(", ");
}
first = false;
builder.append("'").append(identifier).append("'");
}
return builder.toString();
}
/**
* Gets the custom command-line for this identifier.
* @param identifier The unique identifier for this command-line implementation.
* @return CustomCommandLine or null if none was found
*/
public static CustomCommandLine getActiveCustomCommandLine(String identifier) {
return CliFrontendParser.getAllCustomCommandLine().get(identifier);
}
private static void loadCustomCommandLine(String className, Object... params) {
try {
Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);
// construct class types from the parameters
Class<?>[] types = new Class<?>[params.length];
for (int i = 0; i < params.length; i++) {
Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
types[i] = params[i].getClass();
}
Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
final CustomCommandLine cli = constructor.newInstance(params);
String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier());
CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli);
if (existing != null) {
throw new IllegalStateException("Attempted to register " + cliIdentifier +
" but there is already a command-line with this identifier.");
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
| InvocationTargetException e) {
LOG.warn("Unable to locate custom CLI class {}. " +
"Flink is not compiled with support for this class.", className, e);
}
}
}
......@@ -29,29 +29,47 @@ import org.apache.flink.configuration.Configuration;
public interface CustomCommandLine<ClusterType extends ClusterClient> {
/**
* Returns a unique identifier for this custom command-line.
* @return An unique identifier string
* Signals whether the custom command-line wants to execute or not
* @param commandLine The command-line options
* @param configuration The Flink configuration
* @return True if the command-line wants to run, False otherwise
*/
String getIdentifier();
boolean isActive(CommandLine commandLine, Configuration configuration);
/**
* Adds custom options to the existing options.
* Gets the unique identifier of this CustomCommandLine
* @return A unique identifier
*/
String getId();
/**
* Adds custom options to the existing run options.
* @param baseOptions The existing options.
*/
void addRunOptions(Options baseOptions);
/**
* Adds custom options to the existing general options.
* @param baseOptions The existing options.
*/
void addOptions(Options baseOptions);
void addGeneralOptions(Options baseOptions);
/**
* Retrieves a client for a running cluster
* @param commandLine The command-line parameters from the CliFrontend
* @param config The Flink config
* @return Client if a cluster could be retrieve, null otherwise
* @return Client if a cluster could be retrieved
* @throws UnsupportedOperationException if the operation is not supported
*/
ClusterClient retrieveCluster(Configuration config) throws Exception;
ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
/**
* Creates the client for the cluster
* @param applicationName The application name to use
* @param commandLine The command-line options parsed by the CliFrontend
* @param config The Flink config to use
* @return The client to communicate with the cluster which the CustomCommandLine brought up.
* @throws UnsupportedOperationException if the operation is not supported
*/
ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception;
ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config) throws UnsupportedOperationException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import java.net.InetSocketAddress;
import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
/**
* The default CLI which is used for interaction with standalone clusters.
*/
public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
@Override
public boolean isActive(CommandLine commandLine, Configuration configuration) {
// always active because we can try to read a JobManager address from the config
return true;
}
@Override
public String getId() {
return null;
}
@Override
public void addRunOptions(Options baseOptions) {
}
@Override
public void addGeneralOptions(Options baseOptions) {
}
@Override
public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configuration config) {
if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort);
setJobManagerAddressInConfig(config, jobManagerAddress);
}
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
return descriptor.retrieve(null);
}
@Override
public StandaloneClusterClient createCluster(
String applicationName,
CommandLine commandLine,
Configuration config) throws UnsupportedOperationException {
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
return descriptor.deploy();
}
}
......@@ -30,12 +30,20 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
* Returns a String containing details about the cluster (NodeManagers, available memory, ...)
*
*/
String getClusterDescription() throws Exception;
String getClusterDescription();
/**
* Retrieves an existing Flink Cluster.
* @param applicationID The unique application identifier of the running cluster
* @return Client for the cluster
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
ClientType retrieve(String applicationID) throws UnsupportedOperationException;
/**
* Triggers deployment of a cluster
* @return Client for the cluster
* @throws Exception
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
ClientType deploy() throws Exception;
ClientType deploy() throws UnsupportedOperationException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.deployment;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
/**
* A deployment descriptor for an existing cluster
*/
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
private final Configuration config;
public StandaloneClusterDescriptor(Configuration config) {
this.config = config;
}
@Override
public String getClusterDescription() {
String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "");
int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
return "Standalone cluster at " + host + ":" + port;
}
@Override
public StandaloneClusterClient retrieve(String applicationID) {
try {
return new StandaloneClusterClient(config);
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve standalone cluster", e);
}
}
@Override
public StandaloneClusterClient deploy() {
throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
}
}
......@@ -76,7 +76,7 @@ import akka.actor.ActorSystem;
*/
public abstract class ClusterClient {
private static final Logger LOG = LoggerFactory.getLogger(ClusterClient.class);
private final Logger LOG = LoggerFactory.getLogger(getClass());
/** The optimizer used in the optimization of batch programs */
final Optimizer compiler;
......@@ -203,9 +203,9 @@ public abstract class ClusterClient {
*/
public InetSocketAddress getJobManagerAddressFromConfig() {
try {
String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
return new InetSocketAddress(hostName, port);
String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
return new InetSocketAddress(hostName, port);
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve JobManager address", e);
}
......@@ -255,11 +255,13 @@ public abstract class ClusterClient {
}
public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
Logger log = LoggerFactory.getLogger(ClusterClient.class);
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
return compiler.compile(p);
}
......@@ -603,7 +605,7 @@ public abstract class ClusterClient {
* @return ActorGateway of the current job manager leader
* @throws Exception
*/
protected ActorGateway getJobManagerGateway() throws Exception {
public ActorGateway getJobManagerGateway() throws Exception {
LOG.info("Looking up JobManager");
return LeaderRetrievalUtils.retrieveLeaderGateway(
......
......@@ -23,9 +23,13 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
......@@ -57,14 +61,12 @@ public class CliFrontendAddressConfigurationTest {
public void testValidConfig() {
try {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
CommandLineOptions options = mock(CommandLineOptions.class);
frontend.updateConfig(options);
Configuration config = frontend.getConfiguration();
ClusterClient clusterClient = frontend.retrieveClient(options);
checkJobManagerAddress(
config,
clusterClient.getFlinkConfiguration(),
CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
}
......@@ -74,43 +76,12 @@ public class CliFrontendAddressConfigurationTest {
}
}
@Test
public void testInvalidConfigAndNoOption() {
try {
@Test(expected = IllegalConfigurationException.class)
public void testInvalidConfigAndNoOption() throws Exception {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
CommandLineOptions options = mock(CommandLineOptions.class);
frontend.updateConfig(options);
Configuration config = frontend.getConfiguration();
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
checkJobManagerAddress(config, null, -1);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testInvalidConfigAndOption() {
try {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
CommandLineOptions options = mock(CommandLineOptions.class);
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
frontend.updateConfig(options);
Configuration config = frontend.getConfiguration();
InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
frontend.retrieveClient(options);
}
@Test
......@@ -118,12 +89,10 @@ public class CliFrontendAddressConfigurationTest {
try {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
CommandLineOptions options = mock(CommandLineOptions.class);
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
frontend.updateConfig(options);
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
Configuration config = frontend.getConfiguration();
ClusterClient client = frontend.retrieveClient(options);
Configuration config = client.getFlinkConfiguration();
InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.scala
import java.io._
import org.apache.commons.cli.CommandLine
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.client.CliFrontend
......@@ -245,11 +246,13 @@ object FlinkShell {
yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString))
yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString))
val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
val options = CliFrontendParser.parseRunCommand(args.toArray)
val frontend = new CliFrontend()
val config = frontend.getConfiguration
val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
val cluster = customCLI.createClient("Flink Scala Shell", options.getCommandLine)
val cluster = customCLI.createCluster("Flink Scala Shell", options.getCommandLine, config)
val address = cluster.getJobManagerAddress.getAddress.getHostAddress
val port = cluster.getJobManagerAddress.getPort
......@@ -259,12 +262,21 @@ object FlinkShell {
def fetchDeployedYarnClusterInfo() = {
// load configuration
val globalConfig = GlobalConfiguration.getConfiguration
val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
val args = ArrayBuffer[String](
"-m", "yarn-cluster"
)
val cluster = customCLI.retrieveCluster(globalConfig)
val options = CliFrontendParser.parseRunCommand(args.toArray)
val frontend = new CliFrontend()
val config = frontend.getConfiguration
val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
val cluster = customCLI.retrieveCluster(options.getCommandLine, config)
if (cluster == null) {
throw new RuntimeException("Yarn Cluster could not be retrieved.")
}
val jobManager = cluster.getJobManagerAddress
......
......@@ -18,27 +18,45 @@
package org.apache.flink.yarn;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.junit.*;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests that verify that the CLI client picks up the correct address for the JobManager
......@@ -80,8 +98,10 @@ public class CliFrontendYarnAddressConfigurationTest {
private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
private static final ApplicationId TEST_YARN_APPLICATION_ID =
ApplicationId.newInstance(System.currentTimeMillis(), 42);
private static final String propertiesFile =
private static final String validPropertiesFile =
"jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT;
......@@ -101,110 +121,292 @@ public class CliFrontendYarnAddressConfigurationTest {
* Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
*/
@Test
public void testYarnConfig() {
try {
File tmpFolder = temporaryFolder.newFolder();
String currentUser = System.getProperty("user.name");
public void testResumeFromYarnPropertiesFile() throws Exception {
// copy .yarn-properties-<username>
File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
Files.write(testPropertiesFile.toPath(), propertiesFile.getBytes(), StandardOpenOption.CREATE);
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
// start CLI Frontend
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
// start CLI Frontend
TestCLI frontend = new TestCLI(tmpFolder.getAbsolutePath());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
CommandLineOptions options = mock(CommandLineOptions.class);
frontend.retrieveClient(options);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
frontend.getClient(options, "Program name");
}
frontend.updateConfig(options);
Configuration config = frontend.getConfiguration();
@Test(expected = IllegalConfigurationException.class)
public void testResumeFromYarnPropertiesFileWithFinishedApplication() throws Exception {
checkJobManagerAddress(
config,
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
// start CLI Frontend
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
frontend.retrieveClient(options);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
}
public static class TestCLI extends CliFrontend {
TestCLI(String configDir) throws Exception {
super(configDir);
}
@Override
public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
return super.getClient(options, programName);
}
@Test(expected = IllegalConfigurationException.class)
public void testInvalidYarnPropertiesFile() throws Exception {
@Override
public void updateConfig(CommandLineOptions options) {
super.updateConfig(options);
}
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
frontend.retrieveClient(options);
Configuration config = frontend.getConfiguration();
checkJobManagerAddress(
config,
TEST_JOB_MANAGER_ADDRESS,
TEST_JOB_MANAGER_PORT);
}
@Test
public void testInvalidYarnConfig() {
try {
File tmpFolder = temporaryFolder.newFolder();
public void testResumeFromYarnID() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
// copy invalid .yarn-properties-<username>
File testPropertiesFile = new File(tmpFolder, ".yarn-properties");
Files.write(testPropertiesFile.toPath(), invalidPropertiesFile.getBytes(), StandardOpenOption.CREATE);
// start CLI Frontend
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
TestCLI cli = new TestCLI(tmpFolder.getAbsolutePath());
frontend.retrieveClient(options);
CommandLineOptions options = mock(CommandLineOptions.class);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
}
cli.updateConfig(options);
@Test(expected = IllegalConfigurationException.class)
public void testResumeFromInvalidYarnID() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
Configuration config = cli.getConfiguration();
// start CLI Frontend
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
checkJobManagerAddress(
config,
TEST_JOB_MANAGER_ADDRESS,
TEST_JOB_MANAGER_PORT);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", ApplicationId.newInstance(0, 666).toString()});
frontend.retrieveClient(options);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
}
@Test(expected = IllegalConfigurationException.class)
public void testResumeFromYarnIDWithFinishedApplication() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
// start CLI Frontend
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
frontend.retrieveClient(options);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
}
@Test
public void testManualOptionsOverridesYarn() {
try {
File emptyFolder = temporaryFolder.newFolder();
TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
// start CLI Frontend
TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
frontend.retrieveClient(options);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
TEST_YARN_JOB_MANAGER_PORT);
}
@Test
public void testManualOptionsOverridesYarn() throws Exception {
File emptyFolder = temporaryFolder.newFolder();
File testConfFile = new File(emptyFolder.getAbsolutePath(), "flink-conf.yaml");
Files.createFile(testConfFile.toPath());
CommandLineOptions options = mock(CommandLineOptions.class);
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
frontend.updateConfig(options);
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
Configuration config = frontend.getConfiguration();
frontend.retrieveClient(options);
InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
Configuration config = frontend.getConfiguration();
checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788);
checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort());
}
///////////
// Utils //
///////////
private File writeYarnPropertiesFile(String contents) throws IOException {
File tmpFolder = temporaryFolder.newFolder();
String currentUser = System.getProperty("user.name");
// copy .yarn-properties-<username>
File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
Files.write(testPropertiesFile.toPath(), contents.getBytes(), StandardOpenOption.CREATE);
// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder;
File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE);
return tmpFolder.getAbsoluteFile();
}
private static class TestCLI extends CliFrontend {
TestCLI(String configDir) throws Exception {
super(configDir);
}
@Override
// make method public
public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
return super.getClient(options, programName);
}
@Override
// make method public
public ClusterClient retrieveClient(CommandLineOptions options) {
return super.retrieveClient(options);
}
}
/**
* Injects an extended FlinkYarnSessionCli that deals with mocking Yarn communication
*/
private static class CustomYarnTestCLI extends TestCLI {
// the default application status for yarn applications to be retrieved
private final FinalApplicationStatus finalApplicationStatus;
CustomYarnTestCLI(String configDir) throws Exception {
this(configDir, FinalApplicationStatus.UNDEFINED);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
CustomYarnTestCLI(String configDir, FinalApplicationStatus finalApplicationStatus) throws Exception {
super(configDir);
this.finalApplicationStatus = finalApplicationStatus;
}
@Override
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
// inject the testing FlinkYarnSessionCli
return new TestingYarnSessionCli();
}
/**
* Testing FlinkYarnSessionCli which returns a modified cluster descriptor for testing.
*/
private class TestingYarnSessionCli extends FlinkYarnSessionCli {
TestingYarnSessionCli() {
super("y", "yarn");
}
@Override
// override cluster descriptor to replace the YarnClient
protected AbstractYarnClusterDescriptor getClusterDescriptor() {
return new TestingYarnClusterDescriptor();
}
/**
* Replace the YarnClient for this test.
*/
private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
@Override
protected YarnClient getYarnClient() {
return new TestYarnClient();
}
@Override
protected YarnClusterClient createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
YarnClient yarnClient,
ApplicationReport report,
Configuration flinkConfiguration,
Path sessionFilesDir,
boolean perJobCluster) throws IOException, YarnException {
return Mockito.mock(YarnClusterClient.class);
}
private class TestYarnClient extends YarnClientImpl {
private final List<ApplicationReport> reports = new LinkedList<>();
TestYarnClient() {
{ // a report that of our Yarn application we want to resume from
ApplicationReport report = Mockito.mock(ApplicationReport.class);
Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
this.reports.add(report);
}
{ // a second report, just for noise
ApplicationReport report = Mockito.mock(ApplicationReport.class);
Mockito.when(report.getHost()).thenReturn("1.2.3.4");
Mockito.when(report.getRpcPort()).thenReturn(-123);
Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
this.reports.add(report);
}
}
@Override
public List<ApplicationReport> getApplications() throws YarnException, IOException {
return reports;
}
@Override
public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
for (ApplicationReport report : reports) {
if (report.getApplicationId().equals(appId)) {
return report;
}
}
throw new YarnException();
}
}
}
}
}
......
......@@ -23,7 +23,6 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
......@@ -37,8 +36,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
public class FlinkYarnSessionCliTest {
@Rule
......@@ -53,9 +50,10 @@ public class FlinkYarnSessionCliTest {
fakeConf.createNewFile();
map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
Options options = new Options();
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
cli.addOptions(options);
Options options = new Options();
cli.addGeneralOptions(options);
cli.addRunOptions(options);
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
......@@ -66,7 +64,7 @@ public class FlinkYarnSessionCliTest {
Assert.fail("Parsing failed with " + e.getMessage());
}
YarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
Assert.assertNotNull(flinkYarnDescriptor);
......
......@@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
......@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
......@@ -73,18 +75,8 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_
import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
/**
* All classes in this package contain code taken from
* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
* and
* https://github.com/hortonworks/simple-yarn-app
* and
* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
*
* The Flink jar is uploaded to HDFS by this client.
* The application master and all the TaskManager containers get the jar file downloaded
* by YARN into their local fs.
*
*/
* The descriptor with deployment information for spwaning or resuming a {@link YarnClusterClient}.
*/
public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
......@@ -132,7 +124,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private boolean detached;
private String customName = null;
private String customName;
public AbstractYarnClusterDescriptor() {
// for unit tests only
......@@ -321,49 +314,112 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
* Gets a Hadoop Yarn client
* @return Returns a YarnClient which has to be shutdown manually
*/
public static YarnClient getYarnClient(Configuration conf) {
protected YarnClient getYarnClient() {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
return yarnClient;
}
@Override
public YarnClusterClient deploy() throws Exception {
/**
* Retrieves the Yarn application and cluster from the config
* @param config The config with entries to retrieve the cluster
* @return YarnClusterClient
* @deprecated This should be removed in the future
*/
public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config)
throws UnsupportedOperationException {
String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (jobManagerHost != null && jobManagerPort != -1) {
if (UserGroupInformation.isSecurityEnabled()) {
if (!ugi.hasKerberosCredentials()) {
throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
"You may use kinit to authenticate and request a TGT from the Kerberos server.");
YarnClient yarnClient = getYarnClient();
final List<ApplicationReport> applicationReports;
try {
applicationReports = yarnClient.getApplications();
} catch (Exception e) {
throw new RuntimeException("Couldn't get Yarn application reports", e);
}
return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
@Override
public YarnClusterClient run() throws Exception {
return deployInternal();
for (ApplicationReport report : applicationReports) {
if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) {
LOG.info("Found application '{}' " +
"with JobManager host name '{}' and port '{}' from Yarn properties file.",
report.getApplicationId(), jobManagerHost, jobManagerPort);
return retrieve(report.getApplicationId().toString());
}
});
} else {
return deployInternal();
}
}
LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'",
jobManagerHost, jobManagerPort);
throw new IllegalConfigurationException("Could not resume Yarn cluster from config.");
}
@Override
public AbstractFlinkYarnCluster attach(String appId) throws Exception {
// check if required Hadoop environment variables are set. If not, warn user
if(System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
public YarnClusterClient retrieve(String applicationID) {
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
}
final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
final YarnClient yarnClient = getYarnClient();
final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
// Flink cluster is not running anymore
LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}",
applicationID, appReport.getFinalApplicationStatus());
throw new RuntimeException("The Yarn application " + applicationID + " doesn't run anymore.");
}
LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
appReport.getHost(), appReport.getRpcPort(), applicationID);
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false);
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
}
final ApplicationId yarnAppId = ConverterUtils.toApplicationId(appId);
@Override
public YarnClusterClient deploy() {
return new FlinkYarnCluster(yarnClient, yarnAppId, conf, flinkConfiguration, sessionFilesDir, detached);
try {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
if (!ugi.hasKerberosCredentials()) {
throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
"You may use kinit to authenticate and request a TGT from the Kerberos server.");
}
return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() {
@Override
public YarnClusterClient run() throws Exception {
return deployInternal();
}
});
} else {
return deployInternal();
}
} catch (Exception e) {
throw new RuntimeException("Couldn't deploy Yarn cluster", e);
}
}
/**
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
......@@ -377,7 +433,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
// Create application via yarnClient
final YarnClient yarnClient = getYarnClient(conf);
final YarnClient yarnClient = getYarnClient();
final YarnClientApplication yarnApplication = yarnClient.createApplication();
GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
......@@ -726,7 +782,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
// the Flink cluster is deployed in YARN. Represent cluster
return new YarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir);
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
}
/**
......@@ -780,40 +836,44 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
@Override
public String getClusterDescription() throws Exception {
public String getClusterDescription() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
YarnClient yarnClient = getYarnClient(conf);
YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
YarnClient yarnClient = getYarnClient();
YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
final String format = "|%-16s |%-16s %n";
ps.printf("|Property |Value %n");
ps.println("+---------------------------------------+");
int totalMemory = 0;
int totalCores = 0;
for(NodeReport rep : nodes) {
final Resource res = rep.getCapability();
totalMemory += res.getMemory();
totalCores += res.getVirtualCores();
ps.format(format, "NodeID", rep.getNodeId());
ps.format(format, "Memory", res.getMemory() + " MB");
ps.format(format, "vCores", res.getVirtualCores());
ps.format(format, "HealthReport", rep.getHealthReport());
ps.format(format, "Containers", rep.getNumContainers());
ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
final String format = "|%-16s |%-16s %n";
ps.printf("|Property |Value %n");
ps.println("+---------------------------------------+");
int totalMemory = 0;
int totalCores = 0;
for (NodeReport rep : nodes) {
final Resource res = rep.getCapability();
totalMemory += res.getMemory();
totalCores += res.getVirtualCores();
ps.format(format, "NodeID", rep.getNodeId());
ps.format(format, "Memory", res.getMemory() + " MB");
ps.format(format, "vCores", res.getVirtualCores());
ps.format(format, "HealthReport", rep.getHealthReport());
ps.format(format, "Containers", rep.getNumContainers());
ps.println("+---------------------------------------+");
}
ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
List<QueueInfo> qInfo = yarnClient.getAllQueues();
for (QueueInfo q : qInfo) {
ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
}
yarnClient.stop();
return baos.toString();
} catch (Exception e) {
throw new RuntimeException("Couldn't get cluster description", e);
}
ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
List<QueueInfo> qInfo = yarnClient.getAllQueues();
for(QueueInfo q : qInfo) {
ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
}
yarnClient.stop();
return baos.toString();
}
public String getSessionFilesDir() {
......@@ -918,9 +978,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private static class YarnDeploymentException extends RuntimeException {
private static final long serialVersionUID = -812040641215388943L;
public YarnDeploymentException() {
}
public YarnDeploymentException(String message) {
super(message);
}
......@@ -954,5 +1011,24 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
}
/**
* Creates a YarnClusterClient; may be overriden in tests
*/
protected YarnClusterClient createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
YarnClient yarnClient,
ApplicationReport report,
org.apache.flink.configuration.Configuration flinkConfiguration,
Path sessionFilesDir,
boolean perJobCluster) throws IOException, YarnException {
return new YarnClusterClient(
descriptor,
yarnClient,
report,
flinkConfiguration,
sessionFilesDir,
perJobCluster);
}
}
......@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
......@@ -55,6 +56,7 @@ import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
......@@ -77,9 +79,6 @@ public class YarnClusterClient extends ClusterClient {
// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
private final Path sessionFilesDir;
/** The leader retrieval service for connecting to the cluster and finding the active leader. */
private final LeaderRetrievalService leaderRetrievalService;
//---------- Class internal fields -------------------
private final AbstractYarnClusterDescriptor clusterDescriptor;
......@@ -92,6 +91,7 @@ public class YarnClusterClient extends ClusterClient {
private boolean isConnected = false;
private final boolean perJobCluster;
/**
* Create a new Flink on YARN cluster.
......@@ -101,6 +101,7 @@ public class YarnClusterClient extends ClusterClient {
* @param appReport the YARN application ID
* @param flinkConfig Flink configuration
* @param sessionFilesDir Location of files required for YARN session
* @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown
* @throws IOException
* @throws YarnException
*/
......@@ -109,7 +110,8 @@ public class YarnClusterClient extends ClusterClient {
final YarnClient yarnClient,
final ApplicationReport appReport,
org.apache.flink.configuration.Configuration flinkConfig,
Path sessionFilesDir) throws IOException, YarnException {
Path sessionFilesDir,
boolean perJobCluster) throws IOException, YarnException {
super(flinkConfig);
......@@ -122,18 +124,16 @@ public class YarnClusterClient extends ClusterClient {
this.applicationId = appReport;
this.appId = appReport.getApplicationId();
this.trackingURL = appReport.getTrackingUrl();
this.perJobCluster = perJobCluster;
/* The leader retrieval service for connecting to the cluster and finding the active leader. */
LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
} catch (Exception e) {
throw new IOException("Could not create the leader retrieval service.", e);
}
if (isConnected) {
throw new IllegalStateException("Already connected to the cluster.");
}
// start application client
LOG.info("Start application client.");
......@@ -182,28 +182,31 @@ public class YarnClusterClient extends ClusterClient {
isConnected = true;
logAndSysout("Waiting until all TaskManagers have connected");
if (perJobCluster) {
while(true) {
GetClusterStatusResponse status = getClusterStatus();
if (status != null) {
if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
+ clusterDescriptor.getTaskManagerCount() + ")");
logAndSysout("Waiting until all TaskManagers have connected");
while (true) {
GetClusterStatusResponse status = getClusterStatus();
if (status != null) {
if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
+ clusterDescriptor.getTaskManagerCount() + ")");
} else {
logAndSysout("All TaskManagers are connected");
break;
}
} else {
logAndSysout("All TaskManagers are connected");
break;
logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
} else {
logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for TaskManagers");
System.err.println("Thread is interrupted");
throw new IOException("Interrupted while waiting for TaskManagers", e);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for TaskManagers");
System.err.println("Thread is interrupted");
throw new IOException("Interrupted while waiting for TaskManagers", e);
}
}
}
}
......@@ -214,9 +217,12 @@ public class YarnClusterClient extends ClusterClient {
}
LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) {
LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally");
try {
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
} catch (IllegalStateException e) {
// we are already in the shutdown hook
}
// tell the actor to shut down.
applicationClient.tell(PoisonPill.getInstance(), applicationClient);
......@@ -265,12 +271,30 @@ public class YarnClusterClient extends ClusterClient {
@Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
if (isDetached()) {
JobSubmissionResult result = super.runDetached(jobGraph, classLoader);
if (perJobCluster) {
stopAfterJob(jobGraph.getJobID());
return result;
}
if (isDetached()) {
return super.runDetached(jobGraph, classLoader);
} else {
return super.run(jobGraph, classLoader);
try {
return super.run(jobGraph, classLoader);
} finally {
// show cluster status
List<String> msgs = getNewMessages();
if (msgs != null && msgs.size() > 1) {
logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
logAndSysout(msg);
}
}
if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
logAndSysout("YARN Diagnostics: " + getDiagnostics());
}
}
}
}
......@@ -298,8 +322,9 @@ public class YarnClusterClient extends ClusterClient {
throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
}
if(hasBeenShutdown()) {
throw new RuntimeException("The YarnClusterClient has already been stopped");
return null;
}
Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
Object clusterStatus;
try {
......@@ -417,32 +442,20 @@ public class YarnClusterClient extends ClusterClient {
@Override
public void finalizeCluster() {
if (!isConnected) {
throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
}
if (isDetached()) {
// only disconnect if we are running detached
if (isDetached() || !perJobCluster) {
// only disconnect if we are not running a per job cluster
disconnect();
return;
} else {
shutdownCluster();
}
}
// show cluster status
List<String> msgs = getNewMessages();
if (msgs != null && msgs.size() > 1) {
public void shutdownCluster() {
logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
logAndSysout(msg);
}
}
if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
logAndSysout("YARN Diagnostics: " + getDiagnostics());
if (!isConnected) {
throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
}
if(hasBeenShutDown.getAndSet(true)) {
return;
}
......@@ -471,13 +484,30 @@ public class YarnClusterClient extends ClusterClient {
actorSystem.awaitTermination();
}
LOG.info("Deleting files in " + sessionFilesDir);
try {
FileSystem shutFS = FileSystem.get(hadoopConfig);
shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
shutFS.close();
}catch(IOException e){
LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig);
if (propertiesFile.isFile()) {
if (propertiesFile.delete()) {
LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
} else {
LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
}
}
} catch (Exception e) {
LOG.warn("Exception while deleting the JobManager address file", e);
}
if (sessionFilesDir != null) {
LOG.info("Deleting files in " + sessionFilesDir);
try {
FileSystem shutFS = FileSystem.get(hadoopConfig);
shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
shutFS.close();
} catch (IOException e) {
LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
}
} else {
LOG.warn("Session file directory not set. Not deleting session files");
}
try {
......@@ -571,7 +601,6 @@ public class YarnClusterClient extends ClusterClient {
@Override
public boolean isDetached() {
// either we have set detached mode using the general '-d' flag or using the Yarn CLI flag 'yd'
return super.isDetached() || clusterDescriptor.isDetachedMode();
}
}
......@@ -17,10 +17,12 @@
*/
package org.apache.flink.yarn;
/**
* Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
*/
public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
@Override
protected Class<?> getApplicationMasterClass() {
return YarnApplicationMasterRunner.class;
......
......@@ -28,11 +28,9 @@ import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
......@@ -59,6 +57,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
/**
* Class handling the command line interface to the YARN session.
*/
......@@ -97,8 +97,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
private final Option CONTAINER;
private final Option SLOTS;
private final Option DETACHED;
@Deprecated
private final Option STREAMING;
private final Option NAME;
private final Options ALL_OPTIONS;
/**
* Dynamic properties allow the user to specify additional configuration values with -D, such as
......@@ -118,7 +121,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) {
this.acceptInteractiveInput = acceptInteractiveInput;
QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
......@@ -132,37 +135,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
ALL_OPTIONS = new Options();
ALL_OPTIONS.addOption(FLINK_JAR);
ALL_OPTIONS.addOption(JM_MEMORY);
ALL_OPTIONS.addOption(TM_MEMORY);
ALL_OPTIONS.addOption(CONTAINER);
ALL_OPTIONS.addOption(QUEUE);
ALL_OPTIONS.addOption(QUERY);
ALL_OPTIONS.addOption(SHIP_PATH);
ALL_OPTIONS.addOption(SLOTS);
ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
ALL_OPTIONS.addOption(DETACHED);
ALL_OPTIONS.addOption(STREAMING);
ALL_OPTIONS.addOption(NAME);
ALL_OPTIONS.addOption(APPLICATION_ID);
}
/**
* Attaches a new Yarn Client to running YARN application.
*
*/
public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) {
AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
if (flinkYarnClient == null) {
return null;
}
if (!cmd.hasOption(APPLICATION_ID.getOpt())) {
LOG.error("Missing required argument " + APPLICATION_ID.getOpt());
printUsage();
return null;
}
String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
GlobalConfiguration.loadConfiguration(confDirPath);
Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
flinkYarnClient.setConfigurationDirectory(confDirPath);
try {
return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt()));
} catch (Exception e) {
LOG.error("Could not attach to YARN session", e);
return null;
}
}
/**
* Resumes from a Flink Yarn properties file
* @param flinkConfiguration The flink configuration
......@@ -170,7 +160,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
*/
private boolean resumeFromYarnProperties(Configuration flinkConfiguration) {
// load the YARN properties
File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration));
File propertiesFile = getYarnPropertiesLocation(flinkConfiguration);
if (!propertiesFile.exists()) {
return false;
}
......@@ -209,7 +199,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
try {
jobManagerAddress = ClientUtils.parseHostPortAddress(address);
// store address in config from where it is retrieved by the retrieval service
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress);
CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
}
catch (Exception e) {
throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e);
......@@ -228,10 +218,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return true;
}
public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor();
AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
LOG.error("Missing required argument {}", CONTAINER.getOpt());
......@@ -343,19 +332,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return yarnClusterDescriptor;
}
@Override
public YarnClusterClient createClient(String applicationName, CommandLine cmdLine) throws Exception {
YarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
try {
return yarnClusterDescriptor.deploy();
} catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}
}
private void printUsage() {
System.out.println("Usage:");
HelpFormatter formatter = new HelpFormatter();
......@@ -367,17 +343,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
formatter.printHelp(" ", req);
formatter.setSyntaxPrefix(" Optional");
Options opt = new Options();
opt.addOption(JM_MEMORY);
opt.addOption(TM_MEMORY);
opt.addOption(QUERY);
opt.addOption(QUEUE);
opt.addOption(SLOTS);
opt.addOption(DYNAMIC_PROPERTIES);
opt.addOption(DETACHED);
opt.addOption(STREAMING);
opt.addOption(NAME);
formatter.printHelp(" ", opt);
Options options = new Options();
addGeneralOptions(options);
addRunOptions(options);
formatter.printHelp(" ", options);
}
private static void writeYarnProperties(Properties properties, File propertiesFile) {
......@@ -439,6 +408,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
switch (command) {
case "quit":
case "stop":
yarnCluster.shutdownCluster();
break label;
case "help":
......@@ -466,38 +436,62 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
@Override
public String getIdentifier() {
public boolean isActive(CommandLine commandLine, Configuration configuration) {
String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
boolean yarnJobManager = ID.equals(jobManagerOption);
return yarnJobManager || resumeFromYarnProperties(configuration);
}
@Override
public String getId() {
return ID;
}
public void addOptions(Options options) {
options.addOption(FLINK_JAR);
options.addOption(JM_MEMORY);
options.addOption(TM_MEMORY);
options.addOption(CONTAINER);
options.addOption(QUEUE);
options.addOption(QUERY);
options.addOption(SHIP_PATH);
options.addOption(SLOTS);
options.addOption(DYNAMIC_PROPERTIES);
options.addOption(DETACHED);
options.addOption(STREAMING);
options.addOption(NAME);
@Override
public void addRunOptions(Options baseOptions) {
for (Object option : ALL_OPTIONS.getOptions()) {
baseOptions.addOption((Option) option);
}
}
@Override
public void addGeneralOptions(Options baseOptions) {
baseOptions.addOption(APPLICATION_ID);
}
public void getYARNAttachCLIOptions(Options options) {
options.addOption(APPLICATION_ID);
@Override
public YarnClusterClient retrieveCluster(
CommandLine cmdLine,
Configuration config) throws UnsupportedOperationException {
// first check for an application id
if (cmdLine.hasOption(APPLICATION_ID.getOpt())) {
String applicationID = cmdLine.getOptionValue(APPLICATION_ID.getOpt());
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieve(applicationID);
// then try to load from yarn properties
} else if (resumeFromYarnProperties(config)) {
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieveFromConfig(config);
}
throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
}
@Override
public ClusterClient retrieveCluster(Configuration config) throws Exception {
public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) {
AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
yarnClusterDescriptor.setFlinkConfiguration(config);
if(resumeFromYarnProperties(config)) {
return new StandaloneClusterClient(config);
try {
return yarnClusterDescriptor.deploy();
} catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}
return null;
}
public int run(String[] args) {
......@@ -505,7 +499,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// Command Line Options
//
Options options = new Options();
addOptions(options);
addGeneralOptions(options);
addRunOptions(options);
CommandLineParser parser = new PosixParser();
CommandLine cmd;
......@@ -519,10 +514,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// Query cluster for metrics
if (cmd.hasOption(QUERY.getOpt())) {
YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
String description;
try {
description = flinkYarnClient.getClusterDescription();
description = yarnDescriptor.getClusterDescription();
} catch (Exception e) {
System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
e.printStackTrace(System.err);
......@@ -531,56 +526,61 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
System.out.println(description);
return 0;
} else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
yarnCluster = attachFlinkYarnClient(cmd);
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
try {
yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt()));
} catch (Exception e) {
throw new RuntimeException("Could not retrieve existing Yarn application", e);
}
if (detachedMode) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId());
"yarn application -kill "+yarnCluster.getClusterIdentifier());
yarnCluster.disconnect();
} else {
runInteractiveCli(yarnCluster);
if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown(false);
}
runInteractiveCli(yarnCluster, true);
}
} else {
YarnClusterDescriptor flinkYarnClient;
AbstractYarnClusterDescriptor yarnDescriptor;
try {
flinkYarnClient = createDescriptor(null, cmd);
yarnDescriptor = createDescriptor(null, cmd);
} catch (Exception e) {
System.err.println("Error while starting the YARN Client. Please check log output!");
return 1;
}
try {
yarnCluster = flinkYarnClient.deploy();
yarnCluster = yarnDescriptor.deploy();
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
e.printStackTrace(System.err);
return 1;
}
//------------------ ClusterClient deployed, handle connection details
String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
String jobManagerAddress =
yarnCluster.getJobManagerAddress().getAddress().getHostAddress() +
":" + yarnCluster.getJobManagerAddress().getPort();
System.out.println("Flink JobManager is now running on " + jobManagerAddress);
System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
// file that we write into the conf/ dir containing the jobManager address and the dop.
File yarnPropertiesFile = new File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
Properties yarnProps = new Properties();
yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
if (flinkYarnClient.getTaskManagerSlots() != -1) {
if (yarnDescriptor.getTaskManagerSlots() != -1) {
String parallelism =
Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
}
// add dynamic properties
if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
flinkYarnClient.getDynamicPropertiesEncoded());
yarnDescriptor.getDynamicPropertiesEncoded());
}
writeYarnProperties(yarnProps, yarnPropertiesFile);
......@@ -592,21 +592,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
flinkYarnClient.getSessionFilesDir());
yarnDescriptor.getSessionFilesDir());
yarnCluster.disconnect();
} else {
runInteractiveCli(yarnCluster, acceptInteractiveInput);
if (!yarnCluster.hasBeenShutdown()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown();
}
try {
yarnPropertiesFile.delete();
} catch (Exception e) {
LOG.warn("Exception while deleting the JobManager address file", e);
}
}
}
return 0;
......@@ -649,11 +638,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
}
private static String getYarnPropertiesLocation(Configuration conf) {
public static File getYarnPropertiesLocation(Configuration conf) {
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
String propertiesFileLocation =
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
}
return propertiesFileLocation + File.separator + YARN_PROPERTIES_FILE + currentUser;
protected AbstractYarnClusterDescriptor getClusterDescriptor() {
return new YarnClusterDescriptor();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册