From 123c637e804bfdd6569051cf705ec73b5cb95352 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Thu, 21 Jul 2016 00:31:01 +0800 Subject: [PATCH] [FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples This closes #2274. --- .../flink/examples/java/JavaSQLExample.java | 72 +++++++++++++++++++ .../examples/scala/StreamSQLExample.scala | 62 ++++++++++++++++ .../examples/scala/StreamTableExample.scala | 58 +++++++++++++++ .../flink/examples/scala/WordCountSQL.scala | 43 +++++++++++ 4 files changed, 235 insertions(+) create mode 100644 flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala create mode 100644 flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java new file mode 100644 index 00000000000..bbac94ad00e --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaSQLExample.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.java; + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.table.TableEnvironment; + +/** + * Simple example that shows how the Batch SQL used in Java. + */ +public class JavaSQLExample { + + public static class WC { + public String word; + public long frequency; + + // Public constructor to make it a Flink POJO + public WC() { + + } + + public WC(String word, long frequency) { + this.word = word; + this.frequency = frequency; + } + + @Override + public String toString() { + return "WC " + word + " " + frequency; + } + } + + public static void main(String[] args) throws Exception { + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + + DataSet input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + // register the DataSet as table "WordCount" + tableEnv.registerDataSet("WordCount", input, "word, frequency"); + // run a SQL query on the Table and retrieve the result as a new Table + Table table = tableEnv.sql( + "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); + + DataSet result = tableEnv.toDataSet(table, WC.class); + + result.print(); + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala new file mode 100644 index 00000000000..8eed77d17e2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Simple example for demonstrating the use of SQL on Stream Table. + */ +object StreamSQLExample { + + case class Order(user: Long, product: String, amount: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA: DataStream[Order] = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))) + + val orderB: DataStream[Order] = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))) + + // register the DataStream under the name "OrderA" and "OrderB" + tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount) + tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) + + // Union two tables + val result = tEnv.sql( + "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " + + "SELECT STREAM * FROM OrderB WHERE amount < 2") + + result.toDataStream[Order].print() + + env.execute() + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala new file mode 100644 index 00000000000..9081f508506 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Simple example for demonstrating the use of Table API on Stream Table. + */ +object StreamTableExample { + + case class Order(user: Long, product: String, amount: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))).toTable(tEnv) + + val orderB = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))).toTable(tEnv) + + val result: DataStream[Order] = orderA.unionAll(orderB) + .select('user, 'product, 'amount) + .where('amount > 2) + .toDataStream[Order] + + result.print() + + env.execute() + } + +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala new file mode 100644 index 00000000000..41efffc9ec6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment + +/** + * Simple example that shows how the Batch SQL used in Scala. + */ +object WordCountSQL { + case class WC(word: String, count: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + tEnv.registerDataSet("WordCount", input, 'word, 'frequency) + + val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") + + table.toDataSet[WC].print() + } +} -- GitLab