提交 d44963c3 编写于 作者: M Mathias Peters

added some tests and the hamcrest dependancy

上级 797d3da2
文件已添加
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.client.nephele.api;
import java.io.File;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.internal.matchers.Any;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.client.AbstractJobResult.ReturnCode;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.mockito.Matchers.any;
import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* Simple and maybe stupid test to check the {@link Client} class.
* However, the use of mocks can be copied copied easily from this example.
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(Client.class)
public class ClientTest {
@Mock
Configuration configMock;
@Mock
PactProgram program;
@Mock
JobGraph jobGraphMock;
@Mock
File mockJarFile;
@Mock
JobClient jobClientMock;
@Mock
JobSubmissionResult jobSubmissionResultMock;
@Before
public void setUp() throws Exception
{
initMocks(this);
when(program.getCompiledPlan()).thenReturn(jobGraphMock);
when(program.getJarFile()).thenReturn(mockJarFile);
when(mockJarFile.getAbsolutePath()).thenReturn("mockFilePath");
whenNew(JobClient.class).withArguments(any(JobGraph.class), any(Configuration.class)).thenReturn(this.jobClientMock);
when(this.jobClientMock.submitJob()).thenReturn(jobSubmissionResultMock);
}
@Test
public void shouldSubmitToJobClient() throws ProgramInvocationException, ErrorInPlanAssemblerException, IOException
{
when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.SUCCESS);
Client out = new Client(configMock);
out.run(program);
verify(this.jobClientMock, times(1)).submitJob();
}
/**
* @throws Exception
*/
@Test(expected=ProgramInvocationException.class)
public void shouldThrowException() throws Exception
{
when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.ERROR);
Client out = new Client(configMock);
out.run(program);
verify(this.jobClientMock).submitJob();
}
}
package eu.stratosphere.pact.common.type.stub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.MatchStub;
import eu.stratosphere.pact.common.type.Pair;
import eu.stratosphere.pact.common.type.base.PactInteger;
/**
*
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class MatchStubTest {
private class MockMatcher extends MatchStub<PactInteger, TupleMock, TupleMock, PactInteger, TupleMock>
{
private int matchCount = 0;
//this could be done a lot more elegant with a true Pair type
private List<List<TupleMock>> matchPairs = new ArrayList<List<TupleMock>>();
/**
* Just count the number of calls and the tuple combinations
*/
@Override
public void match(PactInteger key, TupleMock value1, TupleMock value2, Collector<PactInteger, TupleMock> out) {
this.matchCount++;
List<TupleMock> pair = new ArrayList<TupleMock>();
pair.add(value1);
pair.add(value2);
this.matchPairs.add(pair);
}
}
@Mock
private Iterator<TupleMock> valueIteratorMock1;
@Mock
private Iterator<TupleMock> valueIteratorMock2;
@Mock
private Collector<PactInteger, TupleMock> collectorMock;
private TupleMock leftValue;
private TupleMock leftValue2;
private TupleMock leftValue3;
private TupleMock leftValue4;
private TupleMock rightValue;
private TupleMock rightValue2;
private TupleMock rightValue3;
private TupleMock rightValue4;
private MockMatcher matcher;
@Before
public void setUp()
{
initMocks(this);
this.matcher = new MockMatcher();
this.leftValue = new TupleMock("l1");
this.leftValue2 = new TupleMock("l2");
this.leftValue3 = new TupleMock("l3");
this.leftValue4 = new TupleMock("l4");
this.rightValue = new TupleMock("r1");
this.rightValue2 = new TupleMock("r2");
this.rightValue3 = new TupleMock("r3");
this.rightValue4 = new TupleMock("r4");
}
@Test
public void shouldNotRunMatch()
{
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(0)));
when(valueIteratorMock1.next()).thenReturn(leftValue);
when(valueIteratorMock2.next()).thenReturn(null);
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(0)));
when(valueIteratorMock1.next()).thenReturn(null);
when(valueIteratorMock2.next()).thenReturn(rightValue);
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(0)));
}
@Test
public void shouldRunMatchOnce()
{
when(valueIteratorMock1.next()).thenReturn(leftValue);
when(valueIteratorMock2.next()).thenReturn(rightValue);
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(1)));
assertThat(this.matcher.matchPairs.size(), is(equalTo(1)));
List<TupleMock> onlyPair = this.matcher.matchPairs.get(0);
assertThat(onlyPair.get(0), is(equalTo(leftValue)));
assertThat(onlyPair.get(1), is(equalTo(rightValue)));
}
@Test
public void shouldMatchOneLeftValueWithMultipleRightValues()
{
when(valueIteratorMock1.next()).thenReturn(leftValue);
when(valueIteratorMock2.next()).thenReturn(rightValue, rightValue2, rightValue3, rightValue4);
when(valueIteratorMock2.hasNext()).thenReturn(true, true, true, false);
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(4)));
assertThat(this.matcher.matchPairs.size(), is(equalTo(4)));
List<TupleMock> firstPair = this.matcher.matchPairs.get(0);
assertThat(firstPair.get(0), is(equalTo(leftValue)));
assertThat(firstPair.get(1), is(equalTo(rightValue)));
List<TupleMock> secondPair = this.matcher.matchPairs.get(1);
assertThat(secondPair.get(0), is(equalTo(leftValue)));
assertThat(secondPair.get(1), is(equalTo(rightValue2)));
List<TupleMock> thirdPair = this.matcher.matchPairs.get(2);
assertThat(thirdPair.get(0), is(equalTo(leftValue)));
assertThat(thirdPair.get(1), is(equalTo(rightValue3)));
List<TupleMock> fourthPair = this.matcher.matchPairs.get(3);
assertThat(fourthPair.get(0), is(equalTo(leftValue)));
assertThat(fourthPair.get(1), is(equalTo(rightValue4)));
}
@Test
public void shouldMatchOneRighValueWithMultipleLeftValues()
{
when(valueIteratorMock1.hasNext()).thenReturn(true, true, true, false);
when(valueIteratorMock1.next()).thenReturn(leftValue, leftValue2, leftValue3, leftValue4);
when(valueIteratorMock2.next()).thenReturn(rightValue);
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(4)));
assertThat(this.matcher.matchPairs.size(), is(equalTo(4)));
List<TupleMock> firstPair = this.matcher.matchPairs.get(0);
assertThat(firstPair.get(0), is(equalTo(leftValue)));
assertThat(firstPair.get(1), is(equalTo(rightValue)));
List<TupleMock> secondPair = this.matcher.matchPairs.get(1);
assertThat(secondPair.get(0), is(equalTo(leftValue2)));
assertThat(secondPair.get(1), is(equalTo(rightValue)));
List<TupleMock> thirdPair = this.matcher.matchPairs.get(2);
assertThat(thirdPair.get(0), is(equalTo(leftValue3)));
assertThat(thirdPair.get(1), is(equalTo(rightValue)));
List<TupleMock> fourthPair = this.matcher.matchPairs.get(3);
assertThat(fourthPair.get(0), is(equalTo(leftValue4)));
assertThat(fourthPair.get(1), is(equalTo(rightValue)));
}
@Test
public void shouldMatchMultipleRighValuesWithMultipleLeftValues()
{
when(valueIteratorMock1.hasNext()).thenReturn(true, true, true, true, false);
when(valueIteratorMock1.next()).thenReturn(leftValue, leftValue2, leftValue3, leftValue4);
when(valueIteratorMock2.hasNext()).thenReturn(true, true, true, true, false);
when(valueIteratorMock2.next()).thenReturn(rightValue, rightValue2, rightValue3, rightValue4);
this.matcher.run(null, valueIteratorMock1, valueIteratorMock2, collectorMock);
assertThat(this.matcher.matchCount, is(equalTo(16)));
assertThat(this.matcher.matchPairs.size(), is(equalTo(16)));
List<TupleMock> firstPair = this.matcher.matchPairs.get(0);
assertThat(firstPair.get(0), is(equalTo(leftValue)));
assertThat(firstPair.get(1), is(equalTo(rightValue)));
List<TupleMock> secondPair = this.matcher.matchPairs.get(1);
assertThat(secondPair.get(0), is(equalTo(leftValue2)));
assertThat(secondPair.get(1), is(equalTo(rightValue)));
List<TupleMock> thirdPair = this.matcher.matchPairs.get(2);
assertThat(thirdPair.get(0), is(equalTo(leftValue3)));
assertThat(thirdPair.get(1), is(equalTo(rightValue)));
List<TupleMock> fourthPair = this.matcher.matchPairs.get(3);
assertThat(fourthPair.get(0), is(equalTo(leftValue4)));
assertThat(fourthPair.get(1), is(equalTo(rightValue)));
List<TupleMock> fifthPair = this.matcher.matchPairs.get(4);
assertThat(fifthPair.get(0), is(equalTo(leftValue)));
assertThat(fifthPair.get(1), is(equalTo(rightValue2)));
List<TupleMock> sixthPair = this.matcher.matchPairs.get(5);
assertThat(sixthPair.get(0), is(equalTo(leftValue2)));
assertThat(sixthPair.get(1), is(equalTo(rightValue2)));
List<TupleMock> seventhPair = this.matcher.matchPairs.get(6);
assertThat(seventhPair.get(0), is(equalTo(leftValue3)));
assertThat(seventhPair.get(1), is(equalTo(rightValue2)));
List<TupleMock> eighthPair = this.matcher.matchPairs.get(7);
assertThat(eighthPair.get(0), is(equalTo(leftValue4)));
assertThat(eighthPair.get(1), is(equalTo(rightValue2)));
List<TupleMock> ninethPair = this.matcher.matchPairs.get(8);
assertThat(ninethPair.get(0), is(equalTo(leftValue)));
assertThat(ninethPair.get(1), is(equalTo(rightValue3)));
List<TupleMock> tenthPair = this.matcher.matchPairs.get(9);
assertThat(tenthPair.get(0), is(equalTo(leftValue2)));
assertThat(tenthPair.get(1), is(equalTo(rightValue3)));
List<TupleMock> eleventhPair = this.matcher.matchPairs.get(10);
assertThat(eleventhPair.get(0), is(equalTo(leftValue3)));
assertThat(eleventhPair.get(1), is(equalTo(rightValue3)));
List<TupleMock> twelvethPair = this.matcher.matchPairs.get(11);
assertThat(twelvethPair.get(0), is(equalTo(leftValue4)));
assertThat(twelvethPair.get(1), is(equalTo(rightValue3)));
List<TupleMock> thirteenthPair = this.matcher.matchPairs.get(12);
assertThat(thirteenthPair.get(0), is(equalTo(leftValue)));
assertThat(thirteenthPair.get(1), is(equalTo(rightValue4)));
List<TupleMock> fourteenthPair = this.matcher.matchPairs.get(13);
assertThat(fourteenthPair.get(0), is(equalTo(leftValue2)));
assertThat(fourteenthPair.get(1), is(equalTo(rightValue4)));
List<TupleMock> fifteenthPair = this.matcher.matchPairs.get(14);
assertThat(fifteenthPair.get(0), is(equalTo(leftValue3)));
assertThat(fifteenthPair.get(1), is(equalTo(rightValue4)));
List<TupleMock> sixteenthPair = this.matcher.matchPairs.get(15);
assertThat(sixteenthPair.get(0), is(equalTo(leftValue4)));
assertThat(sixteenthPair.get(1), is(equalTo(rightValue4)));
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.common.type.stub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;
import static org.mockito.Matchers.*;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.whenNew;
import java.util.Iterator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.ReduceStub;
import eu.stratosphere.pact.common.type.Pair;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.util.KeyGroupedIterator;
/**
* Some small stuüid test to get into testing the stubs.
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(ReduceStub.class)
public class ReduceStubTest {
private class MockReducer extends ReduceStub<PactInteger, TupleMock, PactInteger, TupleMock>
{
int reduceCount = 0;
public int getReduceCount() {
return reduceCount;
}
@Override
public void reduce(PactInteger key, Iterator<TupleMock> values, Collector<PactInteger, TupleMock> out) {
this.reduceCount++;
}
}
private MockReducer reducer;
@Mock
private Iterator<Pair<PactInteger, TupleMock>> iteratorMock;
@Mock
private Collector<PactInteger, TupleMock> collectorMock;
@Mock
private KeyGroupedIterator<PactInteger, TupleMock> groupedIteratorMock;
@Before
public void setUp()
{
initMocks(this);
this.reducer = new MockReducer();
}
@Test
public void shouldNotIterateInRunWithEmptyIterator()
{
this.reducer.run(this.iteratorMock, collectorMock);
assertThat(this.reducer.getReduceCount(), is(equalTo(0)));
}
@Test
public void shouldIterateWithNullCollector()
{
this.reducer.run(this.iteratorMock, null);
assertThat(this.reducer.getReduceCount(), is(equalTo(0)));
}
@Test
public void shouldCallReduceOnes() throws Exception
{
whenNew(KeyGroupedIterator.class).withArguments(any()).thenReturn(this.groupedIteratorMock);
when(groupedIteratorMock.nextKey()).thenReturn(true, false);
this.reducer.run(this.iteratorMock, this.collectorMock);
assertThat(this.reducer.reduceCount, is(equalTo(1)));
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.common.type.stub;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.pact.common.stub.MatchStub;
import eu.stratosphere.pact.common.stub.ReduceStub;
import eu.stratosphere.pact.common.type.Value;
/**
* Needed to use the mock implementations of {@link ReduceStub} and {@link MatchStub}.
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
class TupleMock implements Value
{
private String name = "";
public TupleMock()
{}
public TupleMock(String name)
{
this.name = name;
}
@Override
public void read(DataInput in) throws IOException {
}
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public String toString() {
return this.name;
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.example.relational;
/**
* Small base class for the tpc-h queries.
*
*
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class TPCHBase {
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.example.relational;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.DataSinkContract;
import eu.stratosphere.pact.common.contract.DataSourceContract;
import eu.stratosphere.pact.common.contract.MapContract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.PlanAssembler;
import eu.stratosphere.pact.common.plan.PlanAssemblerDescription;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.contracts.tpch1.GroupByReturnFlag;
import eu.stratosphere.pact.example.relational.contracts.tpch1.LineItemFilter;
import eu.stratosphere.pact.example.relational.util.IntTupleDataInFormat;
import eu.stratosphere.pact.example.relational.util.StringTupleDataOutFormat;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class TPCHQuery1 implements PlanAssembler, PlanAssemblerDescription {
private static Logger LOGGER = Logger.getLogger(TPCHQuery1.class);
private int degreeOfParallelism = 1;
private String lineItemInputPath;
private String outputPath;
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.plan.PlanAssembler#getPlan(java.lang.String[])
*/
@Override
public Plan getPlan(String... args) throws IllegalArgumentException {
if(args.length != 3)
{
this.degreeOfParallelism = 1;
this.lineItemInputPath = "";
this.outputPath = "";
}else{
this.degreeOfParallelism = Integer.parseInt(args[0]);
this.lineItemInputPath = args[1];
this.outputPath = args[2];
}
DataSourceContract<PactInteger, Tuple> lineItems =
new DataSourceContract<PactInteger, Tuple>(IntTupleDataInFormat.class, this.lineItemInputPath, "LineItems");
lineItems.setDegreeOfParallelism(this.degreeOfParallelism);
DataSinkContract<PactString, Tuple> result = new DataSinkContract<PactString, Tuple>(
StringTupleDataOutFormat.class, this.outputPath, "Output");
result.setDegreeOfParallelism(this.degreeOfParallelism);
MapContract<PactInteger , Tuple, PactString, Tuple> lineItemFilter =
new MapContract<PactInteger, Tuple, PactString, Tuple>(LineItemFilter.class, "LineItem Filter");
lineItemFilter.setDegreeOfParallelism(this.degreeOfParallelism);
ReduceContract<PactString, Tuple, PactString, Tuple> groupByReturnFlag =
new ReduceContract<PactString, Tuple, PactString, Tuple>(GroupByReturnFlag.class, "groupyBy");
lineItemFilter.setInput(lineItems);
groupByReturnFlag.setInput(lineItemFilter);
result.setInput(groupByReturnFlag);
return new Plan(result, "TPC-H 1");
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.plan.PlanAssemblerDescription#getDescription()
*/
@Override
public String getDescription() {
return "Parameters: [dop] [lineitem-input] [output]";
}
}
......@@ -17,6 +17,8 @@ package eu.stratosphere.pact.example.relational;
import java.util.Iterator;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.DataSinkContract;
import eu.stratosphere.pact.common.contract.DataSourceContract;
import eu.stratosphere.pact.common.contract.MapContract;
......@@ -42,6 +44,8 @@ import eu.stratosphere.pact.example.relational.util.Tuple;
public class TPCHQuery3 implements PlanAssembler, PlanAssemblerDescription {
private static Logger LOGGER = Logger.getLogger(TPCHQuery3.class);
/**
* Implements a modified query 3 of the TPC-H benchmark.
* SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
......@@ -58,7 +62,7 @@ public class TPCHQuery3 implements PlanAssembler, PlanAssemblerDescription {
super();
}
public N_IntStringPair(PactInteger first, PactString second) {
public N_IntStringPair(final PactInteger first, final PactString second) {
super(first, second);
}
}
......@@ -71,20 +75,26 @@ public class TPCHQuery3 implements PlanAssembler, PlanAssemblerDescription {
private final String PRIO_FILTER = "5";
@Override
public void map(PactInteger oKey, Tuple value, Collector<PactInteger, Tuple> out) {
public void map(final PactInteger oKey, final Tuple value, final Collector<PactInteger, Tuple> out) {
if ((Integer.parseInt(value.getStringValueAt(4).substring(0, 4)) > YEAR_FILTER)
&& (value.getStringValueAt(2).equals("F")) && (value.getStringValueAt(5).startsWith(PRIO_FILTER))) {
try {
if (Integer.parseInt(value.getStringValueAt(4).substring(0, 4)) > this.YEAR_FILTER
&& value.getStringValueAt(2).equals("F") && value.getStringValueAt(5).startsWith(this.PRIO_FILTER)) {
// project
value.project(129);
// project
value.project(129);
out.collect(oKey, value);
out.collect(oKey, value);
// Output Schema:
// KEY: ORDERKEY
// VALUE: 0:ORDERKEY, 1:SHIPPRIORITY
// Output Schema:
// KEY: ORDERKEY
// VALUE: 0:ORDERKEY, 1:SHIPPRIORITY
}
} catch (final StringIndexOutOfBoundsException e) {
LOGGER.error(e);
} catch (final Exception ex) {
LOGGER.error(ex);
}
}
}
......@@ -173,20 +183,23 @@ public class TPCHQuery3 implements PlanAssembler, PlanAssemblerDescription {
}
@Override
public Plan getPlan(String... args) {
// check for the correct number of job parameters
if (args.length != 4) {
throw new IllegalArgumentException(
"Must provide four arguments: <parallelism> <orders_input> <lineitem_input> <result_directory>");
public Plan getPlan(final String... args) {
int degreeOfParallelism = 1;
String ordersPath = "";
String lineitemsPath = "";
String resultPath = "";
if (args.length != 4)
LOGGER.warn("number of arguments do not match!");
else {
degreeOfParallelism = Integer.parseInt(args[0]);
ordersPath = args[1];
lineitemsPath = args[2];
resultPath = args[3];
}
int degreeOfParallelism = Integer.parseInt(args[0]);
String ordersPath = args[1];
String lineitemsPath = args[2];
String resultPath = args[3];
DataSourceContract<PactInteger, Tuple> orders = new DataSourceContract<PactInteger, Tuple>(
final DataSourceContract<PactInteger, Tuple> orders = new DataSourceContract<PactInteger, Tuple>(
IntTupleDataInFormat.class, ordersPath, "Orders");
orders.setFormatParameter("delimiter", "\n");
orders.setDegreeOfParallelism(degreeOfParallelism);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.example.relational;
import java.util.Iterator;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.DataSinkContract;
import eu.stratosphere.pact.common.contract.DataSourceContract;
import eu.stratosphere.pact.common.contract.MapContract;
import eu.stratosphere.pact.common.contract.MatchContract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.contract.OutputContract.SameKey;
import eu.stratosphere.pact.common.contract.OutputContract.SuperKey;
import eu.stratosphere.pact.common.contract.OutputContract.UniqueKey;
import eu.stratosphere.pact.common.contract.ReduceContract.Combinable;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.PlanAssembler;
import eu.stratosphere.pact.common.plan.PlanAssemblerDescription;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.MapStub;
import eu.stratosphere.pact.common.stub.MatchStub;
import eu.stratosphere.pact.common.stub.ReduceStub;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactPair;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.util.IntTupleDataInFormat;
import eu.stratosphere.pact.example.relational.util.StringTupleDataOutFormat;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Simple copy of {@link TPCHQuery3} to check the different behaviour of this
* plan without output contracts.
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class TPCHQuery3WithoutOCs implements PlanAssembler, PlanAssemblerDescription {
private static Logger LOGGER = Logger.getLogger(TPCHQuery3.class);
/**
* Implements a modified query 3 of the TPC-H benchmark.
* SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
* FROM orders, lineitem
* WHERE l_orderkey = o_orderkey
* AND o_custkey IN [X]
* AND o_orderdate > [Y]
* GROUP BY l_orderkey, o_shippriority;
*/
public static class N_IntStringPair extends PactPair<PactInteger, PactString> {
public N_IntStringPair() {
super();
}
public N_IntStringPair(PactInteger first, PactString second) {
super(first, second);
}
}
@SameKey
public static class FilterO extends MapStub<PactInteger, Tuple, PactInteger, Tuple> {
private final int YEAR_FILTER = 1993;
private final String PRIO_FILTER = "5";
@Override
public void map(PactInteger oKey, Tuple value, Collector<PactInteger, Tuple> out) {
try{
if ((Integer.parseInt(value.getStringValueAt(4).substring(0, 4)) > YEAR_FILTER)
&& (value.getStringValueAt(2).equals("F")) && (value.getStringValueAt(5).startsWith(PRIO_FILTER))) {
// project
value.project(129);
out.collect(oKey, value);
// Output Schema:
// KEY: ORDERKEY
// VALUE: 0:ORDERKEY, 1:SHIPPRIORITY
}
}catch (StringIndexOutOfBoundsException e) {
LOGGER.error(e);
}catch(Exception ex)
{
LOGGER.error(ex);
}
}
}
@SameKey
public static class ProjectLi extends MapStub<PactInteger, Tuple, PactInteger, Tuple> {
@Override
public void map(PactInteger oKey, Tuple value, Collector<PactInteger, Tuple> out) {
value.project(33);
out.collect(oKey, value);
// Output Schema:
// Key: ORDERKEY
// Value: 0:ORDERKEY, 1:EXTENDEDPRICE
}
}
@SuperKey
public static class JoinLiO extends MatchStub<PactInteger, Tuple, Tuple, N_IntStringPair, Tuple> {
@Override
public void match(PactInteger oKey, Tuple oVal, Tuple liVal, Collector<N_IntStringPair, Tuple> out) {
oVal.concatenate(liVal);
oVal.project(11);
N_IntStringPair superKey = new N_IntStringPair(oKey, new PactString(oVal.getStringValueAt(1)));
out.collect(superKey, oVal);
// Output Schema:
// KEY: ORDERKEY, SHIPPRIORITY
// VALUE: 0:ORDERKEY, 1:SHIPPRIORITY, 2:EXTENDEDPRICE
}
}
@Combinable
public static class AggLiO extends ReduceStub<N_IntStringPair, Tuple, PactInteger, Tuple> {
@Override
public void reduce(N_IntStringPair oKeyShipPrio, Iterator<Tuple> values, Collector<PactInteger, Tuple> out) {
long partExtendedPriceSum = 0;
Tuple value = null;
while (values.hasNext()) {
value = values.next();
partExtendedPriceSum += ((long) Double.parseDouble(value.getStringValueAt(2)));
}
if (value != null) {
value.project(3);
value.addAttribute(partExtendedPriceSum + "");
out.collect(oKeyShipPrio.getFirst(), value);
}
// Output Schema:
// KEY: ORDERKEY
// VALUE: 0:ORDERKEY, 1:SHIPPRIORITY, 2:EXTENDEDPRICESUM
}
@Override
public void combine(N_IntStringPair oKeyShipPrio, Iterator<Tuple> values, Collector<N_IntStringPair, Tuple> out) {
long partExtendedPriceSum = 0;
Tuple value = null;
while (values.hasNext()) {
value = values.next();
partExtendedPriceSum += ((long) Double.parseDouble(value.getStringValueAt(2)));
}
if (value != null) {
value.project(3);
value.addAttribute(partExtendedPriceSum + "");
out.collect(oKeyShipPrio, value);
}
// Output Schema:
// KEY: ORDERKEY, SHIPPRIORITY
// VALUE: 0:ORDERKEY, 1:SHIPPRIORITY, 2:EXTENDEDPRICESUM
}
}
@Override
public Plan getPlan(String... args) {
// check for the correct number of job parameters
int degreeOfParallelism = 1;
String ordersPath = "";
String lineitemsPath = "";
String resultPath = "";
if (args.length != 4) {
LOGGER.warn("number of arguments do not match!");
}else{
degreeOfParallelism = Integer.parseInt(args[0]);
ordersPath = args[1];
lineitemsPath = args[2];
resultPath = args[3];
}
DataSourceContract<PactInteger, Tuple> orders = new DataSourceContract<PactInteger, Tuple>(
IntTupleDataInFormat.class, ordersPath, "Orders");
orders.setFormatParameter("delimiter", "\n");
orders.setDegreeOfParallelism(degreeOfParallelism);
orders.setOutputContract(UniqueKey.class);
orders.getCompilerHints().setAvgNumValuesPerKey(1);
DataSourceContract<PactInteger, Tuple> lineitems = new DataSourceContract<PactInteger, Tuple>(
IntTupleDataInFormat.class, lineitemsPath, "LineItems");
lineitems.setFormatParameter("delimiter", "\n");
lineitems.setDegreeOfParallelism(degreeOfParallelism);
lineitems.getCompilerHints().setAvgNumValuesPerKey(4);
MapContract<PactInteger, Tuple, PactInteger, Tuple> filterO = new MapContract<PactInteger, Tuple, PactInteger, Tuple>(
FilterO.class, "FilterO");
filterO.setDegreeOfParallelism(degreeOfParallelism);
filterO.getCompilerHints().setAvgBytesPerRecord(32);
filterO.getCompilerHints().setSelectivity(0.05f);
filterO.getCompilerHints().setAvgNumValuesPerKey(1);
MapContract<PactInteger, Tuple, PactInteger, Tuple> projectLi = new MapContract<PactInteger, Tuple, PactInteger, Tuple>(
ProjectLi.class, "ProjectLi");
projectLi.setDegreeOfParallelism(degreeOfParallelism);
projectLi.getCompilerHints().setAvgBytesPerRecord(48);
projectLi.getCompilerHints().setSelectivity(1.0f);
projectLi.getCompilerHints().setAvgNumValuesPerKey(4);
MatchContract<PactInteger, Tuple, Tuple, N_IntStringPair, Tuple> joinLiO = new MatchContract<PactInteger, Tuple, Tuple, N_IntStringPair, Tuple>(
JoinLiO.class, "JoinLiO");
joinLiO.setDegreeOfParallelism(degreeOfParallelism);
joinLiO.getCompilerHints().setSelectivity(0.05f);
joinLiO.getCompilerHints().setAvgBytesPerRecord(64);
joinLiO.getCompilerHints().setAvgNumValuesPerKey(4);
ReduceContract<N_IntStringPair, Tuple, PactInteger, Tuple> aggLiO = new ReduceContract<N_IntStringPair, Tuple, PactInteger, Tuple>(
AggLiO.class, "AggLio");
aggLiO.setDegreeOfParallelism(degreeOfParallelism);
aggLiO.getCompilerHints().setAvgBytesPerRecord(64);
aggLiO.getCompilerHints().setSelectivity(0.25f);
aggLiO.getCompilerHints().setAvgNumValuesPerKey(1);
DataSinkContract<PactString, Tuple> result = new DataSinkContract<PactString, Tuple>(
StringTupleDataOutFormat.class, resultPath, "Output");
result.setDegreeOfParallelism(degreeOfParallelism);
result.setInput(aggLiO);
aggLiO.setInput(joinLiO);
joinLiO.setFirstInput(filterO);
filterO.setInput(orders);
joinLiO.setSecondInput(projectLi);
projectLi.setInput(lineitems);
return new Plan(result, "TPCH Q3");
}
@Override
public String getDescription() {
return "Parameters: dop, orders-input, lineitem-input, result";
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.example.relational;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.DataSinkContract;
import eu.stratosphere.pact.common.contract.DataSourceContract;
import eu.stratosphere.pact.common.contract.MapContract;
import eu.stratosphere.pact.common.contract.MatchContract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.contract.OutputContract.UniqueKey;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.PlanAssembler;
import eu.stratosphere.pact.common.plan.PlanAssemblerDescription;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.contracts.tpch4.Aggregation;
import eu.stratosphere.pact.example.relational.contracts.tpch4.Join;
import eu.stratosphere.pact.example.relational.contracts.tpch4.LineItemFilter;
import eu.stratosphere.pact.example.relational.contracts.tpch4.OrdersFilter;
import eu.stratosphere.pact.example.relational.util.IntTupleDataInFormat;
import eu.stratosphere.pact.example.relational.util.StringTupleDataOutFormat;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Implementation of the TPC-H Query 4 as a PACT program.
*
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class TPCHQuery4 implements PlanAssembler, PlanAssemblerDescription {
private static Logger LOGGER = Logger.getLogger(TPCHQuery4.class);
private int degreeOfParallelism = 1;
private String ordersInputPath;
private String lineItemInputPath;
private String outputPath;
/**
* {@inheritDoc}
*/
@Override
public Plan getPlan(String... args) throws IllegalArgumentException {
if(args == null || args.length != 4)
{
LOGGER.warn("number of arguments do not match!");
this.ordersInputPath = "";
this.lineItemInputPath = "";
this.outputPath = "";
}else
{
setArgs(args);
}
DataSourceContract<PactInteger, Tuple> orders =
new DataSourceContract<PactInteger, Tuple>(IntTupleDataInFormat.class, this.ordersInputPath, "Orders");
orders.setDegreeOfParallelism(this.degreeOfParallelism);
orders.setOutputContract(UniqueKey.class);
DataSourceContract<PactInteger, Tuple> lineItems =
new DataSourceContract<PactInteger, Tuple>(IntTupleDataInFormat.class, this.lineItemInputPath, "LineItems");
lineItems.setDegreeOfParallelism(this.degreeOfParallelism);
MatchContract<PactInteger, Tuple, Tuple, PactString, Tuple> join =
new MatchContract<PactInteger, Tuple, Tuple, PactString, Tuple>(
Join.class, "OrdersLineitemsJoin");
join.setDegreeOfParallelism(degreeOfParallelism);
DataSinkContract<PactString, Tuple> result = new DataSinkContract<PactString, Tuple>(
StringTupleDataOutFormat.class, this.outputPath, "Output");
result.setDegreeOfParallelism(degreeOfParallelism);
MapContract<PactInteger, Tuple, PactInteger, Tuple> lineFilter = new MapContract<PactInteger, Tuple, PactInteger, Tuple>(
LineItemFilter.class, "LineItemFilter");
lineFilter.setDegreeOfParallelism(degreeOfParallelism);
MapContract<PactInteger, Tuple, PactInteger, Tuple> ordersFilter = new MapContract<PactInteger, Tuple, PactInteger, Tuple>(
OrdersFilter.class, "OrdersFilter");
ordersFilter.setDegreeOfParallelism(degreeOfParallelism);
ReduceContract<PactString, Tuple, PactString, Tuple> aggregation = new ReduceContract<PactString, Tuple, PactString, Tuple>(
Aggregation.class, "AggregateGroupBy");
aggregation.setDegreeOfParallelism(this.degreeOfParallelism);
lineFilter.setInput(lineItems);
ordersFilter.setInput(orders);
join.setFirstInput(ordersFilter);
join.setSecondInput(lineFilter);
aggregation.setInput(join);
result.setInput(aggregation);
return new Plan(result, "TPC-H 4");
}
/**
* Get the args into the members.
* @param args
*/
private void setArgs(String[] args) {
this.degreeOfParallelism = Integer.parseInt(args[0]);
this.ordersInputPath = args[1];
this.lineItemInputPath = args[2];
this.outputPath = args[3];
}
/**
* {@inheritDoc}
*/
@Override
public String getDescription() {
return "Parameters: [dop] [orders-input] [lineitem-input] [output]";
}
}
/**
*
*/
package eu.stratosphere.pact.example.relational;
/**
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class TPCHQuery4Launcher {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.example.relational.contracts.tpch1;
import java.util.Iterator;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.ReduceStub;
import eu.stratosphere.pact.common.type.base.PactLong;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class GroupByReturnFlag extends ReduceStub<PactString, Tuple, PactString, Tuple> {
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.stub.ReduceStub#reduce(eu.stratosphere.pact.common.type.Key, java.util.Iterator, eu.stratosphere.pact.common.stub.Collector)
*/
@Override
public void reduce(PactString key, Iterator<Tuple> values, Collector<PactString, Tuple> out) {
Tuple returnTuple = new Tuple();
returnTuple.addAttribute(key.toString());
long quantity = 0;
double extendedPriceSum = 0.0;
while(values.hasNext())
{
Tuple t = values.next();
long tupleQuantity = Long.parseLong(t.getStringValueAt(4));
quantity += tupleQuantity;
double extendedPricePerTuple = Double.parseDouble(t.getStringValueAt(5));
extendedPriceSum += extendedPricePerTuple;
}
PactLong pactQuantity = new PactLong(quantity);
returnTuple.addAttribute("" + pactQuantity);
returnTuple.addAttribute("" + extendedPriceSum);
out.collect(key, returnTuple);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.example.relational.contracts.tpch1;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.OutputContract.SameKey;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.MapStub;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Filters the line item tuples according to the filter condition
* l_shipdate <= date '1998-12-01' - interval '[DELTA]' day (3)
* TODO: add parametrisation; first version uses a static interval = 90
*
* In prepration of the following reduce step (see {@link GroupByReturnFlag}) the key has to be set to &quot;return flag&quot;
*
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*/
public class LineItemFilter extends MapStub<PactInteger, Tuple, PactString, Tuple> {
// used later on when the interval will be randomized
// private static final String DATE_CONSTANT = "1998-12-01";
private static final Logger LOGGER = Logger.getLogger(LineItemFilter.class);
private static final String DATE_CONSTANT = "1998-09-03";
@Override
protected void map(PactInteger key, Tuple value, Collector<PactString, Tuple> out) {
if (value != null && value.getNumberOfColumns() >= 11) {
String shipDateString = value.getStringValueAt(10);
DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
try {
Date constantDate = format.parse(DATE_CONSTANT);
Date shipDate = format.parse(shipDateString);
if (shipDate.before(constantDate)) {
String returnFlag = value.getStringValueAt(8);
out.collect(new PactString(returnFlag), value);
}
} catch (ParseException e) {
LOGGER.error(e);
}
}
}
}
/**
*
*/
package eu.stratosphere.pact.example.relational.contracts.tpch4;
import java.util.Iterator;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.OutputContract.SameKey;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.ReduceStub;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Implements the count(*) part.
*
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
@SameKey
public class Aggregation extends ReduceStub<PactString, Tuple, PactString, Tuple> {
private static final Logger LOGGER = Logger.getLogger(Aggregation.class);
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.stub.ReduceStub#reduce(eu.stratosphere.pact.common.type.Key, java.util.Iterator, eu.stratosphere.pact.common.stub.Collector)
*/
@Override
public void reduce(PactString key, Iterator<Tuple> values, Collector<PactString, Tuple> out) {
logic(key, values, out, "reduce");
}
private void logic(PactString key, Iterator<Tuple> values, Collector<PactString, Tuple> out, String context) {
long count = 0;
Tuple t = null;
while(values.hasNext()) {
t = values.next();
count++;
}
if(t != null)
{
t.addAttribute("" + count);
}
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("Constructed tuple " + t.toString() + " in " + context + "() as result");
}
out.collect(key, t);
}
@Override
public void combine(PactString key, Iterator<Tuple> values,
Collector<PactString, Tuple> out) {
logic(key, values, out, "combine");
}
}
/**
*
*/
package eu.stratosphere.pact.example.relational.contracts.tpch4;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.MatchStub;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Implements the equijoin on the orderkey and performs the projection on
* the order priority as well.
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
public class Join extends MatchStub<PactInteger, Tuple, Tuple, PactString, Tuple> {
private static Logger LOGGER = Logger.getLogger(Join.class);
@Override
public void match(PactInteger key, Tuple orderValue, Tuple lineValue,
Collector<PactString, Tuple> outputTuple) {
try{
LOGGER.info("before projection: " + orderValue + " on key " + key);
orderValue.project(32);
LOGGER.info("after projection: " + orderValue + " on key " + key);
String newOrderKey = orderValue.getStringValueAt(0);
outputTuple.collect(new PactString(newOrderKey), orderValue);
}catch(Exception ex)
{
LOGGER.error(ex);
}
}
}
/**
*
*/
package eu.stratosphere.pact.example.relational.contracts.tpch4;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.OutputContract.SameKey;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.MapStub;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Simple filter for the line item selection. It filters all teh tuples that do
* not satisfy the &quot;l_commitdate &lt; l_receiptdate&quot; condition.
*
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
@SameKey
public class LineItemFilter extends
MapStub<PactInteger, Tuple, PactInteger, Tuple> {
private static Logger LOGGER = Logger.getLogger(LineItemFilter.class);
@Override
protected void map(PactInteger key, Tuple value,
Collector<PactInteger, Tuple> out) {
String commitString = value.getStringValueAt(11);
String receiptString = value.getStringValueAt(12);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date commitDate;
try {
commitDate = sdf.parse(commitString);
Date receiptDate = sdf.parse(receiptString);
if (commitDate.before(receiptDate)) {
//TODO: do projection and filter everything except the orderkey
out.collect(key, value);
}
} catch (ParseException ex) {
LOGGER.error(ex);
}
}
}
/**
*
*/
package eu.stratosphere.pact.example.relational.contracts.tpch4;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import org.apache.log4j.Logger;
import eu.stratosphere.pact.common.contract.OutputContract.SameKey;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.stub.MapStub;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.example.relational.util.Tuple;
/**
* Small {@link MapStub} to filer out the irrelevant orders.
* @author Mathias Peters <mathias.peters@informatik.hu-berlin.de>
*
*/
@SameKey
public class OrdersFilter extends MapStub<PactInteger, Tuple, PactInteger, Tuple> {
private static Logger LOGGER = Logger.getLogger(OrdersFilter.class);
private String dateParamString = "1995-01-01";
/* (non-Javadoc)
* @see eu.stratosphere.pact.common.stub.MapStub#map(eu.stratosphere.pact.common.type.Key, eu.stratosphere.pact.common.type.Value, eu.stratosphere.pact.common.stub.Collector)
*/
@Override
protected void map(PactInteger key, Tuple value,
Collector<PactInteger, Tuple> out) {
String orderStringDate = value.getStringValueAt(4);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
try {
Date orderDate = sdf.parse(orderStringDate);
Date paramDate = sdf.parse(this.dateParamString);
Date plusThreeMonths = getPlusThreeMonths(paramDate);
if(paramDate.before(orderDate) && plusThreeMonths.after(orderDate))
{
//TODO: make projection
out.collect(key, value);
}
} catch (ParseException e) {
LOGGER.error(e); }
}
/**
* Calculates the {@link Date} which is three months after the given one.
* @param paramDate of type {@link Date}.
* @return a {@link Date} three month later.
*/
private Date getPlusThreeMonths(Date paramDate) {
GregorianCalendar gregCal = new GregorianCalendar();
gregCal.setTime(paramDate);
gregCal.add(Calendar.MONTH, 3);
Date plusThreeMonths = gregCal.getTime();
return plusThreeMonths;
}
}
package eu.stratosphere.pact.example.relational.contracts.tpch1;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.MockitoAnnotations.initMocks;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.common.stub.Collector;
import eu.stratosphere.pact.common.type.KeyValuePair;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.example.relational.util.Tuple;
import eu.stratosphere.pact.runtime.task.util.OutputCollector;
@SuppressWarnings("unchecked")
public class LineItemFilterTest {
private static final String RETURN_FLAG = "N";
@Mock
RecordWriter<KeyValuePair<PactInteger, Tuple>> recordWriterMock;
@Mock
RecordWriter<KeyValuePair<PactString, Tuple>> recordStringWriterMock;
private List<RecordWriter> writerList = new ArrayList<RecordWriter>();
@Before
public void setUp()
{
initMocks(this);
writerList.add(recordStringWriterMock);
}
@Test
public void shouldNotFilterTuple() throws IOException, InterruptedException
{
LineItemFilter out = new LineItemFilter();
String shipDate = "1996-03-13";
Tuple input = createInputTuple(shipDate);
PactInteger inputKey = new PactInteger();
Collector<PactString, Tuple> collector = new OutputCollector(writerList);
PactString returnFlag = new PactString(RETURN_FLAG);
out.map(inputKey, input, collector);
verify(recordStringWriterMock).emit(new KeyValuePair<PactString, Tuple>(returnFlag, input));
}
@Test
public void shouldFilterTuple() throws IOException, InterruptedException
{
LineItemFilter out = new LineItemFilter();
String shipDate = "1999-03-13";
Tuple input = createInputTuple(shipDate);
PactInteger inputKey = new PactInteger();
Collector<PactString, Tuple> collector = new OutputCollector(writerList);
out.map(inputKey, input, collector);
verifyNoMoreInteractions(recordWriterMock);
}
@Test
public void shouldNotThrowExceptionWhenNullTuple()
{
LineItemFilter out = new LineItemFilter();
Tuple input = null;
PactInteger inputKey = new PactInteger();
Collector<PactString, Tuple> collector = new OutputCollector(writerList);
out.map(inputKey, input, collector);
verifyNoMoreInteractions(recordWriterMock);
}
@Test
public void shouldNoThrowExceptionOnMalformedDate() throws IOException, InterruptedException
{
LineItemFilter out = new LineItemFilter();
String shipDate = "foobarDate";
Tuple input = createInputTuple(shipDate);
PactInteger inputKey = new PactInteger();
Collector<PactString, Tuple> collector = new OutputCollector(writerList);
out.map(inputKey, input, collector);
verifyNoMoreInteractions(recordWriterMock);
}
@Test
public void shouldNoThrowExceptionOnTooShortTuple() throws IOException, InterruptedException
{
LineItemFilter out = new LineItemFilter();
Tuple input = new Tuple();
input.addAttribute("" +1);
input.addAttribute("" + 155190);
input.addAttribute("" + 7706);
input.addAttribute("" + 1);
input.addAttribute("" + 17);
input.addAttribute("" + 21168.23);
input.addAttribute("" + 0.04);
input.addAttribute("" + 0.02);
input.addAttribute(RETURN_FLAG);
input.addAttribute("0");
//the relevant column is missing now
PactInteger inputKey = new PactInteger();
Collector<PactString, Tuple> collector = new OutputCollector(writerList);
out.map(inputKey, input, collector);
verifyNoMoreInteractions(recordWriterMock);
}
/**
* Creates a subtuple of the lineitem relation.
*
* 1155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|
* @param shipDate the date the {@link LineItemFilter} filters for.
* @return
*/
private Tuple createInputTuple(String shipDate) {
Tuple input = new Tuple();
input.addAttribute("" +1);
input.addAttribute("" + 155190);
input.addAttribute("" + 7706);
input.addAttribute("" + 1);
input.addAttribute("" + 17);
input.addAttribute("" + 21168.23);
input.addAttribute("" + 0.04);
input.addAttribute("" + 0.02);
input.addAttribute(RETURN_FLAG);
input.addAttribute("0");
input.addAttribute(shipDate);
return input;
}
}
......@@ -128,7 +128,12 @@
<type>jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<!--
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册