提交 fa256b2a 编写于 作者: T Timo Walther

[FLINK-8897] [table] Reintroduce materialization of time attributes in filters

上级 e1d975d2
......@@ -23,6 +23,7 @@ import org.apache.calcite.rel.core._
import org.apache.calcite.rel.logical._
import org.apache.calcite.rel.{RelNode, RelShuttle}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
import org.apache.flink.table.api.{TableException, ValidationException}
......@@ -100,13 +101,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
override def visit(matchRel: LogicalMatch): RelNode = {
// visit children and update inputs
val input = matchRel.getInput.accept(this)
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
val materializer = new RexTimeIndicatorMaterializer(
rexBuilder,
input.getRowType.getFieldList.map(_.getType))
val materializer = createMaterializer(input)
// update input expressions
val patternDefs = matchRel.getPatternDefinitions.mapValues(_.accept(materializer))
......@@ -180,23 +175,16 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
override def visit(filter: LogicalFilter): RelNode = {
// visit children and update inputs
val input = filter.getInput.accept(this)
val materializer = createMaterializer(input)
// We do not materialize time indicators in conditions because they can be locally evaluated.
// Some conditions are evaluated by special operators (e.g., time window joins).
// Time indicators in remaining conditions are materialized by Calc before the code generation.
LogicalFilter.create(input, filter.getCondition)
val condition = filter.getCondition.accept(materializer)
LogicalFilter.create(input, condition)
}
override def visit(project: LogicalProject): RelNode = {
// visit children and update inputs
val input = project.getInput.accept(this)
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
val materializer = new RexTimeIndicatorMaterializer(
rexBuilder,
input.getRowType.getFieldList.map(_.getType))
val materializer = createMaterializer(input)
val projects = project.getProjects.map(_.accept(materializer))
val fieldNames = project.getRowType.getFieldNames
......@@ -206,8 +194,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
override def visit(join: LogicalJoin): RelNode = {
val left = join.getLeft.accept(this)
val right = join.getRight.accept(this)
val materializer = createMaterializer(left, right)
LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, join.getJoinType)
LogicalJoin.create(
left,
right,
join.getCondition.accept(materializer),
join.getVariablesSet,
join.getJoinType)
}
def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
......@@ -229,19 +223,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
case scan: LogicalTableFunctionScan =>
// visit children and update inputs
val scanInputs = scan.getInputs.map(_.accept(this))
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
val materializer = new RexTimeIndicatorMaterializer(
rexBuilder,
inputs.head.getRowType.getFieldList.map(_.getType))
val call = scan.getCall.accept(materializer)
val materializer = createMaterializer(inputs.head)
LogicalTableFunctionScan.create(
scan.getCluster,
scanInputs,
call,
scan.getCall.accept(materializer),
scan.getElementType,
scan.getRowType,
scan.getColumnMappings)
......@@ -369,6 +355,15 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
indicesToMaterialize.toSet
}
private def createMaterializer(inputs: RelNode*): RexTimeIndicatorMaterializer = {
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
new RexTimeIndicatorMaterializer(
rexBuilder,
inputs.flatMap(_.getRowType.getFieldList.map(_.getType)))
}
}
object RelTimeIndicatorConverter {
......@@ -412,11 +407,34 @@ object RelTimeIndicatorConverter {
* @return The expression with materialized time indicators.
*/
def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: RexBuilder): RexNode = {
// check if input field contains time indicator type
// materialize field if no time indicator is present anymore
// if input field is already materialized, change to timestamp type
val materializer = new RexTimeIndicatorMaterializer(
rexBuilder,
rowType.getFieldList.map(_.getType))
rexBuilder,
rowType.getFieldList.map(_.getType))
expr.accept(materializer)
}
/**
* Checks if the given call is a materialization call for either proctime or rowtime.
*/
def isMaterializationCall(call: RexCall): Boolean = {
val isProctimeCall: Boolean = {
call.getOperator == ProctimeSqlFunction &&
call.getOperands.size() == 1 &&
isProctimeIndicatorType(call.getOperands.get(0).getType)
}
val isRowtimeCall: Boolean = {
call.getOperator == SqlStdOperatorTable.CAST &&
call.getOperands.size() == 1 &&
isRowtimeIndicatorType(call.getOperands.get(0).getType) &&
call.getType.getSqlTypeName == SqlTypeName.TIMESTAMP
}
expr.accept(materializer)
isProctimeCall || isRowtimeCall
}
}
......
......@@ -73,7 +73,7 @@ class DataStreamJoinRule
val remainingPredsAccessTime = remainingPreds.isDefined &&
accessesTimeAttribute(remainingPreds.get, join.getRowType)
// Check that no event-time attributes are in the input because non-window join is unbounded
// Check that no event-time attributes are in the output because non-window join is unbounded
// and we don't know how much to hold back watermarks.
val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
......
......@@ -52,7 +52,7 @@ class DataStreamWindowJoinRule
if (windowBounds.get.isEventTime) {
true
} else {
// Check that no event-time attributes are in the input because the processing time window
// Check that no event-time attributes are in the output because the processing time window
// join does not correctly hold back watermarks.
// We rely on projection pushdown to remove unused attributes before the join.
!join.getRowType.getFieldList.asScala
......
......@@ -23,12 +23,14 @@ import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.{SqlKind, SqlOperatorTable}
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.functions.sql.ProctimeSqlFunction
import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
import org.apache.flink.types.Row
......@@ -380,13 +382,13 @@ object WindowJoinUtil {
*/
def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
expr match {
case c: RexCall if RelTimeIndicatorConverter.isMaterializationCall(c) =>
// replace with timestamp
rexBuilder.makeZeroLiteral(expr.getType)
case c: RexCall =>
// replace in call operands
val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava
rexBuilder.makeCall(c.getType, c.getOperator, newOps)
case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) =>
// replace with timestamp
rexBuilder.makeZeroLiteral(expr.getType)
case _ => expr
}
}
......
......@@ -21,6 +21,7 @@ import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.expressions.Null
import org.apache.flink.table.plan.logical.TumblingGroupWindow
import org.apache.flink.table.runtime.join.WindowJoinUtil
......@@ -29,6 +30,9 @@ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.junit.Assert._
import org.junit.Test
/**
* Tests for both windowed and non-windowed joins.
*/
class JoinTest extends TableTestBase {
private val streamUtil: StreamTableTestUtil = streamTestUtil()
streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
......@@ -62,8 +66,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "proctime")
),
term("where",
"AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
"<=(proctime, +(proctime0, 3600000)))"),
"AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " +
"<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
term("join", "a, proctime, a0, b, proctime0"),
term("joinType", "InnerJoin")
),
......@@ -100,8 +104,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 10000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, c, a0, b, c0"),
term("joinType", "InnerJoin")
),
......@@ -138,8 +142,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "proctime")
),
term("where",
"AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
"<=(proctime, +(proctime0, 3600000)))"),
"AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " +
"<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
term("join", "a, proctime, a0, b, proctime0"),
term("joinType", "InnerJoin")
),
......@@ -176,8 +180,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 600000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, c, a0, b, c0"),
term("joinType", "InnerJoin")
),
......@@ -208,7 +212,7 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "a", "b", "proctime")
),
term("where", "AND(=(a, a0), =(proctime, proctime0))"),
term("where", "AND(=(a, a0), =(PROCTIME(proctime), PROCTIME(proctime0)))"),
term("join", "a", "proctime", "a0", "b", "proctime0"),
term("joinType", "InnerJoin")
),
......@@ -238,7 +242,7 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "a", "b", "c")
),
term("where", "AND(=(a, a0), =(c, c0))"),
term("where", "AND(=(a, a0), =(CAST(c), CAST(c0)))"),
term("join", "a", "c", "a0", "b", "c0"),
term("joinType", "InnerJoin")
),
......@@ -280,8 +284,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "a", "c", "proctime", "12 AS nullField")
),
term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " +
"-(proctime0, 5000)), <=(proctime, +(proctime0, 5000)))"),
term("where", "AND(=(a, a0), =(nullField, nullField0), >=(PROCTIME(proctime), " +
"-(PROCTIME(proctime0), 5000)), <=(PROCTIME(proctime), +(PROCTIME(proctime0), 5000)))"),
term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"),
term("joinType", "InnerJoin")
),
......@@ -320,8 +324,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 600000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, b, c, a0, b0, c0"),
term("joinType", "InnerJoin")
),
......@@ -365,8 +369,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 600000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, b, c, a0, b0, c0"),
term("joinType", "InnerJoin")
),
......@@ -408,8 +412,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "proctime")
),
term("where",
"AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
"<=(proctime, +(proctime0, 3600000)))"),
"AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " +
"<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
term("join", "a, proctime, a0, b, proctime0"),
term("joinType", "LeftOuterJoin")
),
......@@ -446,8 +450,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 10000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, c, a0, b, c0"),
term("joinType", "LeftOuterJoin")
),
......@@ -485,8 +489,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "proctime")
),
term("where",
"AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
"<=(proctime, +(proctime0, 3600000)))"),
"AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " +
"<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
term("join", "a, proctime, a0, b, proctime0"),
term("joinType", "RightOuterJoin")
),
......@@ -523,8 +527,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 10000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, c, a0, b, c0"),
term("joinType", "RightOuterJoin")
),
......@@ -562,8 +566,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "proctime")
),
term("where",
"AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
"<=(proctime, +(proctime0, 3600000)))"),
"AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " +
"<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
term("join", "a, proctime, a0, b, proctime0"),
term("joinType", "FullOuterJoin")
),
......@@ -600,8 +604,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 10000)), " +
"<=(c, +(c0, 3600000)))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)))"),
term("join", "a, c, a0, b, c0"),
term("joinType", "FullOuterJoin")
),
......@@ -640,8 +644,8 @@ class JoinTest extends TableTestBase {
term("select", "a", "b", "c")
),
term("where",
"AND(=(a, a0), >=(c, -(c0, 10000)), " +
"<=(c, +(c0, 3600000)), LIKE(b, b0))"),
"AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
"<=(CAST(c), +(CAST(c0), 3600000)), LIKE(b, b0))"),
term("join", "a, b, c, a0, b0, c0"),
// Since we filter on attributes b and b0 after the join, the full outer join
// will be automatically optimized to inner join.
......@@ -795,7 +799,9 @@ class JoinTest extends TableTestBase {
"SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
val resultTable = streamUtil.tableEnv.sqlQuery(query)
val relNode = resultTable.getRelNode
val relNode = RelTimeIndicatorConverter.convert(
resultTable.getRelNode,
streamUtil.tableEnv.getRelBuilder.getRexBuilder)
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val (windowBounds, _) =
WindowJoinUtil.extractWindowBoundsFromPredicate(
......@@ -1008,7 +1014,9 @@ class JoinTest extends TableTestBase {
expectCondStr: String): Unit = {
val resultTable = streamUtil.tableEnv.sqlQuery(query)
val relNode = resultTable.getRelNode
val relNode = RelTimeIndicatorConverter.convert(
resultTable.getRelNode,
streamUtil.tableEnv.getRelBuilder.getRexBuilder)
val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
val (_, remainCondition) =
WindowJoinUtil.extractWindowBoundsFromPredicate(
......
......@@ -27,7 +27,7 @@ import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
/**
* Currently only time-windowed joins can be processed in a streaming fashion.
* Tests for both windowed and non-windowed joins.
*/
class JoinTest extends TableTestBase {
......@@ -57,8 +57,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rrtime")
),
term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
" <(lrtime, +(rrtime, 3000)))"),
term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," +
" <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
term("join", "a", "lrtime", "d", "e", "rrtime"),
term("joinType", "InnerJoin")
),
......@@ -92,7 +92,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rptime")
),
term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"),
term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " +
"<(PROCTIME(lptime), PROCTIME(rptime)))"),
term("join", "a", "lptime", "d", "e", "rptime"),
term("joinType", "InnerJoin")
),
......@@ -126,7 +127,7 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rptime")
),
term("where", "AND(=(a, d), =(lptime, rptime))"),
term("where", "AND(=(a, d), =(PROCTIME(lptime), PROCTIME(rptime)))"),
term("join", "a", "lptime", "d", "e", "rptime"),
term("joinType", "InnerJoin")
),
......@@ -153,7 +154,8 @@ class JoinTest extends TableTestBase {
streamTableNode(0),
streamTableNode(1),
term("where",
"AND(=(a, d), >=(lrtime, -(rrtime, 300000)), <(lrtime, rrtime), >(lrtime, " + "f))"),
"AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000)), " +
"<(CAST(lrtime), CAST(rrtime)), >(CAST(lrtime), f))"),
term("join", "a", "b", "c", "lrtime", "d", "e", "f", "rrtime"),
term("joinType", "InnerJoin")
)
......@@ -188,8 +190,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rrtime")
),
term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
" <(lrtime, +(rrtime, 3000)))"),
term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," +
" <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
term("join", "a", "lrtime", "d", "e", "rrtime"),
term("joinType", "LeftOuterJoin")
),
......@@ -223,7 +225,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rptime")
),
term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"),
term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " +
"<(PROCTIME(lptime), PROCTIME(rptime)))"),
term("join", "a", "lptime", "d", "e", "rptime"),
term("joinType", "LeftOuterJoin")
),
......@@ -260,8 +263,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rrtime")
),
term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
" <(lrtime, +(rrtime, 3000)))"),
term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," +
" <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
term("join", "a", "lrtime", "d", "e", "rrtime"),
term("joinType", "RightOuterJoin")
),
......@@ -295,7 +298,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rptime")
),
term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"),
term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " +
"<(PROCTIME(lptime), PROCTIME(rptime)))"),
term("join", "a", "lptime", "d", "e", "rptime"),
term("joinType", "RightOuterJoin")
),
......@@ -332,8 +336,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rrtime")
),
term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
" <(lrtime, +(rrtime, 3000)))"),
term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," +
" <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
term("join", "a", "lrtime", "d", "e", "rrtime"),
term("joinType", "FullOuterJoin")
),
......@@ -367,7 +371,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rptime")
),
term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"),
term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " +
"<(PROCTIME(lptime), PROCTIME(rptime)))"),
term("join", "a", "lptime", "d", "e", "rptime"),
term("joinType", "FullOuterJoin")
),
......@@ -402,8 +407,8 @@ class JoinTest extends TableTestBase {
streamTableNode(1),
term("select", "d", "e", "rrtime")
),
term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
" <(lrtime, +(rrtime, 3000)))"),
term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," +
" <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
term("join", "a", "lrtime", "d", "e", "rrtime"),
// Since we filter on attributes of the left table after the join, the left outer join
// will be automatically optimized to inner join.
......
......@@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
"DataStreamCalc",
streamTableNode(0),
term("select", "rowtime"),
term("where", ">(rowtime, 1990-12-02 12:11:11)")
term("where", ">(CAST(rowtime), 1990-12-02 12:11:11)")
)
util.verifyTable(result, expected)
......
......@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.stream
import java.lang.{Integer => JInt, Long => JLong}
import java.math.BigDecimal
import java.sql.Timestamp
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
......@@ -661,6 +662,51 @@ class TimeAttributesITCase extends AbstractTestBase {
)
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@Test
def testMaterializedRowtimeFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
val data = new mutable.MutableList[(String, Timestamp, Int)]
data.+=(("ACME", new Timestamp(1000L), 12))
data.+=(("ACME", new Timestamp(2000L), 17))
data.+=(("ACME", new Timestamp(3000L), 13))
data.+=(("ACME", new Timestamp(4000L), 11))
val t = env.fromCollection(data)
.assignAscendingTimestamps(e => e._2.toInstant.toEpochMilli)
.toTable(tEnv, 'symbol, 'tstamp.rowtime, 'price)
tEnv.registerTable("Ticker", t)
val sqlQuery =
s"""
|SELECT *
|FROM (
| SELECT symbol, SUM(price) as price,
| TUMBLE_ROWTIME(tstamp, interval '1' second) as rowTime,
| TUMBLE_START(tstamp, interval '1' second) as startTime,
| TUMBLE_END(tstamp, interval '1' second) as endTime
| FROM Ticker
| GROUP BY symbol, TUMBLE(tstamp, interval '1' second)
|)
|WHERE startTime < endTime
|""".stripMargin
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
val expected = List(
"ACME,12,1970-01-01 00:00:01.999,1970-01-01 00:00:01.0,1970-01-01 00:00:02.0",
"ACME,17,1970-01-01 00:00:02.999,1970-01-01 00:00:02.0,1970-01-01 00:00:03.0",
"ACME,13,1970-01-01 00:00:03.999,1970-01-01 00:00:03.0,1970-01-01 00:00:04.0",
"ACME,11,1970-01-01 00:00:04.999,1970-01-01 00:00:04.0,1970-01-01 00:00:05.0")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
object TimeAttributesITCase {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册