diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 9344faf02ae0a00ab9fc7c5bed46233abaac6cd9..8ea3f2c078ebf36f2092b2c1e5aef97f976887a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.security.modules; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +32,7 @@ import javax.security.auth.Subject; import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collection; /** * Responsible for installing a Hadoop login user. @@ -61,15 +65,27 @@ public class HadoopModule implements SecurityModule { * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); + * Notify:If UGI use the keytab for login, do not load HDFS delegation token. */ try { Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class); Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), securityConfig.getHadoopConfiguration()); + Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens"); + Credentials credentials = new Credentials(); + final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + Collection> usrTok = (Collection>) getAllTokensMethod.invoke(cred); + //If UGI use keytab for login, do not load HDFS delegation token. + for (Token token : usrTok) { + if (!token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) { + final Text id = new Text(token.getIdentifier()); + credentials.addToken(id, token); + } + } Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); + addCredentialsMethod.invoke(loginUser, credentials); } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } catch (InvocationTargetException e) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 2315c706d2e8511b6586265e8e7f56f1ca979b02..3b73feceee921946adbc574350531b4be2368d46 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -793,8 +793,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5); - if (UserGroupInformation.isSecurityEnabled() && keytab == null) { - //set tokens only when keytab is not provided + if (UserGroupInformation.isSecurityEnabled()) { + //set tokens when security is enable LOG.info("Adding delegation token to the AM container.."); Utils.setTokensFor(amContainer, paths, conf); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 698b69e0b4288ad7764dc2bf018884556820e9e2..33da78c2804d05d2e86b9d715225bf03c98ed58d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.security.SecurityUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -455,9 +457,17 @@ public final class Utils { try (DataOutputBuffer dob = new DataOutputBuffer()) { log.debug("Adding security tokens to Task Executor Container launch Context...."); - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - Credentials credentials = user.getCredentials(); - credentials.writeTokenStorageToStream(dob); + /* + * For taskmanager yarn container context, read the tokens from the jobmanager yarn container local flie. + * Notify: must read the tokens from the local file, but not from UGI context.Because if UGI is login + * from Keytab, there is no HDFS degegation token in UGI context. + */ + String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", + File.class, org.apache.hadoop.conf.Configuration.class); + Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), + new SecurityUtils.SecurityConfiguration(flinkConfig).getHadoopConfiguration()); + cred.writeTokenStorageToStream(dob); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ctx.setTokens(securityTokens); }