提交 de573cf5 编写于 作者: F fpompermaier 提交者: Fabian Hueske

[FLINK-1828] [hadoop] Fixed missing call to configure() for Configurable HadoopOutputFormats.

This closes #632
This closes #571
上级 11f1dd64
...@@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; ...@@ -25,6 +25,7 @@ import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobContext;
...@@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T> ...@@ -51,7 +52,6 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
private transient JobContext jobContext; private transient JobContext jobContext;
public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) { public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
super();
this.mapredOutputFormat = mapredOutputFormat; this.mapredOutputFormat = mapredOutputFormat;
HadoopUtils.mergeHadoopConf(job); HadoopUtils.mergeHadoopConf(job);
this.jobConf = job; this.jobConf = job;
...@@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T> ...@@ -67,7 +67,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
@Override @Override
public void configure(Configuration parameters) { public void configure(Configuration parameters) {
// nothing to do if(this.mapredOutputFormat instanceof Configurable){
((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
}
} }
/** /**
......
...@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster; ...@@ -22,6 +22,7 @@ import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
...@@ -65,7 +66,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T> ...@@ -65,7 +66,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>
@Override @Override
public void configure(Configuration parameters) { public void configure(Configuration parameters) {
// nothing to do if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
}
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册