提交 78f41e9c 编写于 作者: S sebastian kunert 提交者: Fabian Hueske

[FLINK-1328] Integrated forwarded Fields into the optimizer (incomplete)

This closes #83
上级 82c42002
......@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.costs.CostEstimator;
import org.apache.flink.compiler.plan.PlanNode;
......@@ -86,8 +87,8 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
public SemanticProperties getSemanticProperties() {
return null;
}
@Override
......
......@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.Union;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
......@@ -252,8 +254,10 @@ public class BinaryUnionNode extends TwoInputNode {
protected void readStubAnnotations() {}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return true;
public SemanticProperties getSemanticProperties() {
DualInputSemanticProperties sprops = new DualInputSemanticProperties();
sprops.setAllFieldsConstant(true);
return sprops;
}
@Override
......
......@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.compiler.CompilerException;
......@@ -182,10 +183,10 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
public String getName() {
return "Bulk Iteration";
}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
public SemanticProperties getSemanticProperties() {
return null;
}
protected void readStubAnnotations() {}
......
......@@ -28,6 +28,7 @@ import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.costs.CostEstimator;
......@@ -230,9 +231,10 @@ public class DataSinkNode extends OptimizerNode {
// --------------------------------------------------------------------------------------------
// Function Annotation Handling
// --------------------------------------------------------------------------------------------
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
@Override
public SemanticProperties getSemanticProperties() {
return null;
}
// --------------------------------------------------------------------------------------------
......
......@@ -30,6 +30,7 @@ import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.costs.CostEstimator;
......@@ -193,8 +194,8 @@ public class DataSourceNode extends OptimizerNode {
}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
public SemanticProperties getSemanticProperties() {
return null;
}
@Override
......
......@@ -21,6 +21,8 @@ package org.apache.flink.compiler.dag;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.FilterOperatorBase;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.operators.FilterDescriptor;
......@@ -47,10 +49,14 @@ public class FilterNode extends SingleInputNode {
public String getName() {
return "Filter";
}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return true;
public SemanticProperties getSemanticProperties() {
SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
sprops.setAllFieldsConstant(true);
return sprops;
}
@Override
......
......@@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.flink.api.common.operators.AbstractUdfOperator;
import org.apache.flink.api.common.operators.CompilerHints;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
......@@ -268,16 +269,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
*/
@Override
public abstract void accept(Visitor<OptimizerNode> visitor);
/**
* Checks whether a field is modified by the user code or whether it is kept unchanged.
*
* @param input The input number.
* @param fieldNumber The position of the field.
*
* @return True if the field is not changed by the user function, false otherwise.
*/
public abstract boolean isFieldConstant(int input, int fieldNumber);
public abstract SemanticProperties getSemanticProperties();
// ------------------------------------------------------------------------
// Getters / Setters
......@@ -688,8 +681,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
return null;
}
for (int keyColumn : keyColumns) {
if (!isFieldConstant(input, keyColumn)) {
return null;
FieldSet fs = getSemanticProperties() == null ? null : getSemanticProperties().getForwardFields(input, keyColumn);
if (fs == null) {
return null;
} else if (!fs.contains(keyColumn)) {
return null;
}
}
return keyColumns;
......
......@@ -31,8 +31,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.PactCompiler;
......@@ -144,22 +144,8 @@ public abstract class SingleInputNode extends OptimizerNode {
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
if (input != 0) {
throw new IndexOutOfBoundsException();
}
SingleInputOperator<?, ?, ?> c = getPactContract();
SingleInputSemanticProperties semanticProperties = c.getSemanticProperties();
if (semanticProperties != null) {
FieldSet fs;
if ((fs = semanticProperties.getForwardedField(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}
}
return false;
public SemanticProperties getSemanticProperties() {
return ((SingleInputOperator<?,?,?>) getPactContract()).getSemanticProperties();
}
......@@ -444,10 +430,11 @@ public abstract class SingleInputNode extends OptimizerNode {
LocalProperties lProps = in.getLocalProperties().clone();
gProps = dps.computeGlobalProperties(gProps);
lProps = dps.computeLocalProperties(lProps);
SemanticProperties props = this.getSemanticProperties();
// filter by the user code field copies
gProps = gProps.filterByNodesConstantSet(this, 0);
lProps = lProps.filterByNodesConstantSet(this, 0);
gProps = gProps.filterBySemanticProperties(props, 0);
lProps = lProps.filterBySemanticProperties(props, 0);
// apply
node.initProperties(gProps, lProps);
......
......@@ -31,10 +31,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.costs.CostEstimator;
......@@ -554,13 +553,14 @@ public abstract class TwoInputNode extends OptimizerNode {
DualInputPlanNode node = operator.instantiate(in1, in2, this);
node.setBroadcastInputs(broadcastChannelsCombination);
GlobalProperties gp1 = in1.getGlobalProperties().clone().filterByNodesConstantSet(this, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone().filterByNodesConstantSet(this, 1);
SemanticProperties props = this.getSemanticProperties();
GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
LocalProperties lp1 = in1.getLocalProperties().clone().filterByNodesConstantSet(this, 0);
LocalProperties lp2 = in2.getLocalProperties().clone().filterByNodesConstantSet(this, 1);
LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
node.initProperties(combined, locals);
......@@ -690,37 +690,11 @@ public abstract class TwoInputNode extends OptimizerNode {
}
}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
DualInputOperator<?, ?, ?, ?> c = getPactContract();
DualInputSemanticProperties semanticProperties = c.getSemanticProperties();
switch(input) {
case 0:
if (semanticProperties != null) {
FieldSet fs;
if ((fs = semanticProperties.getForwardedField1(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}
}
break;
case 1:
if(semanticProperties != null) {
FieldSet fs;
if ((fs = semanticProperties.getForwardedField2(fieldNumber)) != null) {
return fs.contains(fieldNumber);
}
}
break;
default:
throw new IndexOutOfBoundsException();
}
return false;
public SemanticProperties getSemanticProperties() {
return ((DualInputOperator<?, ?, ?, ?>) getPactContract()).getSemanticProperties();
}
// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
......
......@@ -21,6 +21,8 @@ package org.apache.flink.compiler.dag;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
......@@ -54,12 +56,12 @@ public class UnaryOperatorNode extends SingleInputNode {
public String getName() {
return this.name;
}
public boolean isFieldConstant(int input, int fieldNumber) {
if (input != 0) {
throw new IndexOutOfBoundsException();
}
return true;
@Override
public SemanticProperties getSemanticProperties() {
SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
sprops.setAllFieldsConstant(true);
return sprops;
}
@Override
......
......@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
......@@ -218,12 +219,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
public String getName() {
return "Workset Iteration";
}
@Override
public boolean isFieldConstant(int input, int fieldNumber) {
return false;
public SemanticProperties getSemanticProperties() {
return null;
}
protected void readStubAnnotations() {}
@Override
......
......@@ -25,10 +25,10 @@ import java.util.Set;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.util.Utils;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
......@@ -212,7 +212,19 @@ public class GlobalProperties implements Cloneable {
return false;
}
}
public Ordering getOrdering() {
return this.ordering;
}
public void setOrdering(Ordering ordering) {
this.ordering = ordering;
}
public void setPartitioningFields(FieldList partitioningFields) {
this.partitioningFields = partitioningFields;
}
public boolean isFullyReplicated() {
return this.partitioning == PartitioningProperty.FULL_REPLICATION;
}
......@@ -234,56 +246,86 @@ public class GlobalProperties implements Cloneable {
}
/**
* Filters these properties by what can be preserved through the given output contract.
*
* @param node The optimizer node.
* @param input The input of the node to filter against.
* @return The adjusted global properties.
* Filters these GlobalProperties by the fields that are constant or forwarded to another output field.
*
* @param props The node representing the contract.
* @param input The index of the input.
* @return The filtered GlobalProperties
*/
public GlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
// check if partitioning survives
FieldList forwardFields = null;
GlobalProperties returnProps = this;
if (props == null) {
return new GlobalProperties();
}
if (this.ordering != null) {
for (int col : this.ordering.getInvolvedIndexes()) {
if (!node.isFieldConstant(input, col)) {
return new GlobalProperties();
Ordering no = new Ordering();
for (int index : this.ordering.getInvolvedIndexes()) {
forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList();
if (forwardFields == null) {
returnProps = new GlobalProperties();
no = null;
break;
} else {
returnProps = returnProps == this ? this.clone() : returnProps;
for (int i = 0; i < forwardFields.size(); i++) {
no.appendOrdering(forwardFields.get(i), this.ordering.getType(index), this.ordering.getOrder(index));
}
}
returnProps.setOrdering(no);
}
}
if (this.partitioningFields != null) {
for (int colIndex : this.partitioningFields) {
if (!node.isFieldConstant(input, colIndex)) {
return new GlobalProperties();
returnProps = returnProps == this ? this.clone() : returnProps;
returnProps.setPartitioningFields(new FieldList());
for (int index : this.partitioningFields) {
forwardFields = props.getForwardFields(input, index) == null ? null: props.getForwardFields(input, index).toFieldList();
if (forwardFields == null) {
returnProps = new GlobalProperties();
break;
} else {
returnProps.setPartitioningFields(returnProps.getPartitioningFields().addFields(forwardFields));
}
}
}
if (this.uniqueFieldCombinations != null) {
HashSet<FieldSet> newSet = new HashSet<FieldSet>();
newSet.addAll(this.uniqueFieldCombinations);
for (Iterator<FieldSet> combos = newSet.iterator(); combos.hasNext(); ){
for (Iterator<FieldSet> combos = this.uniqueFieldCombinations.iterator(); combos.hasNext(); ){
FieldSet current = combos.next();
FieldSet nfs = new FieldSet();
for (Integer field : current) {
if (!node.isFieldConstant(input, field)) {
combos.remove();
if (props.getForwardFields(input, field) == null) {
newSet.remove(current);
nfs = null;
break;
} else {
nfs = nfs.addFields(props.getForwardFields(input, field));
}
}
if (nfs != null) {
newSet.remove(current);
newSet.add(nfs);
}
}
if (newSet.size() != this.uniqueFieldCombinations.size()) {
GlobalProperties gp = clone();
gp.uniqueFieldCombinations = newSet.isEmpty() ? null : newSet;
return gp;
}
GlobalProperties gp = returnProps.clone();
gp.uniqueFieldCombinations = newSet.isEmpty() ? null : newSet;
return gp;
}
if (this.partitioning == PartitioningProperty.FULL_REPLICATION) {
return new GlobalProperties();
}
return this;
return returnProps;
}
public void parameterizeChannel(Channel channel, boolean globalDopChange) {
switch (this.partitioning) {
case RANDOM:
......
......@@ -23,7 +23,10 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.dag.SingleInputNode;
import org.apache.flink.compiler.dag.TwoInputNode;
/**
* The interesting properties that a node in the optimizer plan hands to its predecessors. It has the
......@@ -87,19 +90,25 @@ public class InterestingProperties implements Cloneable
public Set<RequestedGlobalProperties> getGlobalProperties() {
return this.globalProps;
}
public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input)
{
InterestingProperties iProps = new InterestingProperties();
SemanticProperties props = null;
if (node instanceof SingleInputNode) {
props = node.getSemanticProperties();
} else if (node instanceof TwoInputNode) {
props = node.getSemanticProperties();
}
for (RequestedGlobalProperties rgp : this.globalProps) {
RequestedGlobalProperties filtered = rgp.filterByNodesConstantSet(node, input);
RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(props, input);
if (filtered != null && !filtered.isTrivial()) {
iProps.addGlobalProperties(filtered);
}
}
for (RequestedLocalProperties rlp : this.localProps) {
RequestedLocalProperties filtered = rlp.filterByNodesConstantSet(node, input);
RequestedLocalProperties filtered = rlp.filterBySemanticProperties(props, input);
if (filtered != null && !filtered.isTrivial()) {
iProps.addLocalProperties(filtered);
}
......
......@@ -20,12 +20,13 @@
package org.apache.flink.compiler.dataproperties;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.dag.OptimizerNode;
/**
* This class represents local properties of the data. A local property is a property that exists
......@@ -126,57 +127,91 @@ public class LocalProperties implements Cloneable {
/**
* Filters these properties by what can be preserved through a user function's constant fields set.
*
* @param node The optimizer node that potentially modifies the properties.
*
* @param props The optimizer node that potentially modifies the properties.
* @param input The input of the node which is relevant.
*
* @return True, if the resulting properties are non trivial.
*
* @return The filtered LocalProperties
*/
public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) {
// check, whether the local order is preserved
Ordering no = this.ordering;
FieldList ngf = this.groupedFields;
Set<FieldSet> nuf = this.uniqueFields;
FieldList forwardList = null;
if (props == null) {
return new LocalProperties();
}
if (this.ordering != null) {
no = new Ordering();
final FieldList involvedIndexes = this.ordering.getInvolvedIndexes();
for (int i = 0; i < involvedIndexes.size(); i++) {
if (!node.isFieldConstant(input, involvedIndexes.get(i))) {
if (i == 0) {
forwardList = props.getForwardFields(input, involvedIndexes.get(i)) == null ? null : props.getForwardFields(input, involvedIndexes.get(i)).toFieldList();
if (forwardList == null) {
no = null;
ngf = null;
/*if (i == 0) {
no = null;
ngf = null;
} else {
no = this.ordering.createNewOrderingUpToIndex(i);
ngf = no.getInvolvedIndexes();
}
}*/
break;
} else {
no.appendOrdering(forwardList.get(0), this.ordering.getType(i), this.ordering.getOrder(i));
ngf = no.getInvolvedIndexes();
}
}
}
else if (this.groupedFields != null) {
// check, whether the local key grouping is preserved
for (Integer index : this.groupedFields) {
if (!node.isFieldConstant(input, index)) {
forwardList = props.getForwardFields(input, index) == null ? null : props.getForwardFields(input, index).toFieldList();
if (forwardList == null) {
ngf = null;
break;
} else if (!forwardList.contains(index)) {
FieldList grouped = new FieldList();
for (Integer value : ngf.toFieldList()) {
if (value.intValue() == index) {
grouped = grouped.addFields(forwardList);
} else {
grouped = grouped.addField(value);
}
}
ngf = grouped;
}
}
}
if (this.uniqueFields != null && this.uniqueFields.size() > 0) {
Set<FieldSet> s = new HashSet<FieldSet>(this.uniqueFields);
for (FieldSet fields : this.uniqueFields) {
for (Integer index : fields) {
if (!node.isFieldConstant(input, index)) {
s.remove(fields);
if (this.uniqueFields != null) {
HashSet<FieldSet> newSet = new HashSet<FieldSet>();
newSet.addAll(this.uniqueFields);
for (Iterator<FieldSet> combos = this.uniqueFields.iterator(); combos.hasNext(); ){
FieldSet current = combos.next();
FieldSet nfs = new FieldSet();
for (Integer field : current) {
if (props.getForwardFields(input, field) == null) {
newSet.remove(current);
nfs = null;
break;
} else {
nfs = nfs.addFields(props.getForwardFields(input, field));
}
}
if (nfs != null) {
newSet.remove(current);
newSet.add(nfs);
}
}
if (s.size() != this.uniqueFields.size()) {
nuf = s;
}
nuf = newSet.isEmpty() ? null : newSet;
}
if (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) {
return this;
} else {
......@@ -187,7 +222,6 @@ public class LocalProperties implements Cloneable {
return lp;
}
}
// --------------------------------------------------------------------------------------------
@Override
......
......@@ -21,9 +21,10 @@ package org.apache.flink.compiler.dataproperties;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.util.Utils;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
......@@ -188,30 +189,63 @@ public final class RequestedGlobalProperties implements Cloneable {
this.customPartitioner = null;
}
public void setPartitioningFields(FieldSet partitioned) {
this.partitioningFields = partitioned;
}
public void setPartitioningFields(FieldSet fields, PartitioningProperty partitioning) {
this.partitioningFields = fields;
this.partitioning = partitioning;
}
public void setOrdering(Ordering newOrdering) {
this.ordering = newOrdering;
}
/**
* Filters these properties by what can be preserved by the given node when propagated down
* to the given input.
*
* @param node The node representing the contract.
*
* @param props The node representing the contract.
* @param input The index of the input.
* @return True, if any non-default value is preserved, false otherwise.
* @return The filtered RequestedGlobalProperties
*/
public RequestedGlobalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
FieldList sourceList;
RequestedGlobalProperties returnProps = null;
if (props == null) {
return null;
}
// check if partitioning survives
if (this.ordering != null) {
for (int col : this.ordering.getInvolvedIndexes()) {
if (!node.isFieldConstant(input, col)) {
Ordering no = new Ordering();
returnProps = new RequestedGlobalProperties();
returnProps.setPartitioningFields(new FieldSet(), this.partitioning);
for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) {
int value = this.ordering.getInvolvedIndexes().get(index);
sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList();
if (sourceList != null) {
no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index));
} else {
return null;
}
}
returnProps.setOrdering(no);
} else if (this.partitioningFields != null) {
for (int colIndex : this.partitioningFields) {
if (!node.isFieldConstant(input, colIndex)) {
returnProps = new RequestedGlobalProperties();
returnProps.setPartitioningFields(new FieldSet(), this.partitioning);
for (Integer index : this.partitioningFields) {
sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList();
if (sourceList != null) {
returnProps.setPartitioningFields(returnProps.getPartitionedFields().addFields(sourceList), this.partitioning);
} else {
return null;
}
}
}
// make sure that certain properties are not pushed down
final PartitioningProperty partitioning = this.partitioning;
if (partitioning == PartitioningProperty.FULL_REPLICATION ||
......@@ -220,8 +254,8 @@ public final class RequestedGlobalProperties implements Cloneable {
{
return null;
}
return this;
return returnProps;
}
/**
......
......@@ -22,9 +22,9 @@ package org.apache.flink.compiler.dataproperties;
import java.util.Arrays;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.util.Utils;
import org.apache.flink.runtime.operators.util.LocalStrategy;
......@@ -138,29 +138,47 @@ public class RequestedLocalProperties implements Cloneable {
* Filters these properties by what can be preserved through a user function's constant fields set.
* Since interesting properties are filtered top-down, anything that partially destroys the ordering
* makes the properties uninteresting.
*
* @param node The optimizer node that potentially modifies the properties.
*
* @param props The optimizer node that potentially modifies the properties.
* @param input The input of the node which is relevant.
*
* @return True, if the resulting properties are non trivial.
*
* @return The filtered RequestedLocalProperties
*/
public RequestedLocalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
public RequestedLocalProperties filterBySemanticProperties(SemanticProperties props, int input) {
FieldList sourceList;
RequestedLocalProperties returnProps = this;
if (props == null) {
return null;
}
if (this.ordering != null) {
final FieldList involvedIndexes = this.ordering.getInvolvedIndexes();
for (int i = 0; i < involvedIndexes.size(); i++) {
if (!node.isFieldConstant(input, involvedIndexes.get(i))) {
Ordering no = new Ordering();
returnProps = this.clone();
for (int index = 0; index < this.ordering.getInvolvedIndexes().size(); index++) {
int value = this.ordering.getInvolvedIndexes().get(index);
sourceList = props.getSourceField(input, value) == null ? null : props.getSourceField(input, value).toFieldList();
if (sourceList != null) {
no.appendOrdering(sourceList.get(0), this.ordering.getType(index), this.ordering.getOrder(index));
} else {
return null;
}
}
returnProps.setOrdering(no);
} else if (this.groupedFields != null) {
returnProps = this.clone();
returnProps.setGroupedFields(new FieldList());
// check, whether the local key grouping is preserved
for (Integer index : this.groupedFields) {
if (!node.isFieldConstant(input, index)) {
sourceList = props.getSourceField(input, index) == null ? null : props.getSourceField(input, index).toFieldList();
if (sourceList != null) {
returnProps.setGroupedFields(returnProps.getGroupedFields().addFields(sourceList));
} else {
return null;
}
}
}
return this;
return returnProps;
}
/**
......
......@@ -21,6 +21,7 @@ package org.apache.flink.compiler.postpass;
import java.util.Map;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
......@@ -522,7 +523,9 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
try {
for (Map.Entry<Integer, X> entry : sourceSchema) {
Integer pos = entry.getKey();
if (optNode.isFieldConstant(input, pos)) {
SemanticProperties sprops = optNode.getSemanticProperties();
if (sprops != null && sprops.getForwardFields(input, pos) != null && sprops.getForwardFields(input,pos).contains(pos)) {
targetSchema.addType(pos, entry.getValue());
}
}
......
......@@ -257,8 +257,11 @@ public class DOPChangeTest extends CompilerTestBase {
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy());
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.NONE, red2Node.getInput().getLocalStrategy());
Assert.assertEquals("The Map 2 Node has an invalid local strategy.", LocalStrategy.SORT, map2Node.getInput().getLocalStrategy());
}
/**
......
......@@ -58,7 +58,40 @@ public class DualInputSemanticProperties extends SemanticProperties {
public DualInputSemanticProperties() {
init();
}
/**
* Finds the source field where the given field was forwarded from.
* @param dest The destination field in the output data.
* @return FieldSet containing the source input fields.
*/
public FieldSet forwardedFrom1(int dest) {
FieldSet fs = null;
for (Map.Entry<Integer, FieldSet> entry : forwardedFields1.entrySet()) {
if (entry.getValue().contains(dest)) {
if (fs == null) {
fs = new FieldSet();
}
fs = fs.addField(entry.getKey());
}
}
return fs;
}
public FieldSet forwardedFrom2(int dest) {
FieldSet fs = null;
for (Map.Entry<Integer, FieldSet> entry : forwardedFields2.entrySet()) {
if (entry.getValue().contains(dest)) {
if (fs == null) {
fs = new FieldSet();
}
fs = fs.addField(entry.getKey());
}
}
return fs;
}
/**
* Adds, to the existing information, a field that is forwarded directly
* from the source record(s) in the first input to the destination
......@@ -115,6 +148,10 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @return the destination fields, or null if they do not exist
*/
public FieldSet getForwardedField1(int sourceField) {
if (isAllFieldsConstant()) {
return new FieldSet(sourceField);
}
return this.forwardedFields1.get(sourceField);
}
......@@ -174,9 +211,43 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @return the destination fields, or null if they do not exist
*/
public FieldSet getForwardedField2(int sourceField) {
if (isAllFieldsConstant()) {
return new FieldSet(sourceField);
}
return this.forwardedFields2.get(sourceField);
}
@Override
public FieldSet getSourceField(int input, int field) {
if (isAllFieldsConstant()) {
return new FieldSet(field);
}
switch(input) {
case 0:
return this.forwardedFrom1(field);
case 1:
return this.forwardedFrom2(field);
default:
throw new IndexOutOfBoundsException();
}
}
@Override
public FieldSet getForwardFields(int input, int field) {
if (isAllFieldsConstant()) {
return new FieldSet(field);
}
if (input == 0) {
return this.getForwardedField1(field);
} else if (input == 1) {
return this.getForwardedField2(field);
}
return null;
}
/**
* Adds, to the existing information, field(s) that are read in
* the source record(s) from the first input.
......@@ -253,7 +324,7 @@ public class DualInputSemanticProperties extends SemanticProperties {
super.clearProperties();
init();
}
@Override
public boolean isEmpty() {
return super.isEmpty() &&
......@@ -263,7 +334,11 @@ public class DualInputSemanticProperties extends SemanticProperties {
(readFields2 == null || readFields2.size() == 0);
}
@Override
public String toString() {
return "DISP(" + this.forwardedFields1 + "; " + this.forwardedFields2 + ")";
}
private void init() {
this.forwardedFields1 = new HashMap<Integer,FieldSet>();
this.forwardedFields2 = new HashMap<Integer,FieldSet>();
......
......@@ -144,7 +144,7 @@ public class Ordering {
}
for (int i = 0; i < this.indexes.size(); i++) {
if (this.indexes.get(i) != otherOrdering.indexes.get(i)) {
if (this.indexes.get(i).intValue() != otherOrdering.indexes.get(i).intValue()) {
return false;
}
......
......@@ -27,7 +27,8 @@ import org.apache.flink.api.common.operators.util.FieldSet;
* Container for the semantic properties associated to an operator.
*/
public abstract class SemanticProperties implements Serializable {
private boolean allFieldsConstant;
private static final long serialVersionUID = 1L;
/** Set of fields that are written in the destination record(s).*/
......@@ -47,7 +48,19 @@ public abstract class SemanticProperties implements Serializable {
this.writtenFields = this.writtenFields.addFields(writtenFields);
}
}
public void setAllFieldsConstant(boolean constant) {
this.allFieldsConstant = constant;
}
public boolean isAllFieldsConstant() {
return this.allFieldsConstant;
}
public abstract FieldSet getForwardFields(int input, int field);
public abstract FieldSet getSourceField(int input, int field);
/**
* Sets the field(s) that are written in the destination record(s).
*
......
......@@ -36,7 +36,41 @@ public class SingleInputSemanticProperties extends SemanticProperties {
/** Set of fields that are read in the source record(s).*/
private FieldSet readFields;
@Override
public FieldSet getForwardFields(int input, int field) {
if (input != 0) {
throw new IndexOutOfBoundsException();
}
return this.getForwardedField(field);
}
@Override
public FieldSet getSourceField(int input, int field) {
if (input != 0) {
throw new IndexOutOfBoundsException();
}
if (isAllFieldsConstant()) {
return new FieldSet(field);
}
return this.forwardedFrom(field);
}
public FieldSet forwardedFrom(int dest) {
FieldSet fs = null;
for (Map.Entry<Integer, FieldSet> entry : forwardedFields.entrySet()) {
if (entry.getValue().contains(dest)) {
if (fs == null) {
fs = new FieldSet();
}
fs = fs.addField(entry.getKey());
}
}
return fs;
}
public SingleInputSemanticProperties() {
init();
}
......@@ -95,6 +129,10 @@ public class SingleInputSemanticProperties extends SemanticProperties {
* @return the destination fields, or null if they do not exist
*/
public FieldSet getForwardedField(int sourceField) {
if (isAllFieldsConstant()) {
return new FieldSet(sourceField);
}
return this.forwardedFields.get(sourceField);
}
......@@ -145,7 +183,12 @@ public class SingleInputSemanticProperties extends SemanticProperties {
(forwardedFields == null || forwardedFields.isEmpty()) &&
(readFields == null || readFields.size() == 0);
}
@Override
public String toString() {
return "SISP(" + this.forwardedFields + ")";
}
private void init() {
this.forwardedFields = new HashMap<Integer,FieldSet>();
this.readFields = null;
......
......@@ -155,8 +155,8 @@ public class ConnectedComponents implements ProgramDescription {
* a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
* produces a (Target-vertex-ID, Component-ID) pair.
*/
@ConstantFieldsFirst("1 -> 0")
@ConstantFieldsSecond("1 -> 1")
@ConstantFieldsFirst("1 -> 1")
@ConstantFieldsSecond("1 -> 0")
public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
......@@ -178,8 +178,6 @@ public class ConnectedComponents implements ProgramDescription {
}
}
@Override
public String getDescription() {
return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
......
......@@ -341,19 +341,19 @@ public class FunctionAnnotation {
return semanticProperties;
}
private static final class ImplicitlyForwardingSingleInputSemanticProperties extends SingleInputSemanticProperties {
private static final long serialVersionUID = 1L;
private FieldSet nonForwardedFields;
private ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) {
this.nonForwardedFields = nonForwardedFields;
addWrittenFields(nonForwardedFields);
}
/**
* Returns the logical position where the given field is written to.
* In this variant of the semantic properties, all fields are assumed implicitly forwarded,
......@@ -362,52 +362,81 @@ public class FunctionAnnotation {
*/
@Override
public FieldSet getForwardedField(int sourceField) {
if (isAllFieldsConstant()) {
return new FieldSet(sourceField);
}
if (this.nonForwardedFields.contains(sourceField)) {
return null;
} else {
return new FieldSet(sourceField);
}
}
@Override
public FieldSet getSourceField(int input, int field) {
if (input != 0) {
throw new IndexOutOfBoundsException();
}
if (isAllFieldsConstant()) {
return new FieldSet(field);
}
if (this.nonForwardedFields == null) {
return super.getSourceField(input, field);
}
if (this.nonForwardedFields.contains(field)) {
return null;
} else {
return new FieldSet(field);
}
}
@Override
public void addForwardedField(int sourceField, int destinationField) {
throw new UnsupportedOperationException("When defining fields as implicitly constant " +
"(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
}
@Override
public void addForwardedField(int sourceField, FieldSet destinationFields) {
throw new UnsupportedOperationException("When defining fields as implicitly constant " +
"(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
}
@Override
public void setForwardedField(int sourceField, FieldSet destinationFields) {
throw new UnsupportedOperationException("When defining fields as implicitly constant " +
"(such as through the ConstantFieldsExcept annotation), you cannot manually add forwarded fields.");
}
}
private static final class ImplicitlyForwardingTwoInputSemanticProperties extends DualInputSemanticProperties {
private static final long serialVersionUID = 1L;
private FieldSet nonForwardedFields1;
private FieldSet nonForwardedFields2;
private ImplicitlyForwardingTwoInputSemanticProperties() {}
public void setImplicitlyForwardingFirstExcept(FieldSet nonForwardedFields) {
this.nonForwardedFields1 = nonForwardedFields;
}
public void setImplicitlyForwardingSecondExcept(FieldSet nonForwardedFields) {
this.nonForwardedFields2 = nonForwardedFields;
}
@Override
public FieldSet getForwardedField1(int sourceField) {
if (isAllFieldsConstant()) {
return new FieldSet(sourceField);
}
if (this.nonForwardedFields1 == null) {
return super.getForwardedField1(sourceField);
}
......@@ -419,9 +448,13 @@ public class FunctionAnnotation {
}
}
}
@Override
public FieldSet getForwardedField2(int sourceField) {
if (isAllFieldsConstant()) {
return new FieldSet(sourceField);
}
if (this.nonForwardedFields2 == null) {
return super.getForwardedField2(sourceField);
}
......@@ -433,7 +466,36 @@ public class FunctionAnnotation {
}
}
}
@Override
public FieldSet getSourceField(int input, int field) {
if (input != 0 && input != 1) {
throw new IndexOutOfBoundsException();
}
if (isAllFieldsConstant()) {
return new FieldSet(field);
}
if (this.nonForwardedFields1 == null && this.nonForwardedFields2 == null) {
return super.getSourceField(input, field);
}
if (input == 0 && this.nonForwardedFields1 != null && this.nonForwardedFields1.contains(field)) {
return null;
} else if (input == 0) {
return new FieldSet(field);
}
if (input == 1 && this.nonForwardedFields2 != null && this.nonForwardedFields2.contains(field)) {
return null;
} else if (input == 1) {
return new FieldSet(field);
}
return null;
}
@Override
public void addForwardedField1(int sourceField, int destinationField) {
if (this.nonForwardedFields1 == null) {
......@@ -444,7 +506,7 @@ public class FunctionAnnotation {
"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields.");
}
}
@Override
public void addForwardedField1(int sourceField, FieldSet destinationFields) {
if (this.nonForwardedFields1 == null) {
......@@ -455,7 +517,7 @@ public class FunctionAnnotation {
"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields.");
}
}
@Override
public void setForwardedField1(int sourceField, FieldSet destinationFields) {
if (this.nonForwardedFields1 == null) {
......@@ -466,7 +528,7 @@ public class FunctionAnnotation {
"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields.");
}
}
@Override
public void addForwardedField2(int sourceField, int destinationField) {
if (this.nonForwardedFields2 == null) {
......@@ -477,7 +539,7 @@ public class FunctionAnnotation {
"(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields.");
}
}
@Override
public void addForwardedField2(int sourceField, FieldSet destinationFields) {
if (this.nonForwardedFields2 == null) {
......@@ -488,7 +550,7 @@ public class FunctionAnnotation {
"(such as through the ConstantFieldsSecondExcept annotation), you cannot manually add forwarded fields.");
}
}
@Override
public void setForwardedField2(int sourceField, FieldSet destinationFields) {
if (this.nonForwardedFields2 == null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册