未验证 提交 c33bab4a 编写于 作者: L Leonard Xu 提交者: Jark Wu

[hotfix][table-planner-blink] Planner should pass query's changelog mode to...

[hotfix][table-planner-blink] Planner should pass query's changelog mode to DynamicTableSink#getChangelogMode
上级 adfd45fb
......@@ -18,6 +18,9 @@
package org.apache.flink.table.planner.plan.trait;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
......@@ -112,6 +115,24 @@ public class ModifyKindSet {
return union(this, other);
}
/**
* Returns the default {@link ChangelogMode} from this {@link ModifyKindSet}.
*/
public ChangelogMode toChangelogMode() {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
if (this.contains(ModifyKind.INSERT)) {
builder.addContainedKind(RowKind.INSERT);
}
if (this.contains(ModifyKind.UPDATE)) {
builder.addContainedKind(RowKind.UPDATE_BEFORE);
builder.addContainedKind(RowKind.UPDATE_AFTER);
}
if (this.contains(ModifyKind.DELETE)) {
builder.addContainedKind(RowKind.DELETE);
}
return builder.build();
}
@Override
public boolean equals(Object o) {
if (this == o) {
......
......@@ -19,10 +19,10 @@
package org.apache.flink.table.planner.plan.optimize.program
import org.apache.flink.table.api.TableException
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{BEFORE_AND_AFTER, ONLY_UPDATE_AFTER, beforeAfterOrNone, onlyAfterOrNone}
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils.FULL_CHANGELOG_MODE
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.runtime.operators.join.FlinkJoinType
......@@ -115,8 +115,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
requester: String): StreamPhysicalRel = rel match {
case sink: StreamExecSink =>
val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'"
val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)
val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode(
sink.tableSink.getChangelogMode(FULL_CHANGELOG_MODE))
sink.tableSink.getChangelogMode(queryModifyKindSet))
val children = visitChildren(sink, sinkRequiredTrait, name)
val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
// ignore required trait from context, because sink is the true root
......@@ -323,6 +324,18 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
}
}
/**
* Derives the [[ModifyKindSetTrait]] of query plan without required ModifyKindSet validation.
*/
private def deriveQueryDefaultChangelogMode(
queryNode: RelNode, name: String): ChangelogMode = {
val newNode = visit(
queryNode.asInstanceOf[StreamPhysicalRel],
ModifyKindSetTrait.ALL_CHANGES,
name)
getModifyKindSet(newNode).toChangelogMode
}
private def createNewNode(
node: StreamPhysicalRel,
children: List[StreamPhysicalRel],
......@@ -385,7 +398,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val onlyAfter = onlyAfterOrNone(childModifyKindSet)
val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
val sinkTrait = UpdateKindTrait.fromChangelogMode(
sink.tableSink.getChangelogMode(FULL_CHANGELOG_MODE))
sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
Seq(onlyAfter, beforeAndAfter)
} else if (sinkTrait.equals(BEFORE_AND_AFTER)){
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册