未验证 提交 e789bb71 编写于 作者: J Jingsong Lee 提交者: GitHub

[FLINK-16857][table-planner-blink] Support partition prune by getPartitions of source


This closes #11560
上级 ee1e83ce
......@@ -234,7 +234,7 @@ public class HiveTableSource implements
@Override
public List<Map<String, String>> getPartitions() {
throw new RuntimeException("This method is not expected to be called. " +
throw new UnsupportedOperationException(
"Please use Catalog API to retrieve all partitions of a table");
}
......
......@@ -107,12 +107,20 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(
inputFieldType.getFieldList.get(index).getType
}.map(FlinkTypeFactory.toLogicalType)
val partitionsFromSource = try {
Some(tableSource.getPartitions)
} catch {
case _: UnsupportedOperationException => None
}
def getAllPartitions: util.List[util.Map[String, String]] = {
catalogOption match {
case Some(c) =>
c.listPartitions(tableIdentifier.toObjectPath)
.map(_.getPartitionSpec).toList
case None => tableSource.getPartitions
partitionsFromSource match {
case Some(parts) => parts
case None => catalogOption match {
case Some(catalog) =>
catalog.listPartitions(tableIdentifier.toObjectPath).map(_.getPartitionSpec).toList
case None => throw new TableException(s"The $tableSource must be a catalog.")
}
}
}
......@@ -132,35 +140,39 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(
)
}
val remainingPartitions: util.List[util.Map[String, String]] = catalogOption match {
case Some(catalog) =>
val converter = new RexNodeToExpressionConverter(
inputFields,
context.getFunctionCatalog,
context.getCatalogManager,
TimeZone.getTimeZone(config.getLocalTimeZone))
def toExpressions: Option[Seq[Expression]] = {
val expressions = new mutable.ArrayBuffer[Expression]()
for (predicate <- partitionPredicates) {
predicate.accept(converter) match {
case Some(expr) => expressions.add(expr)
case None => return None
val remainingPartitions: util.List[util.Map[String, String]] = partitionsFromSource match {
case Some(_) => internalPartitionPrune()
case None =>
catalogOption match {
case Some(catalog) =>
val converter = new RexNodeToExpressionConverter(
inputFields,
context.getFunctionCatalog,
context.getCatalogManager,
TimeZone.getTimeZone(config.getLocalTimeZone))
def toExpressions: Option[Seq[Expression]] = {
val expressions = new mutable.ArrayBuffer[Expression]()
for (predicate <- partitionPredicates) {
predicate.accept(converter) match {
case Some(expr) => expressions.add(expr)
case None => return None
}
}
Some(expressions)
}
}
Some(expressions)
}
toExpressions match {
case Some(expressions) =>
try {
catalog
.listPartitionsByFilter(tableIdentifier.toObjectPath, expressions)
.map(_.getPartitionSpec)
} catch {
case _: UnsupportedOperationException => internalPartitionPrune()
toExpressions match {
case Some(expressions) =>
try {
catalog
.listPartitionsByFilter(tableIdentifier.toObjectPath, expressions)
.map(_.getPartitionSpec)
} catch {
case _: UnsupportedOperationException => internalPartitionPrune()
}
case None => internalPartitionPrune()
}
case None => internalPartitionPrune()
}
case None => internalPartitionPrune()
}
val newTableSource = tableSource.applyPartitionPruning(remainingPartitions)
......
......@@ -25,12 +25,16 @@ import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase, Te
import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.rules.FilterProjectTransposeRule
import org.apache.calcite.tools.RuleSets
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{Before, Test}
/**
* Test for [[PushPartitionIntoTableSourceScanRule]].
*/
class PushPartitionIntoTableSourceScanRuleTest extends TableTestBase {
@RunWith(classOf[Parameterized])
class PushPartitionIntoTableSourceScanRuleTest(
sourceFetchPartitions: Boolean) extends TableTestBase {
private val util = batchTestUtil()
@Before
......@@ -63,9 +67,9 @@ class PushPartitionIntoTableSourceScanRuleTest extends TableTestBase {
.build()
TestPartitionableSourceFactory.registerTableSource(util.tableEnv, "MyTable",
tableSchema = tableSchema, isBounded = true)
tableSchema = tableSchema, isBounded = true, sourceFetchPartitions = sourceFetchPartitions)
TestPartitionableSourceFactory.registerTableSource(util.tableEnv, "VirtualTable",
tableSchema = tableSchema2, isBounded = true)
tableSchema = tableSchema2, isBounded = true, sourceFetchPartitions = sourceFetchPartitions)
}
@Test
......@@ -161,3 +165,10 @@ class PushPartitionIntoTableSourceScanRuleTest extends TableTestBase {
}
}
object PushPartitionIntoTableSourceScanRuleTest {
@Parameterized.Parameters(name = "{0}")
def parameters(): java.util.Collection[Boolean] = {
java.util.Arrays.asList(true, false)
}
}
......@@ -605,7 +605,7 @@ class TestFilterableTableSourceFactory extends StreamTableSourceFactory[Row] {
class TestPartitionableTableSource(
override val isBounded: Boolean,
remainingPartitions: JList[JMap[String, String]],
isCatalogTable: Boolean)
sourceFetchPartitions: Boolean)
extends StreamTableSource[Row]
with PartitionableTableSource {
......@@ -626,8 +626,8 @@ class TestPartitionableTableSource(
)
override def getPartitions: JList[JMap[String, String]] = {
if (isCatalogTable) {
throw new RuntimeException("Should not expected.")
if (!sourceFetchPartitions) {
throw new UnsupportedOperationException()
}
List(
Map("part1" -> "A", "part2" -> "1").asJava,
......@@ -639,7 +639,7 @@ class TestPartitionableTableSource(
override def applyPartitionPruning(
remainingPartitions: JList[JMap[String, String]]): TableSource[_] = {
new TestPartitionableTableSource(isBounded, remainingPartitions, isCatalogTable)
new TestPartitionableTableSource(isBounded, remainingPartitions, sourceFetchPartitions)
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
......@@ -787,7 +787,7 @@ class TestPartitionableSourceFactory extends TableSourceFactory[Row] {
dp.putProperties(properties)
val isBounded = dp.getBoolean("is-bounded")
val isCatalogTable = dp.getBoolean("is-catalog-table")
val sourceFetchPartitions = dp.getBoolean("source-fetch-partitions")
val remainingPartitions = dp.getOptionalArray("remaining-partition",
new function.Function[String, util.Map[String, String]] {
override def apply(t: String): util.Map[String, String] = {
......@@ -800,7 +800,7 @@ class TestPartitionableSourceFactory extends TableSourceFactory[Row] {
new TestPartitionableTableSource(
isBounded,
remainingPartitions,
isCatalogTable)
sourceFetchPartitions)
}
}
......@@ -827,11 +827,11 @@ object TestPartitionableSourceFactory {
tableName: String,
isBounded: Boolean,
tableSchema: TableSchema = tableSchema,
remainingPartitions: JList[JMap[String, String]] = null): Unit = {
remainingPartitions: JList[JMap[String, String]] = null,
sourceFetchPartitions: Boolean = false): Unit = {
val properties = new DescriptorProperties()
properties.putString("is-bounded", isBounded.toString)
val isCatalogTable = true
properties.putBoolean("is-catalog-table", isCatalogTable)
properties.putBoolean("source-fetch-partitions", sourceFetchPartitions)
properties.putString(CONNECTOR_TYPE, "TestPartitionableSource")
if (remainingPartitions != null) {
remainingPartitions.zipWithIndex.foreach { case (part, i) =>
......@@ -854,19 +854,17 @@ object TestPartitionableSourceFactory {
val path = new ObjectPath(tEnv.getCurrentDatabase, tableName)
catalog.createTable(path, table, false)
if (isCatalogTable) {
val partitions = List(
Map("part1" -> "A", "part2" -> "1").asJava,
Map("part1" -> "A", "part2" -> "2").asJava,
Map("part1" -> "B", "part2" -> "3").asJava,
Map("part1" -> "C", "part2" -> "1").asJava
)
partitions.foreach(spec => catalog.createPartition(
path,
new CatalogPartitionSpec(new java.util.LinkedHashMap(spec)),
new CatalogPartitionImpl(Map[String, String](), ""),
true))
}
val partitions = List(
Map("part1" -> "A", "part2" -> "1").asJava,
Map("part1" -> "A", "part2" -> "2").asJava,
Map("part1" -> "B", "part2" -> "3").asJava,
Map("part1" -> "C", "part2" -> "1").asJava
)
partitions.foreach(spec => catalog.createPartition(
path,
new CatalogPartitionSpec(new java.util.LinkedHashMap(spec)),
new CatalogPartitionImpl(Map[String, String](), ""),
true))
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册