提交 ec6d9752 编写于 作者: M Maximilian Michels

[FLINK-4079] YARN properties file used for per-job cluster

上级 f4ac8522
......@@ -105,7 +105,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
/**
* Dynamic properties allow the user to specify additional configuration values with -D, such as
* -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
* -D fs.overwrite-files=true -D taskmanager.network.numberOfBuffers=16368
*/
private final Option DYNAMIC_PROPERTIES;
......@@ -155,10 +155,27 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
/**
* Resumes from a Flink Yarn properties file
* @param cmdLine The command-line parameters
* @param flinkConfiguration The flink configuration
* @return True if the properties were loaded, false otherwise
*/
private boolean resumeFromYarnProperties(Configuration flinkConfiguration) {
private boolean resumeFromYarnProperties(CommandLine cmdLine, Configuration flinkConfiguration) {
String jobManagerOption = cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
if (jobManagerOption != null) {
// don't resume from properties file if a JobManager has been specified
return false;
}
for (Option option : cmdLine.getOptions()) {
if (ALL_OPTIONS.hasOption(option.getOpt())) {
if (!option.getOpt().equals(DETACHED.getOpt())) {
// don't resume from properties file if yarn options have been specified
return false;
}
}
}
// load the YARN properties
File propertiesFile = getYarnPropertiesLocation(flinkConfiguration);
if (!propertiesFile.exists()) {
......@@ -439,7 +456,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
public boolean isActive(CommandLine commandLine, Configuration configuration) {
String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
boolean yarnJobManager = ID.equals(jobManagerOption);
return yarnJobManager || resumeFromYarnProperties(configuration);
return yarnJobManager || resumeFromYarnProperties(commandLine, configuration);
}
@Override
......@@ -471,7 +488,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieve(applicationID);
// then try to load from yarn properties
} else if (resumeFromYarnProperties(config)) {
} else if (resumeFromYarnProperties(cmdLine, config)) {
AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
yarnDescriptor.setFlinkConfiguration(config);
return yarnDescriptor.retrieveFromConfig(config);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册