Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
5fef3c12
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
5fef3c12
编写于
3月 17, 2020
作者:
Y
yuzhao.cyz
提交者:
Jark Wu
3月 31, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-14338][table-planner-blink] Update files due to CALCITE-1824
* GROUP_ID translation was fixed
上级
8a6877d1
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
132 addition
and
123 deletion
+132
-123
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
...nner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
+38
-23
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
.../table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
+3
-0
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
...le/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
+7
-4
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
...che/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+48
-35
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
...che/flink/table/api/batch/sql/DistinctAggregateTest.scala
+2
-17
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
...g/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
+12
-35
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
...pache/flink/table/runtime/batch/sql/AggregateITCase.scala
+22
-9
未找到文件。
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml
浏览文件 @
5fef3c12
...
...
@@ -24,16 +24,27 @@ SELECT a, GROUP_ID() AS g, COUNT(*) as c FROM MyTable GROUP BY GROUPING SETS (a,
</Resource>
<Resource
name=
"planBefore"
>
<![CDATA[
LogicalAggregate(group=[{0}], groups=[[{0}, {}]], g=[GROUP_ID()], c=[COUNT()])
+- LogicalProject(a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
LogicalUnion(all=[true])
:- LogicalProject(a=[$0], g=[0:BIGINT], c=[$1])
: +- LogicalAggregate(group=[{0}], groups=[[{0}, {}]], c=[COUNT()])
: +- LogicalProject(a=[$0])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(a=[$0], g=[1:BIGINT], c=[$1])
+- LogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()])
+- LogicalProject(a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource
name=
"planAfter"
>
<![CDATA[
FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c])
+- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()])
+- FlinkLogicalExpand(projects=[{a=[$0], $e=[0]}, {a=[null], $e=[1]}])
FlinkLogicalUnion(all=[true])
:- FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c])
: +- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()])
: +- FlinkLogicalExpand(projects=[a, $e])
: +- FlinkLogicalCalc(select=[a])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- FlinkLogicalCalc(select=[a, 1:BIGINT AS g, c])
+- FlinkLogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()])
+- FlinkLogicalCalc(select=[a])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
...
...
@@ -56,16 +67,17 @@ FROM MyTable
</Resource>
<Resource
name=
"planBefore"
>
<![CDATA[
LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+- LogicalProject(b=[$1], c=[$2], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7])
+- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+- LogicalProject(b=[$1], c=[$2], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource
name=
"planAfter"
>
<![CDATA[
FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT) AS gid])
+- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
+- FlinkLogicalExpand(projects=[
{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}
])
+- FlinkLogicalExpand(projects=[
a, b, c, $e
])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
...
...
@@ -84,9 +96,10 @@ FROM MyTable
</Resource>
<Resource
name=
"planBefore"
>
<![CDATA[
LogicalAggregate(group=[{0}], a=[AVG($1)], g=[GROUP_ID()], gb=[GROUPING($0)], gib=[GROUPING_ID($0)])
+- LogicalProject(b=[$1], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
LogicalProject(b=[$0], a=[$1], g=[0:BIGINT], gb=[$2], gib=[$3])
+- LogicalAggregate(group=[{0}], a=[AVG($1)], gb=[GROUPING($0)], gib=[GROUPING_ID($0)])
+- LogicalProject(b=[$1], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource
name=
"planAfter"
>
...
...
@@ -107,16 +120,17 @@ GROUP BY GROUPING SETS (b, c)
</Resource>
<Resource
name=
"planBefore"
>
<![CDATA[
LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)], g=[GROUP_ID()])
+- LogicalProject(b=[$1], c=[$2], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT])
+- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)])
+- LogicalProject(b=[$1], c=[$2], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource
name=
"planAfter"
>
<![CDATA[
FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g])
+- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
+- FlinkLogicalExpand(projects=[
{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}
])
+- FlinkLogicalExpand(projects=[
a, b, c, $e
])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
...
...
@@ -139,7 +153,7 @@ LogicalProject(a=[$3], b=[$4], c=[$5])
<![CDATA[
FlinkLogicalCalc(select=[a_0 AS a, b_0 AS b, c_0 AS c])
+- FlinkLogicalAggregate(group=[{0, 1, 2, 3}], a=[COUNT($0)], b=[COUNT($4)], c=[COUNT($5)])
+- FlinkLogicalExpand(projects=[
{a=[$0], b=[$1], c=[null], $e=[1], b_0=[$1], c_0=[$2]}, {a=[$0], b=[null], c=[$2], $e=[2], b_0=[$1], c_0=[$2]}
])
+- FlinkLogicalExpand(projects=[
a, b, c, $e, b_0, c_0
])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
...
...
@@ -161,16 +175,17 @@ FROM MyTable
</Resource>
<Resource
name=
"planBefore"
>
<![CDATA[
LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+- LogicalProject(b=[$1], c=[$2], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7])
+- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)])
+- LogicalProject(b=[$1], c=[$2], a=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
<Resource
name=
"planAfter"
>
<![CDATA[
FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT) AS gid])
+- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)])
+- FlinkLogicalExpand(projects=[
{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[null], $e=[3]}
])
+- FlinkLogicalExpand(projects=[
a, b, c, $e
])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
...
...
@@ -193,7 +208,7 @@ LogicalProject(b=[$2], c=[$3])
<![CDATA[
FlinkLogicalCalc(select=[b_0 AS b, c_0 AS c])
+- FlinkLogicalAggregate(group=[{0, 1, 2}], b=[COUNT($3)], c=[COUNT($4)])
+- FlinkLogicalExpand(projects=[
{b=[$0], c=[null], $e=[1], b_0=[$0], c_0=[$1]}, {b=[null], c=[$1], $e=[2], b_0=[$0], c_0=[$1]}
])
+- FlinkLogicalExpand(projects=[
b, c, $e, b_0, c_0
])
+- FlinkLogicalCalc(select=[b, c])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
...
...
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala
浏览文件 @
5fef3c12
...
...
@@ -19,6 +19,7 @@
package
org.apache.flink.table.planner.plan.stream.sql.agg
import
org.apache.flink.api.scala._
import
org.apache.flink.table.api.TableException
import
org.apache.flink.table.api.scala._
import
org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import
org.apache.flink.table.planner.utils.
{
TableTestBase
,
TableTestUtil
}
...
...
@@ -354,6 +355,8 @@ class GroupingSetsTest extends TableTestBase {
@Test
def
testCALCITE1824
()
:
Unit
=
{
expectedException
.
expect
(
classOf
[
TableException
])
expectedException
.
expectMessage
(
"GROUPING SETS are currently not supported"
)
val
sqlQuery
=
"""
|SELECT deptno, GROUP_ID() AS g, COUNT(*) AS c
...
...
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala
浏览文件 @
5fef3c12
...
...
@@ -395,13 +395,16 @@ class GroupingSetsITCase extends BatchTestBase {
@Test
def
testCALCITE1824
()
:
Unit
=
{
// TODO:
// When "[CALCITE-1824] GROUP_ID returns wrong result" is fixed,
// there will be an extra row (null, 1, 14).
checkResult
(
"select deptno, group_id() as g, count(*) as c "
+
"from scott_emp group by grouping sets (deptno, (), ())"
,
Seq
(
row
(
10
,
0
,
3
),
row
(
20
,
0
,
5
),
row
(
30
,
0
,
6
),
row
(
null
,
0
,
14
))
Seq
(
row
(
10
,
0
,
3
),
row
(
10
,
1
,
3
),
row
(
20
,
0
,
5
),
row
(
20
,
1
,
5
),
row
(
30
,
0
,
6
),
row
(
30
,
1
,
6
),
row
(
null
,
0
,
14
))
)
}
...
...
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
浏览文件 @
5fef3c12
...
...
@@ -89,33 +89,33 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
" GROUP BY GROUPING SETS (f1, f2, ())"
;
String
expected
=
"1,null,1,
1
,1,0,1,0,2,1\n"
+
"
6,null,18,1,1,0,1,0,2,6
\n"
+
"
2,null,2,1,1,0,1,0,2,2
\n"
+
"4,null,8,
1
,1,0,1,0,2,4\n"
+
"5,null,13,
1
,1,0,1,0,2,5\n"
+
"
3,null,5,1,1,0,1,0,2,3
\n"
+
"null,Comment#1
1,17,2
,0,1,0,1,1,1\n"
+
"null,Comment#
8,14,2
,0,1,0,1,1,1\n"
+
"null,Comment#
2,8,2
,0,1,0,1,1,1\n"
+
"null,Comment#1
,7,2
,0,1,0,1,1,1\n"
+
"null,Comment#1
4,20,2
,0,1,0,1,1,1\n"
+
"null,Comment#
7,13,2
,0,1,0,1,1,1\n"
+
"null,Comment#
6,12,2
,0,1,0,1,1,1\n"
+
"null,Comment#
3,9,2
,0,1,0,1,1,1\n"
+
"null,Comment#
12,18,2
,0,1,0,1,1,1\n"
+
"null,Comment#
5,11,2
,0,1,0,1,1,1\n"
+
"null,Comment#
15,21,2
,0,1,0,1,1,1\n"
+
"null,Comment#
4,10,2
,0,1,0,1,1,1\n"
+
"null,
Hi,1,2
,0,1,0,1,1,1\n"
+
"null,Comment#
10,16,2
,0,1,0,1,1,1\n"
+
"null,
Hello world,3,2
,0,1,0,1,1,1\n"
+
"null,
I am fine.,5,2
,0,1,0,1,1,1\n"
+
"null,Hello world,
how are you?,4,2
,0,1,0,1,1,1\n"
+
"null,
Comment#9,15,2
,0,1,0,1,1,1\n"
+
"null,
Comment#13,19,2
,0,1,0,1,1,1\n"
+
"null,
Luke Skywalker,6,2
,0,1,0,1,1,1\n"
+
"null,
Hello,2,2
,0,1,0,1,1,1\n"
+
"1,null,1,
0
,1,0,1,0,2,1\n"
+
"
2,null,2,0,1,0,1,0,2,2
\n"
+
"
3,null,5,0,1,0,1,0,2,3
\n"
+
"4,null,8,
0
,1,0,1,0,2,4\n"
+
"5,null,13,
0
,1,0,1,0,2,5\n"
+
"
6,null,18,0,1,0,1,0,2,6
\n"
+
"null,Comment#1
,7,0
,0,1,0,1,1,1\n"
+
"null,Comment#
10,16,0
,0,1,0,1,1,1\n"
+
"null,Comment#
11,17,0
,0,1,0,1,1,1\n"
+
"null,Comment#1
2,18,0
,0,1,0,1,1,1\n"
+
"null,Comment#1
3,19,0
,0,1,0,1,1,1\n"
+
"null,Comment#
14,20,0
,0,1,0,1,1,1\n"
+
"null,Comment#
15,21,0
,0,1,0,1,1,1\n"
+
"null,Comment#
2,8,0
,0,1,0,1,1,1\n"
+
"null,Comment#
3,9,0
,0,1,0,1,1,1\n"
+
"null,Comment#
4,10,0
,0,1,0,1,1,1\n"
+
"null,Comment#
5,11,0
,0,1,0,1,1,1\n"
+
"null,Comment#
6,12,0
,0,1,0,1,1,1\n"
+
"null,
Comment#7,13,0
,0,1,0,1,1,1\n"
+
"null,Comment#
8,14,0
,0,1,0,1,1,1\n"
+
"null,
Comment#9,15,0
,0,1,0,1,1,1\n"
+
"null,
Hello world, how are you?,4,0
,0,1,0,1,1,1\n"
+
"null,Hello world,
3,0
,0,1,0,1,1,1\n"
+
"null,
Hello,2,0
,0,1,0,1,1,1\n"
+
"null,
Hi,1,0
,0,1,0,1,1,1\n"
+
"null,
I am fine.,5,0
,0,1,0,1,1,1\n"
+
"null,
Luke Skywalker,6,0
,0,1,0,1,1,1\n"
+
"null,null,11,0,0,0,0,0,0,21"
;
checkSql
(
query
,
expected
);
...
...
@@ -128,14 +128,27 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
" GROUP BY GROUPING SETS (f1, f2)"
;
String
expected
=
"6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n"
+
"null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n"
+
"null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n"
+
"null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n"
+
"null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n"
+
"null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n"
+
"null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n"
+
"null,Comment#1,7,2"
;
"1,Hi,1,0\n"
+
"2,Hello,2,0\n"
+
"2,null,3,0\n"
+
"3,I am fine.,5,0\n"
+
"3,Luke Skywalker,6,0\n"
+
"3,null,4,0\n"
+
"4,Comment#1,7,0\n"
+
"4,Comment#2,8,0\n"
+
"4,Comment#3,9,0\n"
+
"4,Comment#4,10,0\n"
+
"5,Comment#5,11,0\n"
+
"5,Comment#6,12,0\n"
+
"5,Comment#7,13,0\n"
+
"5,Comment#8,14,0\n"
+
"5,Comment#9,15,0\n"
+
"6,Comment#10,16,0\n"
+
"6,Comment#11,17,0\n"
+
"6,Comment#12,18,0\n"
+
"6,Comment#13,19,0\n"
+
"6,Comment#14,20,0\n"
+
"6,Comment#15,21,0"
;
checkSql
(
query
,
expected
);
}
...
...
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala
浏览文件 @
5fef3c12
...
...
@@ -57,13 +57,7 @@ class DistinctAggregateTest extends TableTestBase {
val
sqlQuery
=
"SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable"
val
left
=
unaryNode
(
"DataSetAggregate"
,
unaryNode
(
"DataSetCalc"
,
batchTableNode
(
table
),
term
(
"select"
,
"a"
)),
term
(
"select"
,
"MAX(a) AS EXPR$2"
))
val
right
=
unaryNode
(
val
expected
=
unaryNode
(
"DataSetAggregate"
,
unaryNode
(
"DataSetDistinct"
,
...
...
@@ -74,18 +68,9 @@ class DistinctAggregateTest extends TableTestBase {
),
term
(
"distinct"
,
"a"
)
),
term
(
"select"
,
"COUNT(a) AS EXPR$0"
,
"SUM(a) AS EXPR$1"
)
term
(
"select"
,
"COUNT(a) AS EXPR$0"
,
"SUM(a) AS EXPR$1"
,
"MAX(a) AS EXPR$2"
)
)
val
expected
=
unaryNode
(
"DataSetCalc"
,
binaryNode
(
"DataSetSingleRowJoin"
,
left
,
right
,
term
(
"where"
,
"true"
),
term
(
"join"
,
"EXPR$2"
,
"EXPR$0"
,
"EXPR$1"
),
term
(
"joinType"
,
"NestedLoopInnerJoin"
)),
term
(
"select"
,
"EXPR$0"
,
"EXPR$1"
,
"EXPR$2"
))
util
.
verifySql
(
sqlQuery
,
expected
)
}
...
...
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
浏览文件 @
5fef3c12
...
...
@@ -34,38 +34,15 @@ class GroupingSetsTest extends TableTestBase {
val
sqlQuery
=
"SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable "
+
"GROUP BY GROUPING SETS (b, c)"
val
aggregate
=
binaryNode
(
"DataSetUnion"
,
unaryNode
(
"DataSetCalc"
,
unaryNode
(
"DataSetAggregate"
,
unaryNode
(
"DataSetCalc"
,
batchTableNode
(
table
),
term
(
"select"
,
"b"
,
"a"
)
),
term
(
"groupBy"
,
"b"
),
term
(
"select"
,
"b"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"b"
,
"null:INTEGER AS c"
,
"a"
,
"1:BIGINT AS g"
)
),
val
aggregate
=
unaryNode
(
"DataSetCalc"
,
unaryNode
(
"DataSetCalc"
,
unaryNode
(
"DataSetAggregate"
,
unaryNode
(
"DataSetCalc"
,
batchTableNode
(
table
),
term
(
"select"
,
"c"
,
"a"
)
),
term
(
"groupBy"
,
"c"
),
term
(
"select"
,
"c"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"null:BIGINT AS b"
,
"c"
,
"a"
,
"2:BIGINT AS g"
)
"DataSetAggregate"
,
batchTableNode
(
table
),
term
(
"groupBy"
,
"b"
,
"c"
),
term
(
"select"
,
"b"
,
"c"
,
"AVG(a) AS a"
)
),
term
(
"all"
,
"true"
),
term
(
"union"
,
"b"
,
"c"
,
"a"
,
"g"
)
term
(
"select"
,
"b"
,
"c"
,
"a"
,
"0:BIGINT AS g"
)
)
util
.
verifySql
(
sqlQuery
,
aggregate
)
...
...
@@ -91,7 +68,7 @@ class GroupingSetsTest extends TableTestBase {
term
(
"groupBy"
,
"b"
,
"c"
),
term
(
"select"
,
"b"
,
"c"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"b"
,
"c"
,
"a"
,
"
3
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
"1:BIGINT AS gc"
,
term
(
"select"
,
"b"
,
"c"
,
"a"
,
"
0
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
"1:BIGINT AS gc"
,
"1:BIGINT AS gib"
,
"1:BIGINT AS gic"
,
"3:BIGINT AS gid"
)
)
...
...
@@ -107,7 +84,7 @@ class GroupingSetsTest extends TableTestBase {
term
(
"groupBy"
,
"b"
),
term
(
"select"
,
"b"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"b"
,
"null:INTEGER AS c"
,
"a"
,
"
1
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
term
(
"select"
,
"b"
,
"null:INTEGER AS c"
,
"a"
,
"
0
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
"0:BIGINT AS gc"
,
"1:BIGINT AS gib"
,
"0:BIGINT AS gic"
,
"2:BIGINT AS gid"
)
)
...
...
@@ -123,7 +100,7 @@ class GroupingSetsTest extends TableTestBase {
term
(
"groupBy"
,
"c"
),
term
(
"select"
,
"c"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"null:BIGINT AS b"
,
"c"
,
"a"
,
"
2
:BIGINT AS g"
,
"0:BIGINT AS gb"
,
term
(
"select"
,
"null:BIGINT AS b"
,
"c"
,
"a"
,
"
0
:BIGINT AS g"
,
"0:BIGINT AS gb"
,
"1:BIGINT AS gc"
,
"0:BIGINT AS gib"
,
"1:BIGINT AS gic"
,
"1:BIGINT AS gid"
)
)
...
...
@@ -185,7 +162,7 @@ class GroupingSetsTest extends TableTestBase {
term
(
"groupBy"
,
"b"
,
"c"
),
term
(
"select"
,
"b"
,
"c"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"b"
,
"c"
,
"a"
,
"
3
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
"1:BIGINT AS gc"
,
term
(
"select"
,
"b"
,
"c"
,
"a"
,
"
0
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
"1:BIGINT AS gc"
,
"1:BIGINT AS gib"
,
"1:BIGINT AS gic"
,
"3:BIGINT AS gid"
)
)
...
...
@@ -201,7 +178,7 @@ class GroupingSetsTest extends TableTestBase {
term
(
"groupBy"
,
"b"
),
term
(
"select"
,
"b"
,
"AVG(a) AS a"
)
),
term
(
"select"
,
"b"
,
"null:INTEGER AS c"
,
"a"
,
"
1
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
term
(
"select"
,
"b"
,
"null:INTEGER AS c"
,
"a"
,
"
0
:BIGINT AS g"
,
"1:BIGINT AS gb"
,
"0:BIGINT AS gc"
,
"1:BIGINT AS gib"
,
"0:BIGINT AS gic"
,
"2:BIGINT AS gid"
)
)
...
...
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
浏览文件 @
5fef3c12
...
...
@@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMod
import
org.apache.flink.table.utils.NonMergableCount
import
org.apache.flink.test.util.TestBaseUtils
import
org.apache.flink.types.Row
import
org.junit._
import
org.junit.runner.RunWith
import
org.junit.runners.Parameterized
...
...
@@ -232,7 +233,6 @@ class AggregateITCase(
@Test
def
testGroupingSetAggregate
()
:
Unit
=
{
val
env
=
ExecutionEnvironment
.
getExecutionEnvironment
val
tEnv
=
BatchTableEnvironment
.
create
(
env
,
config
)
...
...
@@ -245,14 +245,27 @@ class AggregateITCase(
val
result
=
tEnv
.
sqlQuery
(
sqlQuery
).
toDataSet
[
Row
].
collect
()
val
expected
=
"6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n"
+
"null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n"
+
"null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n"
+
"null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n"
+
"null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n"
+
"null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n"
+
"null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n"
+
"null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2"
"1,Hi,1,0\n"
+
"2,Hello world,3,0\n"
+
"2,Hello,2,0\n"
+
"3,Hello world, how are you?,4,0\n"
+
"3,I am fine.,5,0\n"
+
"3,Luke Skywalker,6,0\n"
+
"4,Comment#1,7,0\n"
+
"4,Comment#2,8,0\n"
+
"4,Comment#3,9,0\n"
+
"4,Comment#4,10,0\n"
+
"5,Comment#5,11,0\n"
+
"5,Comment#6,12,0\n"
+
"5,Comment#7,13,0\n"
+
"5,Comment#8,14,0\n"
+
"5,Comment#9,15,0\n"
+
"6,Comment#10,16,0\n"
+
"6,Comment#11,17,0\n"
+
"6,Comment#12,18,0\n"
+
"6,Comment#13,19,0\n"
+
"6,Comment#14,20,0\n"
+
"6,Comment#15,21,0"
TestBaseUtils
.
compareResultAsText
(
result
.
asJava
,
expected
)
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录