From b1f3408f4cc5cca4536fe85300efcd5267eba73a Mon Sep 17 00:00:00 2001 From: z00376786 Date: Wed, 26 Apr 2017 11:36:43 +0800 Subject: [PATCH] [FLINK-6376] [yarn] Always upload HDFS delegation token for secured YARN deployments Previously, YARN log aggregation fails because it depoends on the HDFS delegation token, which we do not upload if Kerberos keytabs are used. We did not include HDFS delegation tokens when keytabs are used because the UGI would prioritize the delegation token (which expires) if both are present. To address this, changes include: 1. Change Flink YARN client to always upload delegation tokens when security is enabled. This would then allow log aggregation. 2. Filter out HDFS delegation token from the tokens fetched from HDFS when populating the UGI. This allows the UGI to always use Kerberos tickets instead of the HDFS delegation token. --- .../runtime/security/modules/HadoopModule.java | 18 +++++++++++++++++- .../yarn/AbstractYarnClusterDescriptor.java | 4 ++-- .../main/java/org/apache/flink/yarn/Utils.java | 16 +++++++++++++--- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 9344faf02ae..8ea3f2c078e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.security.modules; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +32,7 @@ import javax.security.auth.Subject; import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collection; /** * Responsible for installing a Hadoop login user. @@ -61,15 +65,27 @@ public class HadoopModule implements SecurityModule { * used in the context of reading the stored tokens from UGI. * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); * loginUser.addCredentials(cred); + * Notify:If UGI use the keytab for login, do not load HDFS delegation token. */ try { Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", File.class, org.apache.hadoop.conf.Configuration.class); Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), securityConfig.getHadoopConfiguration()); + Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens"); + Credentials credentials = new Credentials(); + final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN"); + Collection> usrTok = (Collection>) getAllTokensMethod.invoke(cred); + //If UGI use keytab for login, do not load HDFS delegation token. + for (Token token : usrTok) { + if (!token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) { + final Text id = new Text(token.getIdentifier()); + credentials.addToken(id, token); + } + } Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); + addCredentialsMethod.invoke(loginUser, credentials); } catch (NoSuchMethodException e) { LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); } catch (InvocationTargetException e) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 2315c706d2e..3b73feceee9 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -793,8 +793,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5); - if (UserGroupInformation.isSecurityEnabled() && keytab == null) { - //set tokens only when keytab is not provided + if (UserGroupInformation.isSecurityEnabled()) { + //set tokens when security is enable LOG.info("Adding delegation token to the AM container.."); Utils.setTokensFor(amContainer, paths, conf); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 698b69e0b42..33da78c2804 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.security.SecurityUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -455,9 +457,17 @@ public final class Utils { try (DataOutputBuffer dob = new DataOutputBuffer()) { log.debug("Adding security tokens to Task Executor Container launch Context...."); - UserGroupInformation user = UserGroupInformation.getCurrentUser(); - Credentials credentials = user.getCredentials(); - credentials.writeTokenStorageToStream(dob); + /* + * For taskmanager yarn container context, read the tokens from the jobmanager yarn container local flie. + * Notify: must read the tokens from the local file, but not from UGI context.Because if UGI is login + * from Keytab, there is no HDFS degegation token in UGI context. + */ + String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", + File.class, org.apache.hadoop.conf.Configuration.class); + Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), + new SecurityUtils.SecurityConfiguration(flinkConfig).getHadoopConfiguration()); + cred.writeTokenStorageToStream(dob); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ctx.setTokens(securityTokens); } -- GitLab