diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java index da8244fd6d85cd9145a69db87a31063cab5902e3..07746feda8fc2f4fa1008859b553d569fe301973 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -54,7 +54,11 @@ public final class HadoopUtils { * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. */ public static void mergeHadoopConf(JobConf jobConf) { - org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); + // we have to load the global configuration here, because the HadoopInputFormatBase does not + // have access to a Flink configuration object + org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + + Configuration hadoopConf = getHadoopConfiguration(flinkConfiguration); for (Map.Entry e : hadoopConf) { if (jobConf.get(e.getKey()) == null) { jobConf.set(e.getKey(), e.getValue()); @@ -109,13 +113,13 @@ public final class HadoopUtils { * Returns a new Hadoop Configuration object using the path to the hadoop conf configured * in the main configuration (flink-conf.yaml). * This method is public because its being used in the HadoopDataSource. + * + * @param flinkConfiguration Flink configuration object + * @return A Hadoop configuration instance */ - public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { - - org.apache.flink.configuration.Configuration flinkConfiguration = - GlobalConfiguration.loadConfiguration(); + public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { - Configuration retConf = new org.apache.hadoop.conf.Configuration(); + Configuration retConf = new Configuration(); // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and // the hdfs configuration diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java index 52fd734afb2dc030bb64d2c7951086fdf8ceb0ea..cf0d057675540a37d8dbdef2c8f5cf4f3a1f03f4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java @@ -22,6 +22,8 @@ import java.lang.reflect.Constructor; import java.util.Map; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.GlobalConfiguration; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -39,8 +41,12 @@ public final class HadoopUtils { */ public static void mergeHadoopConf(Configuration hadoopConfig) { + // we have to load the global configuration here, because the HadoopInputFormatBase does not + // have access to a Flink configuration object + org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + Configuration hadoopConf = - org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(); + org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(flinkConfiguration); for (Map.Entry e : hadoopConf) { if (hadoopConfig.get(e.getKey()) == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java index df4982272e5e7b66dba968e0cb2766db10afaebc..9e6f40258a09fa97b873762ca1d4af0df638e925 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java @@ -146,7 +146,7 @@ public class SecurityUtils { * @param flinkConf the Flink global configuration. */ public SecurityConfiguration(Configuration flinkConf) { - this(flinkConf, HadoopUtils.getHadoopConfiguration()); + this(flinkConf, HadoopUtils.getHadoopConfiguration(flinkConf)); } /**