提交 17ac128c 编写于 作者: M Max 提交者: Robert Metzger

[FLINK-592] Add support for Kerberos secured YARN setups to Flink.

上级 7ac6447f
......@@ -90,7 +90,7 @@ Usage:
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]
~~~
Please note that the Client requires the `HADOOP_HOME` (or `YARN_CONF_DIR` or `HADOOP_CONF_DIR`) environment variable to be set to read the YARN and HDFS configuration.
Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration.
**Example:** Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:
......
......@@ -202,7 +202,7 @@ under the License.
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>org.apache.flink.yarn.FlinkYarnClient</Main-Class>
<Main-Class>org.apache.flink.client.FlinkYarnSessionCli</Main-Class>
</manifestEntries>
</transformer>
</transformers>
......
......@@ -51,4 +51,4 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
export FLINK_CONF_DIR
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH org.apache.flink.client.CliFrontend $*
$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR org.apache.flink.client.CliFrontend $*
......@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
export FLINK_CONF_DIR
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../ship/ -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../ship/ -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
......@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -31,16 +32,17 @@ import java.util.Map;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
......@@ -137,12 +139,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
public FlinkYarnClient() {
// Check if security is enabled
if(UserGroupInformation.isSecurityEnabled()) {
throw new RuntimeException("Flink YARN client does not have security support right now."
+ "File a bug, we will fix it asap");
}
conf = Utils.initializeYarnConfiguration();
conf = new YarnConfiguration();
if(this.yarnClient == null) {
// Create yarnClient
yarnClient = YarnClient.createYarnClient();
......@@ -276,12 +273,26 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
return false;
}
public AbstractFlinkYarnCluster deploy(final String clusterName) throws Exception {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (UserGroupInformation.isSecurityEnabled()) {
return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
@Override
public AbstractFlinkYarnCluster run() throws Exception {
return deployInternal(clusterName);
}
});
} else {
return deployInternal(clusterName);
}
}
/**
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
*/
@Override
public AbstractFlinkYarnCluster deploy(String clusterName) throws Exception {
public AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exception {
isReadyForDepoyment();
LOG.info("Using values:");
......@@ -460,18 +471,18 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
// setup security tokens (code from apache storm)
final Path[] paths = new Path[3 + shipFiles.size()];
final Path[] paths = new Path[2 + shipFiles.size()];
StringBuffer envShipFileList = new StringBuffer();
// upload ship files
for (int i = 0; i < shipFiles.size(); i++) {
File shipFile = shipFiles.get(i);
LocalResource shipResources = Records.newRecord(LocalResource.class);
Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
paths[2 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
shipLocalPath, shipResources, fs.getHomeDirectory());
localResources.put(shipFile.getName(), shipResources);
envShipFileList.append(paths[3 + i]);
envShipFileList.append(paths[2 + i]);
if(i+1 < shipFiles.size()) {
envShipFileList.append(',');
}
......@@ -480,10 +491,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
paths[0] = remotePathJar;
paths[1] = remotePathConf;
sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(sessionFilesDir, permission); // set permission for path.
Utils.setTokensFor(amContainer, paths, this.conf);
Utils.setTokensFor(amContainer, paths, conf);
amContainer.setLocalResources(localResources);
fs.close();
......@@ -497,7 +509,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
if(dynamicPropertiesEncoded != null) {
......@@ -521,7 +533,6 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
appContext.setResource(capability);
appContext.setQueue(yarnQueue);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
......
......@@ -20,13 +20,11 @@ package org.apache.flink.yarn;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
......@@ -37,12 +35,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
......@@ -78,68 +74,7 @@ public class Utils {
}
return heapLimit;
}
private static void addPathToConfig(Configuration conf, File path) {
// chain-in a new classloader
URL fileUrl = null;
try {
fileUrl = path.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("Erroneous config file path", e);
}
URL[] urls = {fileUrl};
ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
conf.setClassLoader(cl);
}
private static void setDefaultConfValues(Configuration conf) {
if(conf.get("fs.hdfs.impl",null) == null) {
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
}
if(conf.get("fs.file.impl",null) == null) {
conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
}
}
public static Configuration initializeYarnConfiguration() {
Configuration conf = new YarnConfiguration();
String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if(configuredHadoopConfig != null) {
LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
addPathToConfig(conf, new File(configuredHadoopConfig));
setDefaultConfValues(conf);
return conf;
}
String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" };
for(int i = 0; i < envs.length; ++i) {
String confPath = System.getenv(envs[i]);
if (confPath != null) {
LOG.info("Found "+envs[i]+", adding it to configuration");
addPathToConfig(conf, new File(confPath));
setDefaultConfValues(conf);
return conf;
}
}
LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
String hadoopHome = null;
try {
hadoopHome = Shell.getHadoopHome();
} catch (IOException e) {
throw new RuntimeException("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
}
File tryConf = new File(hadoopHome+"/etc/hadoop");
if(tryConf.exists()) {
LOG.info("Found configuration using hadoop home.");
addPathToConfig(conf, tryConf);
} else {
tryConf = new File(hadoopHome+"/conf");
if(tryConf.exists()) {
addPathToConfig(conf, tryConf);
}
}
setDefaultConfValues(conf);
return conf;
}
public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
......
......@@ -23,11 +23,12 @@ import java.security.PrivilegedAction
import akka.actor._
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants}
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
import org.apache.flink.yarn.Messages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.slf4j.LoggerFactory
import scala.io.Source
......@@ -58,7 +59,7 @@ object ApplicationMaster {
var jobManager: ActorRef = ActorRef.noSender
try {
val conf = Utils.initializeYarnConfiguration()
val conf = new YarnConfiguration()
val env = System.getenv()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册