diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 2a74660033f8156703767d0af38763b28f0d9d9d..d863e27b4524b5cb845e4ce2513af2020a92d522 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -181,6 +181,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * * @param fieldNumber * Position of the field in the record + * @param value + * Value to set the given field to */ public void setField(int fieldNumber, Value value) { try { @@ -253,7 +255,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Checks if the number of fields are equal to the batch field size then * adds the Value array to the end of the batch * - * @param record + * @param fields * Value array to be added as the next record of the batch */ public void addRecord(Value... fields) { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/BatchForward.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/BatchForward.java new file mode 100644 index 0000000000000000000000000000000000000000..6652fe2182063da06b8b9be8906b244c30a305c7 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/BatchForward.java @@ -0,0 +1,34 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.test.batch; + +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.test.util.TestBase2; + +public class BatchForward extends TestBase2{ + + @Override + public JobGraph getJobGraph() { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + graphBuilder.setSource("StreamSource", BatchForwardSource.class); + graphBuilder.setSink("StreamSink", BatchForwardSink.class); + + graphBuilder.shuffleConnect("StreamSource", "StreamSink"); + + return graphBuilder.getJobGraph(); + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/BatchForwardLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/BatchForwardLocal.java deleted file mode 100644 index ca0917e489f5f258db49b22dbc4c6d0c76a6e3ba..0000000000000000000000000000000000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/BatchForwardLocal.java +++ /dev/null @@ -1,91 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.streaming.test.batch; - -import java.net.InetSocketAddress; - -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; - -import eu.stratosphere.client.minicluster.NepheleMiniCluster; -import eu.stratosphere.client.program.Client; -import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.nephele.jobgraph.JobGraph; -import eu.stratosphere.streaming.api.JobGraphBuilder; - -public class BatchForwardLocal { - - public static JobGraph getJobGraph() { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("StreamSource", BatchForwardSource.class); - graphBuilder.setSink("StreamSink", BatchForwardSink.class); - - graphBuilder.shuffleConnect("StreamSource", "StreamSink"); - - return graphBuilder.getJobGraph(); - } - - public static void main(String[] args) { - - Logger root = Logger.getRootLogger(); - root.removeAllAppenders(); - PatternLayout layout = new PatternLayout( - "%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"); - ConsoleAppender appender = new ConsoleAppender(layout, "System.err"); - root.addAppender(appender); - root.setLevel(Level.DEBUG); - - try { - - JobGraph jG = getJobGraph(); - Configuration configuration = jG.getJobConfiguration(); - - if (args.length == 0) { - args = new String[] { "local" }; - } - - if (args[0].equals("local")) { - System.out.println("Running in Local mode"); - NepheleMiniCluster exec = new NepheleMiniCluster(); - - exec.start(); - - Client client = new Client(new InetSocketAddress("localhost", - 6498), configuration); - - client.run(null, jG, true); - - exec.stop(); - - } else if (args[0].equals("cluster")) { - System.out.println("Running in Cluster2 mode"); - - Client client = new Client(new InetSocketAddress( - "hadoop02.ilab.sztaki.hu", 6123), configuration); - - client.run(null, jG, true); - - } - - } catch (Exception e) { - System.out.println(e); - } - - } - -} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/wordcount/BatchWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/wordcount/BatchWordCount.java similarity index 53% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/wordcount/BatchWordCountLocal.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/wordcount/BatchWordCount.java index e5423d07c7a8d0a66c69693854a5a1c106c7b7bc..3ff1d6a137ebdc2eabac2774f0344d005872c626 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/wordcount/BatchWordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/batch/wordcount/BatchWordCount.java @@ -15,23 +15,15 @@ package eu.stratosphere.streaming.test.batch.wordcount; -import java.net.InetSocketAddress; - -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; - -import eu.stratosphere.client.minicluster.NepheleMiniCluster; -import eu.stratosphere.client.program.Client; -import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.test.util.TestBase2; import eu.stratosphere.types.StringValue; -public class BatchWordCountLocal { +public class BatchWordCount extends TestBase2 { - public static JobGraph getJobGraph() { + @Override + public JobGraph getJobGraph() { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class); @@ -50,52 +42,4 @@ public class BatchWordCountLocal { return graphBuilder.getJobGraph(); } - - public static void main(String[] args) { - - Logger root = Logger.getRootLogger(); - root.removeAllAppenders(); - PatternLayout layout = new PatternLayout( - "%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"); - ConsoleAppender appender = new ConsoleAppender(layout, "System.err"); - root.addAppender(appender); - root.setLevel(Level.DEBUG); - - try { - - JobGraph jG = getJobGraph(); - Configuration configuration = jG.getJobConfiguration(); - - if (args.length == 0) { - args = new String[] { "local" }; - } - - if (args[0].equals("local")) { - System.out.println("Running in Local mode"); - NepheleMiniCluster exec = new NepheleMiniCluster(); - - exec.start(); - - Client client = new Client(new InetSocketAddress("localhost", - 6498), configuration); - - client.run(null, jG, true); - - exec.stop(); - - } else if (args[0].equals("cluster")) { - System.out.println("Running in Cluster2 mode"); - - Client client = new Client(new InetSocketAddress( - "hadoop02.ilab.sztaki.hu", 6123), configuration); - - client.run(null, jG, true); - - } - - } catch (Exception e) { - System.out.println(e); - } - - } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java index 7da79c61b66c7093e29bf0ee5506e5b202012e5c..cac23a673b3ac05bf757c9d8ffa75ad246bb5a39 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/cellinfo/CellInfo.java @@ -87,7 +87,7 @@ public class CellInfo { Client client = new Client(new InetSocketAddress( "hadoop02.ilab.sztaki.hu", 6123), configuration); exec.start(); - client.run(null, jG, true); + client.run(jG, true); exec.stop(); } catch (Exception e) { System.out.println(e); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java similarity index 54% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountLocal.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java index 42a3061c168bb39752642875036cedcd9cef0041..1b261199c763d4c3242ca4a96fb599ad5a12fb70 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java @@ -15,24 +15,17 @@ package eu.stratosphere.streaming.test.window.wordcount; -import java.net.InetSocketAddress; - -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; - -import eu.stratosphere.client.minicluster.NepheleMiniCluster; -import eu.stratosphere.client.program.Client; -import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; + +import eu.stratosphere.test.util.TestBase2; import eu.stratosphere.types.StringValue; //TODO: window operator remains unfinished. -public class WindowWordCountLocal { +public class WindowWordCount extends TestBase2 { - public static JobGraph getJobGraph() { + @Override + public JobGraph getJobGraph() { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class); @@ -51,52 +44,4 @@ public class WindowWordCountLocal { return graphBuilder.getJobGraph(); } - - public static void main(String[] args) { - - Logger root = Logger.getRootLogger(); - root.removeAllAppenders(); - PatternLayout layout = new PatternLayout( - "%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"); - ConsoleAppender appender = new ConsoleAppender(layout, "System.err"); - root.addAppender(appender); - root.setLevel(Level.DEBUG); - - try { - - JobGraph jG = getJobGraph(); - Configuration configuration = jG.getJobConfiguration(); - - if (args.length == 0) { - args = new String[] { "local" }; - } - - if (args[0].equals("local")) { - System.out.println("Running in Local mode"); - NepheleMiniCluster exec = new NepheleMiniCluster(); - - exec.start(); - - Client client = new Client(new InetSocketAddress("localhost", - 6498), configuration); - - client.run(null, jG, true); - - exec.stop(); - - } else if (args[0].equals("cluster")) { - System.out.println("Running in Cluster2 mode"); - - Client client = new Client(new InetSocketAddress( - "hadoop02.ilab.sztaki.hu", 6123), configuration); - - client.run(null, jG, true); - - } - - } catch (Exception e) { - System.out.println(e); - } - - } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java index 377873548c4d7f00ce18d7461512f8334bd07a04..7a6c47258938178575ff4f57f439a7291216c0cd 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java @@ -33,17 +33,15 @@ public class WordCountCounter extends UserTaskInvokable { private int i = 0; private long time; private long prevTime = System.currentTimeMillis(); - @Override public void invoke(StreamRecord record) throws Exception { wordValue = (StringValue) record.getRecord(0)[0]; word = wordValue.getValue(); i++; if (i % 50000 == 0) { - time = System.currentTimeMillis(); - System.out.println("Counter:\t" + i + "\t----Time: " - + (time - prevTime)); - prevTime = time; + time= System.currentTimeMillis(); + System.out.println("Counter:\t" + i + "\t----Time: "+(time-prevTime)); + prevTime=time; } if (wordCounts.containsKey(word)) { count = wordCounts.get(word) + 1; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java index 7cf946cc75dda85c7d5b25b689f7efe29d194613..eb36e7cd2781cc473209758ae6d6fe6bc9eb0fc3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java @@ -74,7 +74,7 @@ public class WordCountLocal { Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); - client.run(null, jG, true); + client.run(jG, true); exec.stop(); @@ -84,7 +84,7 @@ public class WordCountLocal { Client client = new Client(new InetSocketAddress( "hadoop02.ilab.sztaki.hu", 6123), configuration); - client.run(null, jG, true); + client.run(jG, true); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java index 203570f356fdcf483be7d805089eb45b53362229..b69d796864793b05085bc0c843571cabfd779308 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountRemote.java @@ -33,6 +33,7 @@ import eu.stratosphere.types.StringValue; public class WordCountRemote { + private static JobGraph getJobGraph() throws Exception { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); graphBuilder.setSource("WordCountSource", WordCountDummySource2.class); @@ -58,6 +59,7 @@ public class WordCountRemote { root.addAppender(appender); root.setLevel(Level.DEBUG); + try { File file = new File( @@ -73,7 +75,8 @@ public class WordCountRemote { Client client = new Client(new InetSocketAddress( "hadoop02.ilab.sztaki.hu", 6123), configuration); - client.run(null, jG, true); + client.run(jG, true); + } catch (Exception e) { System.out.println(e); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/test/util/TestBase2.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/test/util/TestBase2.java new file mode 100644 index 0000000000000000000000000000000000000000..fb30e10158937b564650f61be8fbf847e465de98 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/test/util/TestBase2.java @@ -0,0 +1,383 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.test.util; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import eu.stratosphere.api.common.JobExecutionResult; +import eu.stratosphere.api.common.Plan; +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.compiler.DataStatistics; +import eu.stratosphere.compiler.PactCompiler; +import eu.stratosphere.compiler.plan.OptimizedPlan; +import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator; +import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.client.JobClient; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.util.LogUtils; + +public abstract class TestBase2 { + + private static final int MINIMUM_HEAP_SIZE_MB = 192; + + protected final Configuration config; + + private final List tempFiles; + + private NepheleMiniCluster executor; + + protected boolean printPlan = false; + + private JobExecutionResult jobExecutionResult; + + + public TestBase2() { + this(new Configuration()); + } + + public TestBase2(Configuration config) { + verifyJvmOptions(); + this.config = config; + this.tempFiles = new ArrayList(); + + LogUtils.initializeDefaultConsoleLogger(Level.WARN); + } + + + private void verifyJvmOptions() { + long heap = Runtime.getRuntime().maxMemory() >> 20; + Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + + "m", heap > MINIMUM_HEAP_SIZE_MB - 50); + } + + + @Before + public void startCluster() throws Exception { + this.executor = new NepheleMiniCluster(); + this.executor.setDefaultOverwriteFiles(true); + + this.executor.start(); + } + + @After + public void stopCluster() throws Exception { + try { + if (this.executor != null) { + this.executor.stop(); + this.executor = null; + FileSystem.closeAll(); + System.gc(); + } + } finally { + deleteAllTempFiles(); + } + } + + @Test + public void testJob() throws Exception { + // pre-submit + try { + preSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // submit job + JobGraph jobGraph = null; + try { + jobGraph = getJobGraph(); + } + catch(Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Failed to obtain JobGraph!"); + } + + Assert.assertNotNull("Obtained null JobGraph", jobGraph); + + try { + JobClient client = null; + try { + client = this.executor.getJobClient(jobGraph); } + catch(Exception e) { + System.err.println("here"); + } + client.setConsoleStreamForReporting(getNullPrintStream()); + this.jobExecutionResult = client.submitJobAndWait(); + } + catch(Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Job execution failed!"); + } + + // post-submit + try { + postSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Post-submit work caused an error: " + e.getMessage()); + } + } + + public String getTempDirPath(String dirName) throws IOException { + File f = createAndRegisterTempFile(dirName); + return f.toURI().toString(); + } + + public String getTempFilePath(String fileName) throws IOException { + File f = createAndRegisterTempFile(fileName); + return f.toURI().toString(); + } + + public String createTempFile(String fileName, String contents) throws IOException { + File f = createAndRegisterTempFile(fileName); + Files.write(contents, f, Charsets.UTF_8); + return f.toURI().toString(); + } + + private File createAndRegisterTempFile(String fileName) throws IOException { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File f = new File(baseDir, fileName); + + if (f.exists()) { + deleteRecursively(f); + } + + File parentToDelete = f; + while (true) { + File parent = parentToDelete.getParentFile(); + if (parent == null) { + throw new IOException("Missed temp dir while traversing parents of a temp file."); + } + if (parent.equals(baseDir)) { + break; + } + parentToDelete = parent; + } + + Files.createParentDirs(f); + this.tempFiles.add(parentToDelete); + return f; + } + + public BufferedReader[] getResultReader(String resultPath) throws IOException { + return getResultReader(resultPath, false); + } + + public BufferedReader[] getResultReader(String resultPath, boolean inOrderOfFiles) throws IOException { + File[] files = getAllInvolvedFiles(resultPath); + + if (inOrderOfFiles) { + // sort the files after their name (1, 2, 3, 4)... + // we cannot sort by path, because strings sort by prefix + Arrays.sort(files, new Comparator() { + + @Override + public int compare(File o1, File o2) { + try { + int f1 = Integer.parseInt(o1.getName()); + int f2 = Integer.parseInt(o2.getName()); + return f1 < f2 ? -1 : (f1 > f2 ? 1 : 0); + } + catch (NumberFormatException e) { + throw new RuntimeException("The file names are no numbers and cannot be ordered: " + + o1.getName() + "/" + o2.getName()); + } + } + }); + } + + BufferedReader[] readers = new BufferedReader[files.length]; + for (int i = 0; i < files.length; i++) { + readers[i] = new BufferedReader(new FileReader(files[i])); + } + return readers; + } + + public BufferedInputStream[] getResultInputStream(String resultPath) throws IOException { + File[] files = getAllInvolvedFiles(resultPath); + BufferedInputStream[] inStreams = new BufferedInputStream[files.length]; + for (int i = 0; i < files.length; i++) { + inStreams[i] = new BufferedInputStream(new FileInputStream(files[i])); + } + return inStreams; + } + + public void readAllResultLines(List target, String resultPath) throws IOException { + readAllResultLines(target, resultPath, false); + } + + public void readAllResultLines(List target, String resultPath, boolean inOrderOfFiles) throws IOException { + for (BufferedReader reader : getResultReader(resultPath, inOrderOfFiles)) { + String s = null; + while ((s = reader.readLine()) != null) { + target.add(s); + } + } + } + + public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception { + ArrayList list = new ArrayList(); + readAllResultLines(list, resultPath, false); + + String[] result = (String[]) list.toArray(new String[list.size()]); + Arrays.sort(result); + + String[] expected = expectedResultStr.split("\n"); + Arrays.sort(expected); + + Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length); + Assert.assertArrayEquals(expected, result); + } + + public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception { + ArrayList list = new ArrayList(); + readAllResultLines(list, resultPath, true); + + String[] result = (String[]) list.toArray(new String[list.size()]); + + String[] expected = expectedResultStr.split("\n"); + + Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length); + Assert.assertArrayEquals(expected, result); + } + + private File[] getAllInvolvedFiles(String resultPath) { + File result = asFile(resultPath); + if (!result.exists()) { + Assert.fail("Result file was not written"); + } + if (result.isDirectory()) { + return result.listFiles(); + } else { + return new File[] { result }; + } + } + + public File asFile(String path) { + try { + URI uri = new URI(path); + if (uri.getScheme().equals("file")) { + return new File(uri.getPath()); + } else { + throw new IllegalArgumentException("This path does not denote a local file."); + } + } catch (URISyntaxException e) { + throw new IllegalArgumentException("This path does not describe a valid local file URI."); + } + } + + private void deleteAllTempFiles() throws IOException { + for (File f : this.tempFiles) { + if (f.exists()) { + deleteRecursively(f); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected JobGraph getJobGraph() throws Exception { + Plan p = getTestJob(); + if (p == null) { + Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?"); + } + + PactCompiler pc = new PactCompiler(new DataStatistics()); + OptimizedPlan op = pc.compile(p); + + if (printPlan) { + System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op)); + } + + NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + return jgg.compileJobGraph(op); + } + + protected Plan getTestJob() { + return null; + } + + protected void preSubmit() throws Exception {} + + + protected void postSubmit() throws Exception {} + + public JobExecutionResult getJobExecutionResult() { + return jobExecutionResult; + } + + + // -------------------------------------------------------------------------------------------- + // Miscellaneous helper methods + // -------------------------------------------------------------------------------------------- + + protected static Collection toParameterList(Configuration ... testConfigs) { + ArrayList configs = new ArrayList(); + for (Configuration testConfig : testConfigs) { + Object[] c = { testConfig }; + configs.add(c); + } + return configs; + } + + private static void deleteRecursively (File f) throws IOException { + if (f.isDirectory()) { + FileUtils.deleteDirectory(f); + } else { + f.delete(); + } + } + + public static PrintStream getNullPrintStream() { + return new PrintStream(new OutputStream() { + @Override + public void write(int b) throws IOException {} + }); + } +}