提交 62ebda3f 编写于 作者: X Xpray 提交者: twalthr

[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

This closes #4658.
上级 2d393e88
......@@ -18,6 +18,9 @@
package org.apache.flink.table.calcite
import java.util
import java.util.List
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`._
......@@ -244,6 +247,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
canonize(newType)
}
private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = {
val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY)
val nullable = types.asScala.exists(
sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
)
if (hasAny) {
if (types.get(0).isInstanceOf[GenericRelDataType] &&
types.get(1).isInstanceOf[GenericRelDataType]) {
createTypeWithNullability(types.get(0), nullable)
} else {
throw new RuntimeException("only GenericRelDataType of ANY is supported")
}
} else {
null
}
}
override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
assert(types != null)
assert(types.size >= 1)
val type0 = types.get(0)
if (type0.getSqlTypeName != null) {
val resultType = resolveAnySqlType(types)
if (resultType != null) {
resultType
} else {
super.leastRestrictive(types)
}
} else {
super.leastRestrictive(types)
}
}
}
object FlinkTypeFactory {
......
......@@ -70,4 +70,21 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
val expected = mutable.MutableList("Hi", "Hallo")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@Test
def testUnionWithAnyType(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
}
class NODE {
val x = new java.util.HashMap[String, String]()
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册