提交 0695929b 编写于 作者: X Xingcan Cui 提交者: twalthr

[FLINK-8258] [table] Enable query configuration for batch queries

This closes #5169.
上级 e2053658
......@@ -68,6 +68,8 @@ abstract class BatchTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataSetTable_[0-9]+$".r
override def queryConfig: BatchQueryConfig = new BatchQueryConfig
/**
* Checks if the chosen table name is valid.
*
......@@ -169,9 +171,9 @@ abstract class BatchTableEnvironment(
sink: TableSink[T],
queryConfig: QueryConfig): Unit = {
// We do not pass the configuration on, because there is nothing to configure for batch queries.
queryConfig match {
case _: BatchQueryConfig =>
// Check the query configuration to be a batch one.
val batchQueryConfig = queryConfig match {
case batchConfig: BatchQueryConfig => batchConfig
case _ =>
throw new TableException("BatchQueryConfig required to configure batch query.")
}
......@@ -180,7 +182,7 @@ abstract class BatchTableEnvironment(
case batchSink: BatchTableSink[T] =>
val outputType = sink.getOutputType
// translate the Table into a DataSet and provide the type that the TableSink expects.
val result: DataSet[T] = translate(table)(outputType)
val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
// Give the DataSet to the TableSink to emit it.
batchSink.emitDataSet(result)
case _ =>
......@@ -230,7 +232,8 @@ abstract class BatchTableEnvironment(
private[flink] def explain(table: Table, extended: Boolean): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new GenericTypeInfo(classOf[Row]))
val dataSet = translate[Row](optimizedPlan, ast.getRowType, queryConfig) (
new GenericTypeInfo (classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
......@@ -372,10 +375,12 @@ abstract class BatchTableEnvironment(
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
protected def translate[A](
table: Table,
queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
val relNode = table.getRelNode
val dataSetPlan = optimize(relNode)
translate(dataSetPlan, relNode.getRowType)
translate(dataSetPlan, relNode.getRowType, queryConfig)
}
/**
......@@ -390,13 +395,13 @@ abstract class BatchTableEnvironment(
*/
protected def translate[A](
logicalPlan: RelNode,
logicalType: RelDataType)
(implicit tpe: TypeInformation[A]): DataSet[A] = {
logicalType: RelDataType,
queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
TableEnvironment.validateType(tpe)
logicalPlan match {
case node: DataSetRel =>
val plan = node.translateToPlan(this)
val plan = node.translateToPlan(this, queryConfig)
val conversion =
getConversionMapper(
plan.getType,
......
......@@ -143,7 +143,8 @@ class BatchTableEnvironment(
* @return The converted [[DataSet]].
*/
def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
translate[T](table)(TypeExtractor.createTypeInfo(clazz))
// Use the default query config.
translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
}
/**
......@@ -160,7 +161,50 @@ class BatchTableEnvironment(
* @return The converted [[DataSet]].
*/
def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
translate[T](table)(typeInfo)
// Use the default batch query config.
translate[T](table, queryConfig)(typeInfo)
}
/**
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
* @param clazz The class of the type of the resulting [[DataSet]].
* @param queryConfig The configuration for the query to generate.
* @tparam T The type of the resulting [[DataSet]].
* @return The converted [[DataSet]].
*/
def toDataSet[T](
table: Table,
clazz: Class[T],
queryConfig: BatchQueryConfig): DataSet[T] = {
translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
}
/**
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]].
* @param queryConfig The configuration for the query to generate.
* @tparam T The type of the resulting [[DataSet]].
* @return The converted [[DataSet]].
*/
def toDataSet[T](
table: Table,
typeInfo: TypeInformation[T],
queryConfig: BatchQueryConfig): DataSet[T] = {
translate[T](table, queryConfig)(typeInfo)
}
/**
......
......@@ -137,7 +137,25 @@ class BatchTableEnvironment(
* @return The converted [[DataSet]].
*/
def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
// Use the default batch query config.
wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}
/**
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataSet]].
* @return The converted [[DataSet]].
*/
def toDataSet[T: TypeInformation](table: Table, queryConfig: BatchQueryConfig): DataSet[T] = {
wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}
/**
......
......@@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, Table, TableException}
import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
......@@ -54,6 +54,29 @@ class TableConversions(table: Table) {
}
}
/**
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataSet]].
* @return The converted [[DataSet]].
*/
def toDataSet[T: TypeInformation](queryConfig: BatchQueryConfig): DataSet[T] = {
table.tableEnv match {
case tEnv: ScalaBatchTableEnv =>
tEnv.toDataSet(table, queryConfig)
case _ =>
throw new TableException(
"Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
}
}
/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
......
......@@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException, Types}
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException, Types}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.RowSchema
......@@ -78,7 +78,9 @@ class BatchTableSourceScan(
)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val fieldIndexes = TableSourceUtil.computeIndexMapping(
tableSource,
......
......@@ -27,7 +27,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.plan.nodes.CommonAggregate
......@@ -86,10 +86,12 @@ class DataSetAggregate(
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val input = inputNode.asInstanceOf[DataSetRel]
val inputDS = input.translateToPlan(tableEnv)
val inputDS = input.translateToPlan(tableEnv, queryConfig)
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
......
......@@ -27,7 +27,7 @@ import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.plan.nodes.CommonCalc
......@@ -82,11 +82,13 @@ class DataSetCalc(
estimateRowCount(calcProgram, rowCnt)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val generator = new FunctionCodeGenerator(config, false, inputDS.getType)
......
......@@ -26,9 +26,7 @@ import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
......@@ -94,12 +92,14 @@ class DataSetCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val config = tableEnv.getConfig
// we do not need to specify input type
val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
......
......@@ -24,7 +24,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.runtime.aggregate.DistinctReduce
import org.apache.flink.types.Row
......@@ -76,9 +76,11 @@ class DataSetDistinct(
rowType.getFieldList.asScala.map(_.getName).mkString(", ")
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val groupKeys = (0 until rowRelDataType.getFieldCount).toArray // group on all fields
inputDS
......
......@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.runtime.IntersectCoGroupFunction
import org.apache.flink.types.Row
......@@ -74,10 +74,12 @@ class DataSetIntersect(
}
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val coGroupedDs = leftDataSet.coGroup(rightDataSet)
......
......@@ -34,7 +34,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableException, Types}
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableConfig, TableException, Types}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.nodes.CommonJoin
......@@ -115,7 +115,9 @@ class DataSetJoin(
planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val config = tableEnv.getConfig
......@@ -160,8 +162,8 @@ class DataSetJoin(
})
}
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
joinType match {
case JoinRelType.INNER =>
......
......@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.runtime.MinusCoGroupFunction
import org.apache.flink.types.Row
......@@ -85,10 +85,12 @@ class DataSetMinus(
rowCnt
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val coGroupedDs = leftDataSet.coGroup(rightDataSet)
......
......@@ -19,7 +19,7 @@
package org.apache.flink.table.plan.nodes.dataset
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.types.Row
......@@ -29,8 +29,9 @@ trait DataSetRel extends FlinkRelNode {
* Translates the [[DataSetRel]] node into a [[DataSet]] operator.
*
* @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
* @param queryConfig The configuration for the query to generate.
* @return DataSet of type [[Row]]
*/
def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row]
def translateToPlan(tableEnv: BatchTableEnvironment, queryConfig: BatchQueryConfig): DataSet[Row]
}
......@@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema}
import org.apache.flink.types.Row
......@@ -59,7 +59,9 @@ class DataSetScan(
)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val schema = new RowSchema(rowRelDataType)
val inputDataSet: DataSet[Any] = dataSetTable.dataSet
val fieldIdxs = dataSetTable.fieldIndexes
......
......@@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexNode
import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableConfig}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
......@@ -91,10 +91,12 @@ class DataSetSingleRowJoin(
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val broadcastSetName = "joinSet"
val mapSideJoin = generateMapFunction(
tableEnv.getConfig,
......@@ -127,8 +129,8 @@ class DataSetSingleRowJoin(
val isOuterJoin = joinType match {
case JoinRelType.LEFT | JoinRelType.RIGHT => true
case _ => false
}
}
val codeGenerator = new FunctionCodeGenerator(
config,
isOuterJoin,
......
......@@ -21,14 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset
import java.util
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException}
import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
import org.apache.flink.types.Row
import org.apache.flink.table.plan.nodes.CommonSort
......@@ -80,7 +78,9 @@ class DataSetSort(
}
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
if (fieldCollations.isEmpty) {
throw TableException("Limiting the result without sorting is not allowed " +
......@@ -89,7 +89,7 @@ class DataSetSort(
val config = tableEnv.getConfig
val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
var partitionedDs = if (currentParallelism == 1) {
......@@ -132,17 +132,17 @@ class DataSetSort(
private val fieldCollations = collations.getFieldCollations.asScala
.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
override def toString: String = {
sortToString(getRowType, collations, offset, fetch)
}
override def explainTerms(pw: RelWriter) : RelWriter = {
sortExplainTerms(
override def explainTerms(pw: RelWriter): RelWriter = {
sortExplainTerms(
super.explainTerms(pw),
getRowType,
collations,
offset,
getRowType,
collations,
offset,
fetch)
}
}
......@@ -23,7 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
......@@ -77,10 +77,12 @@ class DataSetUnion(
getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
leftDataSet.union(rightDataSet)
}
......
......@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.InputFormatCodeGenerator
import org.apache.flink.table.runtime.io.ValuesInputFormat
......@@ -66,7 +66,9 @@ class DataSetValues(
super.explainTerms(pw).item("values", valuesFieldsToString)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val config = tableEnv.getConfig
......
......@@ -25,7 +25,7 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.AggregationCodeGenerator
......@@ -105,9 +105,11 @@ class DataSetWindowAggregate(
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
override def translateToPlan(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
val generator = new AggregationCodeGenerator(
tableEnv.getConfig,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册