提交 1dfad89e 编写于 作者: F Fabian Hueske

- pact modules are builed with tests

- removed println statements from test
- improved pact examples (K-Means and WordCount)
上级 62f30afa
......@@ -696,7 +696,7 @@ public class KMeansIteration implements PlanAssembler, PlanAssemblerDescription
@Override
public String getDescription() {
return "Parameters: dop, data-points, cluster-centers, output";
return "Parameters: [dop] [data-points] [cluster-centers] [output]";
}
}
......@@ -13,9 +13,6 @@
*
**********************************************************************************************************************/
/**
*
*/
package eu.stratosphere.pact.example.wordcount;
import java.util.Iterator;
......@@ -48,7 +45,8 @@ import eu.stratosphere.pact.common.type.base.PactString;
public class WordCount implements PlanAssembler, PlanAssemblerDescription {
/**
* {@inheritDoc}
* Converts a input string (a line) into a KeyValuePair with the string
* being the key and the value being a zero Integer.
*/
public static class LineInFormat extends TextInputFormat<PactString, PactInteger> {
......@@ -65,7 +63,8 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
}
/**
* {@inheritDoc}
* Writes a (String,Integer)-KeyValuePair to a string. The output format is:
* "&lt;key&gt;&nbsp;&lt;value&gt;\nl"
*/
public static class WordCountOutFormat extends TextOutputFormat<PactString, PactInteger> {
......@@ -83,7 +82,10 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
}
/**
* {@inheritDoc}
* Converts a (String,Integer)-KeyValuePair into multiple KeyValuePairs. The
* key string is tokenized by spaces. For each token a new
* (String,Integer)-KeyValuePair is emitted where the Token is the key and
* an Integer(1) is the value.
*/
public static class TokenizeLine extends MapStub<PactString, PactInteger, PactString, PactInteger> {
......@@ -103,8 +105,11 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
}
/**
* {@inheritDoc}
* Counts the number of values for a given key. Hence, the number of
* occurences of a given token (word) is computed and emitted. The key is
* not modified, hence a SameKey OutputContract is attached to this class.
*/
@SameKey
@Combinable
public static class CountWords extends ReduceStub<PactString, PactInteger, PactString, PactInteger> {
......@@ -138,6 +143,7 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
*/
@Override
public Plan getPlan(String... args) {
if (args == null) {
args = new String[0];
}
......@@ -146,21 +152,20 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
String dataInput = (args.length > 1 && args[1] != null ? args[1] : "hdfs://localhost:9000/countwords/data");
String output = (args.length > 2 && args[2] != null ? args[2] : "hdfs://localhost:9000/countwords/result");
DataSourceContract<PactString, PactInteger> data = new DataSourceContract<PactString, PactInteger>(LineInFormat.class,
dataInput, "Lines");
DataSourceContract<PactString, PactInteger> data = new DataSourceContract<PactString, PactInteger>(
LineInFormat.class, dataInput, "Input Lines");
data.setDegreeOfParallelism(noSubTasks);
MapContract<PactString, PactInteger, PactString, PactInteger> mapper = new MapContract<PactString, PactInteger, PactString, PactInteger>(
TokenizeLine.class, "Tokenize Lines");
TokenizeLine.class, "Tokenize Lines");
mapper.setDegreeOfParallelism(noSubTasks);
mapper.setOutputContract(SameKey.class);
ReduceContract<PactString, PactInteger, PactString, PactInteger> reducer = new ReduceContract<PactString, PactInteger, PactString, PactInteger>(
CountWords.class, "Count Words");
CountWords.class, "Count Words");
reducer.setDegreeOfParallelism(noSubTasks);
DataSinkContract<PactString, PactInteger> out = new DataSinkContract<PactString, PactInteger>(WordCountOutFormat.class,
output, "Output");
DataSinkContract<PactString, PactInteger> out = new DataSinkContract<PactString, PactInteger>(
WordCountOutFormat.class, output, "Output");
out.setDegreeOfParallelism(noSubTasks);
out.setInput(reducer);
......@@ -175,9 +180,7 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
*/
@Override
public String getDescription() {
return "WordCount: [noSubStasks] [input] [output] <br />"
+ "\t noSubTasks: defines the degree of parallelism <br />" + "\t input: Location of the input file <br />"
+ "\t output: Location of the output file <br />";
return "Parameters: [noSubStasks] [input] [output]";
}
}
......@@ -68,6 +68,19 @@
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<excludes>
<exclude>**/TestData.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -22,6 +22,7 @@ import java.util.Set;
import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -109,8 +110,8 @@ public class SerializingHashMapTest {
Assert.assertTrue("Number of values in map is below lower bound", pactHashMap.numberOfValues() >= SEGMENT_SIZE
/ (2 * VALUE_LENGTH + 16));
System.out.println("Inserted " + pactHashMap.numberOfKeys() + " keys");
System.out.println("Inserted " + pactHashMap.numberOfValues() + " values");
Logger.getRootLogger().debug("Inserted " + pactHashMap.numberOfKeys() + " keys");
Logger.getRootLogger().debug("Inserted " + pactHashMap.numberOfValues() + " values");
// test value iterators
for (Key key : javaHashMap.keySet()) {
......
......@@ -99,7 +99,7 @@ public class TestMergeIterator {
KeyValuePair<TestData.Key, TestData.Value> pair1 = iterator.next();
while (iterator.hasNext()) {
KeyValuePair<TestData.Key, TestData.Value> pair2 = iterator.next();
System.out.println("1 -> " + pair1.getKey() + " | 2 -> " + pair2.getKey());
Logger.getRootLogger().debug("1 -> " + pair1.getKey() + " | 2 -> " + pair2.getKey());
Assert.assertTrue(comparator.compare(pair1.getKey(), pair2.getKey()) <= 0);
pair1 = pair2;
}
......
......@@ -133,6 +133,28 @@
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<systemProperties>
<property>
<name>java.net.preferIPv4Stack</name>
<value>true</value>
</property>
</systemProperties>
<includes>
<include>**/ContractTests.java</include>
<!--
<include>**/JobTests.java</include>
-->
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -21,6 +21,6 @@ import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
@SuiteClasses( { MapTest.class, ReduceTest.class, MatchTest.class, CrossTest.class, CoGroupTest.class })
public class ContractsTests {
public class ContractTests {
}
......@@ -56,9 +56,6 @@ import eu.stratosphere.pact.test.util.TestBase;
*/
@RunWith(Parameterized.class)
public class MapTest extends TestBase
/*
* TODO: - Allow multiple data sinks
*/
{
public MapTest(String clusterConfig, Configuration testConfig) {
......@@ -82,7 +79,6 @@ public class MapTest extends TestBase
getHDFSProvider().writeFileToHDFS("mapTest_3.txt", MAP_IN_3);
getHDFSProvider().writeFileToHDFS("mapTest_4.txt", MAP_IN_4);
// getHDFSProvider().createDir(getHDFSProvider().getHdfsHome()+"/result/");
}
public static class MapTestInFormat extends TextInputFormat<PactString, PactString> {
......@@ -100,28 +96,12 @@ public class MapTest extends TestBase
return true;
}
// @Override
// public byte[] writeLine(KeyValuePair<N_String, N_String> pair)
// {
// return (pair.getKey().toString() + " " + pair.getValue().toString() + "\n").getBytes();
// }
}
public static class MapTestOutFormat extends TextOutputFormat<PactString, PactInteger> {
private static final Log LOG = LogFactory.getLog(MapTestOutFormat.class);
// @Override
// public void readLine(KeyValuePair<N_String, N_Integer> pair, byte[] line)
// {
//
// String[] tokens = line.toString().split(" ");
//
// pair.setKey(new N_String(tokens[0]));
// pair.setValue(new N_Integer(Integer.parseInt(tokens[1])));
//
// }
@Override
public byte[] writeLine(KeyValuePair<PactString, PactInteger> pair) {
LOG.info("Writing out: [" + pair.getKey() + "," + pair.getValue() + "]");
......
......@@ -225,9 +225,9 @@ public abstract class TestBase extends TestCase {
expectedResult.add(st.nextToken());
}
// print expected and computed results
System.out.println("Expected: " + expectedResult);
System.out.println("Computed: " + computedResult);
// log expected and computed results
LOG.debug("Expected: " + expectedResult);
LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size());
......@@ -235,7 +235,7 @@ public abstract class TestBase extends TestCase {
while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll();
System.out.println("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
}
}
......
......@@ -238,7 +238,7 @@
<!-- execution of Unit Tests -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
<!-- <version>2.4.2</version> -->
<version>2.7</version>
</plugin>
<plugin>
<!-- check coverage of tests -->
......
......@@ -4,8 +4,8 @@
<id>stratosphere-bin</id>
<formats>
<format>dir</format>
<!--
<format>tar.gz</format>
<!--
<format>zip</format>
-->
</formats>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册