提交 6e1de988 编写于 作者: F Fabian Hueske 提交者: Ufuk Celebi

[FLINK-2394] HadoopOutputFormats use correct OutputCommitters.

This closes #1056.
上级 bc627871
......@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;
public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
......@@ -31,6 +32,11 @@ public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2
super(mapredOutputFormat, job);
}
public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, Class<OutputCommitter> outputCommitterClass, JobConf job) {
this(mapredOutputFormat, job);
super.getJobConf().setOutputCommitter(outputCommitterClass);
}
@Override
public void writeRecord(Tuple2<K, V> record) throws IOException {
this.recordWriter.write(record.f0, record.f1);
......
......@@ -26,11 +26,11 @@ import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
......@@ -48,7 +48,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
private JobConf jobConf;
private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
private transient FileOutputCommitter fileOutputCommitter;
private transient OutputCommitter outputCommitter;
private transient TaskAttemptContext context;
private transient JobContext jobContext;
......@@ -106,7 +106,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
throw new RuntimeException(e);
}
this.fileOutputCommitter = new FileOutputCommitter();
this.outputCommitter = this.jobConf.getOutputCommitter();
try {
this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
......@@ -114,7 +114,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
throw new RuntimeException(e);
}
this.fileOutputCommitter.setupJob(jobContext);
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
......@@ -127,8 +127,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
public void close() throws IOException {
this.recordWriter.close(new HadoopDummyReporter());
if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
this.fileOutputCommitter.commitTask(this.context);
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
}
}
......@@ -137,10 +137,10 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
try {
JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
OutputCommitter outputCommitter = this.jobConf.getOutputCommitter();
// finalize HDFS output format
fileOutputCommitter.commitJob(jobContext);
outputCommitter.commitJob(jobContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
......
......@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
......@@ -45,7 +46,7 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
private org.apache.hadoop.conf.Configuration configuration;
private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
private transient FileOutputCommitter fileOutputCommitter;
private transient OutputCommitter outputCommitter;
private transient TaskAttemptContext context;
private transient int taskNumber;
......@@ -101,20 +102,16 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
try {
this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
} catch (Exception e) {
throw new RuntimeException(e);
}
this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
try {
this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
} catch (Exception e) {
throw new RuntimeException(e);
}
// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
if(outputCommitter instanceof FileOutputCommitter) {
this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
}
try {
this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
......@@ -135,8 +132,8 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
throw new IOException("Could not close RecordReader.", e);
}
if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
this.fileOutputCommitter.commitTask(this.context);
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
}
Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
......@@ -152,28 +149,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
}
}
@Override
public void finalizeGlobal(int parallelism) throws IOException {
JobContext jobContext;
TaskAttemptContext taskContext;
try {
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")
+ Integer.toString(1)
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")
+ Integer.toString(1)
+ "_0");
jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
// finalize HDFS output format
this.fileOutputCommitter.commitJob(jobContext);
if(this.outputCommitter != null) {
this.outputCommitter.commitJob(jobContext);
}
}
// --------------------------------------------------------------------------------------------
......
......@@ -18,11 +18,20 @@
package org.apache.flink.api.scala.hadoop.mapred
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
def this(
mapredOutputFormat: OutputFormat[K, V],
outputCommitterClass: Class[OutputCommitter],
job: JobConf) {
this(mapredOutputFormat, job)
this.getJobConf.setOutputCommitter(outputCommitterClass)
}
def writeRecord(record: (K, V)) {
this.recordWriter.write(record._1, record._2)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册