提交 8754352f 编写于 作者: F Fabian Hueske

[FLINK-2617] [hadoop-compat] Added static mutexes for configure, open, close HadoopFormats

This closes #1111
上级 16fb4e91
......@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
// Mutexes to avoid concurrent operations on Hadoop InputFormats.
// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
// might be used in the same JVM.
private static final Object OPEN_MUTEX = new Object();
private static final Object CONFIGURE_MUTEX = new Object();
private static final Object CLOSE_MUTEX = new Object();
private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
protected Class<K> keyClass;
protected Class<V> valueClass;
......@@ -91,12 +99,15 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void configure(Configuration parameters) {
// configure MR InputFormat if necessary
if(this.mapredInputFormat instanceof Configurable) {
((Configurable)this.mapredInputFormat).setConf(this.jobConf);
}
else if(this.mapredInputFormat instanceof JobConfigurable) {
((JobConfigurable)this.mapredInputFormat).configure(this.jobConf);
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR InputFormat if necessary
if (this.mapredInputFormat instanceof Configurable) {
((Configurable) this.mapredInputFormat).setConf(this.jobConf);
} else if (this.mapredInputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
}
}
}
......@@ -148,13 +159,18 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void open(HadoopInputSplit split) throws IOException {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
}
@Override
......@@ -172,7 +188,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void close() throws IOException {
this.recordReader.close();
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
}
}
// --------------------------------------------------------------------------------------------
......
......@@ -54,6 +54,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
private static final long serialVersionUID = 1L;
// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
// might be used in the same JVM.
private static final Object OPEN_MUTEX = new Object();
private static final Object CONFIGURE_MUTEX = new Object();
private static final Object CLOSE_MUTEX = new Object();
private JobConf jobConf;
private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
......@@ -77,12 +85,15 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
@Override
public void configure(Configuration parameters) {
// configure MR OutputFormat if necessary
if(this.mapredOutputFormat instanceof Configurable) {
((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
}
else if(this.mapredOutputFormat instanceof JobConfigurable) {
((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf);
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR OutputFormat if necessary
if (this.mapredOutputFormat instanceof Configurable) {
((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
} else if (this.mapredOutputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
}
}
}
......@@ -94,39 +105,43 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
try {
this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
} catch (Exception e) {
throw new RuntimeException(e);
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
try {
this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
} catch (Exception e) {
throw new RuntimeException(e);
}
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext;
try {
jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext;
try {
jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
/**
......@@ -135,10 +150,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void close() throws IOException {
this.recordWriter.close(new HadoopDummyReporter());
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordWriter.close(new HadoopDummyReporter());
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
}
}
}
......
......@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
// Mutexes to avoid concurrent operations on Hadoop InputFormats.
// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
// might be used in the same JVM.
private static final Object OPEN_MUTEX = new Object();
private static final Object CONFIGURE_MUTEX = new Object();
private static final Object CLOSE_MUTEX = new Object();
// NOTE: this class is using a custom serialization logic, without a defaultWriteObject() method.
// Hence, all fields here are "transient".
private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
......@@ -89,8 +97,12 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void configure(Configuration parameters) {
if (mapreduceInputFormat instanceof Configurable) {
((Configurable) mapreduceInputFormat).setConf(configuration);
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
if (mapreduceInputFormat instanceof Configurable) {
((Configurable) mapreduceInputFormat).setConf(configuration);
}
}
}
......@@ -169,21 +181,26 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void open(HadoopInputSplit split) throws IOException {
TaskAttemptContext context;
try {
context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
} catch(Exception e) {
throw new RuntimeException(e);
}
try {
this.recordReader = this.mapreduceInputFormat
.createRecordReader(split.getHadoopInputSplit(), context);
this.recordReader.initialize(split.getHadoopInputSplit(), context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordReader.", e);
} finally {
this.fetched = false;
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
TaskAttemptContext context;
try {
context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
this.recordReader = this.mapreduceInputFormat
.createRecordReader(split.getHadoopInputSplit(), context);
this.recordReader.initialize(split.getHadoopInputSplit(), context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordReader.", e);
} finally {
this.fetched = false;
}
}
}
......@@ -207,7 +224,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
@Override
public void close() throws IOException {
this.recordReader.close();
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
this.recordReader.close();
}
}
// --------------------------------------------------------------------------------------------
......
......@@ -49,6 +49,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
private static final long serialVersionUID = 1L;
// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
// might be used in the same JVM.
private static final Object OPEN_MUTEX = new Object();
private static final Object CONFIGURE_MUTEX = new Object();
private static final Object CLOSE_MUTEX = new Object();
private org.apache.hadoop.conf.Configuration configuration;
private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
protected transient RecordWriter<K,V> recordWriter;
......@@ -73,8 +81,12 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
@Override
public void configure(Configuration parameters) {
if(this.mapreduceOutputFormat instanceof Configurable){
((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
if (this.mapreduceOutputFormat instanceof Configurable) {
((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
}
}
}
......@@ -86,49 +98,53 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
this.taskNumber = taskNumber+1;
// for hadoop 2.2
this.configuration.set("mapreduce.output.basename", "tmp");
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.taskNumber = taskNumber + 1;
this.configuration.set("mapred.task.id", taskAttemptID.toString());
this.configuration.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
try {
this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
} catch (Exception e) {
throw new RuntimeException(e);
}
// for hadoop 2.2
this.configuration.set("mapreduce.output.basename", "tmp");
this.context.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if(currentUserCreds != null) {
this.context.getCredentials().addAll(currentUserCreds);
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
if(outputCommitter instanceof FileOutputCommitter) {
this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
}
try {
this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordWriter.", e);
this.configuration.set("mapred.task.id", taskAttemptID.toString());
this.configuration.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
try {
this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
} catch (Exception e) {
throw new RuntimeException(e);
}
this.context.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if (currentUserCreds != null) {
this.context.getCredentials().addAll(currentUserCreds);
}
// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
if (outputCommitter instanceof FileOutputCommitter) {
this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter) this.outputCommitter).getWorkPath().toString());
}
try {
this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
} catch (InterruptedException e) {
throw new IOException("Could not create RecordWriter.", e);
}
}
}
......@@ -138,27 +154,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
*/
@Override
public void close() throws IOException {
try {
this.recordWriter.close(this.context);
} catch (InterruptedException e) {
throw new IOException("Could not close RecordReader.", e);
}
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
}
Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
// rename tmp-file to final name
FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
String taskNumberStr = Integer.toString(this.taskNumber);
String tmpFileTemplate = "tmp-r-00000";
String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
// enforce sequential close() calls
synchronized (CLOSE_MUTEX) {
try {
this.recordWriter.close(this.context);
} catch (InterruptedException e) {
throw new IOException("Could not close RecordReader.", e);
}
if (this.outputCommitter.needsTaskCommit(this.context)) {
this.outputCommitter.commitTask(this.context);
}
Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
// rename tmp-file to final name
FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
String taskNumberStr = Integer.toString(this.taskNumber);
String tmpFileTemplate = "tmp-r-00000";
String tmpFile = tmpFileTemplate.substring(0, 11 - taskNumberStr.length()) + taskNumberStr;
if (fs.exists(new Path(outputPath.toString() + "/" + tmpFile))) {
fs.rename(new Path(outputPath.toString() + "/" + tmpFile), new Path(outputPath.toString() + "/" + taskNumberStr));
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册