diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java index a59b96fac0688eaa364fb2d7bf3ab347279ee7c1..a6a318cb611495a639f1d1aed1864085e7a93a5d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; @@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase implements OutputFormat private transient JobContext jobContext; public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat mapredOutputFormat, JobConf job) { - super(); this.mapredOutputFormat = mapredOutputFormat; HadoopUtils.mergeHadoopConf(job); this.jobConf = job; @@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase implements OutputFormat @Override public void configure(Configuration parameters) { - // nothing to do + if(this.mapredOutputFormat instanceof Configurable){ + ((Configurable)this.mapredOutputFormat).setConf(this.jobConf); + } } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java index 10e631d802ce7c16d5ba0f14608f2c03abed11ca..0e592571e631831dec094df6bf082aa440a923d3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -65,7 +66,9 @@ public abstract class HadoopOutputFormatBase implements OutputFormat @Override public void configure(Configuration parameters) { - // nothing to do + if(this.mapreduceOutputFormat instanceof Configurable){ + ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); + } } /**