[FLINK-7109] [hadoop] Remove GlobalConfiguration.loadConfiguration from...

[FLINK-7109] [hadoop] Remove GlobalConfiguration.loadConfiguration from HadoopUtils#getHadoopConfiguration

The HadoopUtils#getHadoopConfiguration should not load the global configuration. Instead
we pass it in as parameter.

This closes #4265.
上级 6c05abe3
......@@ -54,7 +54,11 @@ public final class HadoopUtils {
* Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
*/
public static void mergeHadoopConf(JobConf jobConf) {
org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration();
// we have to load the global configuration here, because the HadoopInputFormatBase does not
// have access to a Flink configuration object
org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
Configuration hadoopConf = getHadoopConfiguration(flinkConfiguration);
for (Map.Entry<String, String> e : hadoopConf) {
if (jobConf.get(e.getKey()) == null) {
jobConf.set(e.getKey(), e.getValue());
......@@ -109,13 +113,13 @@ public final class HadoopUtils {
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured
* in the main configuration (flink-conf.yaml).
* This method is public because its being used in the HadoopDataSource.
*
* @param flinkConfiguration Flink configuration object
* @return A Hadoop configuration instance
*/
public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
org.apache.flink.configuration.Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration();
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
Configuration retConf = new org.apache.hadoop.conf.Configuration();
Configuration retConf = new Configuration();
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration
......
......@@ -22,6 +22,8 @@ import java.lang.reflect.Constructor;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
......@@ -39,8 +41,12 @@ public final class HadoopUtils {
*/
public static void mergeHadoopConf(Configuration hadoopConfig) {
// we have to load the global configuration here, because the HadoopInputFormatBase does not
// have access to a Flink configuration object
org.apache.flink.configuration.Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
Configuration hadoopConf =
org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(flinkConfiguration);
for (Map.Entry<String, String> e : hadoopConf) {
if (hadoopConfig.get(e.getKey()) == null) {
......
......@@ -146,7 +146,7 @@ public class SecurityUtils {
* @param flinkConf the Flink global configuration.
*/
public SecurityConfiguration(Configuration flinkConf) {
this(flinkConf, HadoopUtils.getHadoopConfiguration());
this(flinkConf, HadoopUtils.getHadoopConfiguration(flinkConf));
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册