提交 a1fef27b 编写于 作者: T twalthr

[FLINK-4180] [FLINK-4181] [table] Ensure examples consistency

上级 123c637e
......@@ -24,35 +24,25 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.table.TableEnvironment;
/**
* Simple example that shows how the Batch SQL used in Java.
* Simple example that shows how the Batch SQL API is used in Java.
*
* This example shows how to:
* - Convert DataSets to Tables
* - Register a Table under a name
* - Run a SQL query on the registered Table
*
*/
public class JavaSQLExample {
public class WordCountSQL {
public static class WC {
public String word;
public long frequency;
// Public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
......@@ -60,13 +50,38 @@ public class JavaSQLExample {
new WC("Hello", 1));
// register the DataSet as table "WordCount"
tableEnv.registerDataSet("WordCount", input, "word, frequency");
tEnv.registerDataSet("WordCount", input, "word, frequency");
// run a SQL query on the Table and retrieve the result as a new Table
Table table = tableEnv.sql(
Table table = tEnv.sql(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
public static class WC {
public String word;
public long frequency;
// public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
}
......@@ -24,48 +24,62 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.table.TableEnvironment;
/**
* Very simple example that shows how the Java Table API can be used.
*/
public class JavaTableExample {
public static class WC {
public String word;
public long count;
// Public constructor to make it a Flink POJO
public WC() {
}
* Simple example for demonstrating the use of the Table API for a Word Count in Java.
*
* This example shows how to:
* - Convert DataSets to Tables
* - Apply group, aggregate, select, and filter operations
*
*/
public class WordCountTable {
public WC(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC " + word + " " + count;
}
}
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
Table table = tableEnv.fromDataSet(input);
Table table = tEnv.fromDataSet(input);
Table filtered = table
.groupBy("word")
.select("word.count as count, word")
.filter("count = 2");
.select("word, frequency.sum as frequency")
.filter("frequency = 2");
DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);
result.print();
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
public static class WC {
public String word;
public long frequency;
// public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
}
......@@ -23,11 +23,19 @@ import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* Simple example for demonstrating the use of SQL on Stream Table.
* Simple example for demonstrating the use of SQL on a Stream Table.
*
* This example shows how to:
* - Convert DataStreams to Tables
* - Register a Table under a name
* - Run a StreamSQL query on the registered Table
*
*/
object StreamSQLExample {
case class Order(user: Long, product: String, amount: Int)
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]): Unit = {
......@@ -45,11 +53,11 @@ object StreamSQLExample {
Order(2L, "rubber", 3),
Order(4L, "beer", 1)))
// register the DataStream under the name "OrderA" and "OrderB"
// register the DataStreams under the name "OrderA" and "OrderB"
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
// Union two tables
// union the two tables
val result = tEnv.sql(
"SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT STREAM * FROM OrderB WHERE amount < 2")
......@@ -59,4 +67,10 @@ object StreamSQLExample {
env.execute()
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
case class Order(user: Long, product: String, amount: Int)
}
......@@ -23,11 +23,18 @@ import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* Simple example for demonstrating the use of Table API on Stream Table.
* Simple example for demonstrating the use of Table API on a Stream Table.
*
* This example shows how to:
* - Convert DataStreams to Tables
* - Apply union, select, and filter operations
*
*/
object StreamTableExample {
case class Order(user: Long, product: String, amount: Int)
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]): Unit = {
......@@ -45,6 +52,7 @@ object StreamTableExample {
Order(2L, "rubber", 3),
Order(4L, "beer", 1))).toTable(tEnv)
// union the two tables
val result: DataStream[Order] = orderA.unionAll(orderB)
.select('user, 'product, 'amount)
.where('amount > 2)
......@@ -55,4 +63,10 @@ object StreamTableExample {
env.execute()
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
case class Order(user: Long, product: String, amount: Int)
}
......@@ -54,9 +54,6 @@ import org.apache.flink.api.table.TableEnvironment
* o_orderdate;
* }}}
*
* Compared to the original TPC-H query this version does not sort the result by revenue
* and orderdate.
*
* Input files are plain text CSV files using the pipe character ('|') as field separator
* as generated by the TPC-H data generator which is available at
* [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
......@@ -73,6 +70,10 @@ import org.apache.flink.api.table.TableEnvironment
*/
object TPCHQuery3Table {
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
......
......@@ -22,10 +22,19 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
/**
* Simple example that shows how the Batch SQL used in Scala.
* Simple example that shows how the Batch SQL API is used in Scala.
*
* This example shows how to:
* - Convert DataSets to Tables
* - Register a Table under a name
* - Run a SQL query on the registered Table
*
*/
object WordCountSQL {
case class WC(word: String, count: Int)
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]): Unit = {
......@@ -34,10 +43,20 @@ object WordCountSQL {
val tEnv = TableEnvironment.getTableEnvironment(env)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
// register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
// run a SQL query on the Table and retrieve the result as a new Table
val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
table.toDataSet[WC].print()
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
case class WC(word: String, frequency: Long)
}
......@@ -23,11 +23,18 @@ import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
/**
* Simple example for demonstrating the use of the Table API for a Word Count.
*/
* Simple example for demonstrating the use of the Table API for a Word Count in Scala.
*
* This example shows how to:
* - Convert DataSets to Tables
* - Apply group, aggregate, select, and filter operations
*
*/
object WordCountTable {
case class WC(word: String, count: Int)
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]): Unit = {
......@@ -39,9 +46,17 @@ object WordCountTable {
val expr = input.toTable(tEnv)
val result = expr
.groupBy('word)
.select('word, 'count.sum as 'count)
.select('word, 'frequency.sum as 'frequency)
.filter('frequency === 2)
.toDataSet[WC]
result.print()
}
// *************************************************************************
// USER DATA TYPES
// *************************************************************************
case class WC(word: String, frequency: Long)
}
......@@ -33,7 +33,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.flink.examples.java.JavaTableExample.WC;
import org.apache.flink.examples.java.WordCountTable.WC;
import java.util.List;
......@@ -196,8 +196,8 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
Table filtered = table
.groupBy("word")
.select("word.count as count, word")
.filter("count = 2");
.select("word.frequency as frequency, word")
.filter("frequency = 2");
List<String> result = tableEnv.toDataSet(filtered, WC.class)
.map(new MapFunction<WC, String>() {
......
......@@ -195,11 +195,11 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
val expr = input.toTable(tEnv)
val result = expr
.groupBy('word)
.select('word, 'count.sum as 'count)
.filter('count === 2)
.select('word, 'frequency.sum as 'frequency)
.filter('frequency === 2)
.toDataSet[MyWC]
val mappedResult = result.map(w => (w.word, w.count * 10)).collect()
val mappedResult = result.map(w => (w.word, w.frequency * 10)).collect()
val expected = "(hello,20)\n" + "(hola,20)"
TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册