未验证 提交 8da60525 编写于 作者: G godfrey he 提交者: GitHub

[FLINK-20709][table-planner-blink] Introduce StreamPhysicalMiniBatchAssigner,...

[FLINK-20709][table-planner-blink] Introduce StreamPhysicalMiniBatchAssigner, and make StreamExecMiniBatchAssigner only extended from ExecNode

This closes #14459
上级 2eb84aaa
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.trait.MiniBatchInterval;
import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
import org.apache.flink.table.runtime.operators.wmassigners.ProcTimeMiniBatchAssignerOperator;
import org.apache.flink.table.runtime.operators.wmassigners.RowTimeMiniBatchAssginerOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Stream {@link ExecNode} which injects a mini-batch event in the streaming data. The mini-batch
* event will be recognized as a boundary between two mini-batches. The following operators will
* keep track of the mini-batch events and trigger mini-batch once the mini-batch id is advanced.
*
* <p>NOTE: currently, we leverage the runtime watermark mechanism to achieve the mini-batch,
* because runtime doesn't support customized events and the watermark mechanism fully meets
* mini-batch needs.
*/
public class StreamExecMiniBatchAssigner extends ExecNodeBase<RowData> implements StreamExecNode<RowData> {
private final MiniBatchInterval miniBatchInterval;
public StreamExecMiniBatchAssigner(
MiniBatchInterval miniBatchInterval,
ExecEdge inputEdge,
RowType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.miniBatchInterval = checkNotNull(miniBatchInterval);
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
final Transformation<RowData> inputTransform =
(Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
final OneInputStreamOperator<RowData, RowData> operator;
if (miniBatchInterval.mode() == MiniBatchMode.ProcTime()) {
operator = new ProcTimeMiniBatchAssignerOperator(miniBatchInterval.interval());
} else if (miniBatchInterval.mode() == MiniBatchMode.RowTime()) {
operator = new RowTimeMiniBatchAssginerOperator(miniBatchInterval.interval());
} else {
throw new TableException(
String.format("MiniBatchAssigner shouldn't be in %s mode this is a bug, please file an issue.",
miniBatchInterval.mode()));
}
return new OneInputTransformation<>(
inputTransform,
getDesc(),
operator,
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism());
}
}
......@@ -239,7 +239,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
}
def getRelModifiedMonotonicity(
rel: StreamExecMiniBatchAssigner,
rel: StreamPhysicalMiniBatchAssigner,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
getMonotonicity(rel.getInput, mq, rel.getRowType.getFieldCount)
}
......
......@@ -18,16 +18,10 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.api.TableException
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchIntervalTraitDef, MiniBatchMode}
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.runtime.operators.wmassigners.{ProcTimeMiniBatchAssignerOperator, RowTimeMiniBatchAssginerOperator}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.flink.table.planner.plan.`trait`.MiniBatchIntervalTraitDef
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
......@@ -35,26 +29,25 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import java.util
/**
* Stream physical RelNode for injecting a mini-batch event in the streaming data. The mini-batch
* event will be recognized as a boundary between two mini-batches. The following operators will
* keep track of the mini-batch events and trigger mini-batch once the mini-batch id is advanced.
*
* NOTE: currently, we leverage the runtime watermark mechanism to achieve the mini-batch, because
* runtime doesn't support customized events and the watermark mechanism fully meets mini-batch
* needs.
*/
class StreamExecMiniBatchAssigner(
* Stream physical RelNode for injecting a mini-batch event in the streaming data. The mini-batch
* event will be recognized as a boundary between two mini-batches. The following operators will
* keep track of the mini-batch events and trigger mini-batch once the mini-batch id is advanced.
*
* NOTE: currently, we leverage the runtime watermark mechanism to achieve the mini-batch, because
* runtime doesn't support customized events and the watermark mechanism fully meets mini-batch
* needs.
*/
class StreamPhysicalMiniBatchAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputRel: RelNode)
extends SingleRel(cluster, traits, inputRel)
with StreamPhysicalRel
with LegacyStreamExecNode[RowData] {
with StreamPhysicalRel {
override def requireWatermark: Boolean = false
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamExecMiniBatchAssigner(
new StreamPhysicalMiniBatchAssigner(
cluster,
traitSet,
inputs.get(0))
......@@ -67,35 +60,13 @@ class StreamExecMiniBatchAssigner(
.item("mode", miniBatchInterval.mode.toString)
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val inferredInterval = getTraitSet.getTrait(
MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval
val operator = if (inferredInterval.mode == MiniBatchMode.ProcTime) {
new ProcTimeMiniBatchAssignerOperator(inferredInterval.interval)
} else if (inferredInterval.mode == MiniBatchMode.RowTime) {
new RowTimeMiniBatchAssginerOperator(inferredInterval.interval)
} else {
throw new TableException(s"MiniBatchAssigner shouldn't be in ${inferredInterval.mode} " +
s"mode, this is a bug, please file an issue.")
}
val outputRowTypeInfo = InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
val transformation = new OneInputTransformation[RowData, RowData](
inputTransformation,
getRelDetailedDescription,
operator,
outputRowTypeInfo,
inputTransformation.getParallelism)
transformation
override def translateToExecNode(): ExecNode[_] = {
val miniBatchInterval = traits.getTrait(MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval
new StreamExecMiniBatchAssigner(
miniBatchInterval,
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -273,7 +273,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
_: StreamExecLookupJoin | _: StreamPhysicalExchange |
_: StreamPhysicalExpand | _: StreamExecMiniBatchAssigner |
_: StreamPhysicalExpand | _: StreamPhysicalMiniBatchAssigner |
_: StreamPhysicalWatermarkAssigner =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
......@@ -572,8 +572,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
case _: StreamPhysicalCorrelateBase | _: StreamExecLookupJoin |
_: StreamPhysicalExchange | _: StreamPhysicalExpand | _: StreamExecMiniBatchAssigner |
_: StreamPhysicalWatermarkAssigner =>
_: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner =>
// transparent forward requiredTrait to children
visitChildren(rel, requiredTrait) match {
case None => None
......
......@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecGroupWindowAggregate, StreamExecMiniBatchAssigner, StreamPhysicalDataStreamScan, StreamPhysicalLegacyTableSourceScan, StreamPhysicalRel, StreamPhysicalTableSourceScan, StreamPhysicalWatermarkAssigner}
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecGroupWindowAggregate, StreamPhysicalDataStreamScan, StreamPhysicalLegacyTableSourceScan, StreamPhysicalMiniBatchAssigner, StreamPhysicalRel, StreamPhysicalTableSourceScan, StreamPhysicalWatermarkAssigner}
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.calcite.plan.RelOptRule._
......@@ -35,7 +35,7 @@ import scala.collection.JavaConversions._
*
* This rule could handle the following two kinds of operator:
* 1. supports operators which supports mini-batch and does not require watermark, e.g.
* group aggregate. In this case, [[StreamExecMiniBatchAssigner]] with Protime mode will be
* group aggregate. In this case, [[StreamPhysicalMiniBatchAssigner]] with Protime mode will be
* created if not exist, and the interval value will be set as
* [[ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY]].
* 2. supports operators which requires watermark, e.g. window join, window aggregate.
......@@ -74,7 +74,7 @@ class MiniBatchIntervalInferRule extends RelOptRule(
case _: StreamPhysicalWatermarkAssigner => MiniBatchIntervalTrait.NONE
case _: StreamExecMiniBatchAssigner => MiniBatchIntervalTrait.NONE
case _: StreamPhysicalMiniBatchAssigner => MiniBatchIntervalTrait.NONE
case _ => if (rel.requireWatermark && miniBatchEnabled) {
val mergedInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
......@@ -89,7 +89,7 @@ class MiniBatchIntervalInferRule extends RelOptRule(
val updatedInputs = inputs.map { input =>
// add mini-batch watermark assigner node.
if (shouldAppendMiniBatchAssignerNode(input)) {
new StreamExecMiniBatchAssigner(
new StreamPhysicalMiniBatchAssigner(
input.getCluster,
input.getTraitSet,
// attach NONE trait for all of the inputs of MiniBatchAssigner,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册