提交 05088b4a 编写于 作者: X xccui 提交者: twalthr

[FLINK-6368] [table] Grouping keys in stream aggregations have wrong order

This closes #3768.
上级 bc6409d6
......@@ -101,7 +101,6 @@ class DataStreamAggregate(
override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val groupingKeys = grouping.indices.toArray
val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
......@@ -124,15 +123,15 @@ class DataStreamAggregate(
inputDS.getType)
// grouped / keyed aggregation
if (groupingKeys.length > 0) {
if (grouping.length > 0) {
val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
window,
groupingKeys.length,
grouping.length,
namedAggregates.size,
rowRelDataType.getFieldCount,
namedProperties)
val keyedStream = inputDS.keyBy(groupingKeys: _*)
val keyedStream = inputDS.keyBy(grouping: _*)
val windowedStream =
createKeyedWindowedStream(window, keyedStream)
.asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
......
......@@ -213,6 +213,33 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
"Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@Test
def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
val stream = env
.fromCollection(data)
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
.map(t => (t._2, t._6))
val table = stream.toTable(tEnv, 'int, 'string)
val windowedTable = table
.window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
.groupBy('w, 'string)
.select('string, 'int.count, 'w.start, 'w.end)
val results = windowedTable.toDataStream[Row]
results.addSink(new StreamITCase.StringSink)
env.execute()
val expected = Seq(
"Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
"Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
object DataStreamAggregateITCase {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册