提交 976bc573 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] fault tolerance improvement and bugfix

上级 32c6701e
...@@ -37,7 +37,8 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; ...@@ -37,7 +37,8 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
*/ */
public class FaultToleranceBuffer { public class FaultToleranceBuffer {
private static final Log log = LogFactory.getLog(FaultToleranceBuffer.class); private static final Log log = LogFactory
.getLog(FaultToleranceBuffer.class);
private long TIMEOUT = 10000; private long TIMEOUT = 10000;
private Long timeOfLastUpdate; private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer; private Map<String, StreamRecord> recordBuffer;
...@@ -55,11 +56,11 @@ public class FaultToleranceBuffer { ...@@ -55,11 +56,11 @@ public class FaultToleranceBuffer {
* channel ID * channel ID
* *
* @param outputs * @param outputs
* List of outputs * List of outputs
* @param channelID * @param channelID
* ID of the task object that uses this buffer * ID of the task object that uses this buffer
* @param numberOfChannels * @param numberOfChannels
* Number of output channels for the component * Number of output channels for the component
*/ */
public FaultToleranceBuffer(List<RecordWriter<StreamRecord>> outputs, public FaultToleranceBuffer(List<RecordWriter<StreamRecord>> outputs,
...@@ -88,18 +89,20 @@ public class FaultToleranceBuffer { ...@@ -88,18 +89,20 @@ public class FaultToleranceBuffer {
} }
/** /**
* Checks for records that have timed out since the last check and fails them. * Checks for records that have timed out since the last check and fails
* them.
* *
* @param currentTime * @param currentTime
* Time when the check should be made, usually current system time. * Time when the check should be made, usually current system
* time.
* @return Returns the list of the records that have timed out. * @return Returns the list of the records that have timed out.
*/ */
List<String> timeoutRecords(Long currentTime) { List<String> timeoutRecords(Long currentTime) {
if (timeOfLastUpdate + TIMEOUT < currentTime) { if (timeOfLastUpdate + TIMEOUT < currentTime) {
log.trace("Updating record buffer"); log.trace("Updating record buffer");
List<String> timedOutRecords = new LinkedList<String>(); List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L,
- TIMEOUT); currentTime - TIMEOUT);
for (Set<String> recordSet : timedOut.values()) { for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) { if (!recordSet.isEmpty()) {
...@@ -109,11 +112,12 @@ public class FaultToleranceBuffer { ...@@ -109,11 +112,12 @@ public class FaultToleranceBuffer {
} }
} }
recordsByTime.keySet().removeAll(timedOut.keySet());
for (String recordID : timedOutRecords) { for (String recordID : timedOutRecords) {
failRecord(recordID); failRecord(recordID);
} }
timedOut.clear();
timeOfLastUpdate = currentTime; timeOfLastUpdate = currentTime;
return timedOutRecords; return timedOutRecords;
} }
...@@ -122,13 +126,13 @@ public class FaultToleranceBuffer { ...@@ -122,13 +126,13 @@ public class FaultToleranceBuffer {
/** /**
* Stores time stamp for a record by recordID and also adds the record to a * Stores time stamp for a record by recordID and also adds the record to a
* map which maps a time stamp to the IDs of records that were emitted at that * map which maps a time stamp to the IDs of records that were emitted at
* time. * that time.
* <p> * <p>
* Later used for timeouts. * Later used for timeouts.
* *
* @param recordID * @param recordID
* ID of the record * ID of the record
*/ */
public void addTimestamp(String recordID) { public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis(); Long currentTime = System.currentTimeMillis();
...@@ -146,37 +150,21 @@ public class FaultToleranceBuffer { ...@@ -146,37 +150,21 @@ public class FaultToleranceBuffer {
} }
/** /**
* Returns a StreamRecord after removing it from the buffer * Removes a StreamRecord by ID from the fault tolerance buffer, further
* acks will have no effects for this record.
* *
* @param recordID * @param recordID
* The ID of the record that will be popped * The ID of the record that will be removed
*/
public StreamRecord popRecord(String recordID) {
//System.out.println("Pop ID: " + recordID);
return removeRecord(recordID);
}
/**
* Removes a StreamRecord by ID from the fault tolerance buffer, further acks
* will have no effects for this record.
*
* @param recordID
* The ID of the record that will be removed
* *
*/ */
StreamRecord removeRecord(String recordID) { public StreamRecord removeRecord(String recordID) {
StreamRecord recordToRemove = null;
ackCounter.remove(recordID); ackCounter.remove(recordID);
try {
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID); recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
recordToRemove = recordBuffer.remove(recordID);
log.trace("Record removed from buffer: " + recordID); log.trace("Record removed from buffer: " + recordID);
} catch (NullPointerException e) {
return recordBuffer.remove(recordID);
} catch (Exception e) {
log.error("Cannot remove record from buffer: " + recordID, e);
}
return recordToRemove;
} }
/** /**
...@@ -184,7 +172,7 @@ public class FaultToleranceBuffer { ...@@ -184,7 +172,7 @@ public class FaultToleranceBuffer {
* acknowledgments, removes it from the buffer * acknowledgments, removes it from the buffer
* *
* @param recordID * @param recordID
* ID of the record that has been acknowledged * ID of the record that has been acknowledged
*/ */
// TODO: find a place to call timeoutRecords // TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) { public void ackRecord(String recordID) {
...@@ -204,12 +192,12 @@ public class FaultToleranceBuffer { ...@@ -204,12 +192,12 @@ public class FaultToleranceBuffer {
* stores it with a new ID. * stores it with a new ID.
* *
* @param recordID * @param recordID
* ID of the record that has been failed * ID of the record that has been failed
*/ */
public void failRecord(String recordID) { public void failRecord(String recordID) {
// Create new id to avoid double counting acks // Create new id to avoid double counting acks
log.warn("Fail ID: " + recordID); log.warn("Fail ID: " + recordID);
StreamRecord newRecord = popRecord(recordID).setId(channelID); StreamRecord newRecord = removeRecord(recordID).setId(channelID);
addRecord(newRecord); addRecord(newRecord);
reEmit(newRecord); reEmit(newRecord);
} }
...@@ -218,7 +206,7 @@ public class FaultToleranceBuffer { ...@@ -218,7 +206,7 @@ public class FaultToleranceBuffer {
* Emit give record to all output channels * Emit give record to all output channels
* *
* @param record * @param record
* Record to be re-emitted * Record to be re-emitted
*/ */
public void reEmit(StreamRecord record) { public void reEmit(StreamRecord record) {
for (RecordWriter<StreamRecord> output : outputs) { for (RecordWriter<StreamRecord> output : outputs) {
......
...@@ -15,14 +15,22 @@ ...@@ -15,14 +15,22 @@
package eu.stratosphere.streaming.test.batch; package eu.stratosphere.streaming.test.batch;
import java.net.InetSocketAddress;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
public class BatchForward extends TestBase2{ public class BatchForwardLocal {
@Override public static JobGraph getJobGraph() {
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", BatchForwardSource.class); graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BatchForwardSink.class); graphBuilder.setSink("StreamSink", BatchForwardSink.class);
...@@ -31,4 +39,53 @@ public class BatchForward extends TestBase2{ ...@@ -31,4 +39,53 @@ public class BatchForward extends TestBase2{
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
} }
public static void main(String[] args) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.DEBUG);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
client.run(null, jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(null, jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
} }
...@@ -15,15 +15,23 @@ ...@@ -15,15 +15,23 @@
package eu.stratosphere.streaming.test.batch.wordcount; package eu.stratosphere.streaming.test.batch.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
public class BatchWordCount extends TestBase2 { public class BatchWordCountLocal {
@Override public static JobGraph getJobGraph() {
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource", graphBuilder.setSource("BatchWordCountSource",
BatchWordCountSource.class); BatchWordCountSource.class);
...@@ -42,4 +50,52 @@ public class BatchWordCount extends TestBase2 { ...@@ -42,4 +50,52 @@ public class BatchWordCount extends TestBase2 {
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
} }
public static void main(String[] args) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.DEBUG);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
client.run(null, jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(null, jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
} }
...@@ -87,7 +87,7 @@ public class CellInfo { ...@@ -87,7 +87,7 @@ public class CellInfo {
Client client = new Client(new InetSocketAddress( Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration); "hadoop02.ilab.sztaki.hu", 6123), configuration);
exec.start(); exec.start();
client.run(jG, true); client.run(null, jG, true);
exec.stop(); exec.stop();
} catch (Exception e) { } catch (Exception e) {
System.out.println(e); System.out.println(e);
......
...@@ -15,17 +15,24 @@ ...@@ -15,17 +15,24 @@
package eu.stratosphere.streaming.test.window.wordcount; package eu.stratosphere.streaming.test.window.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
//TODO: window operator remains unfinished. //TODO: window operator remains unfinished.
public class WindowWordCount extends TestBase2 { public class WindowWordCountLocal {
@Override public static JobGraph getJobGraph() {
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", graphBuilder.setSource("WindowWordCountSource",
WindowWordCountSource.class); WindowWordCountSource.class);
...@@ -44,4 +51,52 @@ public class WindowWordCount extends TestBase2 { ...@@ -44,4 +51,52 @@ public class WindowWordCount extends TestBase2 {
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
} }
public static void main(String[] args) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.DEBUG);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
client.run(null, jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(null, jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
} }
...@@ -33,15 +33,17 @@ public class WordCountCounter extends UserTaskInvokable { ...@@ -33,15 +33,17 @@ public class WordCountCounter extends UserTaskInvokable {
private int i = 0; private int i = 0;
private long time; private long time;
private long prevTime = System.currentTimeMillis(); private long prevTime = System.currentTimeMillis();
@Override @Override
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getRecord(0)[0]; wordValue = (StringValue) record.getRecord(0)[0];
word = wordValue.getValue(); word = wordValue.getValue();
i++; i++;
if (i % 50000 == 0) { if (i % 50000 == 0) {
time= System.currentTimeMillis(); time = System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "+(time-prevTime)); System.out.println("Counter:\t" + i + "\t----Time: "
prevTime=time; + (time - prevTime));
prevTime = time;
} }
if (wordCounts.containsKey(word)) { if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1; count = wordCounts.get(word) + 1;
......
...@@ -74,7 +74,7 @@ public class WordCountLocal { ...@@ -74,7 +74,7 @@ public class WordCountLocal {
Client client = new Client(new InetSocketAddress("localhost", Client client = new Client(new InetSocketAddress("localhost",
6498), configuration); 6498), configuration);
client.run(jG, true); client.run(null, jG, true);
exec.stop(); exec.stop();
...@@ -84,7 +84,7 @@ public class WordCountLocal { ...@@ -84,7 +84,7 @@ public class WordCountLocal {
Client client = new Client(new InetSocketAddress( Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration); "hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(jG, true); client.run(null, jG, true);
} }
......
...@@ -33,7 +33,6 @@ import eu.stratosphere.types.StringValue; ...@@ -33,7 +33,6 @@ import eu.stratosphere.types.StringValue;
public class WordCountRemote { public class WordCountRemote {
private static JobGraph getJobGraph() throws Exception { private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource2.class); graphBuilder.setSource("WordCountSource", WordCountDummySource2.class);
...@@ -59,7 +58,6 @@ public class WordCountRemote { ...@@ -59,7 +58,6 @@ public class WordCountRemote {
root.addAppender(appender); root.addAppender(appender);
root.setLevel(Level.DEBUG); root.setLevel(Level.DEBUG);
try { try {
File file = new File( File file = new File(
...@@ -75,8 +73,7 @@ public class WordCountRemote { ...@@ -75,8 +73,7 @@ public class WordCountRemote {
Client client = new Client(new InetSocketAddress( Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration); "hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(jG, true); client.run(null, jG, true);
} catch (Exception e) { } catch (Exception e) {
System.out.println(e); System.out.println(e);
......
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.test.util;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.util.LogUtils;
public abstract class TestBase2 {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
protected final Configuration config;
private final List<File> tempFiles;
private NepheleMiniCluster executor;
protected boolean printPlan = false;
private JobExecutionResult jobExecutionResult;
public TestBase2() {
this(new Configuration());
}
public TestBase2(Configuration config) {
verifyJvmOptions();
this.config = config;
this.tempFiles = new ArrayList<File>();
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
@Before
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
this.executor.start();
}
@After
public void stopCluster() throws Exception {
try {
if (this.executor != null) {
this.executor.stop();
this.executor = null;
FileSystem.closeAll();
System.gc();
}
} finally {
deleteAllTempFiles();
}
}
@Test
public void testJob() throws Exception {
// pre-submit
try {
preSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Pre-submit work caused an error: " + e.getMessage());
}
// submit job
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = null;
try {
client = this.executor.getJobClient(jobGraph); }
catch(Exception e) {
System.err.println("here");
}
client.setConsoleStreamForReporting(getNullPrintStream());
this.jobExecutionResult = client.submitJobAndWait();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
// post-submit
try {
postSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Post-submit work caused an error: " + e.getMessage());
}
}
public String getTempDirPath(String dirName) throws IOException {
File f = createAndRegisterTempFile(dirName);
return f.toURI().toString();
}
public String getTempFilePath(String fileName) throws IOException {
File f = createAndRegisterTempFile(fileName);
return f.toURI().toString();
}
public String createTempFile(String fileName, String contents) throws IOException {
File f = createAndRegisterTempFile(fileName);
Files.write(contents, f, Charsets.UTF_8);
return f.toURI().toString();
}
private File createAndRegisterTempFile(String fileName) throws IOException {
File baseDir = new File(System.getProperty("java.io.tmpdir"));
File f = new File(baseDir, fileName);
if (f.exists()) {
deleteRecursively(f);
}
File parentToDelete = f;
while (true) {
File parent = parentToDelete.getParentFile();
if (parent == null) {
throw new IOException("Missed temp dir while traversing parents of a temp file.");
}
if (parent.equals(baseDir)) {
break;
}
parentToDelete = parent;
}
Files.createParentDirs(f);
this.tempFiles.add(parentToDelete);
return f;
}
public BufferedReader[] getResultReader(String resultPath) throws IOException {
return getResultReader(resultPath, false);
}
public BufferedReader[] getResultReader(String resultPath, boolean inOrderOfFiles) throws IOException {
File[] files = getAllInvolvedFiles(resultPath);
if (inOrderOfFiles) {
// sort the files after their name (1, 2, 3, 4)...
// we cannot sort by path, because strings sort by prefix
Arrays.sort(files, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
try {
int f1 = Integer.parseInt(o1.getName());
int f2 = Integer.parseInt(o2.getName());
return f1 < f2 ? -1 : (f1 > f2 ? 1 : 0);
}
catch (NumberFormatException e) {
throw new RuntimeException("The file names are no numbers and cannot be ordered: " +
o1.getName() + "/" + o2.getName());
}
}
});
}
BufferedReader[] readers = new BufferedReader[files.length];
for (int i = 0; i < files.length; i++) {
readers[i] = new BufferedReader(new FileReader(files[i]));
}
return readers;
}
public BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
File[] files = getAllInvolvedFiles(resultPath);
BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
for (int i = 0; i < files.length; i++) {
inStreams[i] = new BufferedInputStream(new FileInputStream(files[i]));
}
return inStreams;
}
public void readAllResultLines(List<String> target, String resultPath) throws IOException {
readAllResultLines(target, resultPath, false);
}
public void readAllResultLines(List<String> target, String resultPath, boolean inOrderOfFiles) throws IOException {
for (BufferedReader reader : getResultReader(resultPath, inOrderOfFiles)) {
String s = null;
while ((s = reader.readLine()) != null) {
target.add(s);
}
}
}
public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, false);
String[] result = (String[]) list.toArray(new String[list.size()]);
Arrays.sort(result);
String[] expected = expectedResultStr.split("\n");
Arrays.sort(expected);
Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
Assert.assertArrayEquals(expected, result);
}
public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, true);
String[] result = (String[]) list.toArray(new String[list.size()]);
String[] expected = expectedResultStr.split("\n");
Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
Assert.assertArrayEquals(expected, result);
}
private File[] getAllInvolvedFiles(String resultPath) {
File result = asFile(resultPath);
if (!result.exists()) {
Assert.fail("Result file was not written");
}
if (result.isDirectory()) {
return result.listFiles();
} else {
return new File[] { result };
}
}
public File asFile(String path) {
try {
URI uri = new URI(path);
if (uri.getScheme().equals("file")) {
return new File(uri.getPath());
} else {
throw new IllegalArgumentException("This path does not denote a local file.");
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("This path does not describe a valid local file URI.");
}
}
private void deleteAllTempFiles() throws IOException {
for (File f : this.tempFiles) {
if (f.exists()) {
deleteRecursively(f);
}
}
}
// --------------------------------------------------------------------------------------------
// Methods to create the test program and for pre- and post- test work
// --------------------------------------------------------------------------------------------
protected JobGraph getJobGraph() throws Exception {
Plan p = getTestJob();
if (p == null) {
Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
}
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(p);
if (printPlan) {
System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
}
NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
return jgg.compileJobGraph(op);
}
protected Plan getTestJob() {
return null;
}
protected void preSubmit() throws Exception {}
protected void postSubmit() throws Exception {}
public JobExecutionResult getJobExecutionResult() {
return jobExecutionResult;
}
// --------------------------------------------------------------------------------------------
// Miscellaneous helper methods
// --------------------------------------------------------------------------------------------
protected static Collection<Object[]> toParameterList(Configuration ... testConfigs) {
ArrayList<Object[]> configs = new ArrayList<Object[]>();
for (Configuration testConfig : testConfigs) {
Object[] c = { testConfig };
configs.add(c);
}
return configs;
}
private static void deleteRecursively (File f) throws IOException {
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
} else {
f.delete();
}
}
public static PrintStream getNullPrintStream() {
return new PrintStream(new OutputStream() {
@Override
public void write(int b) throws IOException {}
});
}
}
...@@ -107,7 +107,7 @@ public class FaultToleranceBufferTest { ...@@ -107,7 +107,7 @@ public class FaultToleranceBufferTest {
faultTolerancyBuffer.addRecord(record1); faultTolerancyBuffer.addRecord(record1);
assertArrayEquals(record1.getRecord(0), assertArrayEquals(record1.getRecord(0),
faultTolerancyBuffer.popRecord(record1.getId()).getRecord(0)); faultTolerancyBuffer.removeRecord(record1.getId()).getRecord(0));
System.out.println("---------"); System.out.println("---------");
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册