提交 875d4d23 编写于 作者: S Sebastian Klemke 提交者: Maximilian Michels

[FLINK-3937] implement -yid option to Flink CLI

- enables to use list, savepoint, cancel and stop subcommands
- adapt FlinkYarnSessionCli to also accept YARN application Id to attach to
- update documentation

This closes #2034
上级 9e984241
......@@ -105,6 +105,10 @@ The command line can be used to
./bin/flink list -r
- List running Flink jobs inside Flink YARN session:
./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
- Cancel a job:
./bin/flink cancel <jobID>
......@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
-r,--running Show only running programs and their JobIDs
-s,--scheduled Show only scheduled programs and their JobIDs
Additional arguments if -m yarn-cluster is set:
-yid <yarnApplicationId> YARN application ID of Flink YARN session to
connect to. Must not be set if JobManager HA
is used. In this case, JobManager RPC
location is automatically retrieved from
Zookeeper.
Action "cancel" cancels a running program.
......@@ -264,6 +274,12 @@ Action "cancel" cancels a running program.
job. Use this flag to connect to a different
JobManager than the one specified in the
configuration.
Additional arguments if -m yarn-cluster is set:
-yid <yarnApplicationId> YARN application ID of Flink YARN session to
connect to. Must not be set if JobManager HA
is used. In this case, JobManager RPC
location is automatically retrieved from
Zookeeper.
Action "stop" stops a running program (streaming jobs only). There are no strong consistency
......@@ -275,6 +291,12 @@ guarantees for a stop request.
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
Additional arguments if -m yarn-cluster is set:
-yid <yarnApplicationId> YARN application ID of Flink YARN session to
connect to. Must not be set if JobManager HA
is used. In this case, JobManager RPC
location is automatically retrieved from
Zookeeper.
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
......@@ -288,4 +310,10 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on
job. Use this flag to connect to a different
JobManager than the one specified in the
configuration.
Additional arguments if -m yarn-cluster is set:
-yid <yarnApplicationId> YARN application ID of Flink YARN session to
connect to. Must not be set if JobManager HA
is used. In this case, JobManager RPC
location is automatically retrieved from
Zookeeper.
~~~
......@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink.
Use the YARN utilities (`yarn application -kill <appId>`) to stop the YARN session.
#### Attach to an existing Session
Use the following command to start a session
~~~bash
./bin/yarn-session.sh
~~~
This command will show you the following overview:
~~~bash
Usage:
Required
-id,--applicationId <yarnAppId> YARN application Id
~~~
As already mentioned, `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable must be set to read the YARN and HDFS configuration.
**Example:** Issue the following command to attach to running Flink YARN session `application_1463870264508_0029`:
~~~bash
./bin/yarn-session.sh -id application_1463870264508_0029
~~~
Attaching to a running session uses YARN ResourceManager to determine Job Manager RPC port.
Stop the YARN session by stopping the unix process (using CTRL+C) or by entering 'stop' into the client.
### Submit Job to Flink
Use the following command to submit a Flink program to the YARN cluster:
......
......@@ -800,6 +800,10 @@ public class CliFrontend {
*/
protected void updateConfig(CommandLineOptions options) {
if(options.getJobManagerAddress() != null){
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
jobManagerAddress = CliFrontendParser.getFlinkYarnSessionCli()
.attachFlinkYarnClient(options.getCommandLine())
.getJobManagerAddress();
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(config, jobManagerAddress);
}
......
......@@ -188,6 +188,8 @@ public class CliFrontendParser {
private static Options getJobManagerAddressOption(Options options) {
options.addOption(ADDRESS_OPTION);
yarnSessionCLi.getYARNAttachCLIOptions(options);
return options;
}
......@@ -280,6 +282,10 @@ public class CliFrontendParser {
System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>");
formatter.setSyntaxPrefix(" \"info\" action options:");
formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options()));
formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
Options yarnOpts = new Options();
yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
formatter.printHelp(" ", yarnOpts);
System.out.println();
}
......@@ -316,6 +322,10 @@ public class CliFrontendParser {
System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>");
formatter.setSyntaxPrefix(" \"cancel\" action options:");
formatter.printHelp(" ", getCancelOptions(new Options()));
formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
Options yarnOpts = new Options();
yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
formatter.printHelp(" ", yarnOpts);
System.out.println();
}
......
......@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
......@@ -349,6 +350,20 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
@Override
public AbstractFlinkYarnCluster attach(String appId) throws Exception {
// check if required Hadoop environment variables are set. If not, warn user
if(System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
}
final ApplicationId yarnAppId = ConverterUtils.toApplicationId(appId);
return new FlinkYarnCluster(yarnClient, yarnAppId, conf, flinkConfiguration, sessionFilesDir, detached);
}
/**
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
......
......@@ -32,6 +32,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
......@@ -86,6 +87,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// the prefix transformation is used by the CliFrontend static constructor.
private final Option QUERY;
// --- or ---
private final Option APPLICATION_ID;
// --- or ---
private final Option QUEUE;
private final Option SHIP_PATH;
private final Option FLINK_JAR;
......@@ -117,6 +120,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
this.acceptInteractiveInput = acceptInteractiveInput;
QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session");
QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
......@@ -130,6 +134,35 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
}
/**
* Attaches a new Yarn Client to running YARN application.
*
*/
public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) {
AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
if (flinkYarnClient == null) {
return null;
}
if (!cmd.hasOption(APPLICATION_ID.getOpt())) {
LOG.error("Missing required argument " + APPLICATION_ID.getOpt());
printUsage();
return null;
}
String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
GlobalConfiguration.loadConfiguration(confDirPath);
Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
flinkYarnClient.setConfigurationDirectory(confDirPath);
try {
return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt()));
} catch (Exception e) {
LOG.error("Could not attach to YARN session", e);
return null;
}
}
/**
* Resumes from a Flink Yarn properties file
* @param flinkConfiguration The flink configuration
......@@ -452,6 +485,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
options.addOption(NAME);
}
public void getYARNAttachCLIOptions(Options options) {
options.addOption(APPLICATION_ID);
}
@Override
public ClusterClient retrieveCluster(Configuration config) throws Exception {
......@@ -478,7 +516,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
printUsage();
return 1;
}
// Query cluster for metrics
if (cmd.hasOption(QUERY.getOpt())) {
YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
......@@ -492,6 +530,21 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
System.out.println(description);
return 0;
} else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
yarnCluster = attachFlinkYarnClient(cmd);
if (detachedMode) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId());
} else {
runInteractiveCli(yarnCluster);
if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown(false);
}
}
} else {
YarnClusterDescriptor flinkYarnClient;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册