提交 123c637e 编写于 作者: J Jark Wu 提交者: twalthr

[FLINK-4180] [FLINK-4181] [table] add Batch SQL and Stream SQL and Stream Table API examples

This closes #2274.
上级 45df1b25
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.java;
import org.apache.flink.api.table.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.BatchTableEnvironment;
import org.apache.flink.api.table.TableEnvironment;
/**
* Simple example that shows how the Batch SQL used in Java.
*/
public class JavaSQLExample {
public static class WC {
public String word;
public long frequency;
// Public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
public static void main(String[] args) throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
// register the DataSet as table "WordCount"
tableEnv.registerDataSet("WordCount", input, "word, frequency");
// run a SQL query on the Table and retrieve the result as a new Table
Table table = tableEnv.sql(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tableEnv.toDataSet(table, WC.class);
result.print();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* Simple example for demonstrating the use of SQL on Stream Table.
*/
object StreamSQLExample {
case class Order(user: Long, product: String, amount: Int)
def main(args: Array[String]): Unit = {
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(3L, "rubber", 2)))
val orderB: DataStream[Order] = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(2L, "rubber", 3),
Order(4L, "beer", 1)))
// register the DataStream under the name "OrderA" and "OrderB"
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
// Union two tables
val result = tEnv.sql(
"SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT STREAM * FROM OrderB WHERE amount < 2")
result.toDataStream[Order].print()
env.execute()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* Simple example for demonstrating the use of Table API on Stream Table.
*/
object StreamTableExample {
case class Order(user: Long, product: String, amount: Int)
def main(args: Array[String]): Unit = {
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val orderA = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(3L, "rubber", 2))).toTable(tEnv)
val orderB = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(2L, "rubber", 3),
Order(4L, "beer", 1))).toTable(tEnv)
val result: DataStream[Order] = orderA.unionAll(orderB)
.select('user, 'product, 'amount)
.where('amount > 2)
.toDataStream[Order]
result.print()
env.execute()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
/**
* Simple example that shows how the Batch SQL used in Scala.
*/
object WordCountSQL {
case class WC(word: String, count: Int)
def main(args: Array[String]): Unit = {
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
table.toDataSet[WC].print()
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册