提交 ea4c8828 编写于 作者: F Fabian Hueske

Added TPCH query examples for Scala API

Improved Java TPCH query examples
Removed RelationalQuery Java API example
上级 2b4d7792
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* This program implements the following relational query on the TPC-H data set.
*
* <p>
* <code><pre>
* SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
* FROM orders, lineitem
* WHERE l_orderkey = o_orderkey
* AND o_orderstatus = "X"
* AND YEAR(o_orderdate) > Y
* AND o_orderpriority LIKE "Z%"
* GROUP BY l_orderkey, o_shippriority;
* </pre></code>
*
* <p>
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
*
* <p>
* Usage: <code>RelationalQuery &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;result path&gt;</code><br>
*
* <p>
* This example shows how to use:
* <ul>
* <li> tuple data types
* <li> inline-defined functions
* <li> projection and join projection
* <li> build-in aggregation functions
* </ul>
*/
@SuppressWarnings("serial")
public class RelationalQuery {
// *************************************************************************
// PROGRAM
// *************************************************************************
private static String STATUS_FILTER = "F";
private static int YEAR_FILTER = 1993;
private static String OPRIO_FILTER = "5";
public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
return;
}
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
// filter orders
orders.filter(
new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
@Override
public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
// status filter
if(!t.f1.equals(STATUS_FILTER)) {
return false;
// year filter
} else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
return false;
// order priority filter
} else if(!t.f3.startsWith(OPRIO_FILTER)) {
return false;
}
return true;
}
})
// project fields out that are no longer required
.project(0,4).types(Integer.class, Integer.class);
// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
ordersFilteredByYear.joinWithHuge(lineitems)
.where(0).equalTo(0)
.projectFirst(0,1).projectSecond(1)
.types(Integer.class, Integer.class, Double.class);
// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
// group by order and sum extendedprice
lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
// emit result
priceSums.writeAsCsv(outputPath);
// execute program
env.execute("Relational Query Example");
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private static String ordersPath;
private static String lineitemPath;
private static String outputPath;
private static boolean parseParameters(String[] programArguments) {
if(programArguments.length > 0) {
if(programArguments.length == 3) {
ordersPath = programArguments[0];
lineitemPath = programArguments[1];
outputPath = programArguments[2];
} else {
System.err.println("Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
return false;
}
} else {
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
return false;
}
return true;
}
private static DataSet<Tuple5<Integer, String, String, String, Integer>> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath)
.fieldDelimiter('|')
.includeFields("101011010")
.types(Integer.class, String.class, String.class, String.class, Integer.class);
}
private static DataSet<Tuple2<Integer, Double>> getLineitemDataSet(ExecutionEnvironment env) {
return env.readCsvFile(lineitemPath)
.fieldDelimiter('|')
.includeFields("1000010000000000")
.types(Integer.class, Double.class);
}
}
......@@ -104,13 +104,10 @@ public class TPCHQuery10 {
// get customer data set: (custkey, name, address, nationkey, acctbal)
DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
// get orders data set: (orderkey, custkey, orderdate)
DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
// get nation data set: (nationkey, name)
DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
......@@ -120,46 +117,38 @@ public class TPCHQuery10 {
orders.filter(
new FilterFunction<Tuple3<Integer,Integer, String>>() {
@Override
public boolean filter(Tuple3<Integer, Integer, String> t) {
int year = Integer.parseInt(t.f2.substring(0, 4));
return year > 1990;
public boolean filter(Tuple3<Integer, Integer, String> o) {
return Integer.parseInt(o.f2.substring(0, 4)) > 1990;
}
})
// project fields out that are no longer required
.project(0,1).types(Integer.class, Integer.class);
// lineitems filtered by flag: (orderkey, extendedprice, discount)
DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
// lineitems filtered by flag: (orderkey, revenue)
DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
// filter by flag
lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
@Override
public boolean filter(Tuple4<Integer, Double, Double, String> t)
throws Exception {
return t.f3.equals("R");
public boolean filter(Tuple4<Integer, Double, Double, String> l) {
return l.f3.equals("R");
}
})
// project fields out that are no longer required
.project(0,1,2).types(Integer.class, Double.class, Double.class);
// join orders with lineitems: (custkey, extendedprice, discount)
DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
.where(0).equalTo(0)
.projectFirst(1).projectSecond(1,2)
.types(Integer.class, Double.class, Double.class);
// aggregate for revenue: (custkey, revenue)
DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
// calculate the revenue for each item
.map(new MapFunction<Tuple3<Integer, Double, Double>, Tuple2<Integer, Double>>() {
// compute revenue and project out return flag
.map(new MapFunction<Tuple4<Integer, Double, Double, String>, Tuple2<Integer, Double>>() {
@Override
public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
public Tuple2<Integer, Double> map(Tuple4<Integer, Double, Double, String> l) {
// revenue per item = l_extendedprice * (1 - l_discount)
return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
return new Tuple2<Integer, Double>(l.f0, l.f1 * (1 - l.f2));
}
})
// aggregate the revenues per item to revenue per customer
.groupBy(0).aggregate(Aggregations.SUM, 1);
});
// join orders with lineitems: (custkey, revenue)
DataSet<Tuple2<Integer, Double>> revenueByCustomer =
ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
.where(0).equalTo(0)
.projectFirst(1).projectSecond(1)
.types(Integer.class, Double.class)
.groupBy(0).aggregate(Aggregations.SUM, 1);
// join customer with nation (custkey, name, address, nationname, acctbal)
DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
......@@ -169,14 +158,14 @@ public class TPCHQuery10 {
.types(Integer.class, String.class, String.class, String.class, Double.class);
// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
customerWithNation.join(revenueOfCustomerKey)
DataSet<Tuple6<Integer, String, String, String, Double, Double>> result =
customerWithNation.join(revenueByCustomer)
.where(0).equalTo(0)
.projectFirst(0,1,2,3,4).projectSecond(1)
.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
// emit result
customerWithRevenue.writeAsCsv(outputPath);
result.writeAsCsv(outputPath, "\n", "|");
// execute program
env.execute("TPCH Query 10 Example");
......
......@@ -22,19 +22,15 @@ package org.apache.flink.examples.java.relational;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* This program implements a modified version of the TPC-H query 3. The
......@@ -102,93 +98,72 @@ public class TPCHQuery3 {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<Lineitem> li = getLineitemDataSet(env);
DataSet<Order> or = getOrdersDataSet(env);
DataSet<Customer> cust = getCustomerDataSet(env);
DataSet<Lineitem> lineitems = getLineitemDataSet(env);
DataSet<Order> orders = getOrdersDataSet(env);
DataSet<Customer> customers = getCustomerDataSet(env);
// Filter market segment "AUTOMOBILE"
cust = cust.filter(
new FilterFunction<Customer>() {
customers = customers.filter(
new FilterFunction<Customer>() {
@Override
public boolean filter(Customer c) {
return c.getMktsegment().equals("AUTOMOBILE");
}
});
// Filter all Orders with o_orderdate < 12.03.1995
orders = orders.filter(
new FilterFunction<Order>() {
private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private final Date date = format.parse("1995-03-12");
@Override
public boolean filter(Customer value) {
return value.getMktsegment().equals("AUTOMOBILE");
public boolean filter(Order o) throws ParseException {
return format.parse(o.getOrderdate()).before(date);
}
});
// Filter all Orders with o_orderdate < 12.03.1995
or = or.filter(
new FilterFunction<Order>() {
private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private Date date;
{
Calendar cal = Calendar.getInstance();
cal.set(1995, 3, 12);
date = cal.getTime();
}
@Override
public boolean filter(Order value) throws ParseException {
Date orderDate = format.parse(value.getOrderdate());
return orderDate.before(date);
}
});
// Filter all Lineitems with l_shipdate > 12.03.1995
li = li.filter(
new FilterFunction<Lineitem>() {
private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private Date date;
{
Calendar cal = Calendar.getInstance();
cal.set(1995, 3, 12);
date = cal.getTime();
}
@Override
public boolean filter(Lineitem value) throws ParseException {
Date shipDate = format.parse(value.getShipdate());
return shipDate.after(date);
}
});
lineitems = lineitems.filter(
new FilterFunction<Lineitem>() {
private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
private final Date date = format.parse("1995-03-12");
@Override
public boolean filter(Lineitem l) throws ParseException {
return format.parse(l.getShipdate()).after(date);
}
});
// Join customers with orders and package them into a ShippingPriorityItem
DataSet<ShippingPriorityItem> customerWithOrders =
cust.join(or)
.where(0)
.equalTo(0)
.with(
new JoinFunction<Customer, Order, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(Customer first, Order second) {
return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
second.getShippriority(), second.getOrderkey());
}
});
customers.join(orders).where(0).equalTo(1)
.with(
new JoinFunction<Customer, Order, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(Customer c, Order o) {
return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
o.getShippriority());
}
});
// Join the last join result with Lineitems
DataSet<ShippingPriorityItem> joined =
customerWithOrders.join(li)
.where(4)
.equalTo(0)
DataSet<ShippingPriorityItem> result =
customerWithOrders.join(lineitems).where(0).equalTo(0)
.with(
new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
@Override
public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
first.setL_Orderkey(second.getOrderkey());
first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
return first;
public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
return i;
}
});
// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
joined = joined
.groupBy(0, 2, 3)
.aggregate(Aggregations.SUM, 1);
})
// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
.groupBy(0, 2, 3)
.aggregate(Aggregations.SUM, 1);
// emit result
joined.writeAsCsv(outputPath, "\n", "|");
result.writeAsCsv(outputPath, "\n", "|");
// execute program
env.execute("TPCH Query 3 Example");
......@@ -213,34 +188,33 @@ public class TPCHQuery3 {
public String getMktsegment() { return this.f1; }
}
public static class Order extends Tuple3<Integer, String, Integer> {
public static class Order extends Tuple4<Integer, Integer, String, Integer> {
public Integer getOrderkey() { return this.f0; }
public String getOrderdate() { return this.f1; }
public Integer getShippriority() { return this.f2; }
public Integer getOrderKey() { return this.f0; }
public Integer getCustKey() { return this.f1; }
public String getOrderdate() { return this.f2; }
public Integer getShippriority() { return this.f3; }
}
public static class ShippingPriorityItem extends Tuple5<Integer, Double, String, Integer, Integer> {
public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
public ShippingPriorityItem() { }
public ShippingPriorityItem(Integer l_orderkey, Double revenue,
String o_orderdate, Integer o_shippriority, Integer o_orderkey) {
this.f0 = l_orderkey;
public ShippingPriorityItem(Integer o_orderkey, Double revenue,
String o_orderdate, Integer o_shippriority) {
this.f0 = o_orderkey;
this.f1 = revenue;
this.f2 = o_orderdate;
this.f3 = o_shippriority;
this.f4 = o_orderkey;
}
public Integer getL_Orderkey() { return this.f0; }
public void setL_Orderkey(Integer l_orderkey) { this.f0 = l_orderkey; }
public Integer getOrderkey() { return this.f0; }
public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
public Double getRevenue() { return this.f1; }
public void setRevenue(Double revenue) { this.f1 = revenue; }
public String getOrderdate() { return this.f2; }
public Integer getShippriority() { return this.f3; }
public Integer getO_Orderkey() { return this.f4; }
}
// *************************************************************************
......@@ -291,7 +265,7 @@ public class TPCHQuery3 {
private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
return env.readCsvFile(ordersPath)
.fieldDelimiter('|')
.includeFields("100010010")
.includeFields("110010010")
.tupleType(Order.class);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import org.apache.flink.api.java.aggregation.Aggregations
/**
* This program implements a modified version of the TPC-H query 10.
*
* The original query can be found at
* [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
* (page 45).
*
* This program implements the following SQL equivalent:
*
* {{{
* SELECT
* c_custkey,
* c_name,
* c_address,
* n_name,
* c_acctbal
* SUM(l_extendedprice * (1 - l_discount)) AS revenue,
* FROM
* customer,
* orders,
* lineitem,
* nation
* WHERE
* c_custkey = o_custkey
* AND l_orderkey = o_orderkey
* AND YEAR(o_orderdate) > '1990'
* AND l_returnflag = 'R'
* AND c_nationkey = n_nationkey
* GROUP BY
* c_custkey,
* c_name,
* c_acctbal,
* n_name,
* c_address
* }}}
*
* Compared to the original TPC-H query this version does not print
* c_phone and c_comment, only filters by years greater than 1990 instead of
* a period of 3 months, and does not sort the result by revenue..
*
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at
* [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
*
* Usage:
* {{{
*TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path>
* }}}
*
* This example shows how to use:
* - tuple data types
* - build-in aggregation functions
* - join with size hints
*
*/
object TPCHQuery10 {
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}
// get execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get customer data set: (custkey, name, address, nationkey, acctbal)
val customers = getCustomerDataSet(env)
// get orders data set: (orderkey, custkey, orderdate)
val orders = getOrdersDataSet(env)
// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
val lineitems = getLineitemDataSet(env)
// get nation data set: (nationkey, name)
val nations = getNationDataSet(env)
// filter orders by years
val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990)
.map( o => (o._1, o._2))
// filter lineitems by return status
val lineitemsReturn = lineitems.filter( l => l._4.equals("R"))
.map( l => (l._1, l._2 * (1 - l._3)) )
// compute revenue by customer
val revenueByCustomer = orders1990.joinWithHuge(lineitemsReturn).where(0).equalTo(0)
.apply( (o,l) => (o._2, l._2) )
.groupBy(0)
.aggregate(Aggregations.SUM, 1)
// compute final result by joining customer and nation information with revenue
val result = customers.joinWithTiny(nations).where(3).equalTo(0)
.apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) )
.join(revenueByCustomer).where(0).equalTo(0)
.apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) )
// emit result
result.writeAsCsv(outputPath, "\n", "|")
// execute program
env.execute("Scala TPCH Query 10 Example")
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private var customerPath: String = null
private var ordersPath: String = null
private var lineitemPath: String = null
private var nationPath: String = null
private var outputPath: String = null
private def parseParameters(args: Array[String]): Boolean = {
if (args.length == 5) {
customerPath = args(0)
ordersPath = args(1)
lineitemPath = args(2)
nationPath = args(3)
outputPath = args(4)
true
} else {
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " +
"<lineitem-csv path> <nation-csv path> <result path>");
false
}
}
private def getCustomerDataSet(env: ExecutionEnvironment):
DataSet[Tuple5[Int, String, String, Int, Double]] = {
env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
customerPath,
fieldDelimiter = '|',
includedFields = Array(0,1,2,3,5) )
}
private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = {
env.readCsvFile[Tuple3[Int, Int, String]](
ordersPath,
fieldDelimiter = '|',
includedFields = Array(0, 1, 4) )
}
private def getLineitemDataSet(env: ExecutionEnvironment):
DataSet[Tuple4[Int, Double, Double, String]] = {
env.readCsvFile[Tuple4[Int, Double, Double, String]](
lineitemPath,
fieldDelimiter = '|',
includedFields = Array(0, 5, 6, 8) )
}
private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = {
env.readCsvFile[Tuple2[Int, String]](
nationPath,
fieldDelimiter = '|',
includedFields = Array(0, 1) )
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import org.apache.flink.api.java.aggregation.Aggregations
/**
* This program implements a modified version of the TPC-H query 3. The
* example demonstrates how to assign names to fields by extending the Tuple class.
* The original query can be found at
* [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
* (page 29).
*
* This program implements the following SQL equivalent:
*
* {{{
* SELECT
* l_orderkey,
* SUM(l_extendedprice*(1-l_discount)) AS revenue,
* o_orderdate,
* o_shippriority
* FROM customer,
* orders,
* lineitem
* WHERE
* c_mktsegment = '[SEGMENT]'
* AND c_custkey = o_custkey
* AND l_orderkey = o_orderkey
* AND o_orderdate < date '[DATE]'
* AND l_shipdate > date '[DATE]'
* GROUP BY
* l_orderkey,
* o_orderdate,
* o_shippriority;
* }}}
*
* Compared to the original TPC-H query this version does not sort the result by revenue
* and orderdate.
*
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at
* [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
*
* Usage:
* {{{
* TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
* }}}
*
* This example shows how to use:
* - case classes and case class field addressing
* - build-in aggregation functions
*
*/
object TPCHQuery3 {
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}
// set filter date
val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
val date = dateFormat.parse("1995-03-12")
// get execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read and filter lineitems by shipDate
val lineitems = getLineitemDataSet(env).filter( l => dateFormat.parse(l.shipDate).after(date) )
// read and filter customers by market segment
val customers = getCustomerDataSet(env).filter( c => c.mktSegment.equals("AUTOMOBILE"))
// read orders
val orders = getOrdersDataSet(env)
// filter orders by order date
val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) )
// filter orders by joining with customers
.join(customers).where("custId").equalTo("custId").apply( (o,c) => o )
// join with lineitems
.join(lineitems).where("orderId").equalTo("orderId")
.apply( (o,l) =>
new ShippedItem( o.orderId,
l.extdPrice * (1.0 - l.discount),
o.orderDate,
o.shipPrio ) )
// group by order and aggregate revenue
val result = items.groupBy("orderId", "orderDate", "shipPrio")
.aggregate(Aggregations.SUM, "revenue")
// emit result
result.writeAsCsv(outputPath, "\n", "|")
// execute program
env.execute("Scala TPCH Query 3 Example")
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
case class Lineitem(orderId: Integer, extdPrice: Double, discount: Double, shipDate: String)
case class Customer(custId: Integer, mktSegment: String)
case class Order(orderId: Integer, custId: Integer, orderDate: String, shipPrio: Integer)
case class ShippedItem(orderId: Integer, revenue: Double, orderDate: String, shipPrio: Integer)
// *************************************************************************
// UTIL METHODS
// *************************************************************************
private var lineitemPath: String = null
private var customerPath: String = null
private var ordersPath: String = null
private var outputPath: String = null
private def parseParameters(args: Array[String]): Boolean = {
if (args.length == 4) {
lineitemPath = args(0)
customerPath = args(1)
ordersPath = args(2)
outputPath = args(3)
true
} else {
System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
" Due to legal restrictions, we can not ship generated data.\n" +
" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
" Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" +
"<orders-csv path> <result path>");
false
}
}
private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
env.readCsvFile[Lineitem](
lineitemPath,
fieldDelimiter = '|',
includedFields = Array(0, 5, 6, 10) )
}
private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
env.readCsvFile[Customer](
customerPath,
fieldDelimiter = '|',
includedFields = Array(0, 6) )
}
private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
env.readCsvFile[Order](
ordersPath,
fieldDelimiter = '|',
includedFields = Array(0, 1, 4, 7) )
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册