未验证 提交 2eb84aaa 编写于 作者: G godfrey he 提交者: GitHub

[FLINK-20708][table-planner-blink] Introduce StreamPhysicalDropUpdateBefore,...

[FLINK-20708][table-planner-blink] Introduce StreamPhysicalDropUpdateBefore, and make StreamExecDropUpdateBefore only extended from ExecNode

This closes #14458
上级 4fbffb6d
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.runtime.operators.misc.DropUpdateBeforeFunction;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
/**
* Stream {@link ExecNode} which will drop the UPDATE_BEFORE messages.
* This is usually used as an optimization for the downstream operators that doesn't need
* the UPDATE_BEFORE messages, but the upstream operator can't drop it by itself (e.g. the source).
*/
public class StreamExecDropUpdateBefore extends ExecNodeBase<RowData> implements StreamExecNode<RowData> {
public StreamExecDropUpdateBefore(
ExecEdge inputEdge,
RowType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
final Transformation<RowData> inputTransform =
(Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
final StreamFilter<RowData> operator = new StreamFilter<>(new DropUpdateBeforeFunction());
return new OneInputTransformation<>(
inputTransform,
getDesc(),
operator,
inputTransform.getOutputType(),
inputTransform.getParallelism());
}
}
......@@ -313,7 +313,7 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
}
def areColumnsUnique(
rel: StreamExecDropUpdateBefore,
rel: StreamPhysicalDropUpdateBefore,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
......
......@@ -227,7 +227,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
}
def getRelModifiedMonotonicity(
rel: StreamExecDropUpdateBefore,
rel: StreamPhysicalDropUpdateBefore,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
getMonotonicity(rel.getInput, mq, rel.getRowType.getFieldCount)
}
......
......@@ -320,7 +320,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
}
def getUniqueKeys(
rel: StreamExecDropUpdateBefore,
rel: StreamPhysicalDropUpdateBefore,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
mq.getUniqueKeys(rel.getInput, ignoreNulls)
......
......@@ -18,15 +18,10 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.api.dag.Transformation
import org.apache.flink.streaming.api.operators.StreamFilter
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.data.RowData
import org.apache.flink.table.planner.delegation.StreamPlanner
import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore
import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils
import org.apache.flink.table.runtime.operators.misc.DropUpdateBeforeFunction
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
......@@ -39,46 +34,35 @@ import java.util
* This is usually used as an optimization for the downstream operators that doesn't need
* the UPDATE_BEFORE messages, but the upstream operator can't drop it by itself (e.g. the source).
*/
class StreamExecDropUpdateBefore(
class StreamPhysicalDropUpdateBefore(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode)
extends SingleRel(cluster, traitSet, input)
with StreamPhysicalRel
with LegacyStreamExecNode[RowData] {
with StreamPhysicalRel {
override def requireWatermark: Boolean = false
override def deriveRowType(): RelDataType = getInput.getRowType
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamExecDropUpdateBefore(
new StreamPhysicalDropUpdateBefore(
cluster,
traitSet,
inputs.get(0))
}
//~ ExecNode methods -----------------------------------------------------------
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[RowData] = {
override def translateToExecNode(): ExecNode[_] = {
// sanity check
if (ChangelogPlanUtils.generateUpdateBefore(this)) {
throw new IllegalStateException(s"${this.getClass.getSimpleName} is required to emit " +
s"UPDATE_BEFORE messages. This should never happen." )
s"UPDATE_BEFORE messages. This should never happen.")
}
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[RowData]]
val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
val operator = new StreamFilter[RowData](new DropUpdateBeforeFunction)
new OneInputTransformation(
inputTransform,
getRelDetailedDescription,
operator,
rowTypeInfo,
inputTransform.getParallelism)
new StreamExecDropUpdateBefore(
ExecEdge.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
......@@ -635,7 +635,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
if (providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
requiredTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)) {
// requiring only-after, but the source is CDC source, then drop update_before manually
val dropUB = new StreamExecDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
createNewNode(dropUB, newSource.map(s => List(s)), requiredTrait)
} else {
newSource
......
......@@ -722,7 +722,7 @@ class FlinkRelMdHandlerTestBase {
}
protected lazy val streamDropUpdateBefore = {
new StreamExecDropUpdateBefore(
new StreamPhysicalDropUpdateBefore(
cluster,
streamPhysicalTraits,
studentStreamScan
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册