提交 b1f3408f 编写于 作者: Z z00376786 提交者: Tzu-Li (Gordon) Tai

[FLINK-6376] [yarn] Always upload HDFS delegation token for secured YARN deployments

Previously, YARN log aggregation fails because it depoends on the HDFS
delegation token, which we do not upload if Kerberos keytabs are used.
We did not include HDFS delegation tokens when keytabs are used because
the UGI would prioritize the delegation token (which expires) if both
are present.

To address this, changes include:

1. Change Flink YARN client to always upload delegation tokens when
security is enabled. This would then allow log aggregation.

2. Filter out HDFS delegation token from the tokens fetched from HDFS
when populating the UGI. This allows the UGI to always use Kerberos
tickets instead of the HDFS delegation token.
上级 67bf467a
......@@ -20,8 +20,11 @@ package org.apache.flink.runtime.security.modules;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,6 +32,7 @@ import javax.security.auth.Subject;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
/**
* Responsible for installing a Hadoop login user.
......@@ -61,15 +65,27 @@ public class HadoopModule implements SecurityModule {
* used in the context of reading the stored tokens from UGI.
* Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
* loginUser.addCredentials(cred);
* Notify:If UGI use the keytab for login, do not load HDFS delegation token.
*/
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
securityConfig.getHadoopConfiguration());
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
}
}
Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, cred);
addCredentialsMethod.invoke(loginUser, credentials);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e);
} catch (InvocationTargetException e) {
......
......@@ -793,8 +793,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
if (UserGroupInformation.isSecurityEnabled() && keytab == null) {
//set tokens only when keytab is not provided
if (UserGroupInformation.isSecurityEnabled()) {
//set tokens when security is enable
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, conf);
}
......
......@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
......@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
......@@ -455,9 +457,17 @@ public final class Utils {
try (DataOutputBuffer dob = new DataOutputBuffer()) {
log.debug("Adding security tokens to Task Executor Container launch Context....");
UserGroupInformation user = UserGroupInformation.getCurrentUser();
Credentials credentials = user.getCredentials();
credentials.writeTokenStorageToStream(dob);
/*
* For taskmanager yarn container context, read the tokens from the jobmanager yarn container local flie.
* Notify: must read the tokens from the local file, but not from UGI context.Because if UGI is login
* from Keytab, there is no HDFS degegation token in UGI context.
*/
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
new SecurityUtils.SecurityConfiguration(flinkConfig).getHadoopConfiguration());
cred.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ctx.setTokens(securityTokens);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册