提交 cd1fbc07 编写于 作者: X Xingcan Cui 提交者: Fabian Hueske

[FLINK-8069] [table] Add preserving WatermarkStrategy.

This closes #5016.
上级 c697bc14
......@@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner}
import org.apache.flink.table.sources.wmstrategies.{PeriodicWatermarkAssigner, PunctuatedWatermarkAssigner, PreserveWatermarks}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
......@@ -134,6 +134,9 @@ class StreamTableSourceScan(
case p: PunctuatedWatermarkAssigner =>
val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
case _: PreserveWatermarks =>
// The watermarks have already been provided by the underlying DataStream.
ingestedTable
}
} else {
// No need to generate watermarks if no rowtime attribute is specified.
......
......@@ -60,3 +60,9 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
*/
def getWatermark(row: Row, timestamp: Long): Watermark
}
/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
class PreserveWatermarks extends WatermarkStrategy
object PreserveWatermarks {
val INSTANCE: PreserveWatermarks = new PreserveWatermarks
}
......@@ -27,6 +27,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
......@@ -35,6 +36,7 @@ import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.table.utils._
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.junit.Assert._
import org.junit.Test
......@@ -690,4 +692,53 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
"3,Mike,30000,true,3000")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@Test
def testRowtimeTableSourcePreserveWatermarks(): Unit = {
StreamITCase.testResults = mutable.MutableList()
val tableName = "MyTable"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
// rows with timestamps and watermarks
val data = Seq(
Right(1L),
Left(5L, Row.of(new JInt(1), new JLong(5), "A")),
Left(2L, Row.of(new JInt(2), new JLong(1), "B")),
Right(10L),
Left(8L, Row.of(new JInt(6), new JLong(8), "C")),
Right(20L),
Left(21L, Row.of(new JInt(6), new JLong(21), "D")),
Right(30L)
)
val fieldNames = Array("id", "rtime", "name")
val schema = new TableSchema(fieldNames, Array(Types.INT, Types.SQL_TIMESTAMP, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
val tableSource = new TestPreserveWMTableSource(schema, rowType, data, "rtime")
tEnv.registerTableSource(tableName, tableSource)
tEnv.scan(tableName)
.where('rtime.cast(Types.LONG) > 3L)
.select('id, 'name)
.toAppendStream[Row]
// append current watermark to each row to verify that original watermarks were preserved
.process(new ProcessFunction[Row, (Row, Long)] {
override def processElement(
value: Row,
ctx: ProcessFunction[Row, (Row, Long)]#Context,
out: Collector[(Row, Long)]): Unit = {
out.collect(value, ctx.timerService().currentWatermark())
}
})
.addSink(new StreamITCase.StringSink[(Row, Long)])
env.execute()
val expected = Seq("(1,A,1)", "(6,C,10)", "(6,D,20)")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
......@@ -27,9 +27,10 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks}
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
......@@ -199,3 +200,28 @@ class TestNestedProjectableTableSource(
s"read nested fields: ${readNestedFields.mkString(", ")})"
}
}
class TestPreserveWMTableSource[T](
tableSchema: TableSchema,
returnType: TypeInformation[T],
values: Seq[Either[(Long, T), Long]],
rowtime: String)
extends StreamTableSource[T]
with DefinedRowtimeAttributes {
override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
Collections.singletonList(new RowtimeAttributeDescriptor(
rowtime,
new ExistingField(rowtime),
PreserveWatermarks.INSTANCE))
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
execEnv.addSource(new EventTimeSourceFunction[T](values)).setParallelism(1).returns(returnType)
}
override def getReturnType: TypeInformation[T] = returnType
override def getTableSchema: TableSchema = tableSchema
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册