From de573cf5cef3bed6c489af85dba2cc61912db4c0 Mon Sep 17 00:00:00 2001 From: fpompermaier Date: Mon, 27 Apr 2015 16:38:51 +0200 Subject: [PATCH] [FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats. This closes #632 This closes #571 --- .../api/java/hadoop/mapred/HadoopOutputFormatBase.java | 6 ++++-- .../api/java/hadoop/mapreduce/HadoopOutputFormatBase.java | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java index a59b96fac06..a6a318cb611 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; @@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase implements OutputFormat private transient JobContext jobContext; public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat mapredOutputFormat, JobConf job) { - super(); this.mapredOutputFormat = mapredOutputFormat; HadoopUtils.mergeHadoopConf(job); this.jobConf = job; @@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase implements OutputFormat @Override public void configure(Configuration parameters) { - // nothing to do + if(this.mapredOutputFormat instanceof Configurable){ + ((Configurable)this.mapredOutputFormat).setConf(this.jobConf); + } } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java index 10e631d802c..0e592571e63 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -65,7 +66,9 @@ public abstract class HadoopOutputFormatBase implements OutputFormat @Override public void configure(Configuration parameters) { - // nothing to do + if(this.mapreduceOutputFormat instanceof Configurable){ + ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); + } } /** -- GitLab