提交 3333f84f 编写于 作者: F Fabian Hueske

Remove Record API program tests

上级 5558e768
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.CollectionDataSource;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
/**
* test the collection and iterator data input using join operator
*/
@SuppressWarnings("deprecation")
public class CollectionSourceTest extends RecordAPITestBase {
private static final int parallelism = 4;
protected String resultPath;
public CollectionSourceTest(){
setTaskManagerNumSlots(parallelism);
}
public static class Join extends JoinFunction {
private static final long serialVersionUID = 1L;
@Override
public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
out.collect(new Record(value1.getField(1, StringValue.class), value2.getField(1, IntValue.class)));
}
}
public static class SerializableIteratorTest implements Iterator<List<Object>>, Serializable {
private static final long serialVersionUID = 1L;
private final String[] s = WordCountData.COUNTS.split("\n");
private int pos = 0;
public void remove() {
throw new UnsupportedOperationException();
}
public List<Object> next() {
List<Object> tmp = new ArrayList<Object>();
tmp.add(pos);
tmp.add(s[pos++].split(" ")[0]);
return tmp;
}
public boolean hasNext() {
return pos < s.length;
}
}
public Plan getPlan(int numSubTasks, String output) {
List<Object> tmp = new ArrayList<Object>();
int pos = 0;
for (String s : WordCountData.COUNTS.split("\n")) {
List<Object> tmpInner = new ArrayList<Object>();
tmpInner.add(pos++);
tmpInner.add(Integer.parseInt(s.split(" ")[1]));
tmp.add(tmpInner);
}
// test serializable iterator input, the input record is {id, word}
CollectionDataSource source = new CollectionDataSource(new SerializableIteratorTest(), "test_iterator");
// test collection input, the input record is {id, count}
CollectionDataSource source2 = new CollectionDataSource(tmp, "test_collection");
JoinOperator join = JoinOperator.builder(Join.class, IntValue.class, 0, 0)
.input1(source).input2(source2).build();
FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, join, "Collection Join");
CsvOutputFormat.configureRecordFormat(out)
.recordDelimiter('\n')
.fieldDelimiter(' ')
.field(StringValue.class, 0)
.field(IntValue.class, 1);
Plan plan = new Plan(out, "CollectionDataSource");
plan.setExecutionConfig(new ExecutionConfig());
plan.setDefaultParallelism(numSubTasks);
return plan;
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
return getPlan(parallelism, resultPath);
}
@Override
protected void postSubmit() throws Exception {
// Test results
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.java.record.operators.CollectionDataSource;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* Test the input field validation of CollectionDataSource
*/
@SuppressWarnings("deprecation")
public class CollectionValidationTest {
@Test
public void TestArrayInputValidation() throws Exception {
/*
* valid array input
*/
try {
new CollectionDataSource("test_1d_valid_array", "a", "b", "c");
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
try {
new CollectionDataSource("test_2d_valid_array", new Object[][] { { 1, "a" },
{ 2, "b" }, { 3, "c" } });
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
/*
* invalid array input
*/
try {
new CollectionDataSource("test_1d_invalid_array", 1, "b", "c");
Assert.fail("input type is different");
} catch (Exception e) {
}
try {
new CollectionDataSource("test_2d_invalid_array", new Object[][] {
{ 1, "a" }, { 2, "b" }, { 3, 4 } });
Assert.fail("input type is different");
} catch (Exception e) {
}
}
@Test
public void TestCollectionInputValidation() throws Exception {
/*
* valid collection input
*/
try {
List<Object> tmp = new ArrayList<Object>();
for (int i = 0; i < 100; i++) {
tmp.add(i);
}
new CollectionDataSource(tmp, "test_valid_collection");
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
try {
List<Object> tmp = new ArrayList<Object>();
for (int i = 0; i < 100; i++) {
List<Object> inner = new ArrayList<Object>();
inner.add(i);
inner.add('a' + i);
tmp.add(inner);
}
new CollectionDataSource(tmp, "test_valid_double_collection");
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
/*
* invalid collection input
*/
try {
List<Object> tmp = new ArrayList<Object>();
for (int i = 0; i < 100; i++) {
tmp.add(i);
}
tmp.add("a");
new CollectionDataSource(tmp, "test_invalid_collection");
Assert.fail("input type is different");
} catch (Exception e) {
}
try {
List<Object> tmp = new ArrayList<Object>();
for (int i = 0; i < 100; i++) {
List<Object> inner = new ArrayList<Object>();
inner.add(i);
inner.add('a' + i);
tmp.add(inner);
}
List<Object> inner = new ArrayList<Object>();
inner.add('a');
inner.add('a');
tmp.add(inner);
new CollectionDataSource(tmp, "test_invalid_double_collection");
Assert.fail("input type is different");
} catch (Exception e) {
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.ComputeEdgeDegrees;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class ComputeEdgeDegreesITCase extends RecordAPITestBase {
protected String edgesPath = null;
protected String resultPath = null;
private static final String EDGES = "1,2\n1,3\n1,4\n1,5\n2,3\n2,5\n3,4\n3,7\n4,3\n6,5\n8,3\n7,8\n5,6\n";
private static final String EXPECTED = "1,4|2,3\n1,4|3,5\n1,4|4,2\n1,4|5,3\n2,3|3,5\n2,3|5,3\n3,5|4,2\n3,5|7,2\n5,3|6,1\n3,5|8,2\n7,2|8,2\n";
public ComputeEdgeDegreesITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
edgesPath = createTempFile("edges.txt", EDGES);
resultPath = getTempDirPath("edgesWithDegrees");
}
@Override
protected Plan getTestJob() {
ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees();
return computeDegrees.getPlan(String.valueOf(config.getInteger("NumSubtasks", 4)),
edgesPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("NumSubtasks", parallelism);
return toParameterList(config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class EnumTrianglesOnEdgesWithDegreesITCase extends RecordAPITestBase {
private static final String EDGES_WITH_DEGREES = "1,4|2,3\n1,4|3,5\n1,4|4,2\n1,4|5,3\n2,3|3,5\n2,3|5,3\n3,5|4,2\n3,5|7,2\n5,3|6,1\n3,5|8,2\n7,2|8,2\n";
private static final String EXPECTED = "2,1,3\n4,1,3\n2,1,5\n7,3,8\n";
protected String edgesPath;
protected String resultPath;
public EnumTrianglesOnEdgesWithDegreesITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
edgesPath = createTempFile("edgesWithDegrees.txt", EDGES_WITH_DEGREES);
resultPath = getTempDirPath("triangles");
}
@Override
protected Plan getTestJob() {
EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees();
return enumTriangles.getPlan(
String.valueOf(config.getInteger("NumSubtasks", 4)),
edgesPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("NumSubtasks", parallelism);
return toParameterList(config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.EnumTrianglesRdfFoaf;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.Collection;
@RunWith(Parameterized.class)
public class EnumTrianglesRDFITCase extends RecordAPITestBase {
String edgesPath = null;
String resultPath = null;
private static final String EDGES = "<a> <http://xmlns.com/foaf/0.1/knows> <b>\n" + "<a> <http://xmlns.com/foaf/0.1/knows> <c>\n" +
"<a> <http://xmlns.com/foaf/0.1/knows> <d>\n" + "<b> <http://xmlns.com/foaf/0.1/knows> <c>\n" +
"<b> <http://xmlns.com/foaf/0.1/knows> <e>\n" + "<b> <http://xmlns.com/foaf/0.1/knows> <f>\n" +
"<c> <http://xmlns.com/foaf/0.1/knows> <d>\n" + "<d> <http://xmlns.com/foaf/0.1/knows> <b>\n" +
"<f> <http://xmlns.com/foaf/0.1/knows> <g>\n" + "<f> <http://xmlns.com/foaf/0.1/knows> <h>\n" +
"<f> <http://xmlns.com/foaf/0.1/knows> <i>\n" + "<g> <http://xmlns.com/foaf/0.1/knows> <i>\n" +
"<g> <http://willNotWork> <h>\n";
private static final String EXPECTED = "<a> <b> <c>\n" + "<a> <b> <d>\n" + "<a> <c> <d>\n" +
"<b> <c> <d>\n" + "<f> <g> <i>\n";
public EnumTrianglesRDFITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
edgesPath = createTempFile("edges.txt", EDGES);
resultPath = getTempDirPath("triangles");
}
@Override
protected Plan getTestJob() {
EnumTrianglesRdfFoaf enumTriangles = new EnumTrianglesRdfFoaf();
return enumTriangles.getPlan(
String.valueOf(config.getInteger("NumSubtasks", parallelism)), edgesPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("NumSubtasks", parallelism);
return toParameterList(config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.distributions.UniformIntegerDistribution;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
@SuppressWarnings("deprecation")
public class GlobalSortingITCase extends RecordAPITestBase {
private static final int NUM_RECORDS = 100000;
private String recordsPath;
private String resultPath;
private String sortedRecords;
public GlobalSortingITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
ArrayList<Integer> records = new ArrayList<Integer>();
//Generate records
Random rnd = new Random(1988);
StringBuilder sb = new StringBuilder(NUM_RECORDS * 7);
for (int i = 0; i < NUM_RECORDS; i++) {
int number = rnd.nextInt();
records.add(number);
sb.append(number);
sb.append('\n');
}
recordsPath = createTempFile("records", sb.toString());
resultPath = getTempDirPath("result");
// create the expected sorted result
Collections.sort(records);
sb.setLength(0);
for (Integer i : records) {
sb.append(i.intValue());
sb.append('\n');
}
this.sortedRecords = sb.toString();
}
@Override
protected Plan getTestJob() {
GlobalSort globalSort = new GlobalSort();
return globalSort.getPlan(Integer.valueOf(parallelism).toString(), recordsPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
// Test results
compareResultsByLinesInMemoryWithStrictOrder(this.sortedRecords, this.resultPath);
}
private static class GlobalSort implements Program {
private static final long serialVersionUID = 1L;
@Override
public Plan getPlan(String... args) throws IllegalArgumentException {
// parse program parameters
int numSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
String recordsPath = (args.length > 1 ? args[1] : "");
String output = (args.length > 2 ? args[2] : "");
FileDataSource source = new FileDataSource(CsvInputFormat.class, recordsPath);
source.setParallelism(numSubtasks);
CsvInputFormat.configureRecordFormat(source)
.recordDelimiter('\n')
.fieldDelimiter('|')
.field(IntValue.class, 0);
FileDataSink sink =
new FileDataSink(CsvOutputFormat.class, output);
sink.setParallelism(numSubtasks);
CsvOutputFormat.configureRecordFormat(sink)
.recordDelimiter('\n')
.fieldDelimiter('|')
.lenient(true)
.field(IntValue.class, 0);
sink.setGlobalOrder(new Ordering(0, IntValue.class, Order.ASCENDING), new UniformIntegerDistribution(Integer.MIN_VALUE, Integer.MAX_VALUE));
sink.setInput(source);
return new Plan(sink);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Key;
@SuppressWarnings("deprecation")
public class GlobalSortingMixedOrderITCase extends RecordAPITestBase {
private static final int NUM_RECORDS = 100000;
private static final int RANGE_I1 = 100;
private static final int RANGE_I2 = 20;
private static final int RANGE_I3 = 20;
private String recordsPath;
private String resultPath;
private String sortedRecords;
public GlobalSortingMixedOrderITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
ArrayList<TripleInt> records = new ArrayList<TripleInt>();
//Generate records
final Random rnd = new Random(1988);
final StringBuilder sb = new StringBuilder(NUM_RECORDS * 7);
for (int j = 0; j < NUM_RECORDS; j++) {
TripleInt val = new TripleInt(rnd.nextInt(RANGE_I1), rnd.nextInt(RANGE_I2), rnd.nextInt(RANGE_I3));
records.add(val);
sb.append(val);
sb.append('\n');
}
this.recordsPath = createTempFile("records", sb.toString());
this.resultPath = getTempDirPath("result");
// create the sorted result;
Collections.sort(records);
sb.setLength(0);
for (TripleInt val : records) {
sb.append(val);
sb.append('\n');
}
this.sortedRecords = sb.toString();
}
@Override
protected Plan getTestJob() {
GlobalSort globalSort = new GlobalSort();
return globalSort.getPlan(Integer.valueOf(parallelism).toString(), recordsPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
// Test results
compareResultsByLinesInMemoryWithStrictOrder(this.sortedRecords, this.resultPath);
}
public static class TripleIntDistribution implements DataDistribution {
private static final long serialVersionUID = 1L;
private boolean ascendingI1, ascendingI2, ascendingI3;
public TripleIntDistribution(Order orderI1, Order orderI2, Order orderI3) {
this.ascendingI1 = orderI1 != Order.DESCENDING;
this.ascendingI2 = orderI2 != Order.DESCENDING;
this.ascendingI3 = orderI3 != Order.DESCENDING;
}
public TripleIntDistribution() {}
@Override
public void write(DataOutputView out) throws IOException {
out.writeBoolean(this.ascendingI1);
out.writeBoolean(this.ascendingI2);
out.writeBoolean(this.ascendingI3);
}
@Override
public void read(DataInputView in) throws IOException {
this.ascendingI1 = in.readBoolean();
this.ascendingI2 = in.readBoolean();
this.ascendingI3 = in.readBoolean();
}
@Override
public Key<?>[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
final float bucketWidth = ((float) RANGE_I1) / totalNumBuckets;
int boundVal = (int) ((bucketNum + 1) * bucketWidth);
if (!this.ascendingI1) {
boundVal = RANGE_I1 - boundVal;
}
return new Key[] { new IntValue(boundVal), new IntValue(RANGE_I2), new IntValue(RANGE_I3) };
}
@Override
public int getNumberOfFields() {
return 3;
}
}
private static class GlobalSort implements Program {
private static final long serialVersionUID = 1L;
@Override
public Plan getPlan(String... args) throws IllegalArgumentException {
// parse program parameters
final int numSubtasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
final String recordsPath = (args.length > 1 ? args[1] : "");
final String output = (args.length > 2 ? args[2] : "");
@SuppressWarnings("unchecked")
FileDataSource source = new FileDataSource(new CsvInputFormat(',', IntValue.class, IntValue.class, IntValue.class), recordsPath);
FileDataSink sink = new FileDataSink(CsvOutputFormat.class, output);
CsvOutputFormat.configureRecordFormat(sink)
.recordDelimiter('\n')
.fieldDelimiter(',')
.lenient(true)
.field(IntValue.class, 0)
.field(IntValue.class, 1)
.field(IntValue.class, 2);
sink.setGlobalOrder(
new Ordering(0, IntValue.class, Order.DESCENDING)
.appendOrdering(1, IntValue.class, Order.ASCENDING)
.appendOrdering(2, IntValue.class, Order.DESCENDING),
new TripleIntDistribution(Order.DESCENDING, Order.ASCENDING, Order.DESCENDING));
sink.setInput(source);
Plan p = new Plan(sink);
p.setDefaultParallelism(numSubtasks);
return p;
}
}
/**
* Three integers sorting descending, ascending, descending.
*/
private static final class TripleInt implements Comparable<TripleInt> {
private final int i1, i2, i3;
private TripleInt(int i1, int i2, int i3) {
this.i1 = i1;
this.i2 = i2;
this.i3 = i3;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder(32);
bld.append(this.i1);
bld.append(',');
bld.append(this.i2);
bld.append(',');
bld.append(this.i3);
return bld.toString();
}
@Override
public int compareTo(TripleInt o) {
return this.i1 < o.i1 ? 1 : this.i1 > o.i1 ? -1 :
this.i2 < o.i2 ? -1 : this.i2 > o.i2 ? 1 :
this.i3 < o.i3 ? 1 : this.i3 > o.i3 ? -1 : 0;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@SuppressWarnings("deprecation")
public class GroupOrderReduceITCase extends RecordAPITestBase {
private static final String INPUT = "1,3\n" + "2,1\n" + "5,1\n" + "3,1\n" + "1,8\n" + "1,9\n" +
"1,2\n" + "2,3\n" + "7,1\n" + "4,2\n" + "2,7\n" + "2,8\n" +
"1,1\n" + "2,7\n" + "5,4\n" + "4,3\n" + "3,6\n" + "3,7\n" +
"1,3\n" + "2,4\n" + "7,1\n" + "5,3\n" + "4,5\n" + "4,6\n" +
"1,4\n" + "3,9\n" + "8,5\n" + "5,3\n" + "5,4\n" + "5,5\n" +
"1,7\n" + "3,9\n" + "9,3\n" + "6,2\n" + "6,3\n" + "6,4\n" +
"1,8\n" + "3,8\n" + "8,7\n" + "6,2\n" + "7,2\n" + "7,3\n" +
"1,1\n" + "3,7\n" + "9,2\n" + "7,1\n" + "8,1\n" + "8,2\n" +
"1,2\n" + "2,6\n" + "8,7\n" + "7,1\n" + "9,1\n" + "9,1\n" +
"1,1\n" + "2,5\n" + "9,5\n" + "8,2\n" + "10,2\n" + "10,1\n" +
"1,1\n" + "2,6\n" + "2,7\n" + "8,3\n" + "11,3\n" + "11,2\n" +
"1,2\n" + "2,7\n" + "4,2\n" + "9,4\n" + "12,8\n" + "12,3\n" +
"1,2\n" + "4,8\n" + "1,7\n" + "9,5\n" + "13,9\n" + "13,4\n" +
"1,3\n" + "4,2\n" + "3,2\n" + "9,6\n" + "14,7\n" + "14,5\n";
protected String textPath;
protected String resultPath;
public GroupOrderReduceITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("pairs.csv", INPUT);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
int parallelism = this.config.getInteger("GroupOrderTest#NumSubtasks", 1);
@SuppressWarnings("unchecked")
CsvInputFormat format = new CsvInputFormat(',', IntValue.class, IntValue.class);
FileDataSource source = new FileDataSource(format, this.textPath, "Source");
ReduceOperator reducer = ReduceOperator.builder(CheckingReducer.class)
.keyField(IntValue.class, 0)
.input(source)
.name("Ordered Reducer")
.build();
reducer.setGroupOrder(new Ordering(1, IntValue.class, Order.ASCENDING));
FileDataSink sink = new FileDataSink(CsvOutputFormat.class, this.resultPath, reducer, "Sink");
CsvOutputFormat.configureRecordFormat(sink)
.recordDelimiter('\n')
.fieldDelimiter(',')
.field(IntValue.class, 0)
.field(IntValue.class, 1);
Plan p = new Plan(sink);
p.setDefaultParallelism(parallelism);
return p;
}
@Override
protected void postSubmit() throws Exception {
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("GroupOrderTest#NumSubtasks", parallelism);
return toParameterList(config);
}
public static final class CheckingReducer extends ReduceFunction implements Serializable {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
int lastValue = records.next().getField(1, IntValue.class).getValue();
while (records.hasNext()) {
int nextValue = records.next().getField(1, IntValue.class).getValue();
if (nextValue < lastValue) {
throw new Exception("Group Order is violated!");
}
lastValue = nextValue;
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.relational.MergeOnlyJoin;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList;
import java.util.Collection;
@RunWith(Parameterized.class)
public class MergeOnlyJoinITCase extends RecordAPITestBase {
private String input1Path = null;
private String input2Path = null;
private String resultPath = null;
private final String INPUT1 = "1|9|\n"
+ "2|8\n"
+ "3|7\n"
+ "5|5\n"
+ "6|4\n"
+ "7|3\n"
+ "4|6\n"
+ "8|2\n"
+ "2|1\n";
private final String INPUT2 = "2|2|\n"
+ "2|6|\n"
+ "2|1|\n"
+ "4|1|\n"
+ "5|1|\n"
+ "2|1|\n";
private final String EXPECTED_RESULT = "2|8|2\n"
+ "2|8|6\n"
+ "2|8|1\n"
+ "2|8|1\n"
+ "2|1|2\n"
+ "2|1|6\n"
+ "2|1|1\n"
+ "2|1|1\n"
+ "4|6|1\n"
+ "5|5|1\n";
public MergeOnlyJoinITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(4);
}
@Override
protected void preSubmit() throws Exception {
input1Path = createTempFile("input1.txt", INPUT1);
input2Path = createTempFile("input2.txt", INPUT2);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
MergeOnlyJoin mergeOnlyJoin = new MergeOnlyJoin();
return mergeOnlyJoin.getPlan(
String.valueOf(config.getInteger("MergeOnlyJoinTest#NoSubtasks", 1)),
input1Path,
input2Path,
resultPath,
String.valueOf(config.getInteger("MergeOnlyJoinTest#NoSubtasksInput2", 1)));
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
ArrayList<Configuration> tConfigs = new ArrayList<Configuration>();
Configuration config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 3);
tConfigs.add(config);
config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 4);
tConfigs.add(config);
config = new Configuration();
config.setInteger("MergeOnlyJoinTest#NoSubtasks", 3);
config.setInteger("MergeOnlyJoinTest#NoSubtasksInput2", 2);
tConfigs.add(config);
return toParameterList(tConfigs);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.PairwiseSP;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PairwiseSPITCase extends RecordAPITestBase {
String rdfDataPath = null;
String resultPath = null;
/*
private String paths = "A|C|7| |\n" + "A|D|6| |\n" + "B|A|1| |\n" + "B|D|2| |\n" + "C|B|3| |\n" + "C|E|10| |\n"
+ "C|F|12| |\n" + "C|G|9| |\n" + "D|F|5| |\n" + "E|H|2| |\n" + "F|E|3| |\n" + "G|F|1| |\n" + "H|D|2| |\n"
+ "H|E|4| |\n";
*/
private static final String RDF_DATA = "<A> <http://xmlns.com/foaf/0.1/knows> <C>\n" + "<A> <http://xmlns.com/foaf/0.1/knows> <D>\n" +
"<B> <http://xmlns.com/foaf/0.1/knows> <A>\n" + "<B> <http://xmlns.com/foaf/0.1/knows> <D>\n" +
"<C> <http://xmlns.com/foaf/0.1/knows> <B>\n" + "<C> <http://xmlns.com/foaf/0.1/knows> <E>\n" +
"<C> <http://xmlns.com/foaf/0.1/knows> <F>\n" + "<C> <http://xmlns.com/foaf/0.1/knows> <G>\n" +
"<D> <http://xmlns.com/foaf/0.1/knows> <F>\n" + "<E> <http://xmlns.com/foaf/0.1/knows> <H>\n" +
"<F> <http://xmlns.com/foaf/0.1/knows> <E>\n" + "<G> <http://xmlns.com/foaf/0.1/knows> <F>\n" +
"<H> <http://xmlns.com/foaf/0.1/knows> <D>\n" + "<H> <http://xmlns.com/foaf/0.1/knows> <E>\n";
private static final String EXPECTED = "<A>|<C>|1|0| |\n" + "<A>|<D>|1|0| |\n" + "<B>|<A>|1|0| |\n" + "<B>|<D>|1|0| |\n" +
"<C>|<B>|1|0| |\n" + "<C>|<E>|1|0| |\n" + "<C>|<F>|1|0| |\n" + "<C>|<G>|1|0| |\n" +
"<D>|<F>|1|0| |\n" + "<E>|<H>|1|0| |\n" + "<F>|<E>|1|0| |\n" + "<G>|<F>|1|0| |\n" +
"<H>|<D>|1|0| |\n" + "<H>|<E>|1|0| |\n" + "<A>|<B>|2|1|<C>|\n" + "<A>|<E>|2|1|<C>|\n" +
"<A>|<F>|2|1|<C>|\n" + "<A>|<G>|2|1|<C>|\n" + "<A>|<F>|2|1|<D>|\n" + "<B>|<C>|2|1|<A>|\n" +
"<B>|<F>|2|1|<D>|\n" + "<C>|<A>|2|1|<B>|\n" + "<C>|<D>|2|1|<B>|\n" + "<C>|<H>|2|1|<E>|\n" +
"<D>|<E>|2|1|<F>|\n" + "<E>|<D>|2|1|<H>|\n" + "<F>|<H>|2|1|<E>|\n" + "<G>|<E>|2|1|<F>|\n" +
"<H>|<F>|2|1|<D>|\n";
public PairwiseSPITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
rdfDataPath = createTempFile("rdf_data.txt", RDF_DATA);
resultPath = getTempDirPath("ITER_1");
}
@Override
protected Plan getTestJob() {
PairwiseSP a2aSP = new PairwiseSP();
return a2aSP.getPlan(
String.valueOf(config.getInteger("All2AllSPTest#NoSubtasks", parallelism)),
rdfDataPath,
resultPath,
"true");
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("All2AllSPTest#NoSubtasks", parallelism);
return toParameterList(config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.relational.TPCHQuery10;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.Collection;
/**
*/
@RunWith(Parameterized.class)
public class TPCHQuery10ITCase extends RecordAPITestBase {
private final String CUSTOMERS = "36900|Customer#000036900|ppktIUalnJ quTLD1fWZTEMBQwoEUpmI|8|18-347-285-7152|2667.45|MACHINERY|ts. slyly special packages are al|\n"
+ "36901|Customer#000036901|TBb1yDZcf 8Zepk7apFJ|13|23-644-998-4944|4809.84|AUTOMOBILE|nstructions sleep final, regular deposits. quick accounts sleep furiously after the final accounts; instructions wa|\n"
+ "36902|Customer#000036902|nCUCadobbPGA0pzd1yEX3RE|3|13-301-654-8016|8905.80|AUTOMOBILE|le blithely final packages. pending, pending foxes impress qu|\n"
+ "16252|Customer#000016252|Ha0SZbzPcuno,WTyMl1ipU0YtpeuR1|15|25-830-891-9338|7140.55|BUILDING|furiously unusual packages! theodolites haggle along the quickly speci|\n"
+ "130057|Customer#000130057|jQDBlCU2IlHmzkDfcqgIHg2eLsN|9|19-938-862-4157|5009.55|FURNITURE| blithely regular packages. carefully bold accounts sle|\n"
+ "78002|Customer#000078002|v7Jkg5XIqM|10|20-715-308-7926|4128.41|AUTOMOBILE|ly after the special deposits. careful packages|\n"
+ "81763|Customer#000081763|mZtn4M5r0KIw4aooP BXF3ReR RUlPJcAb|8|18-425-613-5972|8368.23|MACHINERY|ronic frays. slyly pending pinto beans are furiously grouches. permanen|\n"
+ "86116|Customer#000086116|63BSp8bODm1dImPJEPTRmsSa4GqNA1SeRqFgx|0|10-356-493-3518|3205.60|AUTOMOBILE| ironic ideas. quickly pending ideas sleep blith|\n";
private final String ORDERS = "1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously among |\n"
+ "2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0| foxes. pending accounts at the pending, silent asymptot|\n"
+ "3|123314|F|193846.25|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular ideas cajole carefully. depos|\n"
+ "4|136777|O|32151.78|1995-10-11|5-LOW|Clerk#000000124|0|sits. slyly regular warthogs cajole. regular, regular theodolites acro|\n"
+ "5|44485|F|144659.20|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages use slyly|\n"
+ "6|55624|F|58749.59|1992-02-21|4-NOT SPECIFIED|Clerk#000000058|0|ggle. special, final requests are against the furiously specia|\n"
+ "7|39136|O|252004.18|1996-01-10|2-HIGH|Clerk#000000470|0|ly special requests |\n"
+ "32|130057|O|208660.75|1995-07-16|2-HIGH|Clerk#000000616|0|ise blithely bold, regular requests. quickly unusual dep|\n"
+ "33|66958|F|163243.98|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request|\n"
+ "34|61001|O|58949.67|1998-07-21|3-MEDIUM|Clerk#000000223|0|ly final packages. fluffily final deposits wake blithely ideas. spe|\n"
+ "35|127588|O|253724.56|1995-10-23|4-NOT SPECIFIED|Clerk#000000259|0|zzle. carefully enticing deposits nag furio|\n"
+ "36|115252|O|68289.96|1995-11-03|1-URGENT|Clerk#000000358|0| quick packages are blithely. slyly silent accounts wake qu|\n"
+ "37|86116|F|206680.66|1992-06-03|3-MEDIUM|Clerk#000000456|0|kly regular pinto beans. carefully unusual waters cajole never|\n"
+ "38|124828|O|82500.05|1996-08-21|4-NOT SPECIFIED|Clerk#000000604|0|haggle blithely. furiously express ideas haggle blithely furiously regular re|\n"
+ "39|81763|O|341734.47|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir|\n"
+ "64|32113|F|39414.99|1994-07-16|3-MEDIUM|Clerk#000000661|0|wake fluffily. sometimes ironic pinto beans about the dolphin|\n"
+ "65|16252|P|110643.60|1995-03-18|1-URGENT|Clerk#000000632|0|ular requests are blithely pending orbits-- even requests against the deposit|\n"
+ "66|129200|F|103740.67|1994-01-20|5-LOW|Clerk#000000743|0|y pending requests integrate|\n";
private final String LINEITEMS = "1|155190|7706|1|17|21168.23|0.04|0.02|R|R|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|\n"
+ "1|67310|7311|2|36|45983.16|0.09|0.06|R|R|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold |\n"
+ "1|63700|3701|3|8|13309.60|0.10|0.02|R|R|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep|\n"
+ "1|2132|4633|4|28|28955.64|0.09|0.06|R|R|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de|\n"
+ "1|24027|1534|5|24|22824.48|0.10|0.04|R|R|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re|\n"
+ "1|15635|638|6|32|49620.16|0.07|0.02|R|R|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex|\n"
+ "2|106170|1191|1|38|44694.46|0.00|0.05|R|R|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a|\n"
+ "3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco|\n"
+ "3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve|\n"
+ "3|128449|3474|3|27|39890.88|0.06|0.07|R|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |\n"
+ "3|29380|1883|4|2|2618.76|0.01|0.06|R|F|1993-12-04|1994-01-07|1994-01-01|NONE|TRUCK|y. fluffily pending d|\n"
+ "3|183095|650|5|28|32986.52|0.04|0.00|R|F|1993-12-14|1994-01-10|1994-01-01|TAKE BACK RETURN|FOB|ages nag slyly pending|\n"
+ "3|62143|9662|6|26|28733.64|0.10|0.02|R|F|1993-10-29|1993-12-18|1993-11-04|TAKE BACK RETURN|RAIL|ges sleep after the caref|\n"
+ "4|88035|5560|1|30|30690.90|0.03|0.08|R|R|1996-01-10|1995-12-14|1996-01-18|DELIVER IN PERSON|REG AIR|- quickly regular packages sleep. idly|\n"
+ "5|108570|8571|1|15|23678.55|0.02|0.04|R|F|1994-10-31|1994-08-31|1994-11-20|NONE|AIR|ts wake furiously |\n"
+ "5|123927|3928|2|26|50723.92|0.07|0.08|R|F|1994-10-16|1994-09-25|1994-10-19|NONE|FOB|sts use slyly quickly special instruc|\n"
+ "5|37531|35|3|50|73426.50|0.08|0.03|R|F|1994-08-08|1994-10-13|1994-08-26|DELIVER IN PERSON|AIR|eodolites. fluffily unusual|\n"
+ "6|139636|2150|1|37|61998.31|0.08|0.03|R|F|1992-04-27|1992-05-15|1992-05-02|TAKE BACK RETURN|TRUCK|p furiously special foxes|\n"
+ "7|182052|9607|1|12|13608.60|0.07|0.03|R|R|1996-05-07|1996-03-13|1996-06-03|TAKE BACK RETURN|FOB|ss pinto beans wake against th|\n"
+ "7|145243|7758|2|9|11594.16|0.08|0.08|R|R|1996-02-01|1996-03-02|1996-02-19|TAKE BACK RETURN|SHIP|es. instructions|\n"
+ "7|94780|9799|3|46|81639.88|0.10|0.07|R|R|1996-01-15|1996-03-27|1996-02-03|COLLECT COD|MAIL| unusual reques|\n"
+ "7|163073|3074|4|28|31809.96|0.03|0.04|R|R|1996-03-21|1996-04-08|1996-04-20|NONE|FOB|. slyly special requests haggl|\n"
+ "7|151894|9440|5|38|73943.82|0.08|0.01|R|R|1996-02-11|1996-02-24|1996-02-18|DELIVER IN PERSON|TRUCK|ns haggle carefully ironic deposits. bl|\n"
+ "7|79251|1759|6|35|43058.75|0.06|0.03|R|R|1996-01-16|1996-02-23|1996-01-22|TAKE BACK RETURN|FOB|jole. excuses wake carefully alongside of |\n"
+ "7|157238|2269|7|5|6476.15|0.04|0.02|R|R|1996-02-10|1996-03-26|1996-02-13|NONE|FOB|ithely regula|\n"
+ "32|82704|7721|1|28|47227.60|0.05|0.08|R|R|1995-10-23|1995-08-27|1995-10-26|TAKE BACK RETURN|TRUCK|sleep quickly. req|\n"
+ "32|197921|441|2|32|64605.44|0.02|0.00|R|R|1995-08-14|1995-10-07|1995-08-27|COLLECT COD|AIR|lithely regular deposits. fluffily |\n"
+ "32|44161|6666|3|2|2210.32|0.09|0.02|R|R|1995-08-07|1995-10-07|1995-08-23|DELIVER IN PERSON|AIR| express accounts wake according to the|\n"
+ "32|2743|7744|4|4|6582.96|0.09|0.03|R|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
+ "32|85811|8320|5|44|79059.64|0.05|0.06|R|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
+ "32|11615|4117|6|6|9159.66|0.04|0.03|R|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
+ "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
+ "33|60519|5532|2|32|47344.32|0.02|0.05|R|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
+ "33|137469|9983|3|5|7532.30|0.05|0.03|R|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"
+ "34|88362|871|1|13|17554.68|0.00|0.07|R|O|1998-10-23|1998-09-14|1998-11-06|NONE|REG AIR|nic accounts. deposits are alon|\n"
+ "34|89414|1923|2|22|30875.02|0.08|0.06|R|O|1998-10-09|1998-10-16|1998-10-12|NONE|FOB|thely slyly p|\n"
+ "34|169544|4577|3|6|9681.24|0.02|0.06|R|O|1998-10-30|1998-09-20|1998-11-05|NONE|FOB|ar foxes sleep |\n"
+ "35|450|2951|1|24|32410.80|0.02|0.00|R|O|1996-02-21|1996-01-03|1996-03-18|TAKE BACK RETURN|FOB|, regular tithe|\n"
+ "35|161940|4457|2|34|68065.96|0.06|0.08|R|O|1996-01-22|1996-01-06|1996-01-27|DELIVER IN PERSON|RAIL|s are carefully against the f|\n"
+ "35|120896|8433|3|7|13418.23|0.06|0.04|R|O|1996-01-19|1995-12-22|1996-01-29|NONE|MAIL| the carefully regular |\n"
+ "35|85175|7684|4|25|29004.25|0.06|0.05|R|O|1995-11-26|1995-12-25|1995-12-21|DELIVER IN PERSON|SHIP| quickly unti|\n"
+ "35|119917|4940|5|34|65854.94|0.08|0.06|R|O|1995-11-08|1996-01-15|1995-11-26|COLLECT COD|MAIL|. silent, unusual deposits boost|\n"
+ "35|30762|3266|6|28|47397.28|0.03|0.02|R|O|1996-02-01|1995-12-24|1996-02-28|COLLECT COD|RAIL|ly alongside of |\n"
+ "36|119767|9768|1|42|75043.92|0.09|0.00|R|O|1996-02-03|1996-01-21|1996-02-23|COLLECT COD|SHIP| careful courts. special |\n"
+ "37|22630|5133|1|40|62105.20|0.09|0.03|R|F|1992-07-21|1992-08-01|1992-08-15|NONE|REG AIR|luffily regular requests. slyly final acco|\n"
+ "37|126782|1807|2|39|70542.42|0.05|0.02|R|F|1992-07-02|1992-08-18|1992-07-28|TAKE BACK RETURN|RAIL|the final requests. ca|\n"
+ "37|12903|5405|3|43|78083.70|0.05|0.08|R|F|1992-07-10|1992-07-06|1992-08-02|DELIVER IN PERSON|TRUCK|iously ste|\n"
+ "38|175839|874|1|44|84252.52|0.04|0.02|R|O|1996-09-29|1996-11-17|1996-09-30|COLLECT COD|MAIL|s. blithely unusual theodolites am|\n"
+ "39|2320|9821|1|44|53782.08|0.09|0.06|R|O|1996-11-14|1996-12-15|1996-12-12|COLLECT COD|RAIL|eodolites. careful|\n"
+ "39|186582|4137|2|26|43383.08|0.08|0.04|R|O|1996-11-04|1996-10-20|1996-11-20|NONE|FOB|ckages across the slyly silent|\n"
+ "39|67831|5350|3|46|82746.18|0.06|0.08|R|O|1996-09-26|1996-12-19|1996-10-26|DELIVER IN PERSON|AIR|he carefully e|\n"
+ "39|20590|3093|4|32|48338.88|0.07|0.05|R|O|1996-10-02|1996-12-19|1996-10-14|COLLECT COD|MAIL|heodolites sleep silently pending foxes. ac|\n"
+ "39|54519|9530|5|43|63360.93|0.01|0.01|R|O|1996-10-17|1996-11-14|1996-10-26|COLLECT COD|MAIL|yly regular i|\n"
+ "39|94368|6878|6|40|54494.40|0.06|0.05|R|O|1996-12-08|1996-10-22|1997-01-01|COLLECT COD|AIR|quickly ironic fox|\n"
+ "64|85951|5952|1|21|40675.95|0.05|0.02|R|F|1994-09-30|1994-09-18|1994-10-26|DELIVER IN PERSON|REG AIR|ch slyly final, thin platelets.|\n"
+ "65|59694|4705|1|26|42995.94|0.03|0.03|R|F|1995-04-20|1995-04-25|1995-05-13|NONE|TRUCK|pending deposits nag even packages. ca|\n"
+ "65|73815|8830|2|22|39353.82|0.00|0.05|R|O|1995-07-17|1995-06-04|1995-07-19|COLLECT COD|FOB| ideas. special, r|\n"
+ "65|1388|3889|3|21|27076.98|0.09|0.07|R|O|1995-07-06|1995-05-14|1995-07-31|DELIVER IN PERSON|RAIL|bove the even packages. accounts nag carefu|\n"
+ "66|115118|7630|1|31|35126.41|0.00|0.08|R|F|1994-02-19|1994-03-11|1994-02-20|TAKE BACK RETURN|RAIL|ut the unusual accounts sleep at the bo|\n"
+ "66|173489|3490|2|41|64061.68|0.04|0.07|R|F|1994-02-21|1994-03-01|1994-03-18|COLLECT COD|AIR| regular de|\n"
+ "67|21636|9143|1|4|6230.52|0.09|0.04|R|O|1997-04-17|1997-01-31|1997-04-20|NONE|SHIP| cajole thinly expres|\n"
+ "67|20193|5198|2|12|13358.28|0.09|0.05|R|O|1997-01-27|1997-02-21|1997-02-22|NONE|REG AIR| even packages cajole|\n"
+ "67|173600|6118|3|5|8368.00|0.03|0.07|R|O|1997-02-20|1997-02-12|1997-02-21|DELIVER IN PERSON|TRUCK|y unusual packages thrash pinto |\n"
+ "67|87514|7515|4|44|66066.44|0.08|0.06|R|O|1997-03-18|1997-01-29|1997-04-13|DELIVER IN PERSON|RAIL|se quickly above the even, express reques|\n"
+ "67|40613|8126|5|23|35733.03|0.05|0.07|R|O|1997-04-19|1997-02-14|1997-05-06|DELIVER IN PERSON|REG AIR|ly regular deposit|\n"
+ "67|178306|824|6|29|40144.70|0.02|0.05|R|O|1997-01-25|1997-01-27|1997-01-27|DELIVER IN PERSON|FOB|ultipliers |\n";
private final String NATIONS = "0|ALGERIA|0| haggle. carefully final deposits detect slyly agai|\n"
+ "1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon|\n"
+ "2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special |\n"
+ "3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold|\n"
+ "4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d|\n"
+ "5|ETHIOPIA|0|ven packages wake quickly. regu|\n"
+ "6|FRANCE|3|refully final requests. regular, ironi|\n"
+ "7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco|\n"
+ "8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun|\n"
+ "9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull|\n"
+ "10|IRAN|4|efully alongside of the slyly final dependencies. |\n"
+ "11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula|\n"
+ "12|JAPAN|2|ously. final, express gifts cajole a|\n"
+ "13|JORDAN|4|ic deposits are blithely about the carefully regular pa|\n"
+ "14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t|\n"
+ "15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets?|\n"
+ "16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r|\n"
+ "17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun|\n"
+ "18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos|\n"
+ "19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account|\n"
+ "20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely|\n"
+ "21|VIETNAM|2|hely enticingly express accounts. even, final |\n"
+ "22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint|\n"
+ "23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull|\n"
+ "24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be|\n";
private final String EXPECTED_RESULT = "36901|Customer#000036901|167183.2296|4809.84|JORDAN|TBb1yDZcf 8Zepk7apFJ|23-644-998-4944|nstructions sleep final, regular deposits. quick accounts sleep furiously after the final accounts; instructions wa|\n"
+ "16252|Customer#000016252|105699.9336|7140.55|MOROCCO|Ha0SZbzPcuno,WTyMl1ipU0YtpeuR1|25-830-891-9338|furiously unusual packages! theodolites haggle along the quickly speci|\n"
+ "130057|Customer#000130057|200081.3676|5009.55|INDONESIA|jQDBlCU2IlHmzkDfcqgIHg2eLsN|19-938-862-4157| blithely regular packages. carefully bold accounts sle|\n"
+ "78002|Customer#000078002|44694.46|4128.41|IRAN|v7Jkg5XIqM|20-715-308-7926|ly after the special deposits. careful packages|\n"
+ "81763|Customer#000081763|325542.7507|8368.23|INDIA|mZtn4M5r0KIw4aooP BXF3ReR RUlPJcAb|18-425-613-5972|ronic frays. slyly pending pinto beans are furiously grouches. permanen|\n"
+ "86116|Customer#000086116|197710.546|3205.60|ALGERIA|63BSp8bODm1dImPJEPTRmsSa4GqNA1SeRqFgx|10-356-493-3518| ironic ideas. quickly pending ideas sleep blith|\n";
private String ordersPath;
private String lineitemsPath;
private String customersPath;
private String nationsPath;
private String resultPath;
public TPCHQuery10ITCase(Configuration testConfig) {
super(testConfig);
}
@Override
protected Plan getTestJob() {
TPCHQuery10 tpchq10 = new TPCHQuery10();
return tpchq10.getPlan(
String.valueOf(config.getInteger("TPCHQuery10Test#NoSubtasks", 1)),
ordersPath,
lineitemsPath,
customersPath,
nationsPath,
resultPath);
}
@Override
protected void preSubmit() throws Exception {
ordersPath = createTempFile("orders.txt", ORDERS);
lineitemsPath = createTempFile("line_items.txt", LINEITEMS);
customersPath = createTempFile("customers.txt", CUSTOMERS);
nationsPath = createTempFile("nations.txt", NATIONS);
resultPath = getTempDirPath("result");
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("TPCHQuery10Test#NoSubtasks", parallelism);
return toParameterList(config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
// -----------------------------------------------------------------------------
// --- NOTE ---
//
// This class contains test data generated by tools from by the
// Transaction Processing Council (TPC), specifically the TPC-H benchmark's
// data generator.
//
// Any form of use and redistribution must happen in accordance with the TPC-H
// Software License Agreement.
//
// For details, see http://www.tpc.org/tpch/dbgen/tpc-h%20license%20agreement.pdf
// -----------------------------------------------------------------------------
@RunWith(Parameterized.class)
public class TPCHQuery3ITCase extends RecordAPITestBase {
protected String ordersPath = null;
protected String lineitemsPath = null;
protected String resultPath = null;
public static final String ORDERS = "1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously among |\n"
+ "2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0| foxes. pending accounts at the pending, silent asymptot|\n"
+ "3|123314|F|193846.25|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular ideas cajole carefully. depos|\n"
+ "4|136777|O|32151.78|1995-10-11|5-LOW|Clerk#000000124|0|sits. slyly regular warthogs cajole. regular, regular theodolites acro|\n"
+ "5|44485|F|144659.20|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages use slyly|\n"
+ "6|55624|F|58749.59|1992-02-21|4-NOT SPECIFIED|Clerk#000000058|0|ggle. special, final requests are against the furiously specia|\n"
+ "7|39136|O|252004.18|1996-01-10|2-HIGH|Clerk#000000470|0|ly special requests |\n"
+ "32|130057|O|208660.75|1995-07-16|2-HIGH|Clerk#000000616|0|ise blithely bold, regular requests. quickly unusual dep|\n"
+ "33|66958|F|163243.98|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request|\n"
+ "34|61001|O|58949.67|1998-07-21|3-MEDIUM|Clerk#000000223|0|ly final packages. fluffily final deposits wake blithely ideas. spe|\n"
+ "35|127588|O|253724.56|1995-10-23|4-NOT SPECIFIED|Clerk#000000259|0|zzle. carefully enticing deposits nag furio|\n"
+ "36|115252|O|68289.96|1995-11-03|1-URGENT|Clerk#000000358|0| quick packages are blithely. slyly silent accounts wake qu|\n"
+ "37|86116|F|206680.66|1992-06-03|3-MEDIUM|Clerk#000000456|0|kly regular pinto beans. carefully unusual waters cajole never|\n"
+ "38|124828|O|82500.05|1996-08-21|4-NOT SPECIFIED|Clerk#000000604|0|haggle blithely. furiously express ideas haggle blithely furiously regular re|\n"
+ "39|81763|O|341734.47|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir|\n"
+ "64|32113|F|39414.99|1994-07-16|3-MEDIUM|Clerk#000000661|0|wake fluffily. sometimes ironic pinto beans about the dolphin|\n"
+ "65|16252|P|110643.60|1995-03-18|1-URGENT|Clerk#000000632|0|ular requests are blithely pending orbits-- even requests against the deposit|\n"
+ "66|129200|F|103740.67|1994-01-20|5-LOW|Clerk#000000743|0|y pending requests integrate|\n";
public static final String LINEITEMS = "1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|\n"
+ "1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold |\n"
+ "1|63700|3701|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep|\n"
+ "1|2132|4633|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de|\n"
+ "1|24027|1534|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re|\n"
+ "1|15635|638|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex|\n"
+ "2|106170|1191|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a|\n"
+ "3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco|\n"
+ "3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve|\n"
+ "3|128449|3474|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |\n"
+ "3|29380|1883|4|2|2618.76|0.01|0.06|A|F|1993-12-04|1994-01-07|1994-01-01|NONE|TRUCK|y. fluffily pending d|\n"
+ "3|183095|650|5|28|32986.52|0.04|0.00|R|F|1993-12-14|1994-01-10|1994-01-01|TAKE BACK RETURN|FOB|ages nag slyly pending|\n"
+ "3|62143|9662|6|26|28733.64|0.10|0.02|A|F|1993-10-29|1993-12-18|1993-11-04|TAKE BACK RETURN|RAIL|ges sleep after the caref|\n"
+ "4|88035|5560|1|30|30690.90|0.03|0.08|N|O|1996-01-10|1995-12-14|1996-01-18|DELIVER IN PERSON|REG AIR|- quickly regular packages sleep. idly|\n"
+ "5|108570|8571|1|15|23678.55|0.02|0.04|R|F|1994-10-31|1994-08-31|1994-11-20|NONE|AIR|ts wake furiously |\n"
+ "5|123927|3928|2|26|50723.92|0.07|0.08|R|F|1994-10-16|1994-09-25|1994-10-19|NONE|FOB|sts use slyly quickly special instruc|\n"
+ "5|37531|35|3|50|73426.50|0.08|0.03|A|F|1994-08-08|1994-10-13|1994-08-26|DELIVER IN PERSON|AIR|eodolites. fluffily unusual|\n"
+ "6|139636|2150|1|37|61998.31|0.08|0.03|A|F|1992-04-27|1992-05-15|1992-05-02|TAKE BACK RETURN|TRUCK|p furiously special foxes|\n"
+ "7|182052|9607|1|12|13608.60|0.07|0.03|N|O|1996-05-07|1996-03-13|1996-06-03|TAKE BACK RETURN|FOB|ss pinto beans wake against th|\n"
+ "7|145243|7758|2|9|11594.16|0.08|0.08|N|O|1996-02-01|1996-03-02|1996-02-19|TAKE BACK RETURN|SHIP|es. instructions|\n"
+ "7|94780|9799|3|46|81639.88|0.10|0.07|N|O|1996-01-15|1996-03-27|1996-02-03|COLLECT COD|MAIL| unusual reques|\n"
+ "7|163073|3074|4|28|31809.96|0.03|0.04|N|O|1996-03-21|1996-04-08|1996-04-20|NONE|FOB|. slyly special requests haggl|\n"
+ "7|151894|9440|5|38|73943.82|0.08|0.01|N|O|1996-02-11|1996-02-24|1996-02-18|DELIVER IN PERSON|TRUCK|ns haggle carefully ironic deposits. bl|\n"
+ "7|79251|1759|6|35|43058.75|0.06|0.03|N|O|1996-01-16|1996-02-23|1996-01-22|TAKE BACK RETURN|FOB|jole. excuses wake carefully alongside of |\n"
+ "7|157238|2269|7|5|6476.15|0.04|0.02|N|O|1996-02-10|1996-03-26|1996-02-13|NONE|FOB|ithely regula|\n"
+ "32|82704|7721|1|28|47227.60|0.05|0.08|N|O|1995-10-23|1995-08-27|1995-10-26|TAKE BACK RETURN|TRUCK|sleep quickly. req|\n"
+ "32|197921|441|2|32|64605.44|0.02|0.00|N|O|1995-08-14|1995-10-07|1995-08-27|COLLECT COD|AIR|lithely regular deposits. fluffily |\n"
+ "32|44161|6666|3|2|2210.32|0.09|0.02|N|O|1995-08-07|1995-10-07|1995-08-23|DELIVER IN PERSON|AIR| express accounts wake according to the|\n"
+ "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
+ "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
+ "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
+ "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
+ "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
+ "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"
+ "34|88362|871|1|13|17554.68|0.00|0.07|N|O|1998-10-23|1998-09-14|1998-11-06|NONE|REG AIR|nic accounts. deposits are alon|\n"
+ "34|89414|1923|2|22|30875.02|0.08|0.06|N|O|1998-10-09|1998-10-16|1998-10-12|NONE|FOB|thely slyly p|\n"
+ "34|169544|4577|3|6|9681.24|0.02|0.06|N|O|1998-10-30|1998-09-20|1998-11-05|NONE|FOB|ar foxes sleep |\n"
+ "35|450|2951|1|24|32410.80|0.02|0.00|N|O|1996-02-21|1996-01-03|1996-03-18|TAKE BACK RETURN|FOB|, regular tithe|\n"
+ "35|161940|4457|2|34|68065.96|0.06|0.08|N|O|1996-01-22|1996-01-06|1996-01-27|DELIVER IN PERSON|RAIL|s are carefully against the f|\n"
+ "35|120896|8433|3|7|13418.23|0.06|0.04|N|O|1996-01-19|1995-12-22|1996-01-29|NONE|MAIL| the carefully regular |\n"
+ "35|85175|7684|4|25|29004.25|0.06|0.05|N|O|1995-11-26|1995-12-25|1995-12-21|DELIVER IN PERSON|SHIP| quickly unti|\n"
+ "35|119917|4940|5|34|65854.94|0.08|0.06|N|O|1995-11-08|1996-01-15|1995-11-26|COLLECT COD|MAIL|. silent, unusual deposits boost|\n"
+ "35|30762|3266|6|28|47397.28|0.03|0.02|N|O|1996-02-01|1995-12-24|1996-02-28|COLLECT COD|RAIL|ly alongside of |\n"
+ "36|119767|9768|1|42|75043.92|0.09|0.00|N|O|1996-02-03|1996-01-21|1996-02-23|COLLECT COD|SHIP| careful courts. special |\n"
+ "37|22630|5133|1|40|62105.20|0.09|0.03|A|F|1992-07-21|1992-08-01|1992-08-15|NONE|REG AIR|luffily regular requests. slyly final acco|\n"
+ "37|126782|1807|2|39|70542.42|0.05|0.02|A|F|1992-07-02|1992-08-18|1992-07-28|TAKE BACK RETURN|RAIL|the final requests. ca|\n"
+ "37|12903|5405|3|43|78083.70|0.05|0.08|A|F|1992-07-10|1992-07-06|1992-08-02|DELIVER IN PERSON|TRUCK|iously ste|\n"
+ "38|175839|874|1|44|84252.52|0.04|0.02|N|O|1996-09-29|1996-11-17|1996-09-30|COLLECT COD|MAIL|s. blithely unusual theodolites am|\n"
+ "39|2320|9821|1|44|53782.08|0.09|0.06|N|O|1996-11-14|1996-12-15|1996-12-12|COLLECT COD|RAIL|eodolites. careful|\n"
+ "39|186582|4137|2|26|43383.08|0.08|0.04|N|O|1996-11-04|1996-10-20|1996-11-20|NONE|FOB|ckages across the slyly silent|\n"
+ "39|67831|5350|3|46|82746.18|0.06|0.08|N|O|1996-09-26|1996-12-19|1996-10-26|DELIVER IN PERSON|AIR|he carefully e|\n"
+ "39|20590|3093|4|32|48338.88|0.07|0.05|N|O|1996-10-02|1996-12-19|1996-10-14|COLLECT COD|MAIL|heodolites sleep silently pending foxes. ac|\n"
+ "39|54519|9530|5|43|63360.93|0.01|0.01|N|O|1996-10-17|1996-11-14|1996-10-26|COLLECT COD|MAIL|yly regular i|\n"
+ "39|94368|6878|6|40|54494.40|0.06|0.05|N|O|1996-12-08|1996-10-22|1997-01-01|COLLECT COD|AIR|quickly ironic fox|\n"
+ "64|85951|5952|1|21|40675.95|0.05|0.02|R|F|1994-09-30|1994-09-18|1994-10-26|DELIVER IN PERSON|REG AIR|ch slyly final, thin platelets.|\n"
+ "65|59694|4705|1|26|42995.94|0.03|0.03|A|F|1995-04-20|1995-04-25|1995-05-13|NONE|TRUCK|pending deposits nag even packages. ca|\n"
+ "65|73815|8830|2|22|39353.82|0.00|0.05|N|O|1995-07-17|1995-06-04|1995-07-19|COLLECT COD|FOB| ideas. special, r|\n"
+ "65|1388|3889|3|21|27076.98|0.09|0.07|N|O|1995-07-06|1995-05-14|1995-07-31|DELIVER IN PERSON|RAIL|bove the even packages. accounts nag carefu|\n"
+ "66|115118|7630|1|31|35126.41|0.00|0.08|R|F|1994-02-19|1994-03-11|1994-02-20|TAKE BACK RETURN|RAIL|ut the unusual accounts sleep at the bo|\n"
+ "66|173489|3490|2|41|64061.68|0.04|0.07|A|F|1994-02-21|1994-03-01|1994-03-18|COLLECT COD|AIR| regular de|\n"
+ "67|21636|9143|1|4|6230.52|0.09|0.04|N|O|1997-04-17|1997-01-31|1997-04-20|NONE|SHIP| cajole thinly expres|\n"
+ "67|20193|5198|2|12|13358.28|0.09|0.05|N|O|1997-01-27|1997-02-21|1997-02-22|NONE|REG AIR| even packages cajole|\n"
+ "67|173600|6118|3|5|8368.00|0.03|0.07|N|O|1997-02-20|1997-02-12|1997-02-21|DELIVER IN PERSON|TRUCK|y unusual packages thrash pinto |\n"
+ "67|87514|7515|4|44|66066.44|0.08|0.06|N|O|1997-03-18|1997-01-29|1997-04-13|DELIVER IN PERSON|RAIL|se quickly above the even, express reques|\n"
+ "67|40613|8126|5|23|35733.03|0.05|0.07|N|O|1997-04-19|1997-02-14|1997-05-06|DELIVER IN PERSON|REG AIR|ly regular deposit|\n"
+ "67|178306|824|6|29|40144.70|0.02|0.05|N|O|1997-01-25|1997-01-27|1997-01-27|DELIVER IN PERSON|FOB|ultipliers |\n";
public static final String EXPECTED_RESULT = "5|0|147828.97\n" + "66|0|99188.09\n";
public TPCHQuery3ITCase(Configuration config) {
super(config);
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
ordersPath = createTempFile("orders", ORDERS);
lineitemsPath = createTempFile("lineitems", LINEITEMS);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
TPCHQuery3 tpch3 = new TPCHQuery3();
return tpch3.getPlan(
String.valueOf(config.getInteger("parallelism", 1)),
ordersPath,
lineitemsPath,
resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() {
Configuration config = new Configuration();
config.setInteger("parallelism", parallelism);
return toParameterList(config);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.test.recordJobs.relational.TPCHQuery3Unioned;
import org.apache.flink.test.util.RecordAPITestBase;
// -----------------------------------------------------------------------------
// --- NOTE ---
//
// This class contains test data generated by tools from by the
// Transaction Processing Council (TPC), specifically the TPC-H benchmark's
// data generator.
//
// Any form of use and redistribution must happen in accordance with the TPC-H
// Software License Agreement.
//
// For details, see http://www.tpc.org/tpch/dbgen/tpc-h%20license%20agreement.pdf
// -----------------------------------------------------------------------------
public class TPCHQuery3WithUnionITCase extends RecordAPITestBase {
private String orders1Path = null;
private String orders2Path = null;
private String partJoin1Path = null;
private String partJoin2Path = null;
private String lineitemsPath = null;
private String resultPath = null;
private static final String ORDERS1 = "1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously among |\n"
+ "2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0| foxes. pending accounts at the pending, silent asymptot|\n"
+ "3|123314|F|193846.25|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular ideas cajole carefully. depos|\n"
+ "4|136777|O|32151.78|1995-10-11|5-LOW|Clerk#000000124|0|sits. slyly regular warthogs cajole. regular, regular theodolites acro|\n"
+ "5|44485|F|144659.20|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages use slyly|\n"
+ "6|55624|F|58749.59|1992-02-21|4-NOT SPECIFIED|Clerk#000000058|0|ggle. special, final requests are against the furiously specia|\n"
+ "7|39136|O|252004.18|1996-01-10|2-HIGH|Clerk#000000470|0|ly special requests |\n"
+ "32|130057|O|208660.75|1995-07-16|2-HIGH|Clerk#000000616|0|ise blithely bold, regular requests. quickly unusual dep|\n";
private static final String ORDERS2 = "33|66958|F|163243.98|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request|\n"
+ "34|61001|O|58949.67|1998-07-21|3-MEDIUM|Clerk#000000223|0|ly final packages. fluffily final deposits wake blithely ideas. spe|\n"
+ "35|127588|O|253724.56|1995-10-23|4-NOT SPECIFIED|Clerk#000000259|0|zzle. carefully enticing deposits nag furio|\n"
+ "36|115252|O|68289.96|1995-11-03|1-URGENT|Clerk#000000358|0| quick packages are blithely. slyly silent accounts wake qu|\n"
+ "37|86116|F|206680.66|1992-06-03|3-MEDIUM|Clerk#000000456|0|kly regular pinto beans. carefully unusual waters cajole never|\n"
+ "38|124828|O|82500.05|1996-08-21|4-NOT SPECIFIED|Clerk#000000604|0|haggle blithely. furiously express ideas haggle blithely furiously regular re|\n"
+ "39|81763|O|341734.47|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir|\n"
+ "64|32113|F|39414.99|1994-07-16|3-MEDIUM|Clerk#000000661|0|wake fluffily. sometimes ironic pinto beans about the dolphin|\n"
+ "65|16252|P|110643.60|1995-03-18|1-URGENT|Clerk#000000632|0|ular requests are blithely pending orbits-- even requests against the deposit|\n"
+ "66|129200|F|103740.67|1994-01-20|5-LOW|Clerk#000000743|0|y pending requests integrate|\n";
private static final String LINEITEMS = "1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|\n"
+ "1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold |\n"
+ "1|63700|3701|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep|\n"
+ "1|2132|4633|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de|\n"
+ "1|24027|1534|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re|\n"
+ "1|15635|638|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex|\n"
+ "2|106170|1191|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a|\n"
+ "3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco|\n"
+ "3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve|\n"
+ "3|128449|3474|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |\n"
+ "3|29380|1883|4|2|2618.76|0.01|0.06|A|F|1993-12-04|1994-01-07|1994-01-01|NONE|TRUCK|y. fluffily pending d|\n"
+ "3|183095|650|5|28|32986.52|0.04|0.00|R|F|1993-12-14|1994-01-10|1994-01-01|TAKE BACK RETURN|FOB|ages nag slyly pending|\n"
+ "3|62143|9662|6|26|28733.64|0.10|0.02|A|F|1993-10-29|1993-12-18|1993-11-04|TAKE BACK RETURN|RAIL|ges sleep after the caref|\n"
+ "4|88035|5560|1|30|30690.90|0.03|0.08|N|O|1996-01-10|1995-12-14|1996-01-18|DELIVER IN PERSON|REG AIR|- quickly regular packages sleep. idly|\n"
+ "5|37531|35|3|50|73426.50|0.08|0.03|A|F|1994-08-08|1994-10-13|1994-08-26|DELIVER IN PERSON|AIR|eodolites. fluffily unusual|\n"
+ "6|139636|2150|1|37|61998.31|0.08|0.03|A|F|1992-04-27|1992-05-15|1992-05-02|TAKE BACK RETURN|TRUCK|p furiously special foxes|\n"
+ "7|182052|9607|1|12|13608.60|0.07|0.03|N|O|1996-05-07|1996-03-13|1996-06-03|TAKE BACK RETURN|FOB|ss pinto beans wake against th|\n"
+ "7|145243|7758|2|9|11594.16|0.08|0.08|N|O|1996-02-01|1996-03-02|1996-02-19|TAKE BACK RETURN|SHIP|es. instructions|\n"
+ "7|94780|9799|3|46|81639.88|0.10|0.07|N|O|1996-01-15|1996-03-27|1996-02-03|COLLECT COD|MAIL| unusual reques|\n"
+ "7|163073|3074|4|28|31809.96|0.03|0.04|N|O|1996-03-21|1996-04-08|1996-04-20|NONE|FOB|. slyly special requests haggl|\n"
+ "7|151894|9440|5|38|73943.82|0.08|0.01|N|O|1996-02-11|1996-02-24|1996-02-18|DELIVER IN PERSON|TRUCK|ns haggle carefully ironic deposits. bl|\n"
+ "7|79251|1759|6|35|43058.75|0.06|0.03|N|O|1996-01-16|1996-02-23|1996-01-22|TAKE BACK RETURN|FOB|jole. excuses wake carefully alongside of |\n"
+ "7|157238|2269|7|5|6476.15|0.04|0.02|N|O|1996-02-10|1996-03-26|1996-02-13|NONE|FOB|ithely regula|\n"
+ "32|82704|7721|1|28|47227.60|0.05|0.08|N|O|1995-10-23|1995-08-27|1995-10-26|TAKE BACK RETURN|TRUCK|sleep quickly. req|\n"
+ "32|197921|441|2|32|64605.44|0.02|0.00|N|O|1995-08-14|1995-10-07|1995-08-27|COLLECT COD|AIR|lithely regular deposits. fluffily |\n"
+ "32|44161|6666|3|2|2210.32|0.09|0.02|N|O|1995-08-07|1995-10-07|1995-08-23|DELIVER IN PERSON|AIR| express accounts wake according to the|\n"
+ "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
+ "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
+ "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
+ "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
+ "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
+ "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"
+ "34|88362|871|1|13|17554.68|0.00|0.07|N|O|1998-10-23|1998-09-14|1998-11-06|NONE|REG AIR|nic accounts. deposits are alon|\n"
+ "34|89414|1923|2|22|30875.02|0.08|0.06|N|O|1998-10-09|1998-10-16|1998-10-12|NONE|FOB|thely slyly p|\n"
+ "34|169544|4577|3|6|9681.24|0.02|0.06|N|O|1998-10-30|1998-09-20|1998-11-05|NONE|FOB|ar foxes sleep |\n"
+ "35|450|2951|1|24|32410.80|0.02|0.00|N|O|1996-02-21|1996-01-03|1996-03-18|TAKE BACK RETURN|FOB|, regular tithe|\n"
+ "35|161940|4457|2|34|68065.96|0.06|0.08|N|O|1996-01-22|1996-01-06|1996-01-27|DELIVER IN PERSON|RAIL|s are carefully against the f|\n"
+ "35|120896|8433|3|7|13418.23|0.06|0.04|N|O|1996-01-19|1995-12-22|1996-01-29|NONE|MAIL| the carefully regular |\n"
+ "35|85175|7684|4|25|29004.25|0.06|0.05|N|O|1995-11-26|1995-12-25|1995-12-21|DELIVER IN PERSON|SHIP| quickly unti|\n"
+ "35|119917|4940|5|34|65854.94|0.08|0.06|N|O|1995-11-08|1996-01-15|1995-11-26|COLLECT COD|MAIL|. silent, unusual deposits boost|\n"
+ "35|30762|3266|6|28|47397.28|0.03|0.02|N|O|1996-02-01|1995-12-24|1996-02-28|COLLECT COD|RAIL|ly alongside of |\n"
+ "36|119767|9768|1|42|75043.92|0.09|0.00|N|O|1996-02-03|1996-01-21|1996-02-23|COLLECT COD|SHIP| careful courts. special |\n"
+ "37|22630|5133|1|40|62105.20|0.09|0.03|A|F|1992-07-21|1992-08-01|1992-08-15|NONE|REG AIR|luffily regular requests. slyly final acco|\n"
+ "37|126782|1807|2|39|70542.42|0.05|0.02|A|F|1992-07-02|1992-08-18|1992-07-28|TAKE BACK RETURN|RAIL|the final requests. ca|\n"
+ "37|12903|5405|3|43|78083.70|0.05|0.08|A|F|1992-07-10|1992-07-06|1992-08-02|DELIVER IN PERSON|TRUCK|iously ste|\n"
+ "38|175839|874|1|44|84252.52|0.04|0.02|N|O|1996-09-29|1996-11-17|1996-09-30|COLLECT COD|MAIL|s. blithely unusual theodolites am|\n"
+ "39|2320|9821|1|44|53782.08|0.09|0.06|N|O|1996-11-14|1996-12-15|1996-12-12|COLLECT COD|RAIL|eodolites. careful|\n"
+ "39|186582|4137|2|26|43383.08|0.08|0.04|N|O|1996-11-04|1996-10-20|1996-11-20|NONE|FOB|ckages across the slyly silent|\n"
+ "39|67831|5350|3|46|82746.18|0.06|0.08|N|O|1996-09-26|1996-12-19|1996-10-26|DELIVER IN PERSON|AIR|he carefully e|\n"
+ "39|20590|3093|4|32|48338.88|0.07|0.05|N|O|1996-10-02|1996-12-19|1996-10-14|COLLECT COD|MAIL|heodolites sleep silently pending foxes. ac|\n"
+ "39|54519|9530|5|43|63360.93|0.01|0.01|N|O|1996-10-17|1996-11-14|1996-10-26|COLLECT COD|MAIL|yly regular i|\n"
+ "39|94368|6878|6|40|54494.40|0.06|0.05|N|O|1996-12-08|1996-10-22|1997-01-01|COLLECT COD|AIR|quickly ironic fox|\n"
+ "64|85951|5952|1|21|40675.95|0.05|0.02|R|F|1994-09-30|1994-09-18|1994-10-26|DELIVER IN PERSON|REG AIR|ch slyly final, thin platelets.|\n"
+ "65|59694|4705|1|26|42995.94|0.03|0.03|A|F|1995-04-20|1995-04-25|1995-05-13|NONE|TRUCK|pending deposits nag even packages. ca|\n"
+ "65|73815|8830|2|22|39353.82|0.00|0.05|N|O|1995-07-17|1995-06-04|1995-07-19|COLLECT COD|FOB| ideas. special, r|\n"
+ "65|1388|3889|3|21|27076.98|0.09|0.07|N|O|1995-07-06|1995-05-14|1995-07-31|DELIVER IN PERSON|RAIL|bove the even packages. accounts nag carefu|\n"
+ "67|21636|9143|1|4|6230.52|0.09|0.04|N|O|1997-04-17|1997-01-31|1997-04-20|NONE|SHIP| cajole thinly expres|\n"
+ "67|20193|5198|2|12|13358.28|0.09|0.05|N|O|1997-01-27|1997-02-21|1997-02-22|NONE|REG AIR| even packages cajole|\n"
+ "67|173600|6118|3|5|8368.00|0.03|0.07|N|O|1997-02-20|1997-02-12|1997-02-21|DELIVER IN PERSON|TRUCK|y unusual packages thrash pinto |\n"
+ "67|87514|7515|4|44|66066.44|0.08|0.06|N|O|1997-03-18|1997-01-29|1997-04-13|DELIVER IN PERSON|RAIL|se quickly above the even, express reques|\n"
+ "67|40613|8126|5|23|35733.03|0.05|0.07|N|O|1997-04-19|1997-02-14|1997-05-06|DELIVER IN PERSON|REG AIR|ly regular deposit|\n"
+ "67|178306|824|6|29|40144.70|0.02|0.05|N|O|1997-01-25|1997-01-27|1997-01-27|DELIVER IN PERSON|FOB|ultipliers |\n";
private static final String PART_JOIN_1 = "5|0|50723.92|\n";
private static final String PART_JOIN_2 = "5|0|23678.55|\n" +
"66|0|64061.68|\n" +
"66|0|35126.41|\n";
private static final String EXPECTED_RESULT = "5|0|147828.97\n" + "66|0|99188.09\n";
public TPCHQuery3WithUnionITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
orders1Path = createTempFile("orders1",ORDERS1);
orders2Path = createTempFile("orders2", ORDERS2);
partJoin1Path = createTempFile("partJoin1", PART_JOIN_1);
partJoin2Path = createTempFile("partJoin2", PART_JOIN_2);
lineitemsPath = createTempFile("lineitems", LINEITEMS);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
TPCHQuery3Unioned tpch3 = new TPCHQuery3Unioned();
return tpch3.getPlan(
Integer.valueOf(parallelism).toString(),
orders1Path,
orders2Path,
partJoin1Path,
partJoin2Path,
lineitemsPath,
resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.test.recordJobs.relational.TPCHQuery4;
import org.apache.flink.test.util.RecordAPITestBase;
public class TPCHQuery4ITCase extends RecordAPITestBase {
private String ordersPath;
private String lineitemsPath;
private String resultPath ;
private final String ORDERS = "1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously among |\n"
+ "2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0| foxes. pending accounts at the pending, silent asymptot|\n"
+ "3|123314|F|193846.25|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular ideas cajole carefully. depos|\n"
+ "4|136777|O|32151.78|1995-10-11|5-LOW|Clerk#000000124|0|sits. slyly regular warthogs cajole. regular, regular theodolites acro|\n"
+ "5|44485|F|144659.20|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages use slyly|\n"
+ "6|55624|F|58749.59|1995-02-21|4-NOT SPECIFIED|Clerk#000000058|0|ggle. special, final requests are against the furiously specia|\n"
+ "7|39136|O|252004.18|1996-01-10|2-HIGH|Clerk#000000470|0|ly special requests |\n"
+ "32|130057|O|208660.75|1995-07-16|2-HIGH|Clerk#000000616|0|ise blithely bold, regular requests. quickly unusual dep|\n"
+ "33|66958|F|163243.98|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request|\n"
+ "34|61001|O|58949.67|1998-07-21|3-MEDIUM|Clerk#000000223|0|ly final packages. fluffily final deposits wake blithely ideas. spe|\n"
+ "35|127588|O|253724.56|1995-01-23|4-NOT SPECIFIED|Clerk#000000259|0|zzle. carefully enticing deposits nag furio|\n"
+ "36|115252|O|68289.96|1995-11-03|1-URGENT|Clerk#000000358|0| quick packages are blithely. slyly silent accounts wake qu|\n"
+ "37|86116|F|206680.66|1995-02-03|3-MEDIUM|Clerk#000000456|0|kly regular pinto beans. carefully unusual waters cajole never|\n"
+ "38|124828|O|82500.05|1996-08-21|4-NOT SPECIFIED|Clerk#000000604|0|haggle blithely. furiously express ideas haggle blithely furiously regular re|\n"
+ "39|81763|O|341734.47|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir|\n"
+ "64|32113|F|39414.99|1994-07-16|3-MEDIUM|Clerk#000000661|0|wake fluffily. sometimes ironic pinto beans about the dolphin|\n"
+ "65|16252|P|110643.60|1995-03-18|1-URGENT|Clerk#000000632|0|ular requests are blithely pending orbits-- even requests against the deposit|\n"
+ "66|129200|F|103740.67|1994-01-20|5-LOW|Clerk#000000743|0|y pending requests integrate|\n";
private static final String LINEITEMS = "1|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|\n"
+ "1|67310|7311|2|36|45983.16|0.09|0.06|N|O|1996-04-12|1996-02-28|1996-04-20|TAKE BACK RETURN|MAIL|ly final dependencies: slyly bold |\n"
+ "1|63700|3701|3|8|13309.60|0.10|0.02|N|O|1996-01-29|1996-03-05|1996-01-31|TAKE BACK RETURN|REG AIR|riously. regular, express dep|\n"
+ "1|2132|4633|4|28|28955.64|0.09|0.06|N|O|1996-04-21|1996-03-30|1996-05-16|NONE|AIR|lites. fluffily even de|\n"
+ "1|24027|1534|5|24|22824.48|0.10|0.04|N|O|1996-03-30|1996-03-14|1996-04-01|NONE|FOB| pending foxes. slyly re|\n"
+ "1|15635|638|6|32|49620.16|0.07|0.02|N|O|1996-01-30|1996-02-07|1996-02-03|DELIVER IN PERSON|MAIL|arefully slyly ex|\n"
+ "2|106170|1191|1|38|44694.46|0.00|0.05|N|O|1997-01-28|1997-01-14|1997-02-02|TAKE BACK RETURN|RAIL|ven requests. deposits breach a|\n"
+ "3|4297|1798|1|45|54058.05|0.06|0.00|R|F|1994-02-02|1994-01-04|1994-02-23|NONE|AIR|ongside of the furiously brave acco|\n"
+ "3|19036|6540|2|49|46796.47|0.10|0.00|R|F|1993-11-09|1993-12-20|1993-11-24|TAKE BACK RETURN|RAIL| unusual accounts. eve|\n"
+ "3|128449|3474|3|27|39890.88|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |\n"
+ "3|29380|1883|4|2|2618.76|0.01|0.06|A|F|1993-12-04|1994-01-07|1994-01-01|NONE|TRUCK|y. fluffily pending d|\n"
+ "3|183095|650|5|28|32986.52|0.04|0.00|R|F|1993-12-14|1994-01-10|1994-01-01|TAKE BACK RETURN|FOB|ages nag slyly pending|\n"
+ "3|62143|9662|6|26|28733.64|0.10|0.02|A|F|1993-10-29|1993-12-18|1993-11-04|TAKE BACK RETURN|RAIL|ges sleep after the caref|\n"
+ "4|88035|5560|1|30|30690.90|0.03|0.08|N|O|1996-01-10|1995-12-14|1996-01-18|DELIVER IN PERSON|REG AIR|- quickly regular packages sleep. idly|\n"
+ "5|108570|8571|1|15|23678.55|0.02|0.04|R|F|1994-10-31|1994-08-31|1994-11-20|NONE|AIR|ts wake furiously |\n"
+ "5|123927|3928|2|26|50723.92|0.07|0.08|R|F|1994-10-16|1994-09-25|1994-10-19|NONE|FOB|sts use slyly quickly special instruc|\n"
+ "5|37531|35|3|50|73426.50|0.08|0.03|A|F|1994-08-08|1994-10-13|1994-08-26|DELIVER IN PERSON|AIR|eodolites. fluffily unusual|\n"
+ "6|139636|2150|1|37|61998.31|0.08|0.03|A|F|1995-04-27|1995-05-15|1995-05-20|TAKE BACK RETURN|TRUCK|p furiously special foxes|\n"
+ "7|182052|9607|1|12|13608.60|0.07|0.03|N|O|1996-05-07|1996-03-13|1996-06-03|TAKE BACK RETURN|FOB|ss pinto beans wake against th|\n"
+ "7|145243|7758|2|9|11594.16|0.08|0.08|N|O|1996-02-01|1996-03-02|1996-02-19|TAKE BACK RETURN|SHIP|es. instructions|\n"
+ "7|94780|9799|3|46|81639.88|0.10|0.07|N|O|1996-01-15|1996-03-27|1996-02-03|COLLECT COD|MAIL| unusual reques|\n"
+ "7|163073|3074|4|28|31809.96|0.03|0.04|N|O|1996-03-21|1996-04-08|1996-04-20|NONE|FOB|. slyly special requests haggl|\n"
+ "7|151894|9440|5|38|73943.82|0.08|0.01|N|O|1996-02-11|1996-02-24|1996-02-18|DELIVER IN PERSON|TRUCK|ns haggle carefully ironic deposits. bl|\n"
+ "7|79251|1759|6|35|43058.75|0.06|0.03|N|O|1996-01-16|1996-02-23|1996-01-22|TAKE BACK RETURN|FOB|jole. excuses wake carefully alongside of |\n"
+ "7|157238|2269|7|5|6476.15|0.04|0.02|N|O|1996-02-10|1996-03-26|1996-02-13|NONE|FOB|ithely regula|\n"
+ "32|82704|7721|1|28|47227.60|0.05|0.08|N|O|1995-10-23|1995-08-27|1995-10-26|TAKE BACK RETURN|TRUCK|sleep quickly. req|\n"
+ "32|197921|441|2|32|64605.44|0.02|0.00|N|O|1995-08-14|1995-10-07|1995-08-27|COLLECT COD|AIR|lithely regular deposits. fluffily |\n"
+ "32|44161|6666|3|2|2210.32|0.09|0.02|N|O|1995-08-07|1995-10-07|1995-08-23|DELIVER IN PERSON|AIR| express accounts wake according to the|\n"
+ "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
+ "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
+ "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
+ "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
+ "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
+ "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"
+ "34|88362|871|1|13|17554.68|0.00|0.07|N|O|1998-10-23|1998-09-14|1998-11-06|NONE|REG AIR|nic accounts. deposits are alon|\n"
+ "34|89414|1923|2|22|30875.02|0.08|0.06|N|O|1998-10-09|1998-10-16|1998-10-12|NONE|FOB|thely slyly p|\n"
+ "34|169544|4577|3|6|9681.24|0.02|0.06|N|O|1998-10-30|1998-09-20|1998-11-05|NONE|FOB|ar foxes sleep |\n"
+ "35|450|2951|1|24|32410.80|0.02|0.00|N|O|1995-02-21|1995-01-03|1995-03-18|TAKE BACK RETURN|FOB|, regular tithe|\n"
+ "35|161940|4457|2|34|68065.96|0.06|0.08|N|O|1995-01-22|1995-01-06|1995-01-27|DELIVER IN PERSON|RAIL|s are carefully against the f|\n"
+ "35|120896|8433|3|7|13418.23|0.06|0.04|N|O|1995-01-19|1995-12-22|1995-01-29|NONE|MAIL| the carefully regular |\n"
+ "35|85175|7684|4|25|29004.25|0.06|0.05|N|O|1995-11-26|1995-12-25|1995-12-21|DELIVER IN PERSON|SHIP| quickly unti|\n"
+ "35|119917|4940|5|34|65854.94|0.08|0.06|N|O|1995-11-08|1995-01-15|1995-11-26|COLLECT COD|MAIL|. silent, unusual deposits boost|\n"
+ "35|30762|3266|6|28|47397.28|0.03|0.02|N|O|1995-02-01|1995-12-24|1995-02-28|COLLECT COD|RAIL|ly alongside of |\n"
+ "36|119767|9768|1|42|75043.92|0.09|0.00|N|O|1996-02-03|1996-01-21|1996-02-23|COLLECT COD|SHIP| careful courts. special |\n"
+ "37|22630|5133|1|40|62105.20|0.09|0.03|A|F|1995-07-21|1995-08-01|1995-08-15|NONE|REG AIR|luffily regular requests. slyly final acco|\n"
+ "37|126782|1807|2|39|70542.42|0.05|0.02|A|F|1995-07-02|1995-08-18|1995-07-28|TAKE BACK RETURN|RAIL|the final requests. ca|\n"
+ "37|12903|5405|3|43|78083.70|0.05|0.08|A|F|1995-07-10|1995-07-06|1995-08-02|DELIVER IN PERSON|TRUCK|iously ste|\n"
+ "38|175839|874|1|44|84252.52|0.04|0.02|N|O|1996-09-29|1996-11-17|1996-09-30|COLLECT COD|MAIL|s. blithely unusual theodolites am|\n"
+ "39|2320|9821|1|44|53782.08|0.09|0.06|N|O|1996-11-14|1996-12-15|1996-12-12|COLLECT COD|RAIL|eodolites. careful|\n"
+ "39|186582|4137|2|26|43383.08|0.08|0.04|N|O|1996-11-04|1996-10-20|1996-11-20|NONE|FOB|ckages across the slyly silent|\n"
+ "39|67831|5350|3|46|82746.18|0.06|0.08|N|O|1996-09-26|1996-12-19|1996-10-26|DELIVER IN PERSON|AIR|he carefully e|\n"
+ "39|20590|3093|4|32|48338.88|0.07|0.05|N|O|1996-10-02|1996-12-19|1996-10-14|COLLECT COD|MAIL|heodolites sleep silently pending foxes. ac|\n"
+ "39|54519|9530|5|43|63360.93|0.01|0.01|N|O|1996-10-17|1996-11-14|1996-10-26|COLLECT COD|MAIL|yly regular i|\n"
+ "39|94368|6878|6|40|54494.40|0.06|0.05|N|O|1996-12-08|1996-10-22|1997-01-01|COLLECT COD|AIR|quickly ironic fox|\n"
+ "64|85951|5952|1|21|40675.95|0.05|0.02|R|F|1994-09-30|1994-09-18|1994-10-26|DELIVER IN PERSON|REG AIR|ch slyly final, thin platelets.|\n"
+ "65|59694|4705|1|26|42995.94|0.03|0.03|A|F|1995-04-20|1995-04-25|1995-05-13|NONE|TRUCK|pending deposits nag even packages. ca|\n"
+ "65|73815|8830|2|22|39353.82|0.00|0.05|N|O|1995-07-17|1995-06-04|1995-05-19|COLLECT COD|FOB| ideas. special, r|\n"
+ "65|1388|3889|3|21|27076.98|0.09|0.07|N|O|1995-07-06|1995-05-14|1995-07-31|DELIVER IN PERSON|RAIL|bove the even packages. accounts nag carefu|\n"
+ "66|115118|7630|1|31|35126.41|0.00|0.08|R|F|1994-02-19|1994-03-11|1994-02-20|TAKE BACK RETURN|RAIL|ut the unusual accounts sleep at the bo|\n"
+ "66|173489|3490|2|41|64061.68|0.04|0.07|A|F|1994-02-21|1994-03-01|1994-03-18|COLLECT COD|AIR| regular de|\n"
+ "67|21636|9143|1|4|6230.52|0.09|0.04|N|O|1997-04-17|1997-01-31|1997-04-20|NONE|SHIP| cajole thinly expres|\n"
+ "67|20193|5198|2|12|13358.28|0.09|0.05|N|O|1997-01-27|1997-02-21|1997-02-22|NONE|REG AIR| even packages cajole|\n"
+ "67|173600|6118|3|5|8368.00|0.03|0.07|N|O|1997-02-20|1997-02-12|1997-02-21|DELIVER IN PERSON|TRUCK|y unusual packages thrash pinto |\n"
+ "67|87514|7515|4|44|66066.44|0.08|0.06|N|O|1997-03-18|1997-01-29|1997-04-13|DELIVER IN PERSON|RAIL|se quickly above the even, express reques|\n"
+ "67|40613|8126|5|23|35733.03|0.05|0.07|N|O|1997-04-19|1997-02-14|1997-05-06|DELIVER IN PERSON|REG AIR|ly regular deposit|\n"
+ "67|178306|824|6|29|40144.70|0.02|0.05|N|O|1997-01-25|1997-01-27|1997-01-27|DELIVER IN PERSON|FOB|ultipliers |\n";
private static final String EXPECTED_RESULT = "1-URGENT|2|\n" + "3-MEDIUM|2|\n" + "4-NOT SPECIFIED|4|";
public TPCHQuery4ITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
ordersPath = createTempFile("orders", ORDERS);
lineitemsPath = createTempFile("lineitems", LINEITEMS);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
TPCHQuery4 tpch4 = new TPCHQuery4();
return tpch4.getPlan(Integer.valueOf(parallelism).toString(), ordersPath, lineitemsPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.test.recordJobs.relational.TPCHQueryAsterix;
import org.apache.flink.test.util.RecordAPITestBase;
// -----------------------------------------------------------------------------
// --- NOTE ---
//
// This class contains test data generated by tools from by the
// Transaction Processing Council (TPC), specifically the TPC-H benchmark's
// data generator.
//
// Any form of use and redistribution must happen in accordance with the TPC-H
// Software License Agreement.
//
// For details, see http://www.tpc.org/tpch/dbgen/tpc-h%20license%20agreement.pdf
// -----------------------------------------------------------------------------
public class TPCHQueryAsterixITCase extends RecordAPITestBase {
private String ordersPath;
private String custPath;
private String resultPath;
private static final String ORDERS =
"1|1|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously among |\n"
+ "2|6|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0| foxes. pending accounts at the pending, silent asymptot|\n"
+ "3|2|F|193846.25|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular ideas cajole carefully. depos|\n"
+ "4|8|O|32151.78|1995-10-11|5-LOW|Clerk#000000124|0|sits. slyly regular warthogs cajole. regular, regular theodolites acro|\n"
+ "5|8|F|144659.20|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages use slyly|\n"
+ "6|1|F|58749.59|1992-02-21|4-NOT SPECIFIED|Clerk#000000058|0|ggle. special, final requests are against the furiously specia|\n"
+ "7|4|O|252004.18|1996-01-10|2-HIGH|Clerk#000000470|0|ly special requests |\n"
+ "32|9|O|208660.75|1995-07-16|2-HIGH|Clerk#000000616|0|ise blithely bold, regular requests. quickly unusual dep|\n"
+ "33|8|F|163243.98|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request|\n"
+ "34|10|O|58949.67|1998-07-21|3-MEDIUM|Clerk#000000223|0|ly final packages. fluffily final deposits wake blithely ideas. spe|\n"
+ "35|2|O|253724.56|1995-10-23|4-NOT SPECIFIED|Clerk#000000259|0|zzle. carefully enticing deposits nag furio|\n"
+ "36|9|O|68289.96|1995-11-03|1-URGENT|Clerk#000000358|0| quick packages are blithely. slyly silent accounts wake qu|\n"
+ "37|1|F|206680.66|1992-06-03|3-MEDIUM|Clerk#000000456|0|kly regular pinto beans. carefully unusual waters cajole never|\n"
+ "38|7|O|82500.05|1996-08-21|4-NOT SPECIFIED|Clerk#000000604|0|haggle blithely. furiously express ideas haggle blithely furiously regular re|\n"
+ "39|1|O|341734.47|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir|\n"
+ "64|4|F|39414.99|1994-07-16|3-MEDIUM|Clerk#000000661|0|wake fluffily. sometimes ironic pinto beans about the dolphin|\n"
+ "65|2|P|110643.60|1995-03-18|1-URGENT|Clerk#000000632|0|ular requests are blithely pending orbits-- even requests against the deposit|\n"
+ "66|3|F|103740.67|1994-01-20|5-LOW|Clerk#000000743|0|y pending requests integrate|\n";
private static final String CUSTOMERS =
"1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e|\n"+
"2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref|\n"+
"3|Customer#000000003|MG9kdTD2WBHm|1|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov|\n"+
"4|Customer#000000004|XxVSJsLAGtn|4|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou|\n"+
"5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|3|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor|\n"+
"6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|20|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious|\n"+
"7|Customer#000000007|TcGe5gaZNgVePxU5kRrvXBfkasDTea|18|28-190-982-9759|9561.95|AUTOMOBILE|ainst the ironic, express theodolites. express, even pinto beans among the exp|\n"+
"8|Customer#000000008|I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5|17|27-147-574-9335|6819.74|BUILDING|among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly along the ide|\n"+
"9|Customer#000000009|xKiAFTjUsCuxfeleNqefumTrjS|8|18-338-906-3675|8324.07|FURNITURE|r theodolites according to the requests wake thinly excuses: pending requests haggle furiousl|\n"+
"10|Customer#000000010|6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2|5|15-741-346-9870|2753.54|HOUSEHOLD|es regular deposits haggle. fur|\n";
private static final String EXPECTED_RESULT =
"7|BUILDING\n" +
"1|HOUSEHOLD\n" +
"6|AUTOMOBILE\n" +
"2|MACHINERY\n" +
"2|FURNITURE\n";
public TPCHQueryAsterixITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
ordersPath = createTempFile("orders", ORDERS);
custPath = createTempFile("customers", CUSTOMERS);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
TPCHQueryAsterix tpchBench = new TPCHQueryAsterix();
return tpchBench.getPlan(Integer.valueOf(parallelism).toString(), ordersPath, custPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(EXPECTED_RESULT, resultPath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import org.apache.flink.api.common.Plan;
import org.apache.flink.test.recordJobs.sort.TeraSort;
import org.apache.flink.test.util.RecordAPITestBase;
import org.junit.Assert;
public class TeraSortITCase extends RecordAPITestBase {
private static final String INPUT_DATA_FILE = "/testdata/terainput.txt";
private String resultPath;
public TeraSortITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
String testDataPath = getClass().getResource(INPUT_DATA_FILE).toString();
TeraSort ts = new TeraSort();
return ts.getPlan(Integer.valueOf(parallelism).toString(), testDataPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
final byte[] line = new byte[100];
final byte[] previous = new byte[10];
for (int i = 0; i < previous.length; i++) {
previous[i] = -128;
}
File parent = new File(new URI(resultPath).getPath());
int num = 1;
while (true) {
File next = new File(parent, String.valueOf(num));
if (!next.exists()) {
break;
}
FileInputStream inStream = new FileInputStream(next);
int read;
while ((read = inStream.read(line)) == 100) {
// check against the previous
for (int i = 0; i < previous.length; i++) {
if (line[i] > previous[i]) {
break;
} else if (line[i] < previous[i]) {
Assert.fail("Next record is smaller than previous record.");
}
}
System.arraycopy(line, 0, previous, 0, 10);
}
if (read != -1) {
Assert.fail("Inclomplete last record in result file.");
}
inStream.close();
num++;
}
if (num == 1) {
Assert.fail("Empty result, nothing checked for Job!");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import org.apache.flink.api.common.Plan;
import org.apache.flink.test.recordJobs.wordcount.WordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.RecordAPITestBase;
public class WordCountITCase extends RecordAPITestBase {
protected String textPath;
protected String resultPath;
public WordCountITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
}
@Override
protected Plan getTestJob() {
WordCount wc = new WordCount();
return wc.getPlan(Integer.valueOf(parallelism).toString(), textPath, resultPath);
}
@Override
protected void postSubmit() throws Exception {
// Test results
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recordJobTests;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.test.recordJobs.wordcount.WordCount.CountWords;
import org.apache.flink.test.recordJobs.wordcount.WordCount.TokenizeLine;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
/**
* WordCount with multiple inputs to the reducer.
* <p>
* This test case is an adaption of issue #192 (and #124), which revealed problems with the union readers in Nephele.
* The problems have been fixed with commit 1228a5e. Without this commit the test will deadlock.
*/
@SuppressWarnings("deprecation")
public class WordCountUnionReduceITCase extends RecordAPITestBase {
private static final int MULTIPLY = 1000;
private String inputPath;
private String outputPath;
public WordCountUnionReduceITCase(){
setTaskManagerNumSlots(parallelism);
}
@Override
protected void preSubmit() throws Exception {
// the fixed input is repeated this many times and the expected counts
// are multiplied by this factor, because the problem only occurs with
// inputs of a certain size
String input = repeatString(WordCountData.TEXT, MULTIPLY);
this.inputPath = createTempFile("input.txt", input);
this.outputPath = getTempDirPath("output");
}
@Override
protected Plan getTestJob() {
WordCountUnionReduce wc = new WordCountUnionReduce();
return wc.getPlan(this.inputPath, this.outputPath, parallelism);
}
@Override
protected void postSubmit() throws Exception {
String expectedCounts =
multiplyIntegersInString(WordCountData.COUNTS,
// adjust counts to string repetition (InputSizeFactor) and two mappers (*2)
MULTIPLY * 2);
compareResultsByLinesInMemory(expectedCounts, this.outputPath);
}
/**
* This is the adapted plan from issue #192.
*/
private class WordCountUnionReduce {
/**
* <pre>
* +-------------+
* //=> | MapOperator | =\\
* +--------+ // +-------------+ \\ +----------------+ +------+
* | Source | =| |=> | ReduceOperator | => | Sink |
* +--------+ \\ +-------------+ // +----------------+ +------+
* \\=> | MapOperator | =//
* +-------------+
* </pre>
*/
public Plan getPlan(String inputPath, String outputPath, int numSubtasks) {
FileDataSource source = new FileDataSource(TextInputFormat.class, inputPath, "First Input");
MapOperator wordsFirstInput = MapOperator.builder(TokenizeLine.class)
.input(source)
.name("Words (First Input)")
.build();
MapOperator wordsSecondInput = MapOperator.builder(TokenizeLine.class)
.input(source)
.name("Words (Second Input)")
.build();
@SuppressWarnings("unchecked")
ReduceOperator counts = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
.input(wordsFirstInput, wordsSecondInput)
.name("Word Counts")
.build();
FileDataSink sink = new FileDataSink(CsvOutputFormat.class, outputPath, counts);
CsvOutputFormat.configureRecordFormat(sink)
.recordDelimiter('\n')
.fieldDelimiter(' ')
.field(StringValue.class, 0)
.field(IntValue.class, 1);
Plan plan = new Plan(sink, "WordCount Union Reduce");
plan.setDefaultParallelism(numSubtasks);
return plan;
}
}
/**
* Repeats the given String and returns the resulting String.
*
* @param str
* the string to repeat
* @param n
* the number of times to repeat the string
* @return repeated string if n > 1, otherwise the input string
*/
private String repeatString(String str, int n) {
if (n <= 1) {
return str;
}
StringBuilder sb = new StringBuilder(str.length() * n + 1);
for (int i = 0; i < n; i++) {
sb.append(str);
}
return sb.toString();
}
/**
* Returns a new String with all occurring integers multiplied.
*
* @param str
* the string which contains integers to multiply
* @param n
* the factor to multiply each integer with
* @return new string with multiplied integers
*/
private String multiplyIntegersInString(String str, int n) {
Pattern counts = Pattern.compile("(\\d+)");
Matcher matcher = counts.matcher(str);
StringBuffer sb = new StringBuffer(str.length());
boolean hasMatch = false;
while (matcher.find()) {
hasMatch = true;
matcher.appendReplacement(sb, String.valueOf(n * Integer.parseInt(matcher.group(1))));
}
return hasMatch ? sb.toString() : str;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册