提交 540c89d1 编写于 作者: S sewen

Next steps in optimizer refactoring.

上级 47c14935
......@@ -34,7 +34,7 @@ import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JSONGenerator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
/**
* Encapsulates the functionality necessary to compile and submit a pact program to a nephele cluster.
......
......@@ -39,7 +39,7 @@ import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.plan.PlanAssembler;
import eu.stratosphere.pact.common.plan.PlanAssemblerDescription;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.contextcheck.ContextChecker;
/**
......
......@@ -42,7 +42,7 @@ import eu.stratosphere.pact.client.nephele.api.PactProgram;
import eu.stratosphere.pact.client.nephele.api.ProgramInvocationException;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.jobgen.JSONGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
/**
* @author Stephan Ewen (stephan.ewen@tu-berlin.com)
......
......@@ -26,7 +26,7 @@ import javax.servlet.http.HttpServletResponse;
import eu.stratosphere.pact.client.nephele.api.PactProgram;
import eu.stratosphere.pact.compiler.jobgen.JSONGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
/**
* @author Stephan Ewen (stephan.ewen@tu-berlin.com)
......
......@@ -79,9 +79,9 @@ import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS;
/**
......
......@@ -43,7 +43,7 @@ import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
......
......@@ -19,6 +19,7 @@ import java.util.ArrayList;
import eu.stratosphere.pact.common.type.Key;
import eu.stratosphere.pact.common.util.FieldList;
import eu.stratosphere.pact.common.util.FieldSet;
/**
*
......@@ -182,6 +183,18 @@ public class Ordering
return newOrdering;
}
public boolean groupsFields(FieldSet fields) {
if (fields.size() > this.indexes.size()) {
return false;
}
for (int i = 0; i < fields.size(); i++) {
if (!fields.contains(this.indexes.get(i)))
return false;
}
return true;
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
......
......@@ -19,9 +19,11 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -53,11 +55,14 @@ import eu.stratosphere.pact.compiler.plan.DataSinkNode;
import eu.stratosphere.pact.compiler.plan.DataSourceNode;
import eu.stratosphere.pact.compiler.plan.MapNode;
import eu.stratosphere.pact.compiler.plan.MatchNode;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.ReduceNode;
import eu.stratosphere.pact.compiler.plan.SinkJoiner;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.SinkPlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.SourcePlanNode;
/**
* The optimizer that takes the user specified pact plan and creates an optimized plan that contains
......@@ -673,37 +678,36 @@ public class PactCompiler {
rootNode.accept(branchingVisitor);
// the final step is now to generate the actual plan alternatives
List<? extends PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
if (bestPlan.size() != 1) {
throw new CompilerException("Error in compiler: more than one best plan was created!");
}
//
// // check if the best plan's root is a data sink (single sink plan)
// // if so, directly take it. if it is a sink joiner node, get its contained sinks
// OptimizerNode bestPlanRoot = bestPlan.get(0);
// List<DataSinkNode> bestPlanSinks = new ArrayList<DataSinkNode>(4);
//
// if (bestPlanRoot instanceof DataSinkNode) {
// bestPlanSinks.add((DataSinkNode) bestPlanRoot);
// check if the best plan's root is a data sink (single sink plan)
// if so, directly take it. if it is a sink joiner node, get its contained sinks
PlanNode bestPlanRoot = bestPlan.get(0);
List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
if (bestPlanRoot instanceof SinkPlanNode) {
bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
// } else if (bestPlanRoot instanceof SinkJoiner) {
// ((SinkJoiner) bestPlanRoot).getDataSinks(bestPlanSinks);
// }
}
// connect nodes bidirectional
new NodeConnector().connectNodes(bestPlanSinks);
// // connect nodes bidirectional
// new NodeConnector().connectNodes(bestPlanSinks);
//
// // insert temporary dams, as they may be necessary in non-tree graphs to prevent deadlocks
// Configuration config = GlobalConfiguration.getConfiguration();
// new DeadlockResolver(config.getBoolean("channel.network.allowSpilling",true)).resolveDeadlocks(bestPlanSinks);
//
// // finalize the plan
// OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, pactPlan.getJobName(), memoryMegabytes);
// plan.setInstanceTypeName(instanceName);
// plan.setPlanConfiguration(pactPlan.getPlanConfiguration());
// return plan;
return null;
// finalize the plan
OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, pactPlan.getJobName(), memoryMegabytes);
plan.setInstanceTypeName(instanceName);
plan.setPlanConfiguration(pactPlan.getPlanConfiguration());
return plan;
}
/**
......@@ -716,12 +720,13 @@ public class PactCompiler {
*/
public static OptimizedPlan createPreOptimizedPlan(Plan pactPlan)
{
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(null, -1, 1, false);
pactPlan.accept(graphCreator);
OptimizedPlan optPlan = new OptimizedPlan(graphCreator.sources, graphCreator.sinks, graphCreator.con2node.values(),
pactPlan.getJobName());
optPlan.setPlanConfiguration(pactPlan.getPlanConfiguration());
return optPlan;
// GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(null, -1, 1, false);
// pactPlan.accept(graphCreator);
// OptimizedPlan optPlan = new OptimizedPlan(graphCreator.sources, graphCreator.sinks, graphCreator.con2node.values(),
// pactPlan.getJobName());
// optPlan.setPlanConfiguration(pactPlan.getPlanConfiguration());
// return optPlan;
return null;
}
/**
......@@ -948,185 +953,185 @@ public class PactCompiler {
}
};
// /**
// * Utility class that traverses a plan to connect all nodes.
// */
// private static final class NodeConnector implements Visitor<OptimizerNode> {
// private final Set<OptimizerNode> allNodes; // a set of all nodes in the optimizer plan
//
// /**
// * Creates a new node connector.
// */
// private NodeConnector() {
// this.allNodes = new HashSet<OptimizerNode>();
// }
//
// private void connectNodes(List<DataSinkNode> sinks) {
//
// // traverse the graph
// for (DataSinkNode node : sinks) {
// node.accept(this);
// }
// }
//
// /*
// * (non-Javadoc)
// * @see
// * eu.stratosphere.pact.common.plan.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable)
// */
// @Override
// public boolean preVisit(OptimizerNode visitable) {
// // if we come here again, prevent a further descend
// if (!this.allNodes.add(visitable)) {
// return false;
// }
//
// for (PactConnection conn : visitable.getIncomingConnections()) {
// conn.getSourcePact().addOutConn(conn);
// }
//
// return true;
// }
//
// /*
// * (non-Javadoc)
// * @see
// * eu.stratosphere.pact.common.plan.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable)
// */
// @Override
// public void postVisit(OptimizerNode visitable) {
// // do nothing
// }
// }
/**
* Utility class that traverses a plan to connect all nodes.
*/
private static final class NodeConnector implements Visitor<PlanNode>
{
private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
// /**
// * Utility class that traverses a plan to collect all nodes and add them to the OptimizedPlan.
// * Besides collecting all nodes, this traversal assigns the memory to the nodes.
// */
// private static final class PlanFinalizer implements Visitor<OptimizerNode>
// {
// private final Set<OptimizerNode> allNodes; // a set of all nodes in the optimizer plan
//
// private final List<DataSourceNode> sources; // all data source nodes in the optimizer plan
//
// private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
//
// private int memoryConsumers; // a counter of all memory consumers
//
// private int memoryPerInstance; // the amount of memory per instance
//
// /**
// * Creates a new plan finalizer.
// */
// private PlanFinalizer() {
// this.allNodes = new HashSet<OptimizerNode>();
// this.sources = new ArrayList<DataSourceNode>();
// this.sinks = new ArrayList<DataSinkNode>();
// }
//
// private OptimizedPlan createFinalPlan(List<DataSinkNode> sinks, String jobName, int memoryPerInstance)
// {
// if (LOG.isDebugEnabled())
// LOG.debug("Available memory per instance: " + memoryPerInstance);
//
// this.memoryPerInstance = memoryPerInstance;
// this.memoryConsumers = 0;
//
// // traverse the graph
// for (DataSinkNode node : sinks) {
// node.accept(this);
// }
//
// // assign the memory to each node
// if (this.memoryConsumers > 0) {
// final int memoryPerTask = this.memoryPerInstance / this.memoryConsumers;
//
// if (LOG.isDebugEnabled())
// LOG.debug("Memory per consumer: "+memoryPerTask);
//
// for (OptimizerNode node : this.allNodes) {
// final int consumerCount = node.getMemoryConsumerCount();
// if (consumerCount > 0) {
// node.setMemoryPerTask(memoryPerTask * consumerCount);
// if (LOG.isDebugEnabled())
// LOG.debug("Assigned " + (memoryPerTask * consumerCount) + " MB to " +
// node.getPactContract().getName());
// }
// }
// }
//
// return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName);
// }
//
// /*
// * (non-Javadoc)
// * @see
// * eu.stratosphere.pact.common.plan.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable)
// */
// @Override
// public boolean preVisit(OptimizerNode visitable) {
// // if we come here again, prevent a further descend
// if (!this.allNodes.add(visitable)) {
// return false;
// }
//
// for (PactConnection conn : visitable.getIncomingConnections()) {
//
// // check for memory consuming temp connection
// switch(conn.getTempMode()) {
// case NONE:
// // do nothing
// break;
// case TEMP_SENDER_SIDE:
// // reduce available memory
// this.memoryPerInstance -= PactCompiler.DEFAULT_TEMP_TASK_MEMORY *
// conn.getSourcePact().getDegreeOfParallelism();
// LOG.debug("Memory reduced to "+this.memoryPerInstance+ " due to TempTask");
// break;
// case TEMP_RECEIVER_SIDE:
// // reduce available memory
// this.memoryPerInstance -= PactCompiler.DEFAULT_TEMP_TASK_MEMORY *
// conn.getTargetPact().getDegreeOfParallelism();
// LOG.debug("Memory reduced to "+this.memoryPerInstance+ " due to TempTask");
// break;
// }
// }
//
// for (PactConnection conn : visitable.getOutConns()) {
// if(conn.getShipStrategy().type() == ShipStrategyType.PARTITION_RANGE) {
// // One memory consumer for the histogram
// this.memoryConsumers += visitable.getInstancesPerMachine();
// //Reduce available memory because of temp task to avoid spilling
// this.memoryPerInstance -= PactCompiler.DEFAULT_TEMP_TASK_MEMORY *
// conn.getSourcePact().getDegreeOfParallelism();
// //TODO: is this correct reducing memory per INSTANCE by multiplying required
// //memory * the TOTAL DoP?
// LOG.debug("Memory reduced to "+this.memoryPerInstance+ " due to TempTask");
// }
// }
//
// if (visitable instanceof DataSinkNode) {
// this.sinks.add((DataSinkNode) visitable);
// } else if (visitable instanceof DataSourceNode) {
// this.sources.add((DataSourceNode) visitable);
// }
//
// // count the memory consumption
// this.memoryConsumers += visitable.getMemoryConsumerCount() * visitable.getInstancesPerMachine();
//
// return true;
// }
//
// /*
// * (non-Javadoc)
// * @see
// * eu.stratosphere.pact.common.plan.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable)
// */
// @Override
// public void postVisit(OptimizerNode visitable) {
// // do nothing
// }
// }
/**
* Creates a new node connector.
*/
private NodeConnector() {
this.allNodes = new HashSet<PlanNode>();
}
private void connectNodes(List<SinkPlanNode> sinks) {
// traverse the graph
for (SinkPlanNode node : sinks) {
node.accept(this);
}
}
/*
* (non-Javadoc)
* @see
* eu.stratosphere.pact.common.plan.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable)
*/
@Override
public boolean preVisit(PlanNode visitable) {
// if we come here again, prevent a further descend
if (!this.allNodes.add(visitable)) {
return false;
}
for (Iterator<Channel> iter = visitable.getInputs(); iter.hasNext();) {
final Channel conn = iter.next();
conn.getSource().addOutgoingChannel(conn);
}
return true;
}
/*
* (non-Javadoc)
* @see
* eu.stratosphere.pact.common.plan.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable)
*/
@Override
public void postVisit(PlanNode visitable) {
// do nothing
}
}
/**
* Utility class that traverses a plan to collect all nodes and add them to the OptimizedPlan.
* Besides collecting all nodes, this traversal assigns the memory to the nodes.
*/
private static final class PlanFinalizer implements Visitor<PlanNode>
{
private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
private int memoryConsumers; // a counter of all memory consumers
private int memoryPerInstance; // the amount of memory per instance
/**
* Creates a new plan finalizer.
*/
private PlanFinalizer() {
this.allNodes = new HashSet<PlanNode>();
this.sources = new ArrayList<SourcePlanNode>();
this.sinks = new ArrayList<SinkPlanNode>();
}
private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, int memoryPerInstance)
{
if (LOG.isDebugEnabled())
LOG.debug("Available memory per instance: " + memoryPerInstance);
this.memoryPerInstance = memoryPerInstance;
this.memoryConsumers = 0;
// traverse the graph
for (SinkPlanNode node : sinks) {
node.accept(this);
}
// assign the memory to each node
if (this.memoryConsumers > 0) {
final int memoryPerTask = this.memoryPerInstance / this.memoryConsumers;
if (LOG.isDebugEnabled())
LOG.debug("Memory per consumer: "+memoryPerTask);
for (PlanNode node : this.allNodes) {
final int consumerCount = node.getMemoryConsumerCount();
if (consumerCount > 0) {
node.setMemoryPerTask(memoryPerTask * consumerCount);
if (LOG.isDebugEnabled())
LOG.debug("Assigned " + (memoryPerTask * consumerCount) + " MB to " +
node.getPactContract().getName());
}
}
}
return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName);
}
/*
* (non-Javadoc)
* @see
* eu.stratosphere.pact.common.plan.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable)
*/
@Override
public boolean preVisit(OptimizerNode visitable) {
// if we come here again, prevent a further descend
if (!this.allNodes.add(visitable)) {
return false;
}
for (PactConnection conn : visitable.getIncomingConnections()) {
// check for memory consuming temp connection
switch(conn.getTempMode()) {
case NONE:
// do nothing
break;
case TEMP_SENDER_SIDE:
// reduce available memory
this.memoryPerInstance -= PactCompiler.DEFAULT_TEMP_TASK_MEMORY *
conn.getSourcePact().getDegreeOfParallelism();
LOG.debug("Memory reduced to "+this.memoryPerInstance+ " due to TempTask");
break;
case TEMP_RECEIVER_SIDE:
// reduce available memory
this.memoryPerInstance -= PactCompiler.DEFAULT_TEMP_TASK_MEMORY *
conn.getTargetPact().getDegreeOfParallelism();
LOG.debug("Memory reduced to "+this.memoryPerInstance+ " due to TempTask");
break;
}
}
for (PactConnection conn : visitable.getOutConns()) {
if(conn.getShipStrategy().type() == ShipStrategyType.PARTITION_RANGE) {
// One memory consumer for the histogram
this.memoryConsumers += visitable.getInstancesPerMachine();
//Reduce available memory because of temp task to avoid spilling
this.memoryPerInstance -= PactCompiler.DEFAULT_TEMP_TASK_MEMORY *
conn.getSourcePact().getDegreeOfParallelism();
//TODO: is this correct reducing memory per INSTANCE by multiplying required
//memory * the TOTAL DoP?
LOG.debug("Memory reduced to "+this.memoryPerInstance+ " due to TempTask");
}
}
if (visitable instanceof DataSinkNode) {
this.sinks.add((DataSinkNode) visitable);
} else if (visitable instanceof DataSourceNode) {
this.sources.add((DataSourceNode) visitable);
}
// count the memory consumption
this.memoryConsumers += visitable.getMemoryConsumerCount() * visitable.getInstancesPerMachine();
return true;
}
/*
* (non-Javadoc)
* @see
* eu.stratosphere.pact.common.plan.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable)
*/
@Override
public void postVisit(OptimizerNode visitable) {
// do nothing
}
}
/**
* Utility class to resolve pipeline deadlocks in non-tree graphs.
......
......@@ -60,7 +60,7 @@ public abstract class CostEstimator {
public void costOperator(PlanNode n)
{
// initialize costs objects with currently unknown costs
final Costs costs = new Costs();
final Costs costs = new Costs(0, 0);
final long availableMemory = n.getTotalAvailableMemory();
// add the shipping strategy costs
......@@ -86,6 +86,17 @@ public abstract class CostEstimator {
default:
throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy());
}
switch (channel.getLocalStrategy()) {
case NONE:
break;
case SORT:
case COMBININGSORT:
addLocalSortCost(channel, availableMemory, costs);
break;
default:
throw new CompilerException("Unsupported local strategy for input: " + channel.getLocalStrategy());
}
}
Channel firstInput = null;
......
......@@ -31,10 +31,10 @@ import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
import eu.stratosphere.pact.compiler.plan.PactConnection.TempMode;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.UnionNode;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.PartitionShipStrategy;
......
......@@ -59,11 +59,11 @@ import eu.stratosphere.pact.compiler.plan.DataSinkNode;
import eu.stratosphere.pact.compiler.plan.DataSourceNode;
import eu.stratosphere.pact.compiler.plan.MapNode;
import eu.stratosphere.pact.compiler.plan.MatchNode;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
import eu.stratosphere.pact.compiler.plan.ReduceNode;
import eu.stratosphere.pact.compiler.plan.UnionNode;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.runtime.plugable.PactRecordComparatorFactory;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.PartitionShipStrategy;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ShipStrategyType;
......@@ -121,7 +121,7 @@ public class JobGraphGenerator implements Visitor<OptimizerNode> {
}
/**
* Translates a {@link eu.stratosphere.pact.compiler.plan.OptimizedPlan} into a
* Translates a {@link eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan} into a
* {@link eu.stratosphere.nephele.jobgraph.JobGraph}.
* This is an 1-to-1 mapping. No optimization whatsoever is applied.
*
......
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler.plan;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import eu.stratosphere.pact.common.contract.Contract;
import eu.stratosphere.pact.common.contract.ReduceContract;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* TODO: add Java doc
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
*/
public class CombinerNode extends OptimizerNode
{
private PactConnection input;
public CombinerNode(ReduceContract reducer, OptimizerNode predecessor, float reducingFactor) {
super(reducer);
this.input = new PactConnection(predecessor, this, new ForwardSS());
this.setLocalStrategy(LocalStrategy.COMBININGSORT);
this.globalProps = predecessor.globalProps;
this.localProps = predecessor.localProps;
this.setDegreeOfParallelism(predecessor.getDegreeOfParallelism());
this.setInstancesPerMachine(predecessor.getInstancesPerMachine());
// set the estimates
this.estimatedCardinality.putAll(predecessor.estimatedCardinality);
long estKeyCard = getEstimatedCardinality(new FieldSet(getPactContract().getKeyColumnNumbers(0)));
if (predecessor.estimatedNumRecords >= 1 && estKeyCard >= 1
&& predecessor.estimatedOutputSize >= -1) {
this.estimatedNumRecords = (long) (predecessor.estimatedNumRecords * reducingFactor);
this.estimatedOutputSize = (long) (predecessor.estimatedOutputSize * reducingFactor);
} else {
this.estimatedNumRecords = predecessor.estimatedNumRecords;
this.estimatedOutputSize = predecessor.estimatedOutputSize;
}
// copy the child's branch-plan map
if (this.branchPlan == null) {
this.branchPlan = predecessor.branchPlan;
} else if (predecessor.branchPlan != null) {
this.branchPlan.putAll(predecessor.branchPlan);
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getName()
*/
@Override
public String getName() {
return "Combine";
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isMemoryConsumer()
*/
@Override
public int getMemoryConsumerCount() {
switch(this.localStrategy) {
case COMBININGSORT: return 1;
default: return 0;
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getPactContract()
*/
@Override
public ReduceContract getPactContract() {
return (ReduceContract) super.getPactContract();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#setInputs(java.util.Map)
*/
@Override
public void setInputs(Map<Contract, OptimizerNode> contractToNode) {
throw new UnsupportedOperationException();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getIncomingConnections()
*/
@Override
public List<PactConnection> getIncomingConnections() {
return Collections.singletonList(this.input);
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeOutputEstimates(eu.stratosphere.pact.compiler.DataStatistics)
*/
@Override
public void computeOutputEstimates(DataStatistics statistics) {
throw new UnsupportedOperationException();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeInterestingPropertiesForInputs(eu.stratosphere.pact.compiler.costs.CostEstimator)
*/
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
throw new UnsupportedOperationException();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeUnclosedBranchStack()
*/
@Override
public void computeUnclosedBranchStack() {
throw new UnsupportedOperationException();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans(eu.stratosphere.pact.compiler.costs.CostEstimator)
*/
@Override
public List<? extends OptimizerNode> getAlternativePlans(CostEstimator estimator) {
throw new UnsupportedOperationException();
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#accept(eu.stratosphere.pact.common.plan.Visitor)
*/
@Override
public void accept(Visitor<OptimizerNode> visitor) {
if (visitor.preVisit(this)) {
this.input.getSourcePact().accept(visitor);
visitor.postVisit(this);
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isFieldKept(int, int)
*/
@Override
public boolean isFieldKept(int input, int fieldNumber) {
return false;
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#readReadsAnnotation()
*/
@Override
protected void readConstantAnnotation() {
// DO NOTHING
}
}
......@@ -26,8 +26,11 @@ import eu.stratosphere.pact.common.contract.Ordering;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.SinkPlanNode;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
......@@ -35,9 +38,12 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
*
* @author Stephan Ewen
*/
public class DataSinkNode extends OptimizerNode {
public class DataSinkNode extends OptimizerNode
{
protected PactConnection input; // The input edge
private List<PlanNode> cachedPlans; // plan candidate cache
protected PactConnection input; // The input edges
/**
* Creates a new DataSinkNode for the given contract.
......@@ -50,58 +56,29 @@ public class DataSinkNode extends OptimizerNode {
setLocalStrategy(LocalStrategy.NONE);
}
// /**
// * Copy constructor to create a copy of a DataSinkNode with a different predecessor. The predecessor
// * is assumed to be of the same type and merely a copy with different strategies, as they
// * are created in the process of the plan enumeration.
// *
// * @param template
// * The node to create a copy of.
// * @param pred
// * The new predecessor.
// * @param conn
// * The old connection to copy properties from.
// * @param globalProps
// * The global properties of this copy.
// * @param localProps
// * The local properties of this copy.
// */
// protected DataSinkNode(DataSinkNode template, OptimizerNode pred, PactConnection conn,
// GlobalProperties globalProps, LocalProperties localProps) {
// super(template, globalProps, localProps);
//
// this.input = new PactConnection(conn, pred, this);
//
// // copy the child's branch-plan map
// if(pred.branchPlan != null && pred.branchPlan.size() > 0)
// this.branchPlan = new HashMap<OptimizerNode, OptimizerNode>(pred.branchPlan);
// else
// this.branchPlan = null;
// }
// --------------------------------------------------------------------------------------
/**
* Gets the <tt>PactConnection</tt> through which this node receives its input.
*
* @return The input connection.
*/
public PactConnection getInConn() {
public PactConnection getInputConnection() {
return this.input;
}
/**
* Sets the <tt>PactConnection</tt> through which this node receives its input.
*
* @param conn
* The input connection to set.
* @param conn The input connection to set.
*/
public void setInputConnection(PactConnection conn) {
this.input = conn;
}
/**
* TODO
*
*/
public OptimizerNode getPredNode() {
public OptimizerNode getPredecessorNode() {
if(this.input != null) {
return input.getSourcePact();
} else {
......@@ -197,16 +174,16 @@ public class DataSinkNode extends OptimizerNode {
@Override
public void computeOutputEstimates(DataStatistics statistics) {
// we copy the output estimates from the input
if (this.getPredNode() == null) {
if (this.getPredecessorNode() == null) {
throw new CompilerException();
}
if (this.estimatedCardinality.size() > 0)
this.estimatedCardinality.clear();
this.estimatedCardinality.putAll(getPredNode().getEstimatedCardinalities());
this.estimatedNumRecords = getPredNode().getEstimatedNumRecords();
this.estimatedOutputSize = getPredNode().getEstimatedOutputSize();
this.estimatedCardinality.putAll(getPredecessorNode().getEstimatedCardinalities());
this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
}
/*
......@@ -228,7 +205,7 @@ public class DataSinkNode extends OptimizerNode {
// in both cases create a range partitioned only IP
InterestingProperties partitioningProps = new InterestingProperties();
partitioningProps.getGlobalProperties().setPartitioning(PartitioningProperty.RANGE_PARTITIONED, partitioning);
partitioningProps.getGlobalProperties().setRangePartitioned(partitioning);
estimator.addRangePartitionCost(this.input, partitioningProps.getMaximalCosts());
this.input.addInterestingProperties(partitioningProps);
......@@ -241,7 +218,7 @@ public class DataSinkNode extends OptimizerNode {
// global sort case: create IP for range partitioned and sorted
InterestingProperties globalSortProps = new InterestingProperties();
globalSortProps.getGlobalProperties().setPartitioning(PartitioningProperty.RANGE_PARTITIONED, partitioning);
globalSortProps.getGlobalProperties().setRangePartitioned(partitioning);
estimator.addRangePartitionCost(this.input, globalSortProps.getMaximalCosts());
globalSortProps.getLocalProperties().setOrdering(partitioning);
......@@ -274,11 +251,11 @@ public class DataSinkNode extends OptimizerNode {
return;
}
addClosedBranches(this.getPredNode().closedBranchingNodes);
addClosedBranches(getPredecessorNode().closedBranchingNodes);
List<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
// TODO: check if merge is necessary
result = mergeLists(result, this.getPredNode().getBranchesForParent(this));
result = mergeLists(result, getPredecessorNode().getBranchesForParent(this));
this.openBranches = result;
}
......@@ -296,86 +273,49 @@ public class DataSinkNode extends OptimizerNode {
// Recursive Optimization
// --------------------------------------------------------------------------------------------
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans()
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getAlternativePlans(eu.stratosphere.pact.compiler.costs.CostEstimator)
*/
@Override
public List<OptimizerNode> getAlternativePlans(CostEstimator estimator)
public List<PlanNode> getAlternativePlans(CostEstimator estimator)
{
// final Ordering po = getPactContract().getPartitionOrdering();
// final Ordering lo = getPactContract().getLocalOrder();
//
// // the alternative plans are the ones that we have incoming, plus the attached output node
final List<OptimizerNode> outputPlans = new ArrayList<OptimizerNode>();
//
// // step down to all producer nodes and calculate alternative plans
// final List<? extends OptimizerNode> subPlans = this.getPredNode().getAlternativePlans(estimator);
//
// // build all possible alternative plans for this node
// for(OptimizerNode subPlan : subPlans) {
//
// final GlobalProperties gp = subPlan.getGlobalPropertiesForParent(this).createCopy();
// final LocalProperties lp = subPlan.getLocalPropertiesForParent(this).createCopy();
//
// final ShipStrategy ss;
// final LocalStrategy ls;
//
// if (po != null && !po.isMetBy(gp.getOrdering())) {
// // requires global sort
//
// ShipStrategy s = this.input.getShipStrategy();
// if (s.type() == ShipStrategyType.NONE || s.type() == ShipStrategyType.PARTITION_RANGE) {
// // strategy not fixed a priori, or strategy fixed, but valid
// ss = new PartitionRangeSS(po.getInvolvedIndexes());
// } else {
// // strategy is set a priory --> via compiler hint
// // this input plan cannot produce a valid plan
// continue;
// }
//
// gp.setPartitioning(PartitionProperty.RANGE_PARTITIONED, po.getInvolvedIndexes());
// gp.setOrdering(po);
// } else {
// ss = new ForwardSS();
// }
//
// if (lo != null && !lo.isMetBy(lp.getOrdering())) {
// // requires local sort
// if (this.localStrategy == LocalStrategy.NONE || this.localStrategy == LocalStrategy.SORT) {
// // strategy not fixed a priori, or strategy fixed, but valid
// ls = LocalStrategy.SORT;
// } else {
// // strategy is set a priory --> via compiler hint
// // this input plan cannot produce a valid plan
// continue;
// }
//
// lp.setOrdering(lo);
// } else {
// ls = LocalStrategy.NONE;
// }
//
// DataSinkNode ns = new DataSinkNode(this, subPlan, this.input, gp, lp);
// ns.input.setShipStrategy(ss);
// ns.setLocalStrategy(ls);
//
// // set the costs
// estimator.costOperator(ns);
//
// // add the plan
// outputPlans.add(ns);
// }
//
// // prune the plans
// prunePlanAlternatives(outputPlans);
//
// // check if the list does not contain any plan. That may happen, if the channels specify
// // incompatible shipping strategies.
// if (outputPlans.isEmpty()) {
// throw new CompilerException("Could not create a valid plan for the DataSink contract '"
// + getPactContract().getName() + "'. The compiler hints specified incompatible shipping strategies.");
// }
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
}
// calculate alternative subplans for predecessor
List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
List<InterestingProperties> ips = this.input.getInterestingProperties();
for (PlanNode p : subPlans) {
if (ips.isEmpty()) {
// create a simple forwarding channel
Channel c = new Channel(p);
c.setShipStrategy(ShipStrategyType.FORWARD);
c.setLocalStrategy(LocalStrategy.NONE);
outputPlans.add(new SinkPlanNode(this, c));
} else {
for (InterestingProperties ip : ips) {
// create a channel that realizes the properties
Channel c = ip.createChannelRealizingProperties(p);
outputPlans.add(new SinkPlanNode(this, c));
}
}
}
// cost and prune the plans
for (PlanNode node : outputPlans) {
estimator.costOperator(node);
}
prunePlanAlternatives(outputPlans);
// cache the result only if we have multiple outputs --> this function gets invoked multiple times
if (isBranching()) {
this.cachedPlans = outputPlans;
}
return outputPlans;
}
......@@ -411,8 +351,8 @@ public class DataSinkNode extends OptimizerNode {
@Override
public void accept(Visitor<OptimizerNode> visitor) {
if (visitor.preVisit(this)) {
if (this.getPredNode() != null) {
this.getPredNode().accept(visitor);
if (getPredecessorNode() != null) {
getPredecessorNode().accept(visitor);
} else {
throw new CompilerException();
}
......
......@@ -25,6 +25,7 @@ import eu.stratosphere.pact.common.contract.CompilerHints;
import eu.stratosphere.pact.common.contract.Contract;
import eu.stratosphere.pact.common.contract.GenericDataSource;
import eu.stratosphere.pact.common.generic.io.InputFormat;
import eu.stratosphere.pact.common.io.FileInputFormat;
import eu.stratosphere.pact.common.io.statistics.BaseStatistics;
import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.util.FieldSet;
......@@ -262,7 +263,12 @@ public class DataSourceNode extends OptimizerNode
SourcePlanNode candidate = new SourcePlanNode(this);
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
candidate.setCosts(new Costs(0, this.inputSize));
final Costs costs = new Costs(0, 0);
if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatClass())) {
costs.addSecondaryStorageCost(this.inputSize);
}
candidate.setCosts(costs);
// since there is only a single plan for the data-source, return a list with that element only
List<PlanNode> plans = new ArrayList<PlanNode>(1);
......
......@@ -61,28 +61,24 @@ public class MapNode extends SingleInputNode
return "Map";
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#isMemoryConsumer()
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getMemoryConsumerCount()
*/
@Override
public int getMemoryConsumerCount() {
return 0;
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeInterestingProperties()
/* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#computeInterestingPropertiesForInputs(eu.stratosphere.pact.compiler.costs.CostEstimator)
*/
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
// the map itself has no interesting properties.
// check, if there is an output contract that tells us that certain properties are preserved.
// if so, propagate to the child.
List<InterestingProperties> thisNodesIntProps = getInterestingProperties();
List<InterestingProperties> props = InterestingProperties.filterInterestingPropertiesForInput(thisNodesIntProps, this, 0);
if (props.isEmpty()) {
this.inConn.setNoInterestingProperties();
} else {
......
......@@ -33,6 +33,7 @@ import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.stubs.StubAnnotation.OutCardBounds;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
......@@ -80,6 +81,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
protected int instancesPerMachine = -1; // the number of parallel instance that will run on the same machine
private long minimalGuaranteedMemory;
protected int id = -1; // the id for this node.
// ------------------------------------------------------------------------
......@@ -212,7 +215,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* The cost estimator used to estimate the costs of each plan alternative.
* @return A list containing all plan alternatives.
*/
public abstract List<? extends PlanNode> getAlternativePlans(CostEstimator estimator);
public abstract List<PlanNode> getAlternativePlans(CostEstimator estimator);
/**
* This method implements the visit of a depth-first graph traversing visitor. Implementors must first
......@@ -1043,8 +1046,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
return bld.toString();
}
// ============================================================================================
// --------------------------------------------------------------------------------------------
// Branching and Pruning
// --------------------------------------------------------------------------------------------
/**
*
*/
protected static final class UnclosedBranchDescriptor
{
protected OptimizerNode branchingNode;
......@@ -1069,4 +1077,53 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
return this.joinedPathsVector;
}
}
protected void prunePlanAlternatives(List<PlanNode> plans)
{
// for each interesting property, which plans are cheapest
final PlanNode[] toKeep = new PlanNode[this.intProps.size()];
PlanNode cheapest = null; // the overall cheapest plan
// go over all plans from the list
for (PlanNode candidate : plans) {
// check if that plan is the overall cheapest
if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
cheapest = candidate;
}
// find the interesting properties that this plan matches
for (int i = 0; i < this.intProps.size(); i++) {
if (this.intProps.get(i).isMetBy(candidate)) {
final PlanNode previous = toKeep[i];
// the candidate meets them. if it is the first one to meet the interesting properties,
// or the previous one was more expensive, keep it
if (previous == null || previous.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
toKeep[i] = candidate;
}
}
}
}
// all plans are set now
plans.clear();
// add the cheapest plan
if (cheapest != null) {
plans.add(cheapest);
cheapest.setPruningMarker(); // remember that that plan is in the set
}
final Costs cheapestCosts = cheapest.getCumulativeCosts();
// add all others, which are optimal for some interesting properties
for (int i = 0; i < toKeep.length; i++) {
final PlanNode n = toKeep[i];
if (n != null && !n.isPruneMarkerSet()) {
final Costs maxDelta = this.intProps.get(i).getMaximalCosts();
if (!cheapestCosts.isOtherMoreThanDeltaAbove(n.getCumulativeCosts(), maxDelta)) {
n.setPruningMarker();
plans.add(n);
}
}
}
}
}
......@@ -22,9 +22,6 @@ import java.util.List;
import java.util.Map;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
/**
......@@ -43,7 +40,7 @@ public class PactConnection implements EstimateProvider
private List<InterestingProperties> interestingProps; // local properties that succeeding nodes are interested in
private ShipStrategyType shipStrategy; // The data distribution strategy
private ShipStrategyType shipStrategy; // The data distribution strategy, if preset
/**
* Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
......@@ -182,26 +179,6 @@ public class PactConnection implements EstimateProvider
this.interestingProps = Collections.emptyList();
}
/**
* Gets the global properties of the data after this connection.
*
* @return The global data properties of the output data.
*/
// public GlobalProperties getGlobalProperties() {
// return PactConnection.getGlobalPropertiesAfterConnection(this.sourcePact, this.targetPact, this.shipStrategy);
// }
/**
* Gets the local properties of the data after this connection.
*
* @return The local data properties of the output data.
*/
public LocalProperties getLocalProperties() {
return PactConnection.getLocalPropertiesAfterConnection(this.sourcePact, this.targetPact, this.shipStrategy);
}
// --------------------------------------------------------------------------------------------
/* (non-Javadoc)
......@@ -270,41 +247,4 @@ public class PactConnection implements EstimateProvider
return buf.toString();
}
/**
* Gets the local properties of the sources output after it crossed a pact connection with the given
* strategy. Local properties are only maintained on <tt>FORWARD</tt> connections.
*
* @return The properties of the data after a channel using the given strategy.
*/
public static LocalProperties getLocalPropertiesAfterConnection(OptimizerNode source, OptimizerNode target, ShipStrategyType shipMode) {
// LocalProperties lp = source.getLocalPropertiesForParent(target);
//
// if (shipMode == null || shipMode.type() == ShipStrategyType.NONE) {
// throw new CompilerException("Cannot determine properties if shipping strategy is not defined.");
// }
// else if (shipMode.type() == ShipStrategyType.FORWARD) {
// if (source.getDegreeOfParallelism() > target.getDegreeOfParallelism()) {
// // any order is destroyed by the random merging of the inputs
// lp.setOrdering(null);
//// lp.setGrouped(false, null);
// }
// }
// else {
// lp.reset();
// }
//
//// if (lp.isGrouped() == false &&
//// shipMode.type() != ShipStrategyType.BROADCAST &&
//// shipMode.type() != ShipStrategyType.SFR) {
////
//// if (source.getUniqueFields().size() > 0) {
//// //TODO allow list of grouped fields, up to now only add the first one
//// lp.setGrouped(true, source.getUniqueFields().iterator().next());
//// }
//// }
//
// return lp;
return null;
}
}
\ No newline at end of file
......@@ -25,10 +25,13 @@ import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.GlobalProperties;
import eu.stratosphere.pact.compiler.LocalProperties;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.PartitioningProperty;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.compiler.plan.candidate.Channel;
import eu.stratosphere.pact.compiler.plan.candidate.PlanNode;
import eu.stratosphere.pact.compiler.plan.candidate.SingleInputPlanNode;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
......@@ -37,17 +40,16 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
* @author Stephan Ewen
* @author Fabian Hueske
*/
public class ReduceNode extends SingleInputNode {
private float combinerReducingFactor = 1.0f; // the factor by which the combiner reduces the data
public class ReduceNode extends SingleInputNode
{
/**
* Creates a new ReduceNode for the given contract.
*
* @param pactContract
* The reduce contract object.
*/
public ReduceNode(ReduceContract pactContract) {
public ReduceNode(ReduceContract pactContract)
{
super(pactContract);
// see if an internal hint dictates the strategy to use
......@@ -67,27 +69,6 @@ public class ReduceNode extends SingleInputNode {
}
}
// /**
// * Copy constructor to create a copy of a ReduceNode with a different predecessor. The predecessor
// * is assumed to be of the same type and merely a copy with different strategies, as they
// * are created in the process of the plan enumeration.
// *
// * @param template
// * The ReduceNode to create a copy of.
// * @param pred
// * The new predecessor.
// * @param conn
// * The old connection to copy properties from.
// * @param globalProps
// * The global properties of this copy.
// * @param localProps
// * The local properties of this copy.
// */
// protected ReduceNode(ReduceNode template, OptimizerNode pred, PactConnection conn, GlobalProperties globalProps,
// LocalProperties localProps) {
// super(template, pred, conn, globalProps, localProps);
// }
// ------------------------------------------------------------------------
/**
......@@ -110,30 +91,6 @@ public class ReduceNode extends SingleInputNode {
return getPactContract().isCombinable();
}
/**
* Provides the optimizers decision whether an external combiner should be used or not.
* Current implementation is based on heuristics!
*
* @return True, if an external combiner should be used, False otherwise
*/
public boolean useExternalCombiner() {
// if (!isCombineable()) {
// return false;
// }
// // else
//
// if(this.inConn.getShipStrategy().type() == ShipStrategyType.PARTITION_HASH) {
// return true;
// }
//
// if(this.inConn.getShipStrategy().type() == ShipStrategyType.PARTITION_RANGE) {
// return true;
// }
// strategy is neither PARTITION_HASH nor PARTITION_RANGE
return false;
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.compiler.plan.OptimizerNode#getName()
......@@ -171,14 +128,14 @@ public class ReduceNode extends SingleInputNode {
// add the first interesting properties: partitioned and grouped
InterestingProperties ip1 = new InterestingProperties();
ip1.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, this.keys);
ip1.getGlobalProperties().setAnyPartitioning(this.keys);
ip1.getLocalProperties().setGroupedFields(this.keys);
estimator.addHashPartitioningCost(this.inConn, ip1.getMaximalCosts());
estimator.addLocalSortCost(this.inConn, -1, ip1.getMaximalCosts());
// add the second interesting properties: partitioned only
InterestingProperties ip2 = new InterestingProperties();
ip2.getGlobalProperties().setPartitioning(PartitioningProperty.ANY, this.keys);
ip2.getGlobalProperties().setAnyPartitioning(this.keys);
estimator.addHashPartitioningCost(this.inConn, ip2.getMaximalCosts());
InterestingProperties.mergeUnionOfInterestingProperties(props, ip1);
......@@ -187,126 +144,38 @@ public class ReduceNode extends SingleInputNode {
}
@Override
protected void computeValidPlanAlternatives(List<? extends PlanNode> altSubPlans,
CostEstimator estimator, List<PlanNode> outputPlans)
protected void createPlanAlternatives(List<Channel> inputs, List<PlanNode> outputPlans)
{
//
// FieldSet keySet = new FieldSet(getPactContract().getKeyColumnNumbers(0));
//
// // we have to check if all shipStrategies are the same or at least compatible
// ShipStrategy ss = new NoneSS();
//
// // check hint and use if available
// ShipStrategy hintSS = this.inConn.getShipStrategy();
//
// if(hintSS.type() == ShipStrategyType.BROADCAST || hintSS.type() == ShipStrategyType.SFR)
// // invalid strategy: we do not produce an alternative node
// return;
// else
// ss = hintSS;
//
// for(OptimizerNode subPlan : altSubPlans) {
//
// GlobalProperties gp;
// LocalProperties lp;
//
// if (ss.type() == ShipStrategyType.NONE) {
//
// gp = subPlan.getGlobalPropertiesForParent(this);
// lp = subPlan.getLocalPropertiesForParent(this);
//
// if ((partitioningIsOnRightFields(gp) && gp.getPartitioning().isPartitioned())
// || isFieldSetUnique(keySet, 0) ){
// ss = new ForwardSS();
// } else {
// ss = new PartitionHashSS(this.keyList);
// }
//
// gp = PactConnection.getGlobalPropertiesAfterConnection(subPlan, this, 0, ss);
// lp = PactConnection.getLocalPropertiesAfterConnection(subPlan, this, ss);
//
// } else {
// // fixed strategy
// gp = PactConnection.getGlobalPropertiesAfterConnection(subPlan, this, 0, ss);
// lp = PactConnection.getLocalPropertiesAfterConnection(subPlan, this, ss);
//
// if (!((partitioningIsOnRightFields(gp) && gp.getPartitioning().isPartitioned())
// || isFieldSetUnique(keySet, 0))) {
// // the shipping strategy is fixed to a value that does not leave us with
// // the necessary properties. this candidate cannot produce a valid child
// continue;
// }
// }
//
// boolean localStrategyNeeded = false;
// if (lp.getOrdering() == null || lp.getOrdering().groupsFieldSet(keySet) == false) {
// localStrategyNeeded = true;
// }
//
//// if (localStrategyNeeded && lp.isGrouped() == true) {
//// localStrategyNeeded = !lp.getGroupedFields().equals(keySet);
//// }
//
// if (localStrategyNeeded) {
// localStrategyNeeded = !isFieldSetUnique(keySet, 0);
// }
//
//
// LocalStrategy ls = getLocalStrategy();
//
// // see, whether we need a local strategy
// if (localStrategyNeeded) {
//
// // we need one
// if (ls != LocalStrategy.NONE) {
// if (ls != LocalStrategy.COMBININGSORT && ls != LocalStrategy.SORT) {
// // no valid plan possible
// continue;
// }
// }
// // local strategy free to choose
// else {
// ls = isCombineable() ? LocalStrategy.COMBININGSORT : LocalStrategy.SORT;
// }
// }
//
// // adapt the local properties
// if (ls == LocalStrategy.COMBININGSORT || ls == LocalStrategy.SORT) {
// Ordering ordering = new Ordering();
// for (Integer index :keySet) {
// ordering.appendOrdering(index, null, Order.ASCENDING);
// }
// lp.setOrdering(ordering);
// lp.setGroupedFields(keySet);
// }
//
// // ----------------------------------------------------------------
// // see, if we have a combiner before shipping
//
// OptimizerNode reducePred = subPlan;
//
// if (isCombineable() && ss.type() != ShipStrategyType.FORWARD) {
// // this node contains the estimates for the costs of the combiner,
// // as well as the updated size and cardinality estimates
//
// OptimizerNode combiner = new CombinerNode(getPactContract(), subPlan, this.combinerReducingFactor);
// combiner.setDegreeOfParallelism(subPlan.getDegreeOfParallelism());
// estimator.costOperator(combiner);
// reducePred = combiner;
// }
//
// ReduceNode n = new ReduceNode(this, reducePred, this.inConn, gp, lp);
// n.inConn.setShipStrategy(ss);
// n.setLocalStrategy(ls);
//
// // compute, which of the properties survive, depending on the output contract
// n.getGlobalProperties().filterByNodesConstantSet(this, 0);
// n.getLocalProperties().filterByNodesConstantSet(this, 0);
//
// estimator.costOperator(n);
//
// outputPlans.add(n);
// }
final LocalStrategy defaultStrat = isCombineable() ?
(this.localStrategy != LocalStrategy.NONE ? this.localStrategy : LocalStrategy.COMBININGSORT) :
LocalStrategy.SORT;
for (Channel c : inputs) {
final GlobalProperties gprops = c.getGlobalProperties();
final LocalProperties lprops = c.getLocalProperties();
// check that the partitioning is correct
if (gprops.getPartitioning().isPartitionedOnKey() && this.keys.isValidSubset(gprops.getPartitionedFields())) {
// check that the order is there
if (lprops.getOrdering() != null && lprops.getOrdering().groupsFields(this.keys)) {
if (c.getShipStrategy() == ShipStrategyType.FORWARD) {
// valid candidate. change local strategy according to the hint
c.setLocalStrategy(defaultStrat);
outputPlans.add(new SingleInputPlanNode(this, c, LocalStrategy.NONE));
} else {
// plug in a combiner
Channel toCombiner = new Channel(c.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD);
toCombiner.setLocalStrategy(LocalStrategy.COMBININGSORT, c.getLocalStrategyKeys(), c.getLocalStrategySortOrder());
SingleInputPlanNode combiner = new SingleInputPlanNode(this, toCombiner, LocalStrategy.NONE);
Channel toReducer = new Channel(combiner);
toReducer.setShipStrategy(c.getShipStrategy(), c.getShipStrategyKeys(), c.getShipStrategySortOrder());
toReducer.setLocalStrategy(defaultStrat, c.getLocalStrategyKeys(), c.getLocalStrategySortOrder());
outputPlans.add(new SingleInputPlanNode(this, toReducer, LocalStrategy.NONE));
}
}
}
}
}
/**
......@@ -316,9 +185,9 @@ public class ReduceNode extends SingleInputNode {
*/
protected long computeNumberOfProcessedKeys() {
if(this.getPredNode() != null) {
if (getPredecessorNode() != null) {
// return key count of predecessor
return this.getPredNode().getEstimatedCardinality(this.keys);
return getPredecessorNode().getEstimatedCardinality(this.keys);
} else
return -1;
}
......@@ -333,36 +202,34 @@ public class ReduceNode extends SingleInputNode {
return 1;
}
/**
* TODO
*/
private void computeCombinerReducingFactor() {
if (!isCombineable())
return;
long numRecords = 0;
if (this.getPredNode() != null && this.getPredNode().estimatedNumRecords != -1)
numRecords = this.getPredNode().estimatedNumRecords;
else
return;
long numKeys = computeNumberOfProcessedKeys();
if(numKeys == -1)
return;
int parallelism = getDegreeOfParallelism();
if (parallelism < 1)
parallelism = 32;
float inValsPerKey = numRecords / (float)numKeys;
float valsPerNode = inValsPerKey / parallelism;
// each node will process at least one key
if (valsPerNode < 1)
valsPerNode = 1;
this.combinerReducingFactor = 1 / valsPerNode;
}
// private void computeCombinerReducingFactor() {
// if (!isCombineable())
// return;
//
// long numRecords = 0;
//
// if (getPredecessorNode() != null && getPredecessorNode().estimatedNumRecords != -1)
// numRecords = getPredecessorNode().estimatedNumRecords;
// else
// return;
//
// long numKeys = computeNumberOfProcessedKeys();
// if(numKeys == -1)
// return;
//
// int parallelism = getDegreeOfParallelism();
// if (parallelism < 1)
// parallelism = 32;
//
// float inValsPerKey = numRecords / (float)numKeys;
// float valsPerNode = inValsPerKey / parallelism;
// // each node will process at least one key
// if (valsPerNode < 1)
// valsPerNode = 1;
//
// this.combinerReducingFactor = 1 / valsPerNode;
// }
/**
* Computes the number of stub calls.
......@@ -398,22 +265,7 @@ public class ReduceNode extends SingleInputNode {
}
super.computeOutputEstimates(statistics);
// check if preceding node is available
this.computeCombinerReducingFactor();
}
public boolean partitioningIsOnRightFields(GlobalProperties gp) {
// FieldList partitionedFields = gp.getPartitionedFields();
// if (partitionedFields == null || partitionedFields.size() == 0) {
// return false;
// }
//
// if (gp.getPartitioning() == PartitionProperty.RANGE_PARTITIONED) {
// return this.keyList.equals(partitionedFields);
// }
//
// return this.keyList.containsAll(partitionedFields);
return false;
// this.computeCombinerReducingFactor();
}
@Override
......
......@@ -204,7 +204,10 @@ public abstract class SingleInputNode extends OptimizerNode
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
createPlanAlternatives(candidates, outputPlans);
// prune the plans
// cost and prune the plans
for (PlanNode node : outputPlans) {
estimator.costOperator(node);
}
prunePlanAlternatives(outputPlans);
// cache the result only if we have multiple outputs --> this function gets invoked multiple times
......
......@@ -21,7 +21,6 @@ import eu.stratosphere.pact.common.contract.DualInputContract;
import eu.stratosphere.pact.common.generic.AbstractStub;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.costs.CostEstimator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy.ForwardSS;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
......@@ -30,7 +29,7 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
* transform it into a plan with a single root node. That way, the code that makes sure no costs are double-counted and that
* candidate selection works correctly with nodes that have multiple outputs is transparently reused.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Stephan Ewen
*/
public class SinkJoiner extends TwoInputNode
{
......
......@@ -68,7 +68,7 @@ public class Channel implements EstimateProvider
private TempMode tempMode;
private int replicationFactor;
private int replicationFactor = 1;
// --------------------------------------------------------------------------------------------
......@@ -110,6 +110,21 @@ public class Channel implements EstimateProvider
this.shipSortOrder = sortDirection;
}
public ShipStrategyType getShipStrategy() {
return this.shipStrategy;
}
public FieldList getShipStrategyKeys() {
return this.shipKeys;
}
public boolean[] getShipStrategySortOrder() {
return this.shipSortOrder;
}
public void setLocalStrategy(LocalStrategy strategy) {
setLocalStrategy(strategy, null, null);
}
......@@ -124,12 +139,16 @@ public class Channel implements EstimateProvider
this.localSortOrder = sortDirection;
}
public ShipStrategyType getShipStrategy() {
return this.shipStrategy;
public LocalStrategy getLocalStrategy() {
return this.localStrategy;
}
public FieldList getShipStrategyKeys() {
return this.shipKeys;
public FieldList getLocalStrategyKeys() {
return this.localKeys;
}
public boolean[] getLocalStrategySortOrder() {
return this.localSortOrder;
}
......@@ -154,6 +173,15 @@ public class Channel implements EstimateProvider
this.tempMode = tempMode;
}
/**
* Sets the replication factor of the connection.
*
* @param factor The replication factor of the connection.
*/
public void setReplicationFactor(int factor) {
this.replicationFactor = factor;
}
/**
* Returns the replication factor of the connection.
*
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler.plan;
package eu.stratosphere.pact.compiler.plan.candidate;
import java.util.Collection;
......@@ -26,24 +26,25 @@ import eu.stratosphere.pact.common.plan.Visitor;
* It works on this representation during its optimization. Finally, this plan is translated to a schedule
* for the nephele runtime system.
*
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
* @author Fabian Hüske (fabian.hueske@tu-berlin.de)
* @author Stephan Ewen
* @author Fabian Hüske
*/
public class OptimizedPlan implements Visitable<OptimizerNode> {
public class OptimizedPlan implements Visitable<PlanNode>
{
/**
* The data sources in the plan.
*/
private final Collection<DataSourceNode> dataSources;
private final Collection<SourcePlanNode> dataSources;
/**
* The data sinks in the plan.
*/
private final Collection<DataSinkNode> dataSinks;
private final Collection<SinkPlanNode> dataSinks;
/**
* All nodes in the optimizer plan.
*/
private final Collection<OptimizerNode> allNodes;
private final Collection<PlanNode> allNodes;
/**
* Name of the PACT job
......@@ -73,8 +74,9 @@ public class OptimizedPlan implements Visitable<OptimizerNode> {
* @param jobName
* The name of the PACT job
*/
public OptimizedPlan(Collection<DataSourceNode> sources, Collection<DataSinkNode> sinks,
Collection<OptimizerNode> allNodes, String jobName) {
public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks,
Collection<PlanNode> allNodes, String jobName)
{
this.dataSources = sources;
this.dataSinks = sinks;
this.allNodes = allNodes;
......@@ -86,7 +88,7 @@ public class OptimizedPlan implements Visitable<OptimizerNode> {
*
* @return The data sources.
*/
public Collection<DataSourceNode> getDataSources() {
public Collection<SourcePlanNode> getDataSources() {
return dataSources;
}
......@@ -95,7 +97,7 @@ public class OptimizedPlan implements Visitable<OptimizerNode> {
*
* @return The data sinks.
*/
public Collection<DataSinkNode> getDataSinks() {
public Collection<SinkPlanNode> getDataSinks() {
return dataSinks;
}
......@@ -104,7 +106,7 @@ public class OptimizedPlan implements Visitable<OptimizerNode> {
*
* @return All nodes.
*/
public Collection<OptimizerNode> getAllNodes() {
public Collection<PlanNode> getAllNodes() {
return allNodes;
}
......@@ -164,8 +166,8 @@ public class OptimizedPlan implements Visitable<OptimizerNode> {
* @see eu.stratosphere.pact.common.plan.Visitable#accept(eu.stratosphere.pact.common.plan.Visitor)
*/
@Override
public void accept(Visitor<OptimizerNode> visitor) {
for (DataSinkNode node : dataSinks) {
public void accept(Visitor<PlanNode> visitor) {
for (SinkPlanNode node : this.dataSinks) {
node.accept(visitor);
}
}
......
......@@ -15,10 +15,13 @@
package eu.stratosphere.pact.compiler.plan.candidate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import eu.stratosphere.pact.common.contract.Contract;
import eu.stratosphere.pact.common.plan.Visitable;
import eu.stratosphere.pact.common.util.FieldSet;
import eu.stratosphere.pact.compiler.Costs;
......@@ -36,6 +39,8 @@ public abstract class PlanNode implements Visitable<PlanNode>
{
protected final OptimizerNode template;
protected final List<Channel> outChannels;
private final LocalStrategy localStrategy; // The local strategy (sorting / hashing, ...)
......@@ -57,7 +62,7 @@ public abstract class PlanNode implements Visitable<PlanNode>
private int memoryPerTask; // the amount of memory dedicated to each task, in MiBytes
protected boolean pFlag; // flag for the internal pruning algorithm
private boolean pFlag; // flag for the internal pruning algorithm
// --------------------------------------------------------------------------------------------
......@@ -72,6 +77,7 @@ public abstract class PlanNode implements Visitable<PlanNode>
public PlanNode(OptimizerNode template, LocalStrategy strategy, LocalProperties localProps, GlobalProperties globalProps)
{
this.outChannels = new ArrayList<Channel>(2);
this.template = template;
this.localStrategy = strategy == null ? LocalStrategy.NONE : strategy;
this.localProps = localProps;
......@@ -83,6 +89,24 @@ public abstract class PlanNode implements Visitable<PlanNode>
// Accessors
// --------------------------------------------------------------------------------------------
/**
* Gets the optimizer's pact node for which this plan candidate node was created.
*
* @return The template optimizer's node.
*/
public OptimizerNode getOriginalOptimizerNode() {
return this.template;
}
/**
* Gets the pact contract this node represents in the plan.
*
* @return The pact contract this node represents in the plan.
*/
public Contract getPactContract() {
return this.template.getPactContract();
}
/**
* Gets the memory dedicated to each task for this node.
*
......@@ -177,13 +201,31 @@ public abstract class PlanNode implements Visitable<PlanNode>
}
// --------------------------------------------------------------------------------------------
// Input and Predecessors
// Input, Predecessors, Successors
// --------------------------------------------------------------------------------------------
public abstract Iterator<Channel> getInputs();
public abstract Iterator<PlanNode> getPredecessors();
/**
* Adds a channel to a successor node to this node.
*
* @param channel The channel to the successor.
*/
public void addOutgoingChannel(Channel channel) {
this.outChannels.add(channel);
}
/**
* Gets a list of all outgoing channels leading to successors.
*
* @return A list of all channels leading to successors.
*/
public List<Channel> getOutgoingChannels() {
return this.outChannels;
}
// --------------------------------------------------------------------------------------------
// Branching and Pruning
// --------------------------------------------------------------------------------------------
......@@ -201,7 +243,7 @@ public abstract class PlanNode implements Visitable<PlanNode>
// protected boolean areBranchCompatible(PlanNode subPlan1, PlanNode subPlan2)
// {
// if (subPlan1 == null || subPlan2 == null)
// throw new CompilerException("SubPlans may not be null.");
// throw new NullPointerException();
//
// // if there is no open branch, the children are always compatible.
// // in most plans, that will be the dominant case
......@@ -212,7 +254,7 @@ public abstract class PlanNode implements Visitable<PlanNode>
// final PlanNode nodeToCompare = subPlan1.branchPlan.get(this.lastJoinedBranchNode);
// return nodeToCompare == subPlan2.branchPlan.get(this.lastJoinedBranchNode);
// }
//
// /**
// * Takes the given list of plans that are candidates for this node in the final plan and retains for each distinct
// * set of interesting properties only the cheapest plan.
......@@ -272,103 +314,6 @@ public abstract class PlanNode implements Visitable<PlanNode>
// plans.clear();
// plans.addAll(result);
// }
// }
//
// private final <T extends OptimizerNode> void prunePlansWithCommonBranchAlternatives(List<T> plans) {
// List<List<T>> toKeep = new ArrayList<List<T>>(this.intProps.size()); // for each interesting property, which plans
// // are cheapest
// for (int i = 0; i < this.intProps.size(); i++) {
// toKeep.add(null);
// }
//
// T cheapest = null; // the overall cheapest plan
//
// // go over all plans from the list
// for (T candidate : plans) {
// // check if that plan is the overall cheapest
// if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
// cheapest = candidate;
// }
//
// // find the interesting properties that this plan matches
// for (int i = 0; i < this.intProps.size(); i++) {
// if (this.intProps.get(i).isMetBy(candidate)) {
// // the candidate meets them
// if (toKeep.get(i) == null) {
// // first one to meet the interesting properties, so store it
// List<T> l = new ArrayList<T>(2);
// l.add(candidate);
// toKeep.set(i, l);
// } else {
// // others met that one before
// // see if that one is more expensive and not more general than
// // one of the others. If so, drop it.
// List<T> l = toKeep.get(i);
// boolean met = false;
// boolean replaced = false;
//
// for (int k = 0; k < l.size(); k++) {
// T other = l.get(k);
//
// // check if the candidate is both cheaper and at least as general
// if (other.getGlobalProperties().isMetBy(candidate.getGlobalProperties())
// && other.getLocalProperties().isMetBy(candidate.getLocalProperties())
// && other.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
// // replace that one with the candidate
// l.set(k, replaced ? null : candidate);
// replaced = true;
// met = true;
// } else {
// // check if the previous plan is more general and not more expensive than the candidate
// met |= (candidate.getGlobalProperties().isMetBy(other.getGlobalProperties())
// && candidate.getLocalProperties().isMetBy(other.getLocalProperties()) && candidate
// .getCumulativeCosts().compareTo(other.getCumulativeCosts()) >= 0);
// }
// }
//
// if (!met) {
// l.add(candidate);
// }
// }
// }
// }
// }
//
// // all plans are set now
// plans.clear();
//
// // add the cheapest plan
// if (cheapest != null) {
// plans.add(cheapest);
// cheapest.pFlag = true; // remember that that plan is in the set
// }
//
// Costs cheapestCosts = cheapest.cumulativeCosts;
//
// // add all others, which are optimal for some interesting properties
// for (int i = 0; i < toKeep.size(); i++) {
// List<T> l = toKeep.get(i);
//
// if (l != null) {
// Costs maxDelta = this.intProps.get(i).getMaximalCosts();
//
// for (T plan : l) {
// if (plan != null && !plan.pFlag) {
// plan.pFlag = true;
//
// // check, if that plan is not more than the delta above the costs of the
// if (!cheapestCosts.isOtherMoreThanDeltaAbove(plan.getCumulativeCosts(), maxDelta)) {
// plans.add(plan);
// }
// }
// }
// }
// }
//
// // reset the flags
// for (T p : plans) {
// p.pFlag = false;
// }
// }
// --------------------------------------------------------------------------------------------
......@@ -389,4 +334,20 @@ public abstract class PlanNode implements Visitable<PlanNode>
this.localProps.setUniqueFields(unique);
}
}
/**
* Sets the pruning marker to true.
*/
public void setPruningMarker() {
this.pFlag = true;
}
/**
* Checks whether the pruning marker was set.
*
* @return True, if the pruning marker was set, false otherwise.
*/
public boolean isPruneMarkerSet() {
return this.pFlag;
}
}
......@@ -22,6 +22,7 @@ import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.compiler.CompilerException;
import eu.stratosphere.pact.compiler.Costs;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
......@@ -41,6 +42,10 @@ public class SingleInputPlanNode extends PlanNode
{
super(template, localStrategy);
this.input = input;
if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
this.input.setReplicationFactor(getDegreeOfParallelism());
}
}
// --------------------------------------------------------------------------------------------
......
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler.plan.candidate;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.runtime.task.util.TaskConfig.LocalStrategy;
/**
* Plan candidate node for data flow sinks.
*
* @author Stephan Ewen
*/
public class SinkPlanNode extends SingleInputPlanNode
{
/**
* Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that
* local sorting and range partitioning are handled by the incoming channel already.
*
* @param template The template optimizer node that this candidate is created for.
*/
public SinkPlanNode(OptimizerNode template, Channel input) {
super(template, input, LocalStrategy.NONE);
}
}
......@@ -43,7 +43,7 @@ import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.util.DummyCoGroupStub;
import eu.stratosphere.pact.compiler.util.DummyCrossStub;
import eu.stratosphere.pact.compiler.util.DummyInputFormat;
......
......@@ -39,9 +39,9 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.MatchNode;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.util.DummyInputFormat;
import eu.stratosphere.pact.compiler.util.DummyMatchStub;
import eu.stratosphere.pact.compiler.util.DummyOutputFormat;
......
......@@ -37,10 +37,10 @@ import eu.stratosphere.pact.common.plan.Visitor;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.OptimizerNode;
import eu.stratosphere.pact.compiler.plan.PactConnection;
import eu.stratosphere.pact.compiler.plan.ReduceNode;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.compiler.util.DummyInputFormat;
import eu.stratosphere.pact.compiler.util.DummyOutputFormat;
import eu.stratosphere.pact.compiler.util.IdentityReduce;
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.nephele.util.StringUtils;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.util.Constants;
import eu.stratosphere.pact.test.util.minicluster.ClusterProvider;
import eu.stratosphere.pact.test.util.minicluster.ClusterProviderPool;
......
......@@ -42,7 +42,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.util.TestBase;
/**
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -42,7 +42,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -36,7 +36,7 @@ import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -40,7 +40,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
import eu.stratosphere.pact.test.contracts.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import eu.stratosphere.pact.test.util.FailingTestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.graph.EnumTriangles;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -46,7 +46,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.parser.DecimalTextIntParser;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.util.TestBase;
@RunWith(Parameterized.class)
......
......@@ -46,7 +46,7 @@ import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.type.base.parser.DecimalTextIntParser;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.util.TestBase;
@RunWith(Parameterized.class)
......
......@@ -33,7 +33,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.datamining.KMeansIteration;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.testPrograms.mergeOnlyJoin.MergeOnlyJoin;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.graph.PairwiseSP;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.testPrograms.tpch10.TPCHQuery10;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.relational.TPCHQuery3;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.testPrograms.tpch3Unioned.TPCHQuery3Unioned;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.testPrograms.tpch4.TPCHQuery4;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.test.testPrograms.tpch9.TPCHQuery9;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.relational.TPCHQueryAsterix;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -33,7 +33,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.sort.TeraSort;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.relational.WebLogAnalysis;
import eu.stratosphere.pact.test.util.TestBase;
......
......@@ -27,9 +27,10 @@ import org.junit.runners.Parameterized.Parameters;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.common.plan.Plan;
import eu.stratosphere.pact.compiler.DataStatistics;
import eu.stratosphere.pact.compiler.PactCompiler;
import eu.stratosphere.pact.compiler.jobgen.JobGraphGenerator;
import eu.stratosphere.pact.compiler.plan.OptimizedPlan;
import eu.stratosphere.pact.compiler.plan.candidate.OptimizedPlan;
import eu.stratosphere.pact.example.wordcount.WordCount;
import eu.stratosphere.pact.test.util.TestBase;
......@@ -205,7 +206,7 @@ public class WordCountITCase extends TestBase {
.getURIPrefix()
+ textPath, getFilesystemProvider().getURIPrefix() + resultPath);
PactCompiler pc = new PactCompiler();
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册