提交 b843f185 编写于 作者: F Fabian Hueske 提交者: unknown

- pact modules are builed with tests

- removed println statements from test
- improved pact examples (K-Means and WordCount)
上级 81636942
...@@ -696,7 +696,7 @@ public class KMeansIteration implements PlanAssembler, PlanAssemblerDescription ...@@ -696,7 +696,7 @@ public class KMeansIteration implements PlanAssembler, PlanAssemblerDescription
@Override @Override
public String getDescription() { public String getDescription() {
return "Parameters: dop, data-points, cluster-centers, output"; return "Parameters: [dop] [data-points] [cluster-centers] [output]";
} }
} }
...@@ -13,9 +13,6 @@ ...@@ -13,9 +13,6 @@
* *
**********************************************************************************************************************/ **********************************************************************************************************************/
/**
*
*/
package eu.stratosphere.pact.example.wordcount; package eu.stratosphere.pact.example.wordcount;
import java.util.Iterator; import java.util.Iterator;
...@@ -48,7 +45,8 @@ import eu.stratosphere.pact.common.type.base.PactString; ...@@ -48,7 +45,8 @@ import eu.stratosphere.pact.common.type.base.PactString;
public class WordCount implements PlanAssembler, PlanAssemblerDescription { public class WordCount implements PlanAssembler, PlanAssemblerDescription {
/** /**
* {@inheritDoc} * Converts a input string (a line) into a KeyValuePair with the string
* being the key and the value being a zero Integer.
*/ */
public static class LineInFormat extends TextInputFormat<PactString, PactInteger> { public static class LineInFormat extends TextInputFormat<PactString, PactInteger> {
...@@ -65,7 +63,8 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription { ...@@ -65,7 +63,8 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
} }
/** /**
* {@inheritDoc} * Writes a (String,Integer)-KeyValuePair to a string. The output format is:
* "&lt;key&gt;&nbsp;&lt;value&gt;\nl"
*/ */
public static class WordCountOutFormat extends TextOutputFormat<PactString, PactInteger> { public static class WordCountOutFormat extends TextOutputFormat<PactString, PactInteger> {
...@@ -83,7 +82,10 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription { ...@@ -83,7 +82,10 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
} }
/** /**
* {@inheritDoc} * Converts a (String,Integer)-KeyValuePair into multiple KeyValuePairs. The
* key string is tokenized by spaces. For each token a new
* (String,Integer)-KeyValuePair is emitted where the Token is the key and
* an Integer(1) is the value.
*/ */
public static class TokenizeLine extends MapStub<PactString, PactInteger, PactString, PactInteger> { public static class TokenizeLine extends MapStub<PactString, PactInteger, PactString, PactInteger> {
...@@ -103,8 +105,11 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription { ...@@ -103,8 +105,11 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
} }
/** /**
* {@inheritDoc} * Counts the number of values for a given key. Hence, the number of
* occurences of a given token (word) is computed and emitted. The key is
* not modified, hence a SameKey OutputContract is attached to this class.
*/ */
@SameKey
@Combinable @Combinable
public static class CountWords extends ReduceStub<PactString, PactInteger, PactString, PactInteger> { public static class CountWords extends ReduceStub<PactString, PactInteger, PactString, PactInteger> {
...@@ -138,6 +143,7 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription { ...@@ -138,6 +143,7 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
*/ */
@Override @Override
public Plan getPlan(String... args) { public Plan getPlan(String... args) {
if (args == null) { if (args == null) {
args = new String[0]; args = new String[0];
} }
...@@ -146,21 +152,20 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription { ...@@ -146,21 +152,20 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
String dataInput = (args.length > 1 && args[1] != null ? args[1] : "hdfs://localhost:9000/countwords/data"); String dataInput = (args.length > 1 && args[1] != null ? args[1] : "hdfs://localhost:9000/countwords/data");
String output = (args.length > 2 && args[2] != null ? args[2] : "hdfs://localhost:9000/countwords/result"); String output = (args.length > 2 && args[2] != null ? args[2] : "hdfs://localhost:9000/countwords/result");
DataSourceContract<PactString, PactInteger> data = new DataSourceContract<PactString, PactInteger>(LineInFormat.class, DataSourceContract<PactString, PactInteger> data = new DataSourceContract<PactString, PactInteger>(
dataInput, "Lines"); LineInFormat.class, dataInput, "Input Lines");
data.setDegreeOfParallelism(noSubTasks); data.setDegreeOfParallelism(noSubTasks);
MapContract<PactString, PactInteger, PactString, PactInteger> mapper = new MapContract<PactString, PactInteger, PactString, PactInteger>( MapContract<PactString, PactInteger, PactString, PactInteger> mapper = new MapContract<PactString, PactInteger, PactString, PactInteger>(
TokenizeLine.class, "Tokenize Lines"); TokenizeLine.class, "Tokenize Lines");
mapper.setDegreeOfParallelism(noSubTasks); mapper.setDegreeOfParallelism(noSubTasks);
mapper.setOutputContract(SameKey.class);
ReduceContract<PactString, PactInteger, PactString, PactInteger> reducer = new ReduceContract<PactString, PactInteger, PactString, PactInteger>( ReduceContract<PactString, PactInteger, PactString, PactInteger> reducer = new ReduceContract<PactString, PactInteger, PactString, PactInteger>(
CountWords.class, "Count Words"); CountWords.class, "Count Words");
reducer.setDegreeOfParallelism(noSubTasks); reducer.setDegreeOfParallelism(noSubTasks);
DataSinkContract<PactString, PactInteger> out = new DataSinkContract<PactString, PactInteger>(WordCountOutFormat.class, DataSinkContract<PactString, PactInteger> out = new DataSinkContract<PactString, PactInteger>(
output, "Output"); WordCountOutFormat.class, output, "Output");
out.setDegreeOfParallelism(noSubTasks); out.setDegreeOfParallelism(noSubTasks);
out.setInput(reducer); out.setInput(reducer);
...@@ -175,9 +180,7 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription { ...@@ -175,9 +180,7 @@ public class WordCount implements PlanAssembler, PlanAssemblerDescription {
*/ */
@Override @Override
public String getDescription() { public String getDescription() {
return "WordCount: [noSubStasks] [input] [output] <br />" return "Parameters: [noSubStasks] [input] [output]";
+ "\t noSubTasks: defines the degree of parallelism <br />" + "\t input: Location of the input file <br />"
+ "\t output: Location of the output file <br />";
} }
} }
...@@ -68,6 +68,19 @@ ...@@ -68,6 +68,19 @@
<target>1.6</target> <target>1.6</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<excludes>
<exclude>**/TestData.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
...@@ -22,6 +22,7 @@ import java.util.Set; ...@@ -22,6 +22,7 @@ import java.util.Set;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -109,8 +110,8 @@ public class SerializingHashMapTest { ...@@ -109,8 +110,8 @@ public class SerializingHashMapTest {
Assert.assertTrue("Number of values in map is below lower bound", pactHashMap.numberOfValues() >= SEGMENT_SIZE Assert.assertTrue("Number of values in map is below lower bound", pactHashMap.numberOfValues() >= SEGMENT_SIZE
/ (2 * VALUE_LENGTH + 16)); / (2 * VALUE_LENGTH + 16));
System.out.println("Inserted " + pactHashMap.numberOfKeys() + " keys"); Logger.getRootLogger().debug("Inserted " + pactHashMap.numberOfKeys() + " keys");
System.out.println("Inserted " + pactHashMap.numberOfValues() + " values"); Logger.getRootLogger().debug("Inserted " + pactHashMap.numberOfValues() + " values");
// test value iterators // test value iterators
for (Key key : javaHashMap.keySet()) { for (Key key : javaHashMap.keySet()) {
......
...@@ -99,7 +99,7 @@ public class TestMergeIterator { ...@@ -99,7 +99,7 @@ public class TestMergeIterator {
KeyValuePair<TestData.Key, TestData.Value> pair1 = iterator.next(); KeyValuePair<TestData.Key, TestData.Value> pair1 = iterator.next();
while (iterator.hasNext()) { while (iterator.hasNext()) {
KeyValuePair<TestData.Key, TestData.Value> pair2 = iterator.next(); KeyValuePair<TestData.Key, TestData.Value> pair2 = iterator.next();
System.out.println("1 -> " + pair1.getKey() + " | 2 -> " + pair2.getKey()); Logger.getRootLogger().debug("1 -> " + pair1.getKey() + " | 2 -> " + pair2.getKey());
Assert.assertTrue(comparator.compare(pair1.getKey(), pair2.getKey()) <= 0); Assert.assertTrue(comparator.compare(pair1.getKey(), pair2.getKey()) <= 0);
pair1 = pair2; pair1 = pair2;
} }
......
...@@ -133,6 +133,28 @@ ...@@ -133,6 +133,28 @@
<target>1.6</target> <target>1.6</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<systemProperties>
<property>
<name>java.net.preferIPv4Stack</name>
<value>true</value>
</property>
</systemProperties>
<includes>
<include>**/ContractTests.java</include>
<!--
<include>**/JobTests.java</include>
-->
</includes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
...@@ -21,6 +21,6 @@ import org.junit.runners.Suite.SuiteClasses; ...@@ -21,6 +21,6 @@ import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class) @RunWith(Suite.class)
@SuiteClasses( { MapTest.class, ReduceTest.class, MatchTest.class, CrossTest.class, CoGroupTest.class }) @SuiteClasses( { MapTest.class, ReduceTest.class, MatchTest.class, CrossTest.class, CoGroupTest.class })
public class ContractsTests { public class ContractTests {
} }
...@@ -56,9 +56,6 @@ import eu.stratosphere.pact.test.util.TestBase; ...@@ -56,9 +56,6 @@ import eu.stratosphere.pact.test.util.TestBase;
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class MapTest extends TestBase public class MapTest extends TestBase
/*
* TODO: - Allow multiple data sinks
*/
{ {
public MapTest(String clusterConfig, Configuration testConfig) { public MapTest(String clusterConfig, Configuration testConfig) {
...@@ -82,7 +79,6 @@ public class MapTest extends TestBase ...@@ -82,7 +79,6 @@ public class MapTest extends TestBase
getHDFSProvider().writeFileToHDFS("mapTest_3.txt", MAP_IN_3); getHDFSProvider().writeFileToHDFS("mapTest_3.txt", MAP_IN_3);
getHDFSProvider().writeFileToHDFS("mapTest_4.txt", MAP_IN_4); getHDFSProvider().writeFileToHDFS("mapTest_4.txt", MAP_IN_4);
// getHDFSProvider().createDir(getHDFSProvider().getHdfsHome()+"/result/");
} }
public static class MapTestInFormat extends TextInputFormat<PactString, PactString> { public static class MapTestInFormat extends TextInputFormat<PactString, PactString> {
...@@ -100,28 +96,12 @@ public class MapTest extends TestBase ...@@ -100,28 +96,12 @@ public class MapTest extends TestBase
return true; return true;
} }
// @Override
// public byte[] writeLine(KeyValuePair<N_String, N_String> pair)
// {
// return (pair.getKey().toString() + " " + pair.getValue().toString() + "\n").getBytes();
// }
} }
public static class MapTestOutFormat extends TextOutputFormat<PactString, PactInteger> { public static class MapTestOutFormat extends TextOutputFormat<PactString, PactInteger> {
private static final Log LOG = LogFactory.getLog(MapTestOutFormat.class); private static final Log LOG = LogFactory.getLog(MapTestOutFormat.class);
// @Override
// public void readLine(KeyValuePair<N_String, N_Integer> pair, byte[] line)
// {
//
// String[] tokens = line.toString().split(" ");
//
// pair.setKey(new N_String(tokens[0]));
// pair.setValue(new N_Integer(Integer.parseInt(tokens[1])));
//
// }
@Override @Override
public byte[] writeLine(KeyValuePair<PactString, PactInteger> pair) { public byte[] writeLine(KeyValuePair<PactString, PactInteger> pair) {
LOG.info("Writing out: [" + pair.getKey() + "," + pair.getValue() + "]"); LOG.info("Writing out: [" + pair.getKey() + "," + pair.getValue() + "]");
......
...@@ -225,9 +225,9 @@ public abstract class TestBase extends TestCase { ...@@ -225,9 +225,9 @@ public abstract class TestBase extends TestCase {
expectedResult.add(st.nextToken()); expectedResult.add(st.nextToken());
} }
// print expected and computed results // log expected and computed results
System.out.println("Expected: " + expectedResult); LOG.debug("Expected: " + expectedResult);
System.out.println("Computed: " + computedResult); LOG.debug("Computed: " + computedResult);
Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult Assert.assertEquals("Computed and expected results have different size", expectedResult.size(), computedResult
.size()); .size());
...@@ -235,7 +235,7 @@ public abstract class TestBase extends TestCase { ...@@ -235,7 +235,7 @@ public abstract class TestBase extends TestCase {
while (!expectedResult.isEmpty()) { while (!expectedResult.isEmpty()) {
String expectedLine = expectedResult.poll(); String expectedLine = expectedResult.poll();
String computedLine = computedResult.poll(); String computedLine = computedResult.poll();
System.out.println("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">"); LOG.debug("expLine: <" + expectedLine + ">\t\t: compLine: <" + computedLine + ">");
Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine); Assert.assertEquals("Computed and expected lines differ", expectedLine, computedLine);
} }
} }
......
...@@ -238,7 +238,7 @@ ...@@ -238,7 +238,7 @@
<!-- execution of Unit Tests --> <!-- execution of Unit Tests -->
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId> <artifactId>maven-surefire-report-plugin</artifactId>
<!-- <version>2.4.2</version> --> <version>2.7</version>
</plugin> </plugin>
<plugin> <plugin>
<!-- check coverage of tests --> <!-- check coverage of tests -->
......
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
<id>stratosphere-bin</id> <id>stratosphere-bin</id>
<formats> <formats>
<format>dir</format> <format>dir</format>
<!--
<format>tar.gz</format> <format>tar.gz</format>
<!--
<format>zip</format> <format>zip</format>
--> -->
</formats> </formats>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册