提交 fa7907ab 编写于 作者: H Haohui Mai 提交者: Fabian Hueske

[FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.

This closes #3693.
上级 63539475
......@@ -1439,7 +1439,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
<tbody>
<tr>
<td><code>TUMBLE(time_attr, interval)</code></td>
<td>Defines are tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
<td>Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
</tr>
<tr>
<td><code>HOP(time_attr, interval, interval)</code></td>
......@@ -1454,6 +1454,40 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que
For SQL queries on streaming tables, the `time_attr` argument of the group window function must be one of the `rowtime()` or `proctime()` time-indicators, which distinguish between event or processing time, respectively. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.
#### Selecting Group Window Start and End Timestamps
The start and end timestamps of group windows can be selected with the following auxiliary functions:
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 40%">Auxiliary Function</th>
<th class="text-left">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>TUMBLE_START(time_attr, interval)</code><br/>
<code>HOP_START(time_attr, interval, interval)</code><br/>
<code>SESSION_START(time_attr, interval)</code><br/>
</td>
<td>Returns the start timestamp of the corresponding tumbling, hopping, and session window.</td>
</tr>
<tr>
<td>
<code>TUMBLE_END(time_attr, interval)</code><br/>
<code>HOP_END(time_attr, interval, interval)</code><br/>
<code>SESSION_END(time_attr, interval)</code><br/>
</td>
<td>Returns the end timestamp of the corresponding tumbling, hopping, and session window.</td>
</tr>
</tbody>
</table>
Note that the auxiliary functions must be called with exactly same arguments as the group window function in the `GROUP BY` clause.
The following examples show how to specify SQL queries with group windows on streaming tables.
<div class="codetabs" markdown="1">
......@@ -1469,7 +1503,10 @@ tableEnv.registerDataStream("Orders", ds, "user, product, amount");
// compute SUM(amount) per day (in event-time)
Table result1 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
"SELECT user, " +
" TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart, " +
" SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
// compute SUM(amount) per day (in processing-time)
Table result2 = tableEnv.sql(
......@@ -1481,7 +1518,12 @@ Table result3 = tableEnv.sql(
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
Table result4 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
"SELECT user, " +
" SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart, " +
" SESSION_END(rowtime(), INTERVAL '12' HOUR) AS snd, " +
" SUM(amount) " +
"FROM Orders " +
"GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
{% endhighlight %}
</div>
......@@ -1498,7 +1540,14 @@ tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// compute SUM(amount) per day (in event-time)
val result1 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user")
"""
|SELECT
| user,
| TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart,
| SUM(amount)
| FROM Orders
| GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user
""".stripMargin)
// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sql(
......@@ -1510,7 +1559,15 @@ val result3 = tableEnv.sql(
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
val result4 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user")
"""
|SELECT
| user,
| SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart,
| SESSION_END(rowtime(), INTERVAL '12' HOUR) AS sEnd,
| SUM(amount)
| FROM Orders
| GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
""".stripMargin)
{% endhighlight %}
</div>
......
......@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.rules
import org.apache.calcite.rel.rules._
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.table.calcite.rules.FlinkAggregateExpandDistinctAggregatesRule
import org.apache.flink.table.plan.rules.common.WindowStartEndPropertiesRule
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
......@@ -38,7 +39,8 @@ object FlinkRuleSets {
ProjectToWindowRule.PROJECT,
// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE
DataSetLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE
)
/**
......@@ -136,6 +138,7 @@ object FlinkRuleSets {
val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
// Transform window to LogicalWindowAggregate
DataStreamLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE,
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.common
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.logical.LogicalProject
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import scala.collection.JavaConversions._
class WindowStartEndPropertiesRule
extends RelOptRule(
WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
"WindowStartEndPropertiesRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val project = call.rel(0).asInstanceOf[LogicalProject]
// project includes at least on group auxiliary function
project.getProjects.exists {
case c: RexCall => c.getOperator.isGroupAuxiliary
case _ => false
}
}
override def onMatch(call: RelOptRuleCall): Unit = {
val project = call.rel(0).asInstanceOf[LogicalProject]
val innerProject = call.rel(1).asInstanceOf[LogicalProject]
val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
// Retrieve window start and end properties
val transformed = call.builder()
val rexBuilder = transformed.getRexBuilder
transformed.push(LogicalWindowAggregate.create(
agg.getWindow,
Seq(
NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias.get)),
NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias.get))
), agg)
)
// forward window start and end properties
transformed.project(
innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
// replace window auxiliary function by access to window properties
transformed.project(
project.getProjects.map{ x =>
if (WindowStartEndPropertiesRule.isWindowStart(x)) {
// replace expression by access to window start
rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
// replace expression by access to window end
rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
} else {
// preserve expression
x
}
}
)
val res = transformed.build()
call.transformTo(res)
}
}
object WindowStartEndPropertiesRule {
private val WINDOW_EXPRESSION_RULE_PREDICATE =
RelOptRule.operand(classOf[LogicalProject],
RelOptRule.operand(classOf[LogicalProject],
RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
val INSTANCE = new WindowStartEndPropertiesRule
/** Checks if a RexNode is a window start auxiliary function. */
private def isWindowStart(node: RexNode): Boolean = {
node match {
case n: RexCall if n.getOperator.isGroupAuxiliary =>
n.getOperator match {
case SqlStdOperatorTable.TUMBLE_START |
SqlStdOperatorTable.HOP_START |
SqlStdOperatorTable.SESSION_START
=> true
case _ => false
}
case _ => false
}
}
/** Checks if a RexNode is a window end auxiliary function. */
private def isWindowEnd(node: RexNode): Boolean = {
node match {
case n: RexCall if n.getOperator.isGroupAuxiliary =>
n.getOperator match {
case SqlStdOperatorTable.TUMBLE_END |
SqlStdOperatorTable.HOP_END |
SqlStdOperatorTable.SESSION_END
=> true
case _ => false
}
case _ => false
}
}
}
......@@ -352,8 +352,14 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
EventTimeExtractor,
ProcTimeExtractor,
SqlStdOperatorTable.TUMBLE,
SqlStdOperatorTable.TUMBLE_START,
SqlStdOperatorTable.TUMBLE_END,
SqlStdOperatorTable.HOP,
SqlStdOperatorTable.SESSION
SqlStdOperatorTable.HOP_START,
SqlStdOperatorTable.HOP_END,
SqlStdOperatorTable.SESSION,
SqlStdOperatorTable.SESSION_START,
SqlStdOperatorTable.SESSION_END
)
builtInSqlOperators.foreach(register)
......
......@@ -63,7 +63,14 @@ class WindowAggregateTest extends TableTestBase {
util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
val sqlQuery =
"SELECT c, SUM(a) AS sumA, MIN(b) AS minB FROM T GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
"SELECT " +
" TUMBLE_START(ts, INTERVAL '4' MINUTE), " +
" TUMBLE_END(ts, INTERVAL '4' MINUTE), " +
" c, " +
" SUM(a) AS sumA, " +
" MIN(b) AS minB " +
"FROM T " +
"GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
val expected =
unaryNode(
......@@ -73,9 +80,10 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "c"),
term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 240000.millis)),
term("select", "c, SUM(a) AS sumA, MIN(b) AS minB")
term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
"start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "c, sumA, minB")
term("select", "CAST(w$start) AS w$start, CAST(w$end) AS w$end, c, sumA, minB")
)
util.verifySql(sqlQuery, expected)
......@@ -117,7 +125,12 @@ class WindowAggregateTest extends TableTestBase {
util.addTable[(Int, Long, String, Long, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
val sqlQuery =
"SELECT c, SUM(a) AS sumA, AVG(b) AS avgB " +
"SELECT " +
" c, " +
" HOP_END(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
" HOP_START(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), " +
" SUM(a) AS sumA, " +
" AVG(b) AS avgB " +
"FROM T " +
"GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), d, c"
......@@ -130,9 +143,10 @@ class WindowAggregateTest extends TableTestBase {
term("groupBy", "c, d"),
term("window",
EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 3600000.millis)),
term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB")
term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
"start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "c, sumA, avgB")
term("select", "c, CAST(w$end) AS w$end, CAST(w$start) AS w$start, sumA, avgB")
)
util.verifySql(sqlQuery, expected)
......@@ -171,7 +185,12 @@ class WindowAggregateTest extends TableTestBase {
util.addTable[(Int, Long, String, Int, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
val sqlQuery =
"SELECT c, d, SUM(a) AS sumA, MIN(b) AS minB " +
"SELECT " +
" c, d, " +
" SESSION_START(ts, INTERVAL '12' HOUR), " +
" SESSION_END(ts, INTERVAL '12' HOUR), " +
" SUM(a) AS sumA, " +
" MIN(b) AS minB " +
"FROM T " +
"GROUP BY SESSION(ts, INTERVAL '12' HOUR), c, d"
......@@ -183,9 +202,10 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "c, d"),
term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 43200000.millis)),
term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB")
term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
"start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "c, d, sumA, minB")
term("select", "c, d, CAST(w$start) AS w$start, CAST(w$end) AS w$end, sumA, minB")
)
util.verifySql(sqlQuery, expected)
......
......@@ -17,12 +17,10 @@
*/
package org.apache.flink.table.api.scala.stream.sql
import java.sql.Timestamp
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow, ProcessingTimeTumblingGroupWindow}
import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow}
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.junit.Test
......@@ -86,7 +84,14 @@ class WindowAggregateTest extends TableTestBase {
@Test
def testTumbleFunction() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
val sql =
"SELECT " +
" COUNT(*), " +
" TUMBLE_START(rowtime(), INTERVAL '15' MINUTE), " +
" TUMBLE_END(rowtime(), INTERVAL '15' MINUTE)" +
"FROM MyTable " +
"GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
val expected =
unaryNode(
"DataStreamCalc",
......@@ -98,17 +103,21 @@ class WindowAggregateTest extends TableTestBase {
term("select", "1970-01-01 00:00:00 AS $f0")
),
term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 900000.millis)),
term("select", "COUNT(*) AS EXPR$0")
term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "EXPR$0")
term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
@Test
def testHoppingFunction() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
"HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
val sql =
"SELECT COUNT(*), " +
" HOP_START(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
" HOP_END(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
"FROM MyTable " +
"GROUP BY HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
val expected =
unaryNode(
"DataStreamCalc",
......@@ -121,17 +130,22 @@ class WindowAggregateTest extends TableTestBase {
),
term("window", ProcessingTimeSlidingGroupWindow(Some('w$),
3600000.millis, 900000.millis)),
term("select", "COUNT(*) AS EXPR$0")
term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "EXPR$0")
term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
@Test
def testSessionFunction() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
"SESSION(proctime(), INTERVAL '15' MINUTE)"
val sql =
"SELECT " +
" COUNT(*), " +
" SESSION_START(proctime(), INTERVAL '15' MINUTE), " +
" SESSION_END(proctime(), INTERVAL '15' MINUTE) " +
"FROM MyTable " +
"GROUP BY SESSION(proctime(), INTERVAL '15' MINUTE)"
val expected =
unaryNode(
"DataStreamCalc",
......@@ -143,9 +157,9 @@ class WindowAggregateTest extends TableTestBase {
term("select", "1970-01-01 00:00:00 AS $f0")
),
term("window", ProcessingTimeSessionGroupWindow(Some('w$), 900000.millis)),
term("select", "COUNT(*) AS EXPR$0")
term("select", "COUNT(*) AS EXPR$0, start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "EXPR$0")
term("select", "EXPR$0, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册