提交 7330dc70 编写于 作者: A Aljoscha Krettek

Move TPCH3 IT case from TestBase to TestBase2

上级 bed28cb2
......@@ -32,10 +32,10 @@ import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.pact.example.relational.TPCHQuery3;
import eu.stratosphere.pact.test.util.TestBase;
import eu.stratosphere.pact.test.util.TestBase2;
@RunWith(Parameterized.class)
public class TPCHQuery3ITCase extends TestBase {
public class TPCHQuery3ITCase extends TestBase2 {
private static final Log LOG = LogFactory.getLog(TPCHQuery3ITCase.class);
......@@ -138,84 +138,31 @@ public class TPCHQuery3ITCase extends TestBase {
@Override
protected void preSubmit() throws Exception {
ordersPath = getFilesystemProvider().getTempDirPath() + "/orders";
lineitemsPath = getFilesystemProvider().getTempDirPath() + "/lineitems";
resultPath = getFilesystemProvider().getTempDirPath() + "/result";
String[] splits = splitInputString(ORDERS, '\n', 4);
getFilesystemProvider().createDir(ordersPath);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(ordersPath + "/part_" + i + ".txt", splits[i]);
if (LOG.isDebugEnabled())
LOG.debug("Orders Part " + (i + 1) + ":\n>" + splits[i] + "<");
}
splits = splitInputString(LINEITEMS, '\n', 4);
getFilesystemProvider().createDir(lineitemsPath);
for (int i = 0; i < splits.length; i++) {
getFilesystemProvider().createFile(lineitemsPath + "/part_" + i + ".txt", splits[i]);
if (LOG.isDebugEnabled())
LOG.debug("Lineitems Part " + (i + 1) + ":\n>" + splits[i] + "<");
}
ordersPath = createTempFile("orders", ORDERS);
lineitemsPath = createTempFile("lineitems", LINEITEMS);
resultPath = getTempDirPath("result");
}
@Override
protected JobGraph getJobGraph() throws Exception {
protected Plan getPactPlan() {
TPCHQuery3 tpch3 = new TPCHQuery3();
Plan plan = tpch3.getPlan(
return tpch3.getPlan(
config.getString("TPCHQuery3Test#NoSubtasks", "1"),
getFilesystemProvider().getURIPrefix()+ordersPath,
getFilesystemProvider().getURIPrefix()+lineitemsPath,
getFilesystemProvider().getURIPrefix()+resultPath);
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
return jgg.compileJobGraph(op);
ordersPath,
lineitemsPath,
resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
@Override
public void stopCluster() throws Exception {
getFilesystemProvider().delete(ordersPath, true);
getFilesystemProvider().delete(lineitemsPath, true);
getFilesystemProvider().delete(resultPath, true);
super.stopCluster();
}
@Parameters
public static Collection<Object[]> getConfigurations() {
ArrayList<Configuration> tConfigs = new ArrayList<Configuration>();
Configuration config = new Configuration();
config.setInteger("TPCHQuery3Test#NoSubtasks", 4);
tConfigs.add(config);
return toParameterList(tConfigs);
}
private String[] splitInputString(String inputString, char splitChar, int noSplits) {
String splitString = inputString.toString();
String[] splits = new String[noSplits];
int partitionSize = (splitString.length() / noSplits) - 2;
// split data file and copy parts
for (int i = 0; i < noSplits - 1; i++) {
int cutPos = splitString.indexOf(splitChar, (partitionSize < splitString.length() ? partitionSize
: (splitString.length() - 1)));
splits[i] = splitString.substring(0, cutPos) + "\n";
splitString = splitString.substring(cutPos + 1);
}
splits[noSplits - 1] = splitString;
return splits;
return toParameterList(config);
}
}
......@@ -35,20 +35,14 @@ public class TPCHQuery3ITCase extends eu.stratosphere.pact.test.pactPrograms.TPC
}
@Override
protected JobGraph getJobGraph() throws Exception {
protected Plan getPactPlan() {
TPCHQuery3 tpch3 = new TPCHQuery3();
Plan plan = tpch3.getScalaPlan(
return tpch3.getScalaPlan(
config.getInteger("TPCHQuery3Test#NoSubtasks", 1),
getFilesystemProvider().getURIPrefix()+ordersPath,
getFilesystemProvider().getURIPrefix()+lineitemsPath,
getFilesystemProvider().getURIPrefix()+resultPath,
ordersPath,
lineitemsPath,
resultPath,
'F', 1993, "5");
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
return jgg.compileJobGraph(op);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册