提交 cac9fa02 编写于 作者: F Fabian Hueske

[hotfix] [table] Disable event-time OVER RANGE UNBOUNDED PRECEDING window.

上级 fe2c61a2
...@@ -113,23 +113,28 @@ class DataStreamOverAggregate( ...@@ -113,23 +113,28 @@ class DataStreamOverAggregate(
if (overWindow.isRows) { if (overWindow.isRows) {
// ROWS clause bounded OVER window // ROWS clause bounded OVER window
throw new TableException( throw new TableException(
"ROWS clause bounded proc-time OVER window no supported yet.") "processing-time OVER ROWS PRECEDING window is not supported yet.")
} else { } else {
// RANGE clause bounded OVER window // RANGE clause bounded OVER window
throw new TableException( throw new TableException(
"RANGE clause bounded proc-time OVER window no supported yet.") "processing-time OVER RANGE PRECEDING window is not supported yet.")
} }
} else { } else {
throw new TableException( throw new TableException(
"OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + "processing-time OVER RANGE FOLLOWING window is not supported yet.")
"condition.")
} }
case _: RowTimeType => case _: RowTimeType =>
// row-time OVER window // row-time OVER window
if (overWindow.lowerBound.isPreceding && if (overWindow.lowerBound.isPreceding &&
overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
// unbounded preceding OVER window if (overWindow.isRows) {
createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) // unbounded preceding OVER ROWS window
createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
} else {
// unbounded preceding OVER RANGE window
throw new TableException(
"row-time OVER RANGE UNBOUNDED PRECEDING window is not supported yet.")
}
} else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) { } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window // bounded OVER window
if (overWindow.isRows) { if (overWindow.isRows) {
...@@ -138,11 +143,11 @@ class DataStreamOverAggregate( ...@@ -138,11 +143,11 @@ class DataStreamOverAggregate(
} else { } else {
// RANGE clause bounded OVER window // RANGE clause bounded OVER window
throw new TableException( throw new TableException(
"RANGE clause bounded row-time OVER window no supported yet.") "row-time OVER RANGE PRECEDING window is not supported yet.")
} }
} else { } else {
throw new TableException( throw new TableException(
"row-time OVER window only support CURRENT ROW condition.") "row-time OVER RANGE FOLLOWING window is not supported yet.")
} }
case _ => case _ =>
throw new TableException(s"Unsupported time type {$timeType}") throw new TableException(s"Unsupported time type {$timeType}")
......
...@@ -448,15 +448,15 @@ class SqlITCase extends StreamingWithStateTestBase { ...@@ -448,15 +448,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, c, " + val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (" + "SUM(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"count(b) over (" + "count(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"avg(b) over (" + "avg(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"max(b) over (" + "max(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"min(b) over (" + "min(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row) " + "partition by a order by rowtime() rows between unbounded preceding and current row) " +
"from T1" "from T1"
val data = Seq( val data = Seq(
...@@ -526,15 +526,15 @@ class SqlITCase extends StreamingWithStateTestBase { ...@@ -526,15 +526,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, c, " + val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (" + "SUM(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"count(b) over (" + "count(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"avg(b) over (" + "avg(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"max(b) over (" + "max(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row), " + "partition by a order by rowtime() rows between unbounded preceding and current row), " +
"min(b) over (" + "min(b) over (" +
"partition by a order by rowtime() range between unbounded preceding and current row) " + "partition by a order by rowtime() rows between unbounded preceding and current row) " +
"from T1" "from T1"
val data = Seq( val data = Seq(
...@@ -596,11 +596,11 @@ class SqlITCase extends StreamingWithStateTestBase { ...@@ -596,11 +596,11 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1) env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " + val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (order by rowtime() range between unbounded preceding and current row), " + "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"count(b) over (order by rowtime() range between unbounded preceding and current row), " + "count(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"avg(b) over (order by rowtime() range between unbounded preceding and current row), " + "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"max(b) over (order by rowtime() range between unbounded preceding and current row), " + "max(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"min(b) over (order by rowtime() range between unbounded preceding and current row) " + "min(b) over (order by rowtime() rows between unbounded preceding and current row) " +
"from T1" "from T1"
val data = Seq( val data = Seq(
...@@ -651,11 +651,11 @@ class SqlITCase extends StreamingWithStateTestBase { ...@@ -651,11 +651,11 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1) env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " + val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (order by rowtime() range between unbounded preceding and current row), " + "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"count(b) over (order by rowtime() range between unbounded preceding and current row), " + "count(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"avg(b) over (order by rowtime() range between unbounded preceding and current row), " + "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"max(b) over (order by rowtime() range between unbounded preceding and current row), " + "max(b) over (order by rowtime() rows between unbounded preceding and current row), " +
"min(b) over (order by rowtime() range between unbounded preceding and current row) " + "min(b) over (order by rowtime() rows between unbounded preceding and current row) " +
"from T1" "from T1"
val data = Seq( val data = Seq(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册