提交 04b97c94 编写于 作者: T Till Rohrmann

Fix race condition in ExecutionGraph which made the job finish before all...

Fix race condition in ExecutionGraph which made the job finish before all vertices have called the finalizeOnMaster method.
上级 ae8fb948
...@@ -92,7 +92,7 @@ public class HadoopMapredCompatWordCount { ...@@ -92,7 +92,7 @@ public class HadoopMapredCompatWordCount {
// normalize and split the line // normalize and split the line
String line = v.toString(); String line = v.toString();
String[] tokens = line.toLowerCase().split("\\W+"); String[] tokens = line.toLowerCase().split("\\W+");
// emit the pairs // emit the pairs
for (String token : tokens) { for (String token : tokens) {
if (token.length() > 0) { if (token.length() > 0) {
...@@ -119,8 +119,8 @@ public class HadoopMapredCompatWordCount { ...@@ -119,8 +119,8 @@ public class HadoopMapredCompatWordCount {
while(vs.hasNext()) { while(vs.hasNext()) {
cnt += vs.next().get(); cnt += vs.next().get();
} }
out.collect(k, new LongWritable(cnt)); out.collect(k, new LongWritable(cnt));
} }
@Override @Override
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.apache.flink.hadoopcompatibility.mapreduce; package org.apache.flink.hadoopcompatibility.mapreduce;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
...@@ -117,9 +116,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement ...@@ -117,9 +116,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
System.out.println("HadoopOutputFormat: Write to " + this.configuration.get("mapred" +
".output.dir"));
this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
try { try {
...@@ -136,21 +133,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement ...@@ -136,21 +133,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Could not create RecordWriter.", e); throw new IOException("Could not create RecordWriter.", e);
} }
File dir = new File(this.configuration.get("mapred.output.dir"));
if(dir.isDirectory()){
File[] files = dir.listFiles();
System.out.println(configuration.get("mapred.output.dir") + " contains the " +
"following files.");
for(File file: files){
System.out.println(file.toURI());
}
}else if(dir.exists()){
System.out.println(configuration.get("mapred.output.dir") + " is not a directory.");
}else{
System.out.println(configuration.get("mapred.output.dir") + " does not yet exists.");
}
} }
...@@ -169,7 +151,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement ...@@ -169,7 +151,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
System.out.println("HadoopOutputFormat: Close");
try { try {
this.recordWriter.close(this.context); this.recordWriter.close(this.context);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -181,22 +162,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement ...@@ -181,22 +162,6 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
} }
Path outputPath = new Path(this.configuration.get("mapred.output.dir")); Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
File dir = new File(this.configuration.get("mapred.output.dir"));
if(dir.isDirectory()){
File[] files = dir.listFiles();
System.out.println(configuration.get("mapred.output.dir") + " contains the " +
"following files.");
for(File file: files){
System.out.println(file.toURI());
}
}else if(dir.exists()){
System.out.println(configuration.get("mapred.output.dir") + " is not a directory.");
}else{
System.out.println(configuration.get("mapred.output.dir") + " does not yet exists.");
}
// rename tmp-file to final name // rename tmp-file to final name
FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
...@@ -206,18 +171,13 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement ...@@ -206,18 +171,13 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr; String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) { if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
System.out.println("Rename file " + new Path(outputPath.toString()+"/"+tmpFile) + " " +
"to " + new Path(outputPath.toString()+"/"+taskNumberStr));
fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr)); fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
}else{
System.out.println("File does not exist?");
} }
} }
@Override @Override
public void finalizeGlobal(int parallelism) throws IOException { public void finalizeGlobal(int parallelism) throws IOException {
System.out.println("Finalize HadoopOutputFormat.");
JobContext jobContext; JobContext jobContext;
TaskAttemptContext taskContext; TaskAttemptContext taskContext;
try { try {
......
...@@ -95,7 +95,7 @@ public class WordCount { ...@@ -95,7 +95,7 @@ public class WordCount {
// normalize and split the line // normalize and split the line
String line = value.f1.toString(); String line = value.f1.toString();
String[] tokens = line.toLowerCase().split("\\W+"); String[] tokens = line.toLowerCase().split("\\W+");
// emit the pairs // emit the pairs
for (String token : tokens) { for (String token : tokens) {
if (token.length() > 0) { if (token.length() > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册