提交 ffbc7e17 编写于 作者: F Fabian Hueske

- updated contract tests

- moved pact program tests
- fixed tpch-3 program test
- fixed LocalFSProvider
上级 7ddfb16a
......@@ -159,11 +159,11 @@
<includes>
<!--
<include>**/TestNepheleMiniCluster.java</include>
-->
<include>**/ContractTests.java</include>
<!--
<include>**/JobTests.java</include>
-->
<include>**/ContractTests.java</include>
<include>**/PactProgramTests.java</include>
</includes>
</configuration>
</plugin>
......
......@@ -199,41 +199,9 @@ public class CoGroupTest extends TestBase
String tempPath = getFilesystemProvider().getTempDirPath();
// read result
InputStream is = getFilesystemProvider().getInputStream(tempPath + "/result.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(COGROUP_RESULT, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
getFilesystemProvider().delete(tempPath + "/result.txt", false);
compareResultsByLinesInMemory(COGROUP_RESULT, tempPath + "/result.txt");
getFilesystemProvider().delete(tempPath + "/result.txt", true);
getFilesystemProvider().delete(tempPath + "/cogroup_left", true);
getFilesystemProvider().delete(tempPath + "/cogroup_right", true);
}
......
......@@ -198,42 +198,9 @@ public class CrossTest extends TestBase
String tempDir = getFilesystemProvider().getTempDirPath();
// read result
InputStream is = getFilesystemProvider().getInputStream(
tempDir + "/result.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(CROSS_RESULT, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
getFilesystemProvider().delete(tempDir + "/result.txt", false);
compareResultsByLinesInMemory(CROSS_RESULT, tempDir + "/result.txt");
getFilesystemProvider().delete(tempDir + "/result.txt", true);
getFilesystemProvider().delete(tempDir + "/cross_left", true);
getFilesystemProvider().delete(tempDir + "/cross_right", true);
}
......
......@@ -156,41 +156,9 @@ public class MapTest extends TestBase
protected void postSubmit() throws Exception {
String tempDir = getFilesystemProvider().getTempDirPath();
// read result
InputStream is = getFilesystemProvider().getInputStream(tempDir+ "/result.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(MAP_RESULT, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
getFilesystemProvider().delete(tempDir+ "/result.txt", false);
compareResultsByLinesInMemory(MAP_RESULT, tempDir+ "/result.txt");
getFilesystemProvider().delete(tempDir+ "/result.txt", true);
getFilesystemProvider().delete(tempDir+ "/mapInput", true);
}
......
......@@ -198,41 +198,9 @@ public class MatchTest extends TestBase
protected void postSubmit() throws Exception {
String tempPath = getFilesystemProvider().getTempDirPath();
// read result
InputStream is = getFilesystemProvider().getInputStream(tempPath + "/result.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(MATCH_RESULT, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
getFilesystemProvider().delete(tempPath + "/result.txt", false);
compareResultsByLinesInMemory(MATCH_RESULT, tempPath + "/result.txt");
getFilesystemProvider().delete(tempPath + "/result.txt", true);
getFilesystemProvider().delete(tempPath + "/match_left", true);
getFilesystemProvider().delete(tempPath + "/match_right", true);
......
......@@ -182,41 +182,9 @@ public class ReduceTest extends TestBase
String tempDir = getFilesystemProvider().getTempDirPath();
// read result
InputStream is = getFilesystemProvider().getInputStream(tempDir + "/result.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(REDUCE_RESULT, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
getFilesystemProvider().delete(tempDir + "/result.txt", false);
compareResultsByLinesInMemory(REDUCE_RESULT, tempDir + "/result.txt");
getFilesystemProvider().delete(tempDir + "/result.txt", true);
getFilesystemProvider().delete(tempDir + "/reduceInput", true);
}
......
......@@ -13,18 +13,13 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.StringTokenizer;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
......@@ -41,42 +36,40 @@ import eu.stratosphere.pact.test.util.TestBase;
@RunWith(Parameterized.class)
public class All2AllSPTest extends TestBase {
private static final Log LOG = LogFactory.getLog(All2AllSPTest.class);
String pathsPath = null;
private String paths = "A|C|7| |\n" + "A|D|6| |\n" + "B|A|1| |\n" + "B|D|2| |\n" + "C|B|3| |\n" + "C|E|10| |\n"
+ "C|F|12| |\n" + "C|G|9| |\n" + "D|F|5| |\n" + "E|H|2| |\n" + "F|E|3| |\n" + "G|F|1| |\n" + "H|D|2| |\n"
+ "H|E|4| |\n";
+ "C|F|12| |\n" + "C|G|9| |\n" + "D|F|5| |\n" + "E|H|2| |\n" + "F|E|3| |\n" + "G|F|1| |\n" + "H|D|2| |\n"
+ "H|E|4| |\n";
private String expected = "A|C|7| |\n" + "A|B|10|C|\n" + "A|D|6| |\n" + "A|E|17|C|\n" + "A|F|11|D|\n"
+ "A|G|16|C|\n" + "B|A|1| |\n" + "B|C|8|A|\n" + "B|D|2| |\n" + "B|F|7|D|\n" + "C|A|4|B|\n" + "C|B|3| |\n"
+ "C|D|5|B|\n" + "C|E|10| |\n" + "C|F|10|G|\n" + "C|G|9| |\n" + "C|H|12|E|\n" + "D|E|8|F|\n" + "D|F|5| |\n"
+ "E|D|4|H|\n" + "E|H|2| |\n" + "F|E|3| |\n" + "F|H|5|E|\n" + "G|E|4|F|\n" + "G|F|1| |\n" + "H|D|2| |\n"
+ "H|E|4| |\n" + "H|F|7|D|\n";
+ "A|G|16|C|\n" + "B|A|1| |\n" + "B|C|8|A|\n" + "B|D|2| |\n" + "B|F|7|D|\n" + "C|A|4|B|\n" + "C|B|3| |\n"
+ "C|D|5|B|\n" + "C|E|10| |\n" + "C|F|10|G|\n" + "C|G|9| |\n" + "C|H|12|E|\n" + "D|E|8|F|\n" + "D|F|5| |\n"
+ "E|D|4|H|\n" + "E|H|2| |\n" + "F|E|3| |\n" + "F|H|5|E|\n" + "G|E|4|F|\n" + "G|F|1| |\n" + "H|D|2| |\n"
+ "H|E|4| |\n" + "H|F|7|D|\n";
public All2AllSPTest(Configuration config) {
super(config);
}
@Override
protected String getJarFilePath() {
return null;
}
@Override
protected void preSubmit() throws Exception {
pathsPath = getFilesystemProvider().getTempDirPath() + "/paths";
getFilesystemProvider().createFile(pathsPath, paths);
System.out.println("Paths:\n>" + paths + "<");
LOG.debug("Paths:\n>" + paths + "<");
}
@Override
protected JobGraph getJobGraph() throws Exception {
All2AllSP a2aSP = new All2AllSP();
Plan plan = a2aSP.getPlan(config.getString("All2AllSPTest#NoSubtasks", "4"),
pathsPath, getFilesystemProvider().getTempDirPath() + "/iter_1.txt");
Plan plan = a2aSP.getPlan(config.getString("All2AllSPTest#NoSubtasks", "4"),
getFilesystemProvider().getURIPrefix() + pathsPath,
getFilesystemProvider().getURIPrefix() + getFilesystemProvider().getTempDirPath() + "/iter_1.txt");
PactCompiler pc = new PactCompiler();
OptimizedPlan op = pc.compile(plan);
......@@ -89,40 +82,7 @@ public class All2AllSPTest extends TestBase {
protected void postSubmit() throws Exception {
// Test results
// read result
InputStream is = getFilesystemProvider().getInputStream(getFilesystemProvider().getTempDirPath() + "/iter_1.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(expected, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
System.out.println("Expected: " + expectedResult);
System.out.println("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
System.out.println("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
compareResultsByLinesInMemory(expected, getFilesystemProvider().getTempDirPath() + "/iter_1.txt");
// clean up hdfs
getFilesystemProvider().delete(pathsPath, true);
......@@ -151,7 +111,7 @@ public class All2AllSPTest extends TestBase {
// split data file and copy parts
for (int i = 0; i < noSplits - 1; i++) {
int cutPos = splitString.indexOf(splitChar, (partitionSize < splitString.length() ? partitionSize
: (splitString.length() - 1)));
: (splitString.length() - 1)));
splits[i] = splitString.substring(0, cutPos) + "\n";
splitString = splitString.substring(cutPos + 1);
}
......
......@@ -13,18 +13,13 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.StringTokenizer;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
......@@ -41,7 +36,9 @@ import eu.stratosphere.pact.test.util.TestBase;
@RunWith(Parameterized.class)
public class EnumTrianglesTest extends TestBase {
String edgesPath = getFilesystemProvider().getTempDirPath() + "/triangleEdges";
private static final Log LOG = LogFactory.getLog(EnumTrianglesTest.class);
String edgesPath = null;
private String edges = "A|B|\n" + "A|C|\n" + "B|C|\n" + "B|D|\n" + "B|E|\n" + "B|F|\n" + "B|I|\n" + "C|D|\n"
+ "E|F|\n" + "F|G|\n" + "F|I|\n" + "G|H|\n" + "G|J|\n" + "H|I|\n" + "H|J|\n" + "H|K|\n" + "I|K|\n";
......@@ -57,19 +54,16 @@ public class EnumTrianglesTest extends TestBase {
super(config);
}
@Override
protected String getJarFilePath() {
return null;
}
@Override
protected void preSubmit() throws Exception {
edgesPath = getFilesystemProvider().getTempDirPath() + "/triangleEdges";
String[] splits = splitInputString(edges, '\n', 4);
getFilesystemProvider().createDir(edgesPath);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(edgesPath + "/part_" + i + ".txt", splits[i]);
System.out.println("Part " + (i + 1) + ":\n>" + splits[i] + "<");
LOG.debug("Part " + (i + 1) + ":\n>" + splits[i] + "<");
}
}
......@@ -78,8 +72,10 @@ public class EnumTrianglesTest extends TestBase {
protected JobGraph getJobGraph() throws Exception {
EnumTriangles enumTriangles = new EnumTriangles();
Plan plan = enumTriangles.getPlan(edgesPath, getFilesystemProvider().getTempDirPath() + "/triangles.txt", config
.getString("EnumTrianglesTest#NoSubtasks", "1"));
Plan plan = enumTriangles.getPlan(
config.getString("EnumTrianglesTest#NoSubtasks", "1"),
getFilesystemProvider().getURIPrefix() + edgesPath,
getFilesystemProvider().getURIPrefix() + getFilesystemProvider().getTempDirPath() + "/triangles.txt");
PactCompiler pc = new PactCompiler();
OptimizedPlan op = pc.compile(plan);
......@@ -92,40 +88,7 @@ public class EnumTrianglesTest extends TestBase {
protected void postSubmit() throws Exception {
// Test results
// read result
InputStream is = getFilesystemProvider().getInputStream(getFilesystemProvider().getTempDirPath() + "/triangles.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(expected, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
System.out.println("Expected: " + expectedResult);
System.out.println("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
System.out.println("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
compareResultsByLinesInMemory(expected, getFilesystemProvider().getTempDirPath() + "/triangles.txt");
// clean up hdfs
getFilesystemProvider().delete(edgesPath, true);
......@@ -139,7 +102,7 @@ public class EnumTrianglesTest extends TestBase {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
Configuration config = new Configuration();
config.setInteger("EnumTrianglesTest#NoSubtasks", 4);
config.setInteger("EnumTrianglesTest#NoSubtasks", 1);
tConfigs.add(config);
return toParameterList(tConfigs);
......
......@@ -13,20 +13,15 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
import java.util.Collection;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.StringTokenizer;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
......@@ -45,24 +40,23 @@ public class KMeansIterationTest extends TestBase {
KMeanDataGenerator kmdg = new KMeanDataGenerator(500, 10, 2);
String dataPath = getFilesystemProvider().getTempDirPath() + "/dataPoints";
String clusterPath = getFilesystemProvider().getTempDirPath() + "/clusters";
private static final Log LOG = LogFactory.getLog(KMeansIterationTest.class);
String resultPath = clusterPath + "/iter_1.txt";
String dataPath = null;
String clusterPath = null;
String resultPath = null;
public KMeansIterationTest(Configuration config) {
super(config);
}
@Override
protected String getJarFilePath() {
return "/home/fhueske/distTests.jar";
}
@Override
protected void preSubmit() throws Exception {
dataPath = getFilesystemProvider().getTempDirPath() + "/dataPoints";
clusterPath = getFilesystemProvider().getTempDirPath() + "/clusters";
resultPath = clusterPath + "/iter_1.txt";
String dataFile = kmdg.getDataPoints();
String clusterFile = kmdg.getClusterCenters();
......@@ -72,20 +66,19 @@ public class KMeansIterationTest extends TestBase {
// create data path
getFilesystemProvider().createDir(dataPath);
// TODO: check splitting!
// split data file and copy parts
for (int i = 0; i < noPartitions; i++) {
int cutPos = dataFile.indexOf('\n', (partitionSize < dataFile.length() ? partitionSize
: (dataFile.length() - 1)));
: (dataFile.length() - 1)));
getFilesystemProvider().createFile(dataPath + "/part_" + i + ".txt", dataFile.substring(0, cutPos) + "\n");
System.out.println("Points Part " + (i + 1) + ":\n>" + dataFile.substring(0, cutPos) + "\n<");
LOG.debug("Points Part " + (i + 1) + ":\n>" + dataFile.substring(0, cutPos) + "\n<");
dataFile = dataFile.substring(cutPos + 1);
}
// create cluster path and copy data
getFilesystemProvider().createDir(clusterPath);
getFilesystemProvider().createFile(clusterPath + "/iter_0.txt", clusterFile);
System.out.println("Clusters: \n>" + clusterFile + "<");
LOG.debug("Clusters: \n>" + clusterFile + "<");
}
......@@ -93,10 +86,13 @@ public class KMeansIterationTest extends TestBase {
protected JobGraph getJobGraph() throws Exception {
KMeansIteration kmi = new KMeansIteration();
// Plan plan = kmi.getPlan(dataPath,clusterPath, clusterPath+"/iter_1.txt",
// Plan plan = kmi.getPlan(dataPath,clusterPath,
// clusterPath+"/iter_1.txt",
// config.getString("KMeansIterationTest#NoSubtasks", "1"));
Plan plan = kmi.getPlan(dataPath, clusterPath, resultPath, config.getString(
"KMeansIterationTest#NoSubtasks", "1"));
Plan plan = kmi.getPlan(config.getString("KMeansIterationTest#NoSubtasks", "1"),
getFilesystemProvider().getURIPrefix()+dataPath,
getFilesystemProvider().getURIPrefix()+clusterPath,
getFilesystemProvider().getURIPrefix()+resultPath);
PactCompiler pc = new PactCompiler();
OptimizedPlan op = pc.compile(plan);
......@@ -110,43 +106,9 @@ public class KMeansIterationTest extends TestBase {
protected void postSubmit() throws Exception {
// Test results
compareResultsByLinesInMemory(kmdg.getNewClusterCenters(), clusterPath + "/iter_1.txt");
// read result
InputStream is = getFilesystemProvider().getInputStream(clusterPath + "/iter_1.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(kmdg.getNewClusterCenters(), "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
System.out.println("Expected: " + expectedResult);
System.out.println("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
System.out.println("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
// clean up hdfs
// clean up file
getFilesystemProvider().delete(dataPath, true);
getFilesystemProvider().delete(clusterPath, true);
......@@ -158,7 +120,7 @@ public class KMeansIterationTest extends TestBase {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
Configuration config = new Configuration();
config.setInteger("KMeansIterationTest#NoSubtasks", 2);
config.setInteger("KMeansIterationTest#NoSubtasks", 4);
tConfigs.add(config);
return toParameterList(tConfigs);
......@@ -167,17 +129,13 @@ public class KMeansIterationTest extends TestBase {
public static class KMeanDataGenerator {
int noPoints;
int noClusters;
int noDims;
Random rand = new Random(System.currentTimeMillis());
double[][] dataPoints;
double[][] centers;
double[][] newCenters;
public KMeanDataGenerator(int noPoints, int noClusters, int noDims) {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.pactPrograms;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
// @SuiteClasses( { All2AllSPTest.class, EnumTrianglesTest.class, KMeansIterationTest.class, TPCHQuery3Test.class, WebLogAnalysisTest.class })
@SuiteClasses( { TPCHQuery3Test.class })
public class PactProgramTests {
}
......@@ -13,18 +13,13 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.StringTokenizer;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
......@@ -41,6 +36,8 @@ import eu.stratosphere.pact.test.util.TestBase;
@RunWith(Parameterized.class)
public class TPCHQuery3Test extends TestBase {
private static final Log LOG = LogFactory.getLog(TPCHQuery3Test.class);
String ordersPath = null;
String lineitemsPath = null;
......@@ -138,11 +135,6 @@ public class TPCHQuery3Test extends TestBase {
super(config);
}
@Override
protected String getJarFilePath() {
return null;
}
@Override
protected void preSubmit() throws Exception {
ordersPath = getFilesystemProvider().getTempDirPath() + "/orders";
......@@ -152,14 +144,14 @@ public class TPCHQuery3Test extends TestBase {
getFilesystemProvider().createDir(ordersPath);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(ordersPath + "/part_" + i + ".txt", splits[i]);
System.out.println("Orders Part " + (i + 1) + ":\n>" + splits[i] + "<");
LOG.debug("Orders Part " + (i + 1) + ":\n>" + splits[i] + "<");
}
splits = splitInputString(lineitems, '\n', 4);
getFilesystemProvider().createDir(lineitemsPath);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(lineitemsPath + "/part_" + i + ".txt", splits[i]);
System.out.println("Lineitems Part " + (i + 1) + ":\n>" + splits[i] + "<");
LOG.debug("Lineitems Part " + (i + 1) + ":\n>" + splits[i] + "<");
}
}
......@@ -169,7 +161,10 @@ public class TPCHQuery3Test extends TestBase {
TPCHQuery3 tpch3 = new TPCHQuery3();
Plan plan = tpch3.getPlan(
config.getString("TPCHQuery3Test#NoSubtasks", "1"), ordersPath, lineitemsPath, (getFilesystemProvider().getTempDirPath() + "/result.txt"));
config.getString("TPCHQuery3Test#NoSubtasks", "1"),
getFilesystemProvider().getURIPrefix()+ordersPath,
getFilesystemProvider().getURIPrefix()+lineitemsPath,
getFilesystemProvider().getURIPrefix()+getFilesystemProvider().getTempDirPath() + "/result.txt");
PactCompiler pc = new PactCompiler();
OptimizedPlan op = pc.compile(plan);
......@@ -182,40 +177,7 @@ public class TPCHQuery3Test extends TestBase {
protected void postSubmit() throws Exception {
// Test results
// read result
InputStream is = getFilesystemProvider().getInputStream(getFilesystemProvider().getTempDirPath() + "/result.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
Assert.assertNotNull("No output computed", line);
// collect out lines
PriorityQueue<String> computedResult = new PriorityQueue<String>();
while (line != null) {
computedResult.add(line);
line = reader.readLine();
}
reader.close();
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(this.expectedResult, "\n");
while (st.hasMoreElements()) {
expectedResult.add(st.nextToken());
}
// print expected and computed results
System.out.println("Expected: " + expectedResult);
System.out.println("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
System.out.println("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
compareResultsByLinesInMemory(expectedResult, getFilesystemProvider().getTempDirPath() + "/result.txt");
// clean up hdfs
getFilesystemProvider().delete(ordersPath, true);
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.DataInput;
import java.io.DataOutput;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
......@@ -49,8 +49,8 @@ import eu.stratosphere.pact.common.type.KeyValuePair;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.test.jobs.WordCount.Integer;
import eu.stratosphere.pact.test.jobs.WordCount.Text;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Integer;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Text;
import eu.stratosphere.pact.test.util.TestBase;
/**
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
......@@ -42,12 +42,12 @@ import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.test.jobs.WordCount.Integer;
import eu.stratosphere.pact.test.jobs.WordCount.Mapper;
import eu.stratosphere.pact.test.jobs.WordCount.Reducer;
import eu.stratosphere.pact.test.jobs.WordCount.Text;
import eu.stratosphere.pact.test.jobs.WordCount.TextFormatIn;
import eu.stratosphere.pact.test.jobs.WordCount.TextFormatOut;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Integer;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Mapper;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Reducer;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Text;
import eu.stratosphere.pact.test.pactPrograms.WordCount.TextFormatIn;
import eu.stratosphere.pact.test.pactPrograms.WordCount.TextFormatOut;
import eu.stratosphere.pact.test.util.TestBase;
/**
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.test.jobs;
package eu.stratosphere.pact.test.pactPrograms;
import java.io.BufferedReader;
import java.io.InputStream;
......@@ -39,12 +39,12 @@ import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.test.jobs.WordCount.Integer;
import eu.stratosphere.pact.test.jobs.WordCount.Mapper;
import eu.stratosphere.pact.test.jobs.WordCount.Reducer;
import eu.stratosphere.pact.test.jobs.WordCount.Text;
import eu.stratosphere.pact.test.jobs.WordCount.TextFormatIn;
import eu.stratosphere.pact.test.jobs.WordCount.TextFormatOut;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Integer;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Mapper;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Reducer;
import eu.stratosphere.pact.test.pactPrograms.WordCount.Text;
import eu.stratosphere.pact.test.pactPrograms.WordCount.TextFormatIn;
import eu.stratosphere.pact.test.pactPrograms.WordCount.TextFormatOut;
import eu.stratosphere.pact.test.util.TestBase;
/**
......
......@@ -74,9 +74,9 @@ public abstract class TestBase extends TestCase {
private void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
Assert.assertTrue("IPv4 stack required - set JVM option: -Djava.net.preferIPv4Stack=true", "true".equals(System
.getProperty("java.net.preferIPv4Stack")));
.getProperty("java.net.preferIPv4Stack")));
}
@Before
......@@ -123,7 +123,7 @@ public abstract class TestBase extends TestCase {
* lists
*
* @param tConfigs
* list of PACT test configurations
* list of PACT test configurations
* @return list of JUnit test configurations
* @throws IOException
* @throws FileNotFoundException
......@@ -184,21 +184,19 @@ public abstract class TestBase extends TestCase {
* @param expectedResult
* @param hdfsPath
*/
protected void compareResultsByLinesInMemory(String expectedResultStr, String hdfsPath) throws Exception {
protected void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
String[] resultFiles = new String[1];
ArrayList<String> resultFiles = new ArrayList<String>();
// Determine all result files
if (getFilesystemProvider().isDir(hdfsPath)) {
List<String> files = new ArrayList<String>();
for (String file : getFilesystemProvider().listFiles(hdfsPath)) {
if (getFilesystemProvider().isDir(resultPath)) {
for (String file : getFilesystemProvider().listFiles(resultPath)) {
if (!getFilesystemProvider().isDir(file)) {
files.add(file);
resultFiles.add(resultPath+"/"+file);
}
}
resultFiles = files.toArray(resultFiles);
} else {
resultFiles[0] = hdfsPath;
resultFiles.add(resultPath);
}
// collect lines of all result files
......@@ -216,7 +214,6 @@ public abstract class TestBase extends TestCase {
}
reader.close();
}
assertEquals("Computed Result is empty", 0, computedResult.size());
PriorityQueue<String> expectedResult = new PriorityQueue<String>();
StringTokenizer st = new StringTokenizer(expectedResultStr, "\n");
......@@ -228,8 +225,7 @@ public abstract class TestBase extends TestCase {
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult.size());
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
......
......@@ -63,7 +63,7 @@ public class LocalFSProvider implements FilesystemProvider {
if(f.isDirectory() && recursive) {
for(String c : f.list()) {
this.delete(c,true);
this.delete(path+"/"+c,true);
}
f.delete();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册