From 934774f0d2005ed93feb3e688d60a06076b94e92 Mon Sep 17 00:00:00 2001 From: Johannes Date: Fri, 29 Jan 2016 00:59:49 +0100 Subject: [PATCH] [FLINK-3293] [yarn] Respect custom CLI Yarn name in JobManager mode Added a method to set a default application name for the Flink Yarn session CLI. Switched the order, such that this name can now be overwritten by the command line. This closes #1558 --- .../org/apache/flink/client/CliFrontend.java | 11 +++++++-- .../flink/client/FlinkYarnSessionCli.java | 24 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 23df133351e..da91bcad8f4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -935,13 +935,20 @@ public class CliFrontend { if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); + // Default yarn application name to use, if nothing is specified on the command line + String applicationName = "Flink Application: " + programName; + // user wants to run Flink in YARN cluster. CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine); + AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser + .getFlinkYarnSessionCli() + .withDefaultApplicationName(applicationName) + .createFlinkYarnClient(commandLine); + if (flinkYarnClient == null) { throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); } - flinkYarnClient.setName("Flink Application: " + programName); + // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded // from yarn options. if (detachedMode) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 8183df4fac9..4f540a65abb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -87,6 +87,9 @@ public class FlinkYarnSessionCli { private AbstractFlinkYarnCluster yarnCluster = null; private boolean detachedMode = false; + /** Default yarn application name. */ + private String defaultApplicationName = null; + public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); @@ -102,6 +105,11 @@ public class FlinkYarnSessionCli { NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); } + /** + * Creates a new Yarn Client. + * @param cmd the command line to parse options from + * @return an instance of the client or null if there was an error + */ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); @@ -222,7 +230,13 @@ public class FlinkYarnSessionCli { if(cmd.hasOption(NAME.getOpt())) { flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt())); + } else { + // set the default application name, if none is specified + if(defaultApplicationName != null) { + flinkYarnClient.setName(defaultApplicationName); + } } + return flinkYarnClient; } @@ -466,6 +480,16 @@ public class FlinkYarnSessionCli { return 0; } + /** + * Sets the default Yarn Application Name. + * @param defaultApplicationName the name of the yarn application to use + * @return FlinkYarnSessionCli instance, for chaining + */ + public FlinkYarnSessionCli withDefaultApplicationName(String defaultApplicationName) { + this.defaultApplicationName = defaultApplicationName; + return this; + } + /** * Utility method for tests. */ -- GitLab