提交 86d206c4 编写于 作者: T Till Rohrmann 提交者: Stephan Ewen

Rework the Taskmanager to a slot based model and remove legacy cloud code

Squashed commit of the following:

  - Post merge cleanup
  - Renamed fractionMemory into memoryFraction.
  - Removed Local and QueueScheduler and replaced it instead with an unified DefaultScheduler.
  - Removed Local and ClusterManager and inserted instead an unified DefaultInstanceManager.
  - Removed connection IDs from execution edges
  - Removed InstanceType, InstanceRequestMap, InstanceTypeDescription, InstanceTypeDescriptionTypeFactory, PendingRequestsMap
  - Fixed problems with test cases.
  - introduced simple slot system for scheduling.
  - Removed subtasks per instance
  - Added registerTaskManager to the JobManager RPC calls. RegisterTaskManager is called only once where the hardware description information is sent.

Add: Merging cloudmodel remove with new network stack
上级 7b6b5a2e
......@@ -47,6 +47,7 @@ public class AvroExternalJarProgramITCase {
try {
testMiniCluster = new NepheleMiniCluster();
testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
testMiniCluster.setTaskManagerNumSlots(4);
testMiniCluster.start();
String jarFile = JAR_FILE;
......
......@@ -42,6 +42,8 @@ public class LocalExecutor extends PlanExecutor {
private static boolean DEFAULT_OVERWRITE = false;
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
private final Object lock = new Object(); // we lock to ensure singleton execution
private NepheleMiniCluster nephele;
......@@ -54,6 +56,8 @@ public class LocalExecutor extends PlanExecutor {
private int taskManagerDataPort = -1;
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private String configDir;
private String hdfsConfigFile;
......@@ -129,6 +133,10 @@ public class LocalExecutor extends PlanExecutor {
public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
// --------------------------------------------------------------------------------------------
......@@ -157,6 +165,7 @@ public class LocalExecutor extends PlanExecutor {
}
nephele.setDefaultOverwriteFiles(defaultOverwriteFiles);
nephele.setDefaultAlwaysCreateDirectory(defaultAlwaysCreateDirectory);
nephele.setTaskManagerNumSlots(taskManagerNumSlots);
// start it up
this.nephele.start();
......
......@@ -46,6 +46,8 @@ public class NepheleMiniCluster {
private static final boolean DEFAULT_LAZY_MEMORY_ALLOCATION = true;
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
// --------------------------------------------------------------------------------------------
private final Object startStopLock = new Object();
......@@ -56,7 +58,9 @@ public class NepheleMiniCluster {
private int taskManagerDataPort = DEFAULT_TM_DATA_PORT;
private int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
private int numTaskTracker = DEFAULT_NUM_TASK_MANAGER;
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private long memorySize = DEFAULT_MEMORY_SIZE;
......@@ -149,9 +153,13 @@ public class NepheleMiniCluster {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}
public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
public int getNumTaskTracker() { return numTaskTracker; }
public int getNumTaskManager() { return numTaskManager; }
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
// ------------------------------------------------------------------------
// Life cycle and Job Submission
......@@ -172,7 +180,7 @@ public class NepheleMiniCluster {
} else {
Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
defaultAlwaysCreateDirectory, numTaskManager);
defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskTracker);
GlobalConfiguration.includeConfiguration(conf);
}
......@@ -196,7 +204,7 @@ public class NepheleMiniCluster {
// start the job manager
jobManager = new JobManager(ExecutionMode.LOCAL);
waitForJobManagerToBecomeReady(numTaskManager);
waitForJobManagerToBecomeReady(numTaskTracker);
}
}
......@@ -236,7 +244,8 @@ public class NepheleMiniCluster {
public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory,
boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory, int numTaskManager)
boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory,
int taskManagerNumSlots, int numTaskManager)
{
final Configuration config = new Configuration();
......@@ -284,6 +293,8 @@ public class NepheleMiniCluster {
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize/numTaskManager);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
return config;
}
......
......@@ -77,7 +77,7 @@ public class Client {
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
// Disable Local Execution when using a Client
ContextEnvironment.disableLocalExecution();
......@@ -104,8 +104,7 @@ public class Client {
throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
}
final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port);
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
// Disable Local Execution when using a Client
ContextEnvironment.disableLocalExecution();
......
......@@ -21,7 +21,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.junit.Assert;
......@@ -34,8 +33,6 @@ import eu.stratosphere.nephele.client.JobProgressResult;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.RecentJobEvent;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
......@@ -202,18 +199,18 @@ public class CliFrontendListCancelTest {
}
@Override
public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() throws IOException {
public void logBufferUtilization(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void logBufferUtilization(JobID jobID) throws IOException {
public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
public int getAvailableSlots() {
return 1;
}
}
}
......@@ -70,9 +70,10 @@ public class WordCount {
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
@SuppressWarnings("serial")
public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
......
......@@ -95,14 +95,20 @@ public class DefaultCostEstimator extends CostEstimator {
@Override
public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
// assumption: we need ship the whole data over the network to each node.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.setNetworkCost(Costs.UNKNOWN);
// if our replication factor is negative, we cannot calculate broadcast costs
if (replicationFactor > 0) {
// assumption: we need ship the whole data over the network to each node.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 200);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor * 100);
}
// --------------------------------------------------------------------------------------------
......
......@@ -42,11 +42,6 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
public abstract IterationNode getIterationNode();
// --------------------------------------------------------------------------------------------
@Override
public boolean isMemoryConsumer() {
return false;
}
public boolean isOnDynamicPath() {
return true;
......
......@@ -122,20 +122,12 @@ public class BinaryUnionNode extends TwoInputNode {
final RequestedLocalProperties noLocalProps = new RequestedLocalProperties();
final int dop = getDegreeOfParallelism();
final int subPerInstance = getSubtasksPerInstance();
final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
final boolean globalDopChange1 = numInstances != inNumInstances1;
final boolean globalDopChange2 = numInstances != inNumInstances2;
final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1;
final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2;
final boolean dopChange1 = dop != inDop1;
final boolean dopChange2 = dop != inDop2;
// enumerate all pairwise combination of the children's plans together with
// all possible operator strategy combination
......@@ -154,15 +146,11 @@ public class BinaryUnionNode extends TwoInputNode {
Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
if (this.input1.getShipStrategy() == null) {
// free to choose the ship strategy
igps.parameterizeChannel(c1, globalDopChange1, localDopChange1);
igps.parameterizeChannel(c1, dopChange1);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() ||
c1.getShipStrategy().compensatesForLocalDOPChanges())) {
if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
} else {
......@@ -173,10 +161,8 @@ public class BinaryUnionNode extends TwoInputNode {
c1.setShipStrategy(this.input1.getShipStrategy());
}
if (globalDopChange1) {
if (dopChange1) {
c1.adjustGlobalPropertiesForFullParallelismChange();
} else if (localDopChange1) {
c1.adjustGlobalPropertiesForLocalParallelismChange();
}
}
......@@ -184,15 +170,11 @@ public class BinaryUnionNode extends TwoInputNode {
Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
if (this.input2.getShipStrategy() == null) {
// free to choose the ship strategy
igps.parameterizeChannel(c2, globalDopChange2, localDopChange2);
igps.parameterizeChannel(c2, dopChange2);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() ||
c2.getShipStrategy().compensatesForLocalDOPChanges())) {
if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
} else {
......@@ -203,10 +185,8 @@ public class BinaryUnionNode extends TwoInputNode {
c2.setShipStrategy(this.input2.getShipStrategy());
}
if (globalDopChange2) {
if (dopChange2) {
c2.adjustGlobalPropertiesForFullParallelismChange();
} else if (localDopChange2) {
c2.adjustGlobalPropertiesForLocalParallelismChange();
}
}
......@@ -224,20 +204,20 @@ public class BinaryUnionNode extends TwoInputNode {
if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) {
// adjust c2 to c1
c2 = c2.clone();
p1.parameterizeChannel(c2,globalDopChange2);
p1.parameterizeChannel(c2,dopChange2);
} else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) {
// adjust c1 to c2
c1 = c1.clone();
p2.parameterizeChannel(c1,globalDopChange1);
p2.parameterizeChannel(c1,dopChange1);
} else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) {
boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 ||
c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize();
if (adjustC1) {
c2 = c2.clone();
p1.parameterizeChannel(c2, globalDopChange2);
p1.parameterizeChannel(c2, dopChange2);
} else {
c1 = c1.clone();
p2.parameterizeChannel(c1, globalDopChange1);
p2.parameterizeChannel(c1, dopChange1);
}
} else {
// this should never happen, as it implies both realize a different strategy, which is
......
......@@ -65,9 +65,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// --------------------------------------------------------------------------------------------
/**
* Creates a new node with a single input for the optimizer plan.
* Creates a new node for the bulk iteration.
*
* @param iteration The PACT that the node represents.
* @param iteration The bulk iteration the node represents.
*/
public BulkIterationNode(BulkIterationBase<?> iteration) {
super(iteration);
......@@ -124,14 +124,12 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
// check if the root of the step function has the same DOP as the iteration
if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
nextPartialSolution.getSubtasksPerInstance() != getSubtasksPerInstance() )
if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism())
{
// add a no-op to the root to express the re-partitioning
NoOpNode noop = new NoOpNode();
noop.setDegreeOfParallelism(getDegreeOfParallelism());
noop.setSubtasksPerInstance(getSubtasksPerInstance());
PactConnection noOpConn = new PactConnection(nextPartialSolution, noop);
noop.setIncomingConnection(noOpConn);
nextPartialSolution.addOutgoingConnection(noOpConn);
......@@ -198,12 +196,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
protected List<OperatorDescriptorSingle> getPossibleProperties() {
return Collections.<OperatorDescriptorSingle>singletonList(new NoOpDescriptor());
}
@Override
public boolean isMemoryConsumer() {
return true;
}
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
final InterestingProperties intProps = getInterestingProperties().clone();
......@@ -306,12 +299,11 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
// attach a no-op node through which we create the properties of the original input
Channel toNoOp = new Channel(candidate);
globPropsReq.parameterizeChannel(toNoOp, false, false);
globPropsReq.parameterizeChannel(toNoOp, false);
locPropsReq.parameterizeChannel(toNoOp);
UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
......
......@@ -86,11 +86,6 @@ public class DataSinkNode extends OptimizerNode {
return "Data Sink";
}
@Override
public boolean isMemoryConsumer() {
return getPactContract().getPartitionOrdering() != null || getPactContract().getLocalOrder() != null;
}
@Override
public List<PactConnection> getIncomingConnections() {
return Collections.singletonList(this.input);
......@@ -194,21 +189,16 @@ public class DataSinkNode extends OptimizerNode {
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
final int dop = getDegreeOfParallelism();
final int subPerInstance = getSubtasksPerInstance();
final int inDop = getPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance();
final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1);
final boolean globalDopChange = numInstances != inNumInstances;
final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance;
final boolean dopChange = dop != inDop;
InterestingProperties ips = this.input.getInterestingProperties();
for (PlanNode p : subPlans) {
for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
for (RequestedLocalProperties lp : ips.getLocalProperties()) {
Channel c = new Channel(p);
gp.parameterizeChannel(c, globalDopChange, localDopChange);
gp.parameterizeChannel(c, dopChange);
lp.parameterizeChannel(c);
c.setRequiredLocalProps(lp);
c.setRequiredGlobalProps(gp);
......
......@@ -55,7 +55,6 @@ public class DataSourceNode extends OptimizerNode {
if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
setDegreeOfParallelism(1);
setSubtasksPerInstance(1);
this.sequentialInput = true;
} else {
this.sequentialInput = false;
......@@ -77,12 +76,6 @@ public class DataSourceNode extends OptimizerNode {
return "Data Source";
}
@Override
public boolean isMemoryConsumer() {
return false;
}
@Override
public void setDegreeOfParallelism(int degreeOfParallelism) {
// if unsplittable, DOP remains at 1
......@@ -90,15 +83,6 @@ public class DataSourceNode extends OptimizerNode {
super.setDegreeOfParallelism(degreeOfParallelism);
}
}
@Override
public void setSubtasksPerInstance(int instancesPerMachine) {
// if unsplittable, DOP remains at 1
if (!this.sequentialInput) {
super.setSubtasksPerInstance(instancesPerMachine);
}
}
@Override
public List<PactConnection> getIncomingConnections() {
......
......@@ -46,7 +46,6 @@ public class GroupReduceNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
setDegreeOfParallelism(1);
setSubtasksPerInstance(1);
}
}
......
......@@ -262,13 +262,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
*/
@Override
public abstract void accept(Visitor<OptimizerNode> visitor);
/**
* Checks, whether this node requires memory for its tasks or not.
*
* @return True, if this node contains logic that requires memory usage, false otherwise.
*/
public abstract boolean isMemoryConsumer();
/**
* Checks whether a field is modified by the user code or whether it is kept unchanged.
......@@ -408,7 +401,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* @param degreeOfParallelism
* The degree of parallelism to set.
* @throws IllegalArgumentException
* If the degree of parallelism is smaller than one.
* If the degree of parallelism is smaller than one and not -1.
*/
public void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1) {
......@@ -416,48 +409,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
}
this.degreeOfParallelism = degreeOfParallelism;
}
/**
* Gets the number of parallel instances of the contract that are
* to be executed on the same compute instance (logical machine).
*
* @return The number of subtask instances per machine.
*/
public int getSubtasksPerInstance() {
return this.subtasksPerInstance;
}
/**
* Sets the number of parallel task instances of the contract that are
* to be executed on the same computing instance (logical machine).
*
* @param instancesPerMachine The instances per machine.
* @throws IllegalArgumentException If the number of instances per machine is smaller than one.
*/
public void setSubtasksPerInstance(int instancesPerMachine) {
if (instancesPerMachine < 1) {
throw new IllegalArgumentException();
}
this.subtasksPerInstance = instancesPerMachine;
}
/**
* Gets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode.
*
* @return The minimal guaranteed memory per subtask, in bytes.
*/
public long getMinimalMemoryPerSubTask() {
return this.minimalMemoryPerSubTask;
}
/**
* Sets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode.
*
* @param minimalGuaranteedMemory The minimal guaranteed memory per subtask, in bytes.
*/
public void setMinimalMemoryPerSubTask(long minimalGuaranteedMemory) {
this.minimalMemoryPerSubTask = minimalGuaranteedMemory;
}
/**
* Gets the amount of memory that all subtasks of this task have jointly available.
......
......@@ -36,7 +36,6 @@ public class ReduceNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
setDegreeOfParallelism(1);
setSubtasksPerInstance(1);
}
}
......
......@@ -206,22 +206,6 @@ public abstract class SingleInputNode extends OptimizerNode {
protected abstract List<OperatorDescriptorSingle> getPossibleProperties();
@Override
public boolean isMemoryConsumer() {
for (OperatorDescriptorSingle dps : getPossibleProperties()) {
if (dps.getStrategy().firstDam().isMaterializing()) {
return true;
}
for (RequestedLocalProperties rlp : dps.getPossibleLocalProperties()) {
if (!rlp.isTrivial()) {
return true;
}
}
}
return false;
}
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
// get what we inherit and what is preserved by our user code
......@@ -284,30 +268,21 @@ public abstract class SingleInputNode extends OptimizerNode {
final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
final int dop = getDegreeOfParallelism();
final int subPerInstance = getSubtasksPerInstance();
final int inDop = getPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance();
final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1);
final boolean globalDopChange = numInstances != inNumInstances;
final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance;
final boolean dopChange = inDop != dop;
// create all candidates
for (PlanNode child : subPlans) {
if (this.inConn.getShipStrategy() == null) {
// pick the strategy ourselves
for (RequestedGlobalProperties igps: intGlobal) {
final Channel c = new Channel(child, this.inConn.getMaterializationMode());
igps.parameterizeChannel(c, globalDopChange, localDopChange);
igps.parameterizeChannel(c, dopChange);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
if (globalDopChange && !c.getShipStrategy().isNetworkStrategy()) {
c.getGlobalProperties().reset();
}
if (localDopChange && !(c.getShipStrategy().isNetworkStrategy() ||
c.getShipStrategy().compensatesForLocalDOPChanges())) {
if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
c.getGlobalProperties().reset();
}
......@@ -332,12 +307,10 @@ public abstract class SingleInputNode extends OptimizerNode {
c.setShipStrategy(this.inConn.getShipStrategy());
}
if (globalDopChange) {
if (dopChange) {
c.adjustGlobalPropertiesForFullParallelismChange();
} else if (localDopChange) {
c.adjustGlobalPropertiesForLocalParallelismChange();
}
// check whether we meet any of the accepted properties
for (RequestedGlobalProperties rgps: allValidGlobals) {
if (rgps.isMetBy(c.getGlobalProperties())) {
......
......@@ -42,7 +42,6 @@ public class SinkJoiner extends TwoInputNode {
this.input2 = conn2;
setDegreeOfParallelism(1);
setSubtasksPerInstance(1);
}
@Override
......
......@@ -251,22 +251,6 @@ public abstract class TwoInputNode extends OptimizerNode {
}
protected abstract List<OperatorDescriptorDual> getPossibleProperties();
@Override
public boolean isMemoryConsumer() {
for (OperatorDescriptorDual dpd : this.possibleProperties) {
if (dpd.getStrategy().firstDam().isMaterializing() ||
dpd.getStrategy().secondDam().isMaterializing()) {
return true;
}
for (LocalPropertiesPair prp : dpd.getPossibleLocalProperties()) {
if (!(prp.getProperties1().isTrivial() && prp.getProperties2().isTrivial())) {
return true;
}
}
}
return false;
}
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
......@@ -348,20 +332,12 @@ public abstract class TwoInputNode extends OptimizerNode {
final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
final int dop = getDegreeOfParallelism();
final int subPerInstance = getSubtasksPerInstance();
final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
final boolean globalDopChange1 = numInstances != inNumInstances1;
final boolean globalDopChange2 = numInstances != inNumInstances2;
final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1;
final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2;
final boolean dopChange1 = dop != inDop1;
final boolean dopChange2 = dop != inDop2;
// enumerate all pairwise combination of the children's plans together with
// all possible operator strategy combination
......@@ -380,15 +356,11 @@ public abstract class TwoInputNode extends OptimizerNode {
final Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
if (this.input1.getShipStrategy() == null) {
// free to choose the ship strategy
igps1.parameterizeChannel(c1, globalDopChange1, localDopChange1);
igps1.parameterizeChannel(c1, dopChange1);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() ||
c1.getShipStrategy().compensatesForLocalDOPChanges())) {
if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
} else {
......@@ -399,10 +371,8 @@ public abstract class TwoInputNode extends OptimizerNode {
c1.setShipStrategy(this.input1.getShipStrategy());
}
if (globalDopChange1) {
if (dopChange1) {
c1.adjustGlobalPropertiesForFullParallelismChange();
} else if (localDopChange1) {
c1.adjustGlobalPropertiesForLocalParallelismChange();
}
}
......@@ -411,15 +381,11 @@ public abstract class TwoInputNode extends OptimizerNode {
final Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
if (this.input2.getShipStrategy() == null) {
// free to choose the ship strategy
igps2.parameterizeChannel(c2, globalDopChange2, localDopChange2);
igps2.parameterizeChannel(c2, dopChange2);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() ||
c2.getShipStrategy().compensatesForLocalDOPChanges())) {
if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
} else {
......@@ -430,10 +396,8 @@ public abstract class TwoInputNode extends OptimizerNode {
c2.setShipStrategy(this.input2.getShipStrategy());
}
if (globalDopChange2) {
if (dopChange2) {
c2.adjustGlobalPropertiesForFullParallelismChange();
} else if (localDopChange2) {
c2.adjustGlobalPropertiesForLocalParallelismChange();
}
}
......
......@@ -155,8 +155,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
solutionSetDeltaUpdateAux.setDegreeOfParallelism(getDegreeOfParallelism());
solutionSetDeltaUpdateAux.setSubtasksPerInstance(getSubtasksPerInstance());
PactConnection conn = new PactConnection(solutionSetDelta, solutionSetDeltaUpdateAux);
solutionSetDeltaUpdateAux.setIncomingConnection(conn);
solutionSetDelta.addOutgoingConnection(conn);
......@@ -217,11 +216,6 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
// Properties and Optimization
// --------------------------------------------------------------------------------------------
@Override
public boolean isMemoryConsumer() {
return true;
}
@Override
protected List<OperatorDescriptorDual> getPossibleProperties() {
return new ArrayList<OperatorDescriptorDual>(1);
......@@ -331,13 +325,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
// attach a no-op node through which we create the properties of the original input
Channel toNoOp = new Channel(candidate);
globPropsReqWorkset.parameterizeChannel(toNoOp, false, false);
globPropsReqWorkset.parameterizeChannel(toNoOp, false);
locPropsReqWorkset.parameterizeChannel(toNoOp);
UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST);
rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
......@@ -518,7 +511,6 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
setDegreeOfParallelism(1);
setSubtasksPerInstance(1);
}
public void setInputs(PactConnection input1, PactConnection input2) {
......
......@@ -53,8 +53,7 @@ public final class RequestedGlobalProperties implements Cloneable {
/**
* Sets the partitioning property for the global properties.
*
* @param partitioning The new partitioning to set.
* @param partitionedFields
* @param partitionedFields
*/
public void setHashPartitioned(FieldSet partitionedFields) {
if (partitionedFields == null) {
......@@ -218,7 +217,7 @@ public final class RequestedGlobalProperties implements Cloneable {
* @param globalDopChange
* @param localDopChange
*/
public void parameterizeChannel(Channel channel, boolean globalDopChange, boolean localDopChange) {
public void parameterizeChannel(Channel channel, boolean globalDopChange) {
// if we request nothing, then we need no special strategy. forward, if the number of instances remains
// the same, randomly repartition otherwise
if (isTrivial()) {
......@@ -228,8 +227,7 @@ public final class RequestedGlobalProperties implements Cloneable {
final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
// if we have no global parallelism change, check if we have already compatible global properties
if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
// we meet already everything, so go forward
if (!globalDopChange && isMetBy(inGlobals)) {
channel.setShipStrategy(ShipStrategyType.FORWARD);
return;
}
......
......@@ -48,8 +48,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
// create an input node for combine with same DOP as input node
GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
......
......@@ -48,8 +48,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle
// create an input node for combine with same DOP as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
......
......@@ -85,9 +85,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
// create an input node for combine with same DOP as input node
GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
......
......@@ -44,9 +44,9 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
// create in input node for combine with same DOP as input node
GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
}
@Override
......
......@@ -56,8 +56,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
// create an input node for combine with same DOP as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
......
......@@ -68,11 +68,11 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
private TempMode tempMode;
private long tempMemory;
private double relativeTempMemory;
private long memoryGlobalStrategy;
private double relativeMemoryGlobalStrategy;
private long memoryLocalStrategy;
private double relativeMemoryLocalStrategy;
private int replicationFactor = 1;
......@@ -200,17 +200,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
*
* @return The temp memory.
*/
public long getTempMemory() {
return this.tempMemory;
public double getRelativeTempMemory() {
return this.relativeTempMemory;
}
/**
* Sets the memory for materializing the channel's result from this Channel.
*
* @param tempMemory The memory for materialization.
* @param relativeTempMemory The memory for materialization.
*/
public void setTempMemory(long tempMemory) {
this.tempMemory = tempMemory;
public void setRelativeTempMemory(double relativeTempMemory) {
this.relativeTempMemory = relativeTempMemory;
}
/**
......@@ -286,20 +286,20 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
this.localStrategyComparator = localStrategyComparator;
}
public long getMemoryGlobalStrategy() {
return memoryGlobalStrategy;
public double getRelativeMemoryGlobalStrategy() {
return relativeMemoryGlobalStrategy;
}
public void setMemoryGlobalStrategy(long memoryGlobalStrategy) {
this.memoryGlobalStrategy = memoryGlobalStrategy;
public void setRelativeMemoryGlobalStrategy(double relativeMemoryGlobalStrategy) {
this.relativeMemoryGlobalStrategy = relativeMemoryGlobalStrategy;
}
public long getMemoryLocalStrategy() {
return memoryLocalStrategy;
public double getRelativeMemoryLocalStrategy() {
return relativeMemoryLocalStrategy;
}
public void setMemoryLocalStrategy(long memoryLocalStrategy) {
this.memoryLocalStrategy = memoryLocalStrategy;
public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) {
this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
}
public boolean isOnDynamicPath() {
......@@ -437,33 +437,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
}
throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
}
public void adjustGlobalPropertiesForLocalParallelismChange() {
if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
"change before the ship strategy is set.");
}
// make sure the properties are acquired
if (this.globalProps == null) {
getGlobalProperties();
}
// some strategies globally reestablish properties
switch (this.shipStrategy) {
case FORWARD:
this.globalProps.reset();
return;
case NONE: // excluded by sanity check. just here to silence compiler warnings check completion
case BROADCAST:
case PARTITION_HASH:
case PARTITION_RANGE:
case PARTITION_RANDOM:
return;
}
throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
}
// --------------------------------------------------------------------------------------------
......
......@@ -65,12 +65,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
protected Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree
private long memoryPerSubTask; // the amount of memory dedicated to each task, in bytes
private double relativeMemoryPerSubTask; // the amount of memory dedicated to each task, in bytes
private int degreeOfParallelism;
private int subtasksPerInstance;
private boolean pFlag; // flag for the internal pruning algorithm
// --------------------------------------------------------------------------------------------
......@@ -83,8 +81,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
this.driverStrategy = strategy;
this.degreeOfParallelism = template.getDegreeOfParallelism();
this.subtasksPerInstance = template.getSubtasksPerInstance();
// check, if there is branch at this node. if yes, this candidate must be associated with
// the branching template node.
if (template.isBranching()) {
......@@ -166,17 +163,17 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
*
* @return The memory per task, in bytes.
*/
public long getMemoryPerSubTask() {
return this.memoryPerSubTask;
public double getRelativeMemoryPerSubTask() {
return this.relativeMemoryPerSubTask;
}
/**
* Sets the memory dedicated to each task for this node.
*
* @param memoryPerTask The memory per sub-task, in bytes.
* @param relativeMemoryPerSubtask The relative memory per sub-task
*/
public void setMemoryPerSubTask(long memoryPerTask) {
this.memoryPerSubTask = memoryPerTask;
public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) {
this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
}
/**
......@@ -303,18 +300,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
this.degreeOfParallelism = parallelism;
}
public void setSubtasksPerInstance(int subTasksPerInstance) {
this.subtasksPerInstance = subTasksPerInstance;
}
public int getDegreeOfParallelism() {
return this.degreeOfParallelism;
}
public int getSubtasksPerInstance() {
return this.subtasksPerInstance;
}
public long getGuaranteedAvailableMemory() {
return this.template.getMinimalMemoryAcrossAllSubTasks();
}
......
......@@ -252,9 +252,6 @@ public class PlanJSONDumpGenerator {
writer.print(",\n\t\t\"parallelism\": \""
+ (n.getDegreeOfParallelism() >= 1 ? n.getDegreeOfParallelism() : "default") + "\"");
writer.print(",\n\t\t\"subtasks_per_instance\": \""
+ (n.getSubtasksPerInstance() >= 1 ? n.getSubtasksPerInstance() : "default") + "\"");
// output node predecessors
Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
String child1name = "", child2name = "";
......
......@@ -22,9 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
import eu.stratosphere.api.common.aggregators.AggregatorWithName;
import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
......@@ -101,7 +98,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, true);
private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class);
// private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class);
private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null);
......@@ -186,13 +183,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
}
// now that all have been created, make sure that all share their instances with the one
// with the highest degree of parallelism
if (program.getInstanceTypeName() != null) {
this.maxDegreeVertex.setInstanceType(program.getInstanceTypeName());
} else {
LOG.warn("No instance type assigned to JobVertex.");
}
for (AbstractJobVertex vertex : this.vertices.values()) {
if (vertex != this.maxDegreeVertex) {
vertex.setVertexToShareInstancesWith(this.maxDegreeVertex);
......@@ -231,7 +221,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
* @param node
* The node that is currently processed.
* @return True, if the visitor should descend to the node's children, false if not.
* @see eu.stratosphere.util.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable)
* @see eu.stratosphere.util.Visitor#preVisit(eu.stratosphere.util.Visitable)
*/
@Override
public boolean preVisit(PlanNode node) {
......@@ -260,8 +250,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// operator with the tail, if they have the same DOP. not merging is currently not
// implemented
PlanNode root = iterationNode.getRootOfStepFunction();
if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism() ||
root.getSubtasksPerInstance() != node.getSubtasksPerInstance())
if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism())
{
throw new CompilerException("Error: The final operator of the step " +
"function has a different degree of parallelism than the iteration operator itself.");
......@@ -278,14 +267,12 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
PlanNode nextWorkSet = iterationNode.getNextWorkSetPlanNode();
PlanNode solutionSetDelta = iterationNode.getSolutionSetDeltaPlanNode();
if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism() ||
nextWorkSet.getSubtasksPerInstance() != node.getSubtasksPerInstance())
if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism())
{
throw new CompilerException("It is currently not supported that the final operator of the step " +
"function has a different degree of parallelism than the iteration operator itself.");
}
if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism() ||
solutionSetDelta.getSubtasksPerInstance() != node.getSubtasksPerInstance())
if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism())
{
throw new CompilerException("It is currently not supported that the final operator of the step " +
"function has a different degree of parallelism than the iteration operator itself.");
......@@ -364,11 +351,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if (this.maxDegreeVertex == null || this.maxDegreeVertex.getNumberOfSubtasks() < pd) {
this.maxDegreeVertex = vertex;
}
// set the number of tasks per instance
if (node.getSubtasksPerInstance() >= 1) {
vertex.setNumberOfSubtasksPerInstance(node.getSubtasksPerInstance());
}
// check whether this vertex is part of an iteration step function
if (this.currentIteration != null) {
......@@ -377,10 +359,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if (iterationNode.getDegreeOfParallelism() < pd) {
throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator.");
}
if (iterationNode.getSubtasksPerInstance() < node.getSubtasksPerInstance()) {
throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, number of subtasks-per-node than the iteration operator.");
}
// store the id of the iterations the step functions participate in
IterationDescriptor descr = this.iterations.get(this.currentIteration);
new TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId());
......@@ -401,7 +380,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
*
* @param node
* The node currently processed during the post-visit.
* @see eu.stratosphere.util.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable)
* @see eu.stratosphere.util.Visitor#postVisit(eu.stratosphere.util.Visitable) t
*/
@Override
public void postVisit(PlanNode node) {
......@@ -739,7 +718,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
inConn.getLocalStrategy() == LocalStrategy.NONE &&
pred.getOutgoingChannels().size() == 1 &&
node.getDegreeOfParallelism() == pred.getDegreeOfParallelism() &&
node.getSubtasksPerInstance() == pred.getSubtasksPerInstance() &&
node.getBroadcastInputs().isEmpty();
// cannot chain the nodes that produce the next workset or the next solution set, if they are not the
......@@ -879,7 +857,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
c.getLocalStrategy() == LocalStrategy.NONE &&
c.getTempMode() == TempMode.NONE &&
successor.getDegreeOfParallelism() == pspn.getDegreeOfParallelism() &&
successor.getSubtasksPerInstance() == pspn.getSubtasksPerInstance() &&
!(successor instanceof NAryUnionPlanNode) &&
successor != iteration.getRootOfStepFunction() &&
iteration.getInput().getLocalStrategy() == LocalStrategy.NONE;
......@@ -948,7 +925,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
c.getLocalStrategy() == LocalStrategy.NONE &&
c.getTempMode() == TempMode.NONE &&
successor.getDegreeOfParallelism() == wspn.getDegreeOfParallelism() &&
successor.getSubtasksPerInstance() == wspn.getSubtasksPerInstance() &&
!(successor instanceof NAryUnionPlanNode) &&
successor != iteration.getNextWorkSetPlanNode() &&
iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE;
......@@ -995,17 +971,17 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
}
private void assignDriverResources(PlanNode node, TaskConfig config) {
final long mem = node.getMemoryPerSubTask();
if (mem > 0) {
config.setMemoryDriver(mem);
final double relativeMem = node.getRelativeMemoryPerSubTask();
if (relativeMem > 0) {
config.setRelativeMemoryDriver(relativeMem);
config.setFilehandlesDriver(this.defaultMaxFan);
config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
}
}
private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
if (c.getMemoryLocalStrategy() > 0) {
config.setMemoryInput(inputNum, c.getMemoryLocalStrategy());
if (c.getRelativeMemoryLocalStrategy() > 0) {
config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
config.setFilehandlesInput(inputNum, this.defaultMaxFan);
config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
}
......@@ -1020,13 +996,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
* channel is then the channel into the union node, the local strategy channel the one from the union to the
* actual target operator.
*
* @param channelForGlobalStrategy
* @param channelForLocalStrategy
* @param channel
* @param inputNumber
* @param sourceVertex
* @param sourceConfig
* @param targetVertex
* @param targetConfig
* @param isBroadcast
* @throws JobGraphDefinitionException
* @throws CompilerException
*/
......@@ -1133,10 +1109,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if (needsMemory) {
// sanity check
if (tm == null || tm == TempMode.NONE || channel.getTempMemory() < 1) {
if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
}
config.setInputMaterializationMemory(inputNum, channel.getTempMemory());
config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
}
}
}
......@@ -1153,11 +1129,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
final long memForBackChannel = bulkNode.getMemoryPerSubTask();
if (memForBackChannel <= 0) {
final double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask();
if (relativeMemForBackChannel <= 0) {
throw new CompilerException("Bug: No memory has been assigned to the iteration back channel.");
}
headConfig.setBackChannelMemory(memForBackChannel);
headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
// --------------------------- create the sync task ---------------------------
final JobOutputVertex sync = new JobOutputVertex("Sync(" +
......@@ -1219,7 +1195,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTail);
// connect the fake tail
......@@ -1262,7 +1237,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class);
fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTailTerminationCriterion.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTailTerminationCriterion);
// connect the fake tail
......@@ -1310,14 +1284,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
final long mem = iterNode.getMemoryPerSubTask();
if (mem <= 0) {
final double relativeMemory = iterNode.getRelativeMemoryPerSubTask();
if (relativeMemory <= 0) {
throw new CompilerException("Bug: No memory has been assigned to the workset iteration.");
}
headConfig.setIsWorksetIteration();
headConfig.setBackChannelMemory(mem / 2);
headConfig.setSolutionSetMemory(mem / 2);
headConfig.setRelativeBackChannelMemory(relativeMemory / 2);
headConfig.setRelativeSolutionSetMemory(relativeMemory / 2);
// set the solution set serializer and comparator
headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer());
......@@ -1396,7 +1370,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTail);
// connect the fake tail
......@@ -1435,7 +1408,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTail);
// connect the fake tail
......@@ -1502,9 +1474,9 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
private AbstractJobVertex containingVertex;
@SuppressWarnings("unchecked")
TaskInChain(@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> chainedTask, TaskConfig taskConfig, String taskName) {
this.chainedTask = (Class<? extends ChainedDriver<?, ?>>) chainedTask;
TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
String taskName) {
this.chainedTask = chainedTask;
this.taskConfig = taskConfig;
this.taskName = taskName;
}
......
......@@ -12,7 +12,6 @@
**********************************************************************************************************************/
package eu.stratosphere.pact.compiler;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
......@@ -37,12 +36,6 @@ import eu.stratosphere.compiler.costs.DefaultCostEstimator;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.util.LogUtils;
import eu.stratosphere.util.OperatingSystem;
import eu.stratosphere.util.Visitor;
......@@ -72,8 +65,6 @@ public abstract class CompilerTestBase implements java.io.Serializable {
protected transient PactCompiler noStatsCompiler;
protected transient InstanceTypeDescription instanceType;
private transient int statCounter;
// ------------------------------------------------------------------------
......@@ -85,29 +76,22 @@ public abstract class CompilerTestBase implements java.io.Serializable {
@Before
public void setup() {
InetSocketAddress dummyAddr = new InetSocketAddress("localhost", 12345);
this.dataStats = new DataStatistics();
this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator(), dummyAddr);
this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator());
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator(), dummyAddr);
this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator());
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
// create the instance type description
InstanceType iType = InstanceTypeFactory.construct("standard", 6, 2, 4096, 100, 0);
HardwareDescription hDesc = HardwareDescriptionFactory.construct(2, 4096 * 1024 * 1024, 2000 * 1024 * 1024);
this.instanceType = InstanceTypeDescriptionFactory.construct(iType, hDesc, DEFAULT_PARALLELISM * 2);
}
// ------------------------------------------------------------------------
public OptimizedPlan compileWithStats(Plan p) {
return this.withStatsCompiler.compile(p, this.instanceType);
return this.withStatsCompiler.compile(p);
}
public OptimizedPlan compileNoStats(Plan p) {
return this.noStatsCompiler.compile(p, this.instanceType);
return this.noStatsCompiler.compile(p);
}
public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
......
......@@ -98,6 +98,11 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
/**
* The config parameter defining the number of task slots of a task manager.
*/
public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots";
/**
* The number of incoming network IO threads (e.g. incoming connection threads used in NettyConnectionManager
* for the ServerBootstrap.)
......@@ -290,12 +295,7 @@ public final class ConfigConstants {
/**
* The default degree of parallelism for operations.
*/
public static final int DEFAULT_PARALLELIZATION_DEGREE = -1;
/**
* The default intra-node parallelism.
*/
public static final int DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE = -1;
public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
// ------------------------------ Runtime ---------------------------------
......
......@@ -40,6 +40,7 @@ public final class ClassUtils {
throws ClassNotFoundException {
if (!className.contains("Protocol")) {
System.out.println(className);
throw new ClassNotFoundException("Only use this method for protocols!");
}
......
......@@ -37,11 +37,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
*/
private String instanceName;
/**
* The type of the instance the vertex is now assigned to.
*/
private String instanceType;
/**
* Constructs a new event.
*
......@@ -51,16 +46,13 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
* identifies the vertex this event refers to
* @param instanceName
* the name of the instance the vertex is now assigned to
* @param instanceType
* the type of the instance the vertex is now assigned to
*/
public VertexAssignmentEvent(final long timestamp, final ManagementVertexID managementVertexID,
final String instanceName, final String instanceType) {
final String instanceName) {
super(timestamp);
this.managementVertexID = managementVertexID;
this.instanceName = instanceName;
this.instanceType = instanceType;
}
/**
......@@ -90,16 +82,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
return this.instanceName;
}
/**
* Returns the type of the instance the vertex is now assigned to.
*
* @return the type of the instance the vertex is now assigned to
*/
public String getInstanceType() {
return this.instanceType;
}
@Override
public void read(final DataInput in) throws IOException {
......@@ -107,7 +89,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
this.managementVertexID.read(in);
this.instanceName = StringRecord.readString(in);
this.instanceType = StringRecord.readString(in);
}
......@@ -118,7 +99,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
this.managementVertexID.write(out);
StringRecord.writeString(out, this.instanceName);
StringRecord.writeString(out, this.instanceType);
}
......@@ -149,16 +129,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
}
}
if (this.instanceType == null) {
if (vae.getInstanceType() != null) {
return false;
}
} else {
if (!this.instanceType.equals(vae.getInstanceType())) {
return false;
}
}
return true;
}
......
......@@ -19,7 +19,6 @@ import eu.stratosphere.runtime.io.channels.ChannelType;
/**
* Objects of this class represent a pair of {@link eu.stratosphere.runtime.io.serialization.io.channels.InputChannel} and {@link AbstractOutputChannel} objects
* within an {@link ExecutionGraph}, Nephele's internal scheduling representation for jobs.
*
*/
public final class ExecutionEdge {
......@@ -51,42 +50,34 @@ public final class ExecutionEdge {
}
public ExecutionGate getInputGate() {
return this.inputGate;
}
public ExecutionGate getOutputGate() {
return this.outputGate;
}
public ChannelID getOutputChannelID() {
return this.outputChannelID;
}
public ChannelID getInputChannelID() {
return this.inputChannelID;
}
public int getOutputGateIndex() {
return this.outputGateIndex;
}
public int getInputGateIndex() {
return this.inputGateIndex;
}
public ChannelType getChannelType() {
return this.groupEdge.getChannelType();
}
public int getConnectionID() {
return this.groupEdge.getConnectionID();
}
}
......@@ -38,8 +38,6 @@ import eu.stratosphere.nephele.execution.ExecutionListener;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.channels.ChannelID;
......@@ -160,18 +158,18 @@ public class ExecutionGraph implements ExecutionListener {
*
* @param job
* the user's job graph
* @param instanceManager
* the instance manager
* @param defaultParallelism
* defaultParallelism in case that nodes have no parallelism set
* @throws GraphConversionException
* thrown if the job graph is not valid and no execution graph can be constructed from it
*/
public ExecutionGraph(final JobGraph job, final InstanceManager instanceManager)
public ExecutionGraph(final JobGraph job, final int defaultParallelism)
throws GraphConversionException {
this(job.getJobID(), job.getName(), job.getJobConfiguration());
// Start constructing the new execution graph from given job graph
try {
constructExecutionGraph(job, instanceManager);
constructExecutionGraph(job, defaultParallelism);
} catch (GraphConversionException e) {
throw e; // forward graph conversion exceptions
} catch (Exception e) {
......@@ -217,7 +215,6 @@ public class ExecutionGraph implements ExecutionListener {
final ExecutionGroupVertex groupVertex = it2.next();
if (groupVertex.isNumberOfMembersUserDefined()) {
groupVertex.createInitialExecutionVertices(groupVertex.getUserDefinedNumberOfMembers());
groupVertex.repairSubtasksPerInstance();
}
}
......@@ -253,12 +250,12 @@ public class ExecutionGraph implements ExecutionListener {
*
* @param jobGraph
* the job graph to create the execution graph from
* @param instanceManager
* the instance manager
* @param defaultParallelism
* defaultParallelism in case that nodes have no parallelism set
* @throws GraphConversionException
* thrown if the job graph is not valid and no execution graph can be constructed from it
*/
private void constructExecutionGraph(final JobGraph jobGraph, final InstanceManager instanceManager)
private void constructExecutionGraph(final JobGraph jobGraph, final int defaultParallelism)
throws GraphConversionException {
// Clean up temporary data structures
......@@ -272,8 +269,11 @@ public class ExecutionGraph implements ExecutionListener {
// Convert job vertices to execution vertices and initialize them
final AbstractJobVertex[] all = jobGraph.getAllJobVertices();
for (int i = 0; i < all.length; i++) {
final ExecutionVertex createdVertex = createVertex(all[i], instanceManager, initialExecutionStage,
jobGraph.getJobConfiguration());
if(all[i].getNumberOfSubtasks() == -1){
all[i].setNumberOfSubtasks(defaultParallelism);
}
final ExecutionVertex createdVertex = createVertex(all[i], initialExecutionStage);
temporaryVertexMap.put(all[i], createdVertex);
temporaryGroupVertexMap.put(all[i], createdVertex.getGroupVertex());
}
......@@ -444,37 +444,15 @@ public class ExecutionGraph implements ExecutionListener {
*
* @param jobVertex
* the job vertex to create the execution vertex from
* @param instanceManager
* the instanceManager
* @param initialExecutionStage
* the initial execution stage all group vertices are added to
* @param jobConfiguration
* the configuration object originally attached to the {@link JobGraph}
* @return the new execution vertex
* @throws GraphConversionException
* thrown if the job vertex is of an unknown subclass
*/
private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final InstanceManager instanceManager,
final ExecutionStage initialExecutionStage, final Configuration jobConfiguration)
private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final ExecutionStage initialExecutionStage)
throws GraphConversionException {
// If the user has requested instance type, check if the type is known by the current instance manager
InstanceType instanceType = null;
boolean userDefinedInstanceType = false;
if (jobVertex.getInstanceType() != null) {
userDefinedInstanceType = true;
instanceType = instanceManager.getInstanceTypeByName(jobVertex.getInstanceType());
if (instanceType == null) {
throw new GraphConversionException("Requested instance type " + jobVertex.getInstanceType()
+ " is not known to the instance manager");
}
}
if (instanceType == null) {
instanceType = instanceManager.getDefaultInstanceType();
}
// Create an initial execution vertex for the job vertex
final Class<? extends AbstractInvokable> invokableClass = jobVertex.getInvokableClass();
if (invokableClass == null) {
......@@ -491,8 +469,7 @@ public class ExecutionGraph implements ExecutionListener {
ExecutionGroupVertex groupVertex = null;
try {
groupVertex = new ExecutionGroupVertex(jobVertex.getName(), jobVertex.getID(), this,
jobVertex.getNumberOfSubtasks(), instanceType, userDefinedInstanceType,
jobVertex.getNumberOfSubtasksPerInstance(), jobVertex.getVertexToShareInstancesWith() != null ? true
jobVertex.getNumberOfSubtasks(), jobVertex.getVertexToShareInstancesWith() != null ? true
: false, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature,
invokableClass);
} catch (Throwable t) {
......@@ -506,39 +483,6 @@ public class ExecutionGraph implements ExecutionListener {
throw new GraphConversionException(StringUtils.stringifyException(e));
}
// Check if the user's specifications for the number of subtasks are valid
final int minimumNumberOfSubtasks = jobVertex.getMinimumNumberOfSubtasks(groupVertex.getEnvironment()
.getInvokable());
final int maximumNumberOfSubtasks = jobVertex.getMaximumNumberOfSubtasks(groupVertex.getEnvironment()
.getInvokable());
if (jobVertex.getNumberOfSubtasks() != -1) {
if (jobVertex.getNumberOfSubtasks() < 1) {
throw new GraphConversionException("Cannot split task " + jobVertex.getName() + " into "
+ jobVertex.getNumberOfSubtasks() + " subtasks");
}
if (jobVertex.getNumberOfSubtasks() < minimumNumberOfSubtasks) {
throw new GraphConversionException("Number of subtasks must be at least " + minimumNumberOfSubtasks);
}
if (maximumNumberOfSubtasks != -1) {
if (jobVertex.getNumberOfSubtasks() > maximumNumberOfSubtasks) {
throw new GraphConversionException("Number of subtasks for vertex " + jobVertex.getName()
+ " can be at most " + maximumNumberOfSubtasks);
}
}
}
// Check number of subtasks per instance
if (jobVertex.getNumberOfSubtasksPerInstance() != -1 && jobVertex.getNumberOfSubtasksPerInstance() < 1) {
throw new GraphConversionException("Cannot set number of subtasks per instance to "
+ jobVertex.getNumberOfSubtasksPerInstance() + " for vertex " + jobVertex.getName());
}
// Assign min/max to the group vertex (settings are actually applied in applyUserDefinedSettings)
groupVertex.setMinMemberSize(minimumNumberOfSubtasks);
groupVertex.setMaxMemberSize(maximumNumberOfSubtasks);
// Register input and output vertices separately
if (jobVertex instanceof AbstractJobInputVertex) {
......@@ -579,8 +523,7 @@ public class ExecutionGraph implements ExecutionListener {
jobVertex.getNumberOfBackwardConnections());
// Assign initial instance to vertex (may be overwritten later on when user settings are applied)
ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(instanceType), instanceType,
null));
ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(), null));
return ev;
}
......@@ -852,6 +795,48 @@ public class ExecutionGraph implements ExecutionListener {
return this.indexToCurrentExecutionStage;
}
/**
* Retrieves the maximum parallel degree of the job represented by this execution graph
*/
public int getMaxNumberSubtasks() {
int maxDegree = 0;
final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
while(stageIterator.hasNext()){
final ExecutionStage stage = stageIterator.next();
int maxPerStageDegree = stage.getMaxNumberSubtasks();
if(maxPerStageDegree > maxDegree){
maxDegree = maxPerStageDegree;
}
}
return maxDegree;
}
/**
* Retrieves the number of required slots to run this execution graph
* @return
*/
public int getRequiredSlots(){
int maxRequiredSlots = 0;
final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
while(stageIterator.hasNext()){
final ExecutionStage stage = stageIterator.next();
int requiredSlots = stage.getRequiredSlots();
if(requiredSlots > maxRequiredSlots){
maxRequiredSlots = requiredSlots;
}
}
return maxRequiredSlots;
}
/**
* Returns the stage which is currently executed.
*
......@@ -1318,25 +1303,16 @@ public class ExecutionGraph implements ExecutionListener {
return this.jobName;
}
@Override
public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
// TODO Auto-generated method stub
}
public void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {}
@Override
public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
// TODO Auto-generated method stub
}
public void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {}
/**
* Reconstructs the execution pipelines for the entire execution graph.
*/
private void reconstructExecutionPipelines() {
final Iterator<ExecutionStage> it = this.stages.iterator();
while (it.hasNext()) {
......@@ -1344,40 +1320,18 @@ public class ExecutionGraph implements ExecutionListener {
}
}
/**
* Calculates the connection IDs of the graph to avoid deadlocks in the data flow at runtime.
*/
private void calculateConnectionIDs() {
final Set<ExecutionGroupVertex> alreadyVisited = new HashSet<ExecutionGroupVertex>();
final ExecutionStage lastStage = getStage(getNumberOfStages() - 1);
for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {
final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);
int currentConnectionID = 0;
if (groupVertex.isOutputVertex()) {
currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
}
}
}
/**
* Returns an iterator over all execution stages contained in this graph.
*
* @return an iterator over all execution stages contained in this graph
*/
public Iterator<ExecutionStage> iterator() {
return this.stages.iterator();
}
@Override
public int getPriority() {
return 1;
}
......@@ -1388,7 +1342,22 @@ public class ExecutionGraph implements ExecutionListener {
* the update command to be asynchronously executed on this graph
*/
public void executeCommand(final Runnable command) {
this.executorService.execute(command);
}
private void calculateConnectionIDs() {
final Set<ExecutionGroupVertex> alreadyVisited = new HashSet<ExecutionGroupVertex>();
final ExecutionStage lastStage = getStage(getNumberOfStages() - 1);
for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) {
final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i);
int currentConnectionID = 0;
if (groupVertex.isOutputVertex()) {
currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited);
}
}
}
}
......@@ -18,7 +18,6 @@ import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.jobgraph.JobVertexID;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.channels.ChannelType;
......@@ -68,41 +67,11 @@ public final class ExecutionGroupVertex {
*/
private final CopyOnWriteArrayList<ExecutionVertex> groupMembers = new CopyOnWriteArrayList<ExecutionVertex>();
/**
* Maximum number of execution vertices this group vertex can manage.
*/
private volatile int maxMemberSize = 1;
/**
* Minimum number of execution vertices this group vertex can manage.
*/
private volatile int minMemberSize = 1;
/**
* The user defined number of execution vertices, -1 if the user has not specified it.
*/
private final int userDefinedNumberOfMembers;
/**
* The instance type to be used for execution vertices this group vertex manages.
*/
private volatile InstanceType instanceType = null;
/**
* Stores whether the instance type is user defined.
*/
private final boolean userDefinedInstanceType;
/**
* Stores the number of subtasks per instance.
*/
private volatile int numberOfSubtasksPerInstance = -1;
/**
* Stores whether the number of subtasks per instance is user defined.
*/
private final boolean userDefinedNumberOfSubtasksPerInstance;
/**
* Number of retries in case of an error before the task represented by this vertex is considered as failed.
*/
......@@ -175,12 +144,6 @@ public final class ExecutionGroupVertex {
* the execution graph is group vertex belongs to
* @param userDefinedNumberOfMembers
* the user defined number of subtasks, -1 if the user did not specify the number
* @param instanceType
* the instance type to be used for execution vertices this group vertex manages.
* @param userDefinedInstanceType
* <code>true</code> if the instance type is user defined, <code>false</code> otherwise
* @param numberOfSubtasksPerInstance
* the user defined number of subtasks per instance, -1 if the user did not specify the number
* @param userDefinedVertexToShareInstanceWith
* <code>true</code> if the user specified another vertex to share instances with, <code>false</code>
* otherwise
......@@ -197,24 +160,13 @@ public final class ExecutionGroupVertex {
* throws if an error occurs while instantiating the {@link AbstractInvokable}
*/
public ExecutionGroupVertex(final String name, final JobVertexID jobVertexID, final ExecutionGraph executionGraph,
final int userDefinedNumberOfMembers, final InstanceType instanceType,
final boolean userDefinedInstanceType, final int numberOfSubtasksPerInstance,
final boolean userDefinedVertexToShareInstanceWith, final int numberOfExecutionRetries,
final Configuration configuration, final ExecutionSignature signature,
final int userDefinedNumberOfMembers, final boolean userDefinedVertexToShareInstanceWith,
final int numberOfExecutionRetries, final Configuration configuration, final ExecutionSignature signature,
final Class<? extends AbstractInvokable> invokableClass) throws Exception {
this.name = (name != null) ? name : "";
this.jobVertexID = jobVertexID;
this.userDefinedNumberOfMembers = userDefinedNumberOfMembers;
this.instanceType = instanceType;
this.userDefinedInstanceType = userDefinedInstanceType;
if (numberOfSubtasksPerInstance != -1) {
this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance;
this.userDefinedNumberOfSubtasksPerInstance = true;
} else {
this.numberOfSubtasksPerInstance = 1;
this.userDefinedNumberOfSubtasksPerInstance = false;
}
if (numberOfExecutionRetries >= 0) {
this.numberOfExecutionRetries = numberOfExecutionRetries;
} else {
......@@ -308,32 +260,6 @@ public final class ExecutionGroupVertex {
}
}
/**
* Sets the maximum number of members this group vertex can have.
*
* @param maxSize
* the maximum number of members this group vertex can have
*/
void setMaxMemberSize(final int maxSize) {
// TODO: Add checks here
this.maxMemberSize = maxSize;
}
/**
* Sets the minimum number of members this group vertex must have.
*
* @param minSize
* the minimum number of members this group vertex must have
*/
void setMinMemberSize(final int minSize) {
// TODO: Add checks here
this.minMemberSize = minSize;
}
/**
* Returns the current number of members this group vertex has.
*
......@@ -344,24 +270,6 @@ public final class ExecutionGroupVertex {
return this.groupMembers.size();
}
/**
* Returns the maximum number of members this group vertex can have.
*
* @return the maximum number of members this group vertex can have
*/
public int getMaximumNumberOfGroupMembers() {
return this.maxMemberSize;
}
/**
* Returns the minimum number of members this group vertex must have.
*
* @return the minimum number of members this group vertex must have
*/
public int getMinimumNumberOfGroupMember() {
return this.minMemberSize;
}
/**
* Wires this group vertex to the specified group vertex and creates
* a back link.
......@@ -376,10 +284,6 @@ public final class ExecutionGroupVertex {
* the channel type to be used for this edge
* @param userDefinedChannelType
* <code>true</code> if the channel type is user defined, <code>false</code> otherwise
* @param compressionLevel
* the compression level to be used for this edge
* @param userDefinedCompressionLevel
* <code>true</code> if the compression level is user defined, <code>false</code> otherwise
* @param distributionPattern
* the distribution pattern to create the wiring between the group members
* @param isBroadcast
......@@ -480,10 +384,10 @@ public final class ExecutionGroupVertex {
* @throws GraphConversionException
* thrown if the number of execution vertices for this group vertex cannot be set to the desired value
*/
void createInitialExecutionVertices(final int initalNumberOfVertices) throws GraphConversionException {
void createInitialExecutionVertices(final int initialNumberOfVertices) throws GraphConversionException {
// If the requested number of group vertices does not change, do nothing
if (initalNumberOfVertices == this.getCurrentNumberOfGroupMembers()) {
if (initialNumberOfVertices == this.getCurrentNumberOfGroupMembers()) {
return;
}
......@@ -517,25 +421,14 @@ public final class ExecutionGroupVertex {
* }
*/
if (initalNumberOfVertices < this.getMinimumNumberOfGroupMember()) {
throw new GraphConversionException("Number of members must be at least "
+ this.getMinimumNumberOfGroupMember());
}
if ((this.getMaximumNumberOfGroupMembers() != -1)
&& (initalNumberOfVertices > this.getMaximumNumberOfGroupMembers())) {
throw new GraphConversionException("Number of members cannot exceed "
+ this.getMaximumNumberOfGroupMembers());
}
final ExecutionVertex originalVertex = this.getGroupMember(0);
int currentNumberOfExecutionVertices = this.getCurrentNumberOfGroupMembers();
while (currentNumberOfExecutionVertices++ < initalNumberOfVertices) {
while (currentNumberOfExecutionVertices++ < initialNumberOfVertices) {
final ExecutionVertex vertex = originalVertex.splitVertex();
vertex.setAllocatedResource(new AllocatedResource(DummyInstance
.createDummyInstance(this.instanceType), this.instanceType, null));
.createDummyInstance(), null));
this.groupMembers.add(vertex);
}
......@@ -645,53 +538,6 @@ public final class ExecutionGroupVertex {
return this.userDefinedNumberOfMembers;
}
boolean isInstanceTypeUserDefined() {
return this.userDefinedInstanceType;
}
void setInstanceType(final InstanceType instanceType) throws GraphConversionException {
if (instanceType == null) {
throw new IllegalArgumentException("Argument instanceType must not be null");
}
if (this.userDefinedInstanceType) {
throw new GraphConversionException("Cannot overwrite user defined instance type "
+ instanceType.getIdentifier());
}
this.instanceType = instanceType;
// Reset instance allocation of all members and let reassignInstances do the work
for (int i = 0; i < this.groupMembers.size(); i++) {
final ExecutionVertex vertex = this.groupMembers.get(i);
vertex.setAllocatedResource(null);
}
}
InstanceType getInstanceType() {
return this.instanceType;
}
boolean isNumberOfSubtasksPerInstanceUserDefined() {
return this.userDefinedNumberOfSubtasksPerInstance;
}
void setNumberOfSubtasksPerInstance(final int numberOfSubtasksPerInstance) throws GraphConversionException {
if (this.userDefinedNumberOfSubtasksPerInstance
&& (numberOfSubtasksPerInstance != this.numberOfSubtasksPerInstance)) {
throw new GraphConversionException("Cannot overwrite user defined number of subtasks per instance");
}
this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance;
}
int getNumberOfSubtasksPerInstance() {
return this.numberOfSubtasksPerInstance;
}
/**
* Returns the number of retries in case of an error before the task represented by this vertex is considered as
......@@ -766,27 +612,13 @@ public final class ExecutionGroupVertex {
}
void repairSubtasksPerInstance() {
final Iterator<ExecutionVertex> it = this.groupMembers.iterator();
int count = 0;
while (it.hasNext()) {
final ExecutionVertex v = it.next();
v.setAllocatedResource(this.groupMembers.get(
(count++ / this.numberOfSubtasksPerInstance) * this.numberOfSubtasksPerInstance)
.getAllocatedResource());
}
}
void repairInstanceSharing(final Set<AllocatedResource> availableResources) {
// Number of required resources by this group vertex
final int numberOfRequiredInstances = (this.groupMembers.size() / this.numberOfSubtasksPerInstance)
+ (((this.groupMembers.size() % this.numberOfSubtasksPerInstance) != 0) ? 1 : 0);
final int numberOfRequiredSlots = this.groupMembers.size();
// Number of resources to be replaced
final int resourcesToBeReplaced = Math.min(availableResources.size(), numberOfRequiredInstances);
final int resourcesToBeReplaced = Math.min(availableResources.size(), numberOfRequiredSlots);
// Build the replacement map if necessary
final Map<AllocatedResource, AllocatedResource> replacementMap = new HashMap<AllocatedResource, AllocatedResource>();
......
......@@ -15,18 +15,10 @@ package eu.stratosphere.nephele.executiongraph;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceRequestMap;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.runtime.io.channels.ChannelType;
/**
......@@ -35,15 +27,9 @@ import eu.stratosphere.runtime.io.channels.ChannelType;
* job can only start to execute if the execution of its preceding stage is complete.
* <p>
* This class is thread-safe.
*
*/
public final class ExecutionStage {
/**
* The log object used for debugging.
*/
private static final Log LOG = LogFactory.getLog(ExecutionStage.class);
/**
* The execution graph that this stage belongs to.
*/
......@@ -241,69 +227,6 @@ public final class ExecutionStage {
return null;
}
/**
* Checks which instance types and how many instances of these types are required to execute this stage
* of the job graph. The required instance types and the number of instances are collected in the given map. Note
* that this method does not clear the map before collecting the instances.
*
* @param instanceRequestMap
* the map containing the instances types and the required number of instances of the respective type
* @param executionState
* the execution state the considered vertices must be in
*/
public void collectRequiredInstanceTypes(final InstanceRequestMap instanceRequestMap,
final ExecutionState executionState) {
final Set<AbstractInstance> collectedInstances = new HashSet<AbstractInstance>();
final ExecutionGroupVertexIterator groupIt = new ExecutionGroupVertexIterator(this.getExecutionGraph(), true,
this.stageNum);
while (groupIt.hasNext()) {
final ExecutionGroupVertex groupVertex = groupIt.next();
final Iterator<ExecutionVertex> vertexIt = groupVertex.iterator();
while (vertexIt.hasNext()) {
// Get the instance type from the execution vertex if it
final ExecutionVertex vertex = vertexIt.next();
if (vertex.getExecutionState() == executionState) {
final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
if (collectedInstances.contains(instance)) {
continue;
} else {
collectedInstances.add(instance);
}
if (instance instanceof DummyInstance) {
final InstanceType instanceType = instance.getType();
int num = instanceRequestMap.getMaximumNumberOfInstances(instanceType);
++num;
instanceRequestMap.setMaximumNumberOfInstances(instanceType, num);
if (groupVertex.isInputVertex()) {
num = instanceRequestMap.getMinimumNumberOfInstances(instanceType);
++num;
instanceRequestMap.setMinimumNumberOfInstances(instanceType, num);
}
} else {
LOG.debug("Execution Vertex " + vertex.getName() + " (" + vertex.getID()
+ ") is already assigned to non-dummy instance, skipping...");
}
}
}
}
final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMaximumIterator();
while (it.hasNext()) {
final Map.Entry<InstanceType, Integer> entry = it.next();
if (instanceRequestMap.getMinimumNumberOfInstances(entry.getKey()) == 0) {
instanceRequestMap.setMinimumNumberOfInstances(entry.getKey(), entry.getValue());
}
}
}
/**
* Returns the execution graph that this stage belongs to.
*
......@@ -446,4 +369,37 @@ public final class ExecutionStage {
}
}
}
public int getMaxNumberSubtasks(){
int maxDegree = 0;
for(int i =0; i < this.getNumberOfStageMembers(); i++){
final ExecutionGroupVertex groupVertex = this.getStageMember(i);
if(groupVertex.getCurrentNumberOfGroupMembers() > maxDegree){
maxDegree = groupVertex.getCurrentNumberOfGroupMembers();
}
}
return maxDegree;
}
public int getRequiredSlots(){
Set<Instance> instanceSet = new HashSet<Instance>();
for(int i=0; i< this.getNumberOfStageMembers(); i++){
final ExecutionGroupVertex groupVertex = this.getStageMember(i);
final Iterator<ExecutionVertex> vertexIterator = groupVertex.iterator();
while(vertexIterator.hasNext()){
final ExecutionVertex vertex = vertexIterator.next();
instanceSet.add(vertex.getAllocatedResource().getInstance());
}
}
return instanceSet.size();
}
}
......@@ -855,7 +855,6 @@ public final class ExecutionVertex {
* <code>false/<code> otherwise
*/
public boolean decrementRetriesLeftAndCheck() {
return (this.retriesLeft.decrementAndGet() > 0);
}
......
......@@ -74,6 +74,7 @@ public enum InternalJobStatus {
* the internal job status to converted.
* @return the corresponding job status or <code>null</code> if no corresponding job status exists
*/
@SuppressWarnings("incomplete-switch")
public static JobStatus toJobStatus(InternalJobStatus status) {
switch (status) {
......
......@@ -17,8 +17,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.managementgraph.ManagementEdge;
import eu.stratosphere.nephele.managementgraph.ManagementEdgeID;
import eu.stratosphere.nephele.managementgraph.ManagementGate;
......@@ -120,12 +120,11 @@ public class ManagementGraphFactory {
final ExecutionVertex ev = iterator.next();
final ManagementGroupVertex parent = groupMap.get(ev.getGroupVertex());
final AbstractInstance instance = ev.getAllocatedResource().getInstance();
final Instance instance = ev.getAllocatedResource().getInstance();
final ManagementVertex managementVertex = new ManagementVertex(
parent,
ev.getID().toManagementVertexID(),
(instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(),
instance.getType().toString(),
(instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(),
ev.getIndexInVertexGroup()
);
managementVertex.setExecutionState(ev.getExecutionState());
......
......@@ -23,7 +23,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
/**
* An allocated resource object unambiguously defines the
* hardware resources which have been assigned to an {@link eu.stratosphere.nephele.executiongraph.ExecutionVertex} for
* executing a task. The allocated resource is comprised of an {@link eu.stratosphere.nephele.instance.AbstractInstance}
* executing a task. The allocated resource is comprised of an {@link Instance}
* which identifies the node the task is scheduled to run on as well as an
* {@link eu.stratosphere.nephele.instance.AllocationID} which determines the resources the task is scheduled to
* allocate within the node.
......@@ -36,12 +36,7 @@ public final class AllocatedResource {
/**
* The instance a task is scheduled to run on.
*/
private final AbstractInstance instance;
/**
* The instance type this allocated resource represents.
*/
private final InstanceType instanceType;
private final Instance instance;
/**
* The allocation ID identifying the resources within the instance
......@@ -60,24 +55,20 @@ public final class AllocatedResource {
*
* @param instance
* the instance a task is scheduled to run on.
* @param instanceType
* the instance type this allocated resource represents
* @param allocationID
* the allocation ID identifying the allocated resources within the instance
*/
public AllocatedResource(final AbstractInstance instance, final InstanceType instanceType,
final AllocationID allocationID) {
public AllocatedResource(final Instance instance, final AllocationID allocationID) {
this.instance = instance;
this.instanceType = instanceType;
this.allocationID = allocationID;
}
/**
* Returns the instance a task is scheduled to run on.
*
*
* @return the instance a task is scheduled to run on
*/
public AbstractInstance getInstance() {
public Instance getInstance() {
return this.instance;
}
......@@ -90,15 +81,6 @@ public final class AllocatedResource {
return this.allocationID;
}
/**
* Returns the instance type this allocated resource represents.
*
* @return the instance type this allocated resource represents
*/
public InstanceType getInstanceType() {
return this.instanceType;
}
@Override
public boolean equals(final Object obj) {
......@@ -120,16 +102,6 @@ public final class AllocatedResource {
}
}
if (this.instanceType == null) {
if (allocatedResource.instance != null) {
return false;
}
} else {
if (!this.instanceType.equals(allocatedResource.getInstanceType())) {
return false;
}
}
return true;
}
......
......@@ -11,65 +11,38 @@
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance.cluster;
package eu.stratosphere.nephele.instance;
import eu.stratosphere.nephele.instance.AllocationID;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.jobgraph.JobID;
/**
* An allocated slice is a part of an instance which is assigned to a job.
* An allocated slot is a part of an instance which is assigned to a job.
* <p>
* This class is thread-safe.
*
*/
class AllocatedSlice {
public class AllocatedSlot {
/**
* The allocation ID which identifies the resources occupied by this slice.
* The allocation ID which identifies the resources occupied by this slot.
*/
private final AllocationID allocationID;
/**
* The machine hosting the slice.
*/
private final ClusterInstance hostingInstance;
/**
* The type describing the characteristics of the allocated slice.
*/
private final InstanceType type;
/**
* The ID of the job this slice belongs to.
*/
private final JobID jobID;
/**
* Time when this machine has been allocation in milliseconds, {@see currentTimeMillis()}.
*/
private final long allocationTime;
/**
* Creates a new allocated slice on the given hosting instance.
*
* @param hostingInstance
* the instance hosting the slice
* @param type
* the type describing the characteristics of the allocated slice
* @param jobID
* the ID of the job this slice belongs to
* @param allocationTime
* the time the instance was allocated
*/
public AllocatedSlice(final ClusterInstance hostingInstance, final InstanceType type, final JobID jobID,
final long allocationTime) {
public AllocatedSlot(final JobID jobID) {
this.allocationID = new AllocationID();
this.hostingInstance = hostingInstance;
this.type = type;
this.jobID = jobID;
this.allocationTime = allocationTime;
}
/**
......@@ -81,25 +54,6 @@ class AllocatedSlice {
return this.allocationID;
}
/**
* The type describing the characteristics of
* this allocated slice.
*
* @return the type describing the characteristics of the slice
*/
public InstanceType getType() {
return this.type;
}
/**
* Returns the time the instance was allocated.
*
* @return the time the instance was allocated
*/
public long getAllocationTime() {
return this.allocationTime;
}
/**
* Returns the ID of the job this allocated slice belongs to.
*
......@@ -108,13 +62,4 @@ class AllocatedSlice {
public JobID getJobID() {
return this.jobID;
}
/**
* Returns the instance hosting this slice.
*
* @return the instance hosting this slice
*/
public ClusterInstance getHostingInstance() {
return this.hostingInstance;
}
}
......@@ -17,8 +17,8 @@ import eu.stratosphere.nephele.AbstractID;
/**
* An allocation ID unambiguously identifies the allocated resources
* within an {@link AbstractInstance}. The ID is necessary if an {@link InstanceManager} decides to partition
* {@link AbstractInstance}s
* within an {@link Instance}. The ID is necessary if an {@link InstanceManager} decides to partition
* {@link Instance}s
* without the knowledge of Nephele's scheduler.
*
*/
......
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Collection;
import java.util.TimerTask;
import java.util.Timer;
/**
* In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
* compute resources,
* provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
* compute resources in order
* to report unexpected resource outages.
*
*/
public class DefaultInstanceManager implements InstanceManager {
// ------------------------------------------------------------------------
// Internal Constants
// ------------------------------------------------------------------------
/**
* The log object used to report debugging and error information.
*/
private static final Log LOG = LogFactory.getLog(DefaultInstanceManager.class);
/**
* Default duration after which a host is purged in case it did not send
* a heart-beat message.
*/
private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
/**
* The key to retrieve the clean up interval from the configuration.
*/
private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
// ------------------------------------------------------------------------
// Fields
// ------------------------------------------------------------------------
private final Object lock = new Object();
/**
* Duration after which a host is purged in case it did not send a
* heart-beat message.
*/
private final long cleanUpInterval;
/**
* Set of hosts known to run a task manager that are thus able to execute
* tasks.
*/
private final Map<InstanceConnectionInfo, Instance> registeredHosts;
/**
* The network topology of the cluster.
*/
private final NetworkTopology networkTopology;
/**
* Object that is notified if instances become available or vanish.
*/
private InstanceListener instanceListener;
private boolean shutdown;
/**
* Periodic task that checks whether hosts have not sent their heart-beat
* messages and purges the hosts in this case.
*/
private final TimerTask cleanupStaleMachines = new TimerTask() {
@Override
public void run() {
synchronized (DefaultInstanceManager.this.lock) {
final List<Map.Entry<InstanceConnectionInfo, Instance>> hostsToRemove =
new ArrayList<Map.Entry<InstanceConnectionInfo, Instance>>();
final Map<JobID, List<AllocatedResource>> staleResources = new HashMap<JobID, List<AllocatedResource>>();
// check all hosts whether they did not send heart-beat messages.
for (Map.Entry<InstanceConnectionInfo, Instance> entry : registeredHosts.entrySet()) {
final Instance host = entry.getValue();
if (!host.isStillAlive(cleanUpInterval)) {
// this host has not sent the heart-beat messages
// -> we terminate all instances running on this host and notify the jobs
final Collection<AllocatedSlot> slots = host.removeAllocatedSlots();
for (AllocatedSlot slot : slots) {
final JobID jobID = slot.getJobID();
List<AllocatedResource> staleResourcesOfJob = staleResources.get(jobID);
if (staleResourcesOfJob == null) {
staleResourcesOfJob = new ArrayList<AllocatedResource>();
staleResources.put(jobID, staleResourcesOfJob);
}
staleResourcesOfJob.add(new AllocatedResource(host, slot.getAllocationID()));
}
hostsToRemove.add(entry);
}
}
registeredHosts.entrySet().removeAll(hostsToRemove);
final Iterator<Map.Entry<JobID, List<AllocatedResource>>> it = staleResources.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<JobID, List<AllocatedResource>> entry = it.next();
if (instanceListener != null) {
instanceListener.allocatedResourcesDied(entry.getKey(), entry.getValue());
}
}
}
}
};
// ------------------------------------------------------------------------
// Constructor and set-up
// ------------------------------------------------------------------------
/**
* Constructor.
*/
public DefaultInstanceManager() {
this.registeredHosts = new HashMap<InstanceConnectionInfo, Instance>();
long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
+ " secs.");
tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
}
this.cleanUpInterval = tmpCleanUpInterval;
this.networkTopology = NetworkTopology.createEmptyTopology();
// look every BASEINTERVAL milliseconds for crashed hosts
final boolean runTimerAsDaemon = true;
new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
}
@Override
public void shutdown() {
synchronized (this.lock) {
if (this.shutdown) {
return;
}
this.cleanupStaleMachines.cancel();
Iterator<Instance> it = this.registeredHosts.values().iterator();
while (it.hasNext()) {
it.next().destroyProxies();
}
this.registeredHosts.clear();
this.shutdown = true;
}
}
@Override
public void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException
{
synchronized (this.lock) {
// release the instance from the host
final Instance clusterInstance = allocatedResource.getInstance();
clusterInstance.releaseSlot(allocatedResource.getAllocationID());
}
}
/**
* Creates a new {@link Instance} object to manage instances that can
* be executed on that host.
*
* @param instanceConnectionInfo
* the connection information for the instance
* @param hardwareDescription
* the hardware description provided by the new instance
* @param numberOfSlots
* number of slots available on the instance
* @return a new {@link Instance} object or <code>null</code> if the cluster instance could not be created
*/
private Instance createNewHost(final InstanceConnectionInfo instanceConnectionInfo,
final HardwareDescription hardwareDescription, int numberOfSlots) {
// Try to match new host with a stub host from the existing topology
String instanceName = instanceConnectionInfo.hostname();
NetworkNode parentNode = this.networkTopology.getRootNode();
NetworkNode currentStubNode = null;
// Try to match new host using the host name
while (true) {
currentStubNode = this.networkTopology.getNodeByName(instanceName);
if (currentStubNode != null) {
break;
}
final int pos = instanceName.lastIndexOf('.');
if (pos == -1) {
break;
}
/*
* If host name is reported as FQDN, iterative remove parts
* of the domain name until a match occurs or no more dots
* can be found in the host name.
*/
instanceName = instanceName.substring(0, pos);
}
// Try to match the new host using the IP address
if (currentStubNode == null) {
instanceName = instanceConnectionInfo.address().toString();
instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
currentStubNode = this.networkTopology.getNodeByName(instanceName);
}
if (currentStubNode != null) {
/*
* The instance name will be the same as the one of the stub node. That way
* the stub now will be removed from the network topology and replaced be
* the new node.
*/
if (currentStubNode.getParentNode() != null) {
parentNode = currentStubNode.getParentNode();
}
// Remove the stub node from the tree
currentStubNode.remove();
}
LOG.info("Creating instance for " + instanceConnectionInfo + ", parent is "
+ parentNode.getName());
final Instance host = new Instance(instanceConnectionInfo, parentNode,
this.networkTopology, hardwareDescription, numberOfSlots);
return host;
}
@Override
public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo) {
synchronized (this.lock) {
Instance host = registeredHosts.get(instanceConnectionInfo);
if(host == null){
LOG.error("Task manager with connection info " + instanceConnectionInfo + " has not been registered.");
return;
}
host.reportHeartBeat();
}
}
@Override
public void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
HardwareDescription hardwareDescription, int numberOfSlots){
synchronized(this.lock){
if(registeredHosts.containsKey(instanceConnectionInfo)){
LOG.error("Task manager with connection info " + instanceConnectionInfo + " has already been " +
"registered.");
return;
}
Instance host = createNewHost(instanceConnectionInfo, hardwareDescription, numberOfSlots);
if(host == null){
LOG.error("Could not create a new host object for register task manager for connection info " +
instanceConnectionInfo);
return;
}
this.registeredHosts.put(instanceConnectionInfo, host);
LOG.info("New number of registered hosts is " + this.registeredHosts.size());
host.reportHeartBeat();
}
}
@Override
public void requestInstance(JobID jobID, Configuration conf, int requiredSlots)
throws InstanceException
{
synchronized(this.lock) {
Iterator<Instance> clusterIterator = this.registeredHosts.values().iterator();
Instance instance = null;
List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
int allocatedSlots = 0;
while(clusterIterator.hasNext()) {
instance = clusterIterator.next();
while(instance.getNumberOfAvailableSlots() >0 && allocatedSlots < requiredSlots){
AllocatedResource resource = instance.allocateSlot(jobID);
allocatedResources.add(resource);
allocatedSlots++;
}
}
if(allocatedSlots < requiredSlots){
throw new InstanceException("Cannot allocate the required number of slots: " + requiredSlots + ".");
}
if (this.instanceListener != null) {
final InstanceNotifier instanceNotifier = new InstanceNotifier(
this.instanceListener, jobID, allocatedResources);
instanceNotifier.start();
}
}
}
@Override
public NetworkTopology getNetworkTopology(JobID jobID) {
return this.networkTopology;
}
@Override
public void setInstanceListener(InstanceListener instanceListener) {
synchronized (this.lock) {
this.instanceListener = instanceListener;
}
}
@Override
public Instance getInstanceByName(String name) {
if (name == null) {
throw new IllegalArgumentException("Argument name must not be null");
}
synchronized (this.lock) {
final Iterator<Instance> it = this.registeredHosts.values().iterator();
while (it.hasNext()) {
final Instance instance = it.next();
if (name.equals(instance.getName())) {
return instance;
}
}
}
return null;
}
@Override
public int getNumberOfTaskTrackers() {
return this.registeredHosts.size();
}
@Override
public int getNumberOfSlots() {
int slots = 0;
for(Instance instance: registeredHosts.values()){
slots += instance.getNumberOfSlots();
}
return slots;
}
}
......@@ -14,32 +14,30 @@
package eu.stratosphere.nephele.instance;
/**
* A DummyInstance is a stub implementation of the {@link AbstractInstance} interface.
* A DummyInstance is a stub implementation of the {@link Instance} interface.
* Dummy instances are used to plan a job execution but must be replaced with
* concrete instances before the job execution starts.
*
*/
public class DummyInstance extends AbstractInstance {
public class DummyInstance extends Instance {
private static int nextID = 0;
private final String name;
public static synchronized DummyInstance createDummyInstance(InstanceType type) {
public static synchronized DummyInstance createDummyInstance() {
return new DummyInstance(type, nextID++);
return new DummyInstance(nextID++);
}
/**
* Constructs a new dummy instance of the given instance type.
*
* @param type
* the type of the new dummy instance
* @param id
* the ID of the dummy instance
*/
private DummyInstance(InstanceType type, int id) {
super(type, null, null, null, null);
private DummyInstance(int id) {
super(null, null, null, null, 0);
this.name = "DummyInstance_" + Integer.toString(id);
}
......
......@@ -11,23 +11,14 @@
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.jobmanager.scheduler.local;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractExecutionListener;
package eu.stratosphere.nephele.instance;
/**
* This is a wrapper class for the {@link LocalScheduler} to receive
* notifications about state changes of vertices belonging
* to scheduled jobs.
* <p>
* This class is thread-safe.
*
* Convenience class to extract hardware specifics of the computer executing this class
*/
public class LocalExecutionListener extends AbstractExecutionListener {
public class Hardware {
public LocalExecutionListener(final LocalScheduler scheduler, final ExecutionVertex executionVertex) {
super(scheduler, executionVertex);
public static int getNumberCPUCores() {
return Runtime.getRuntime().availableProcessors();
}
}
......@@ -15,8 +15,12 @@ package eu.stratosphere.nephele.instance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.Collection;
import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
......@@ -24,28 +28,22 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
import eu.stratosphere.runtime.io.channels.ChannelID;
/**
* An abstract instance represents a resource a {@link eu.stratosphere.nephele.taskmanager.TaskManager} runs on.
* An instance represents a resource a {@link eu.stratosphere.nephele.taskmanager.TaskManager} runs on.
*
*/
public abstract class AbstractInstance extends NetworkNode {
/**
* The type of the instance.
*/
private final InstanceType instanceType;
public class Instance extends NetworkNode {
/**
* The connection info identifying the instance.
*/
......@@ -56,16 +54,29 @@ public abstract class AbstractInstance extends NetworkNode {
*/
private final HardwareDescription hardwareDescription;
/**
* Number of slots available on the node
*/
private final int numberOfSlots;
/**
* Allocated slots on this instance
*/
private final Map<AllocationID, AllocatedSlot> allocatedSlots = new HashMap<AllocationID, AllocatedSlot>();
/**
* Stores the RPC stub object for the instance's task manager.
*/
private TaskOperationProtocol taskManager = null;
/**
* Time when last heat beat has been received from the task manager running on this instance.
*/
private long lastReceivedHeartBeat = System.currentTimeMillis();
/**
* Constructs an abstract instance object.
*
* @param instanceType
* the type of the instance
* @param instanceConnectionInfo
* the connection info identifying the instance
* @param parentNode
......@@ -75,13 +86,13 @@ public abstract class AbstractInstance extends NetworkNode {
* @param hardwareDescription
* the hardware description provided by the instance itself
*/
public AbstractInstance(final InstanceType instanceType, final InstanceConnectionInfo instanceConnectionInfo,
final NetworkNode parentNode, final NetworkTopology networkTopology,
final HardwareDescription hardwareDescription) {
public Instance(final InstanceConnectionInfo instanceConnectionInfo,
final NetworkNode parentNode, final NetworkTopology networkTopology,
final HardwareDescription hardwareDescription, int numberOfSlots) {
super((instanceConnectionInfo == null) ? null : instanceConnectionInfo.toString(), parentNode, networkTopology);
this.instanceType = instanceType;
this.instanceConnectionInfo = instanceConnectionInfo;
this.hardwareDescription = hardwareDescription;
this.numberOfSlots = numberOfSlots;
}
/**
......@@ -114,15 +125,6 @@ public abstract class AbstractInstance extends NetworkNode {
}
}
/**
* Returns the type of the instance.
*
* @return the type of the instance
*/
public final InstanceType getType() {
return this.instanceType;
}
/**
* Returns the instance's connection information object.
*
......@@ -209,7 +211,7 @@ public abstract class AbstractInstance extends NetworkNode {
/**
* Kills the task identified by the given ID at the instance's
* {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
*
*
* @param id
* the ID identifying the task to be killed
* @throws IOException
......@@ -221,6 +223,31 @@ public abstract class AbstractInstance extends NetworkNode {
return getTaskManagerProxy().killTask(id);
}
/**
* Updates the time of last received heart beat to the current system time.
*/
public synchronized void reportHeartBeat() {
this.lastReceivedHeartBeat = System.currentTimeMillis();
}
/**
* Returns whether the host is still alive.
*
* @param cleanUpInterval
* duration (in milliseconds) after which a host is
* considered dead if it has no received heat-beats.
* @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
* has expired, <code>false</code> otherwise
*/
public synchronized boolean isStillAlive(final long cleanUpInterval) {
if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
return false;
}
return true;
}
@Override
public boolean equals(final Object obj) {
......@@ -229,11 +256,11 @@ public abstract class AbstractInstance extends NetworkNode {
return super.equals(obj);
}
if (!(obj instanceof AbstractInstance)) {
if (!(obj instanceof Instance)) {
return false;
}
final AbstractInstance abstractInstance = (AbstractInstance) obj;
final Instance abstractInstance = (Instance) obj;
return this.instanceConnectionInfo.equals(abstractInstance.getInstanceConnectionInfo());
}
......@@ -282,7 +309,6 @@ public abstract class AbstractInstance extends NetworkNode {
* thrown if an error occurs during this remote procedure call
*/
public synchronized void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
getTaskManagerProxy().invalidateLookupCacheEntries(channelIDs);
}
......@@ -294,4 +320,43 @@ public abstract class AbstractInstance extends NetworkNode {
destroyTaskManagerProxy();
}
public int getNumberOfSlots() {
return numberOfSlots;
}
public int getNumberOfAvailableSlots() { return numberOfSlots - allocatedSlots.size(); }
public synchronized AllocatedResource allocateSlot(JobID jobID) throws InstanceException{
if(allocatedSlots.size() < numberOfSlots){
AllocatedSlot slot = new AllocatedSlot(jobID);
allocatedSlots.put(slot.getAllocationID(), slot);
return new AllocatedResource(this,slot.getAllocationID());
}else{
throw new InstanceException("Overbooking instance " + instanceConnectionInfo + ".");
}
}
public synchronized void releaseSlot(AllocationID allocationID) {
if(allocatedSlots.containsKey(allocationID)){
allocatedSlots.remove(allocationID);
}else{
throw new RuntimeException("There is no slot registered with allocation ID " + allocationID + ".");
}
}
public Collection<AllocatedSlot> getAllocatedSlots() {
return allocatedSlots.values();
}
public Collection<AllocatedSlot> removeAllocatedSlots() {
Collection<AllocatedSlot> slots = new ArrayList<AllocatedSlot>(this.allocatedSlots.values());
for(AllocatedSlot slot : slots){
releaseSlot(slot.getAllocationID());
}
return slots;
}
}
......@@ -13,157 +13,32 @@
package eu.stratosphere.nephele.instance;
import java.util.List;
import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkTopology;
/**
* In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
* compute resources,
* provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
* compute resources in order
* to report unexpected resource outages.
*
*/
public interface InstanceManager {
/**
* Requests an instance of the provided instance type from the instance manager.
*
* @param jobID
* the ID of the job this instance is requested for
* @param conf
* a configuration object including additional request information (e.g. credentials)
* @param instanceRequestMap
* a map specifying the instances requested by this call
* @param count
* the number of instances
* @throws InstanceException
* thrown if an error occurs during the instance request
*/
void requestInstance(JobID jobID, Configuration conf, InstanceRequestMap instanceRequestMap,
List<String> splitAffinityList) throws InstanceException;
/**
* Releases an allocated resource from a job.
*
* @param jobID
* the ID of the job the instance has been used for
* @param conf
* a configuration object including additional release information (e.g. credentials)
* @param allocatedResource
* the allocated resource to be released
* @throws InstanceException
* thrown if an error occurs during the release process
*/
void releaseAllocatedResource(JobID jobID, Configuration conf, AllocatedResource allocatedResource)
throws InstanceException;
/**
* Suggests a suitable instance type according to the provided hardware characteristics.
*
* @param minNumComputeUnits
* the minimum number of compute units
* @param minNumCPUCores
* the minimum number of CPU cores
* @param minMemorySize
* the minimum number of main memory (in MB)
* @param minDiskCapacity
* the minimum hard disk capacity (in GB)
* @param maxPricePerHour
* the maximum price per hour for the instance
* @return the instance type matching the requested hardware profile best or <code>null</code> if no such instance
* type is available
*/
InstanceType getSuitableInstanceType(int minNumComputeUnits, int minNumCPUCores, int minMemorySize,
int minDiskCapacity, int maxPricePerHour);
void shutdown();
/**
* Reports a heart beat message of an instance.
*
* @param instanceConnectionInfo
* the {@link InstanceConnectionInfo} object attached to the heart beat message
* @param hardwareDescription
* a hardware description with details on the instance's compute resources.
*/
void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription);
void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException;
/**
* Translates the name of an instance type to the corresponding instance type object.
*
* @param instanceTypeName
* the name of the instance type
* @return the instance type object matching the name or <code>null</code> if no such instance type exists
*/
InstanceType getInstanceTypeByName(String instanceTypeName);
void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo);
/**
* Returns the default instance type used by the instance manager.
*
* @return the default instance type
*/
InstanceType getDefaultInstanceType();
void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
HardwareDescription hardwareDescription, int numberOfSlots);
void requestInstance(JobID jobID, Configuration conf, int requiredSlots)
throws InstanceException;
/**
* Returns the network topology for the job with the given ID. The network topology
* for the job might only be an excerpt of the overall network topology. It only
* includes those instances as leaf nodes which are really allocated for the
* execution of the job.
*
* @param jobID
* the ID of the job to get the topology for
* @return the network topology for the job
*/
NetworkTopology getNetworkTopology(JobID jobID);
/**
* Sets the {@link InstanceListener} object which is supposed to be
* notified about instance availability and deaths.
*
* @param instanceListener
* the instance listener to set for this instance manager
*/
void setInstanceListener(InstanceListener instanceListener);
/**
* Returns a map of all instance types which are currently available to Nephele. The map contains a description of
* the hardware characteristics for each instance type as provided in the configuration file. Moreover, it contains
* the actual hardware description as reported by task managers running on the individual instances. If available,
* the map also contains the maximum number instances Nephele can allocate of each instance type (i.e. if no other
* job occupies instances).
*
* @return a list of all instance types available to Nephele
*/
Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes();
/**
* Returns the {@link AbstractInstance} with the given name.
*
* @param name
* the name of the instance
* @return the instance with the given name or <code>null</code> if no such instance could be found
*/
AbstractInstance getInstanceByName(String name);
Instance getInstanceByName(String name);
/**
* Cancels all pending instance requests that might still exist for the job with the given ID.
*
* @param jobID
* the ID of the job to cancel the pending instance requests for
*/
void cancelPendingRequests(JobID jobID);
/**
* Shuts the instance manager down and stops all its internal processes.
*/
void shutdown();
/**
*
* @return the number of available (registered) TaskTrackers
*/
int getNumberOfTaskTrackers();
int getNumberOfSlots();
}
......@@ -11,7 +11,7 @@
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance.cluster;
package eu.stratosphere.nephele.instance;
import java.util.List;
......@@ -21,14 +21,14 @@ import eu.stratosphere.nephele.jobgraph.JobID;
/**
* This class is an auxiliary class to send the notification
* about the availability of an {@link AbstractInstance} to the given {@link InstanceListener} object. The notification
* must be sent from
* about the availability of an {@link eu.stratosphere.nephele.instance.Instance} to the given {@link
* InstanceListener} object. The notification must be sent from
* a separate thread, otherwise the atomic operation of requesting an instance
* for a vertex and switching to the state ASSINING could not be guaranteed.
* for a vertex and switching to the state ASSIGNING could not be guaranteed.
* This class is thread-safe.
*
*/
public class ClusterInstanceNotifier extends Thread {
public class InstanceNotifier extends Thread {
/**
* The {@link InstanceListener} object to send the notification to.
......@@ -55,8 +55,8 @@ public class ClusterInstanceNotifier extends Thread {
* @param allocatedResources
* the resources with has been allocated for the job
*/
public ClusterInstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
final List<AllocatedResource> allocatedResources) {
public InstanceNotifier(final InstanceListener instanceListener, final JobID jobID,
final List<AllocatedResource> allocatedResources) {
this.instanceListener = instanceListener;
this.jobID = jobID;
this.allocatedResources = allocatedResources;
......
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* An instance request map specifies the required types of instances to run a specific job and the respective number
* thereof. For each instance type it is possible to specify the minimum number of instances required to run the job. If
* the {@link InstanceManager} cannot manage to provide at least this minimum numbers of instances for the given type,
* the job will be rejected.
* <p>
* In addition, is it also possible to specify the optimal number of instances for a particular instance type. The
* {@link InstanceManager} will try to provide this optimal number of instances, but will also start the job with less
* instances.
* <p>
* This class is not thread-safe.
*
*/
public final class InstanceRequestMap {
/**
* The map holding the minimum number of instances to be requested for each instance type.
*/
private final Map<InstanceType, Integer> minimumMap = new HashMap<InstanceType, Integer>();
/**
* The map holding the maximum number of instances to be requested for each instance type.
*/
private final Map<InstanceType, Integer> maximumMap = new HashMap<InstanceType, Integer>();
/**
* Sets the minimum number of instances to be requested from the given instance type.
*
* @param instanceType
* the type of instance to request
* @param number
* the minimum number of instances to request
*/
public void setMinimumNumberOfInstances(final InstanceType instanceType, final int number) {
this.minimumMap.put(instanceType, Integer.valueOf(number));
}
/**
* Sets the maximum number of instances to be requested from the given instance type.
*
* @param instanceType
* the type of instance to request
* @param number
* the maximum number of instances to request
*/
public void setMaximumNumberOfInstances(final InstanceType instanceType, final int number) {
this.maximumMap.put(instanceType, Integer.valueOf(number));
}
/**
* Sets both the minimum and the maximum number of instances to be requested from the given instance type.
*
* @param instanceType
* the type of instance to request
* @param number
* the minimum and the maximum number of instances to request
*/
public void setNumberOfInstances(final InstanceType instanceType, final int number) {
setMinimumNumberOfInstances(instanceType, number);
setMaximumNumberOfInstances(instanceType, number);
}
/**
* Returns the minimum number of instances to be requested from the given instance type.
*
* @param instanceType
* the type of instance to request
* @return the minimum number of instances to be requested from the given instance type
*/
public int getMinimumNumberOfInstances(final InstanceType instanceType) {
final Integer val = this.minimumMap.get(instanceType);
if (val != null) {
return val.intValue();
}
return 0;
}
/**
* Returns the maximum number of instances to be requested from the given instance type.
*
* @param instanceType
* the type of instance to request
* @return the maximum number of instances to be requested from the given instance type
*/
public int getMaximumNumberOfInstances(final InstanceType instanceType) {
final Integer val = this.maximumMap.get(instanceType);
if (val != null) {
return val.intValue();
}
return 0;
}
/**
* Checks if this instance request map is empty, i.e. neither contains an entry for the minimum or maximum number of
* instances to be requested for any instance type.
*
* @return <code>true</code> if the map is empty, <code>false</code> otherwise
*/
public boolean isEmpty() {
if (!this.maximumMap.isEmpty()) {
return false;
}
if (!this.minimumMap.isEmpty()) {
return false;
}
return true;
}
/**
* Returns an {@link Iterator} object which allows to traverse the minimum number of instances to be requested for
* each instance type.
*
* @return an iterator to traverse the minimum number of instances to be requested for each instance type
*/
public Iterator<Map.Entry<InstanceType, Integer>> getMaximumIterator() {
return this.maximumMap.entrySet().iterator();
}
/**
* Returns an {@link Iterator} object which allows to traverse the maximum number of instances to be requested for
* each instance type.
*
* @return an iterator to traverse the maximum number of instances to be requested for each instance type
*/
public Iterator<Map.Entry<InstanceType, Integer>> getMinimumIterator() {
return this.minimumMap.entrySet().iterator();
}
/**
* Returns the number of different instance types stored in this request map.
*
* @return the number of different instance types stored in this request map
*/
public int size() {
final int s = this.maximumMap.size();
if (s != this.minimumMap.size()) {
throw new IllegalStateException("InstanceRequestMap is in an inconsistent state");
}
return s;
}
/**
* Clears the instance request map.
*/
public void clear() {
this.maximumMap.clear();
this.minimumMap.clear();
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
/**
* An instance type describes the hardware resources a task manager runs on. According
* to its type an instance has a specific number of CPU cores, computation units, a certain
* amount of main memory and disk space. In addition, it has a specific price per hour.
*
*/
public final class InstanceType implements IOReadableWritable {
/**
* The identifier for this instance type.
*/
private String identifier;
/**
* The number of computational units of this instance type.
* A computational unit is a virtual compute capacity. A host with a
* single-core 2 GHz CPU may possess 20 compute units (1*20), while a
* dual-core 2.5 GHz CPU may possess 50 compute units (2*25). The
* specified number of compute units expresses the fraction of the
* CPU capacity promised to a user.
*/
private int numberOfComputeUnits = 0;
/**
* The number of CPU cores of this instance type.
*/
private int numberOfCores = 0;
/**
* The amount of main memory of this instance type (in MB).
*/
private int memorySize = 0;
/**
* The disk capacity of this instance type (in GB).
*/
private int diskCapacity = 0;
/**
* The price per hour that is charged for running instances of this type.
*/
private int pricePerHour = 0;
/**
* Public constructor required for the serialization process.
*/
public InstanceType() {
}
/**
* Creates a new instance type.
*
* @param identifier
* identifier for this instance type
* @param numberOfComputeUnits
* number of computational units of this instance type
* @param numberOfCores
* number of CPU cores of this instance type
* @param memorySize
* amount of main memory of this instance type (in MB)
* @param diskCapacity
* disk capacity of this instance type (in GB)
* @param pricePerHour
* price per hour that is charged for running instances of this type
*/
InstanceType(final String identifier, final int numberOfComputeUnits, final int numberOfCores,
final int memorySize,
final int diskCapacity, final int pricePerHour) {
this.identifier = identifier;
this.numberOfComputeUnits = numberOfComputeUnits;
this.numberOfCores = numberOfCores;
this.memorySize = memorySize;
this.diskCapacity = diskCapacity;
this.pricePerHour = pricePerHour;
}
/**
* Returns the instance type's number of computational units.
*
* @return the instance type's number of computational units
*/
public int getNumberOfComputeUnits() {
return this.numberOfComputeUnits;
}
/**
* Returns the instance type's number of CPU cores.
*
* @return the instance type's number of CPU cores
*/
public int getNumberOfCores() {
return this.numberOfCores;
}
/**
* Returns the instance type's amount of main memory.
*
* @return the instance type's amount of main memory
*/
public int getMemorySize() {
return this.memorySize;
}
/**
* Returns the instance type's disk capacity.
*
* @return the instance type's disk capacity
*/
public int getDiskCapacity() {
return this.diskCapacity;
}
/**
* Returns the instance type's price per hour.
*
* @return the instance type's price per hour
*/
public int getPricePerHour() {
return this.pricePerHour;
}
/**
* Returns the instance type's identifier.
*
* @return the instance type's identifier
*/
public String getIdentifier() {
return this.identifier;
}
@Override
public String toString() {
final StringBuilder bld = new StringBuilder(32);
bld.append(this.identifier);
bld.append(' ');
bld.append('(');
bld.append(this.numberOfComputeUnits);
bld.append(',');
bld.append(this.numberOfCores);
bld.append(',');
bld.append(this.memorySize);
bld.append(',');
bld.append(this.diskCapacity);
bld.append(',');
bld.append(this.pricePerHour);
bld.append(')');
return bld.toString();
}
@Override
public void write(final DataOutput out) throws IOException {
StringRecord.writeString(out, this.identifier);
out.writeInt(this.numberOfComputeUnits);
out.writeInt(this.numberOfCores);
out.writeInt(this.memorySize);
out.writeInt(this.diskCapacity);
out.writeInt(this.pricePerHour);
}
@Override
public void read(final DataInput in) throws IOException {
this.identifier = StringRecord.readString(in);
this.numberOfComputeUnits = in.readInt();
this.numberOfCores = in.readInt();
this.memorySize = in.readInt();
this.diskCapacity = in.readInt();
this.pricePerHour = in.readInt();
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
/**
* An instance type description provides details of instance type. Is can comprise both the hardware description from
* the instance type description (as provided by the operator/administrator of the instance) as well as the actual
* hardware description which has been determined on the compute instance itself.
*
*/
public final class InstanceTypeDescription implements IOReadableWritable {
/**
* The instance type.
*/
private InstanceType instanceType = null;
/**
* The hardware description as created by the {@link InstanceManager}.
*/
private HardwareDescription hardwareDescription = null;
/**
* The maximum number of available instances of this type.
*/
private int maximumNumberOfAvailableInstances = 0;
/**
* Public default constructor required for serialization process.
*/
public InstanceTypeDescription() {
}
/**
* Constructs a new instance type description.
*
* @param instanceType
* the instance type
* @param hardwareDescription
* the hardware description as created by the {@link InstanceManager}
* @param maximumNumberOfAvailableInstances
* the maximum number of available instances of this type
*/
InstanceTypeDescription(final InstanceType instanceType, final HardwareDescription hardwareDescription,
final int maximumNumberOfAvailableInstances) {
this.instanceType = instanceType;
this.hardwareDescription = hardwareDescription;
this.maximumNumberOfAvailableInstances = maximumNumberOfAvailableInstances;
}
@Override
public void write(final DataOutput out) throws IOException {
if (this.instanceType == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
this.instanceType.write(out);
}
if (this.hardwareDescription == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
this.hardwareDescription.write(out);
}
out.writeInt(this.maximumNumberOfAvailableInstances);
}
@Override
public void read(final DataInput in) throws IOException {
if (in.readBoolean()) {
this.instanceType = new InstanceType();
this.instanceType.read(in);
} else {
this.instanceType = null;
}
if (in.readBoolean()) {
this.hardwareDescription = new HardwareDescription();
this.hardwareDescription.read(in);
}
this.maximumNumberOfAvailableInstances = in.readInt();
}
/**
* Returns the hardware description as created by the {@link InstanceManager}.
*
* @return the instance's hardware description or <code>null</code> if no description is available
*/
public HardwareDescription getHardwareDescription() {
return this.hardwareDescription;
}
/**
* Returns the instance type as determined by the {@link InstanceManager}.
*
* @return the instance type
*/
public InstanceType getInstanceType() {
return this.instanceType;
}
/**
* Returns the maximum number of instances the {@link InstanceManager} can at most allocate of this instance type
* (i.e. when no other jobs are occupying any resources).
*
* @return the maximum number of instances of this type or <code>-1</code> if the number is unknown to the
* {@link InstanceManager}
*/
public int getMaximumNumberOfAvailableInstances() {
return this.maximumNumberOfAvailableInstances;
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This factory constructs {@link InstanceType} objects.
*
*/
public class InstanceTypeFactory {
/**
* The logger used to report errors.
*/
private static final Log LOG = LogFactory.getLog(InstanceTypeFactory.class);
/**
* The pattern used to parse the hardware descriptions of instance types.
*/
private static Pattern INSTANCE_TYPE_PATTERN = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$");
/**
* Private constructor, so class cannot be instantiated.
*/
private InstanceTypeFactory() {
}
/**
* Constructs an {@link InstanceType} object by parsing a hardware description string.
*
* @param description
* the hardware description reflected by this instance type
* @return an instance type reflecting the given hardware description or <code>null</code> if the description cannot
* be parsed
*/
public static InstanceType constructFromDescription(String description) {
final Matcher m = INSTANCE_TYPE_PATTERN.matcher(description);
if (!m.matches()) {
LOG.error("Cannot extract instance type from string " + description);
return null;
}
final String identifier = m.group(1);
final int numComputeUnits = Integer.parseInt(m.group(2));
final int numCores = Integer.parseInt(m.group(3));
final int memorySize = Integer.parseInt(m.group(4));
final int diskCapacity = Integer.parseInt(m.group(5));
final int pricePerHour = Integer.parseInt(m.group(6));
return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour);
}
/**
* Constructs an {@link InstanceType} from the given parameters.
*
* @param identifier
* identifier for this instance type
* @param numberOfComputeUnits
* number of computational units of this instance type
* @param numberOfCores
* number of CPU cores of this instance type
* @param memorySize
* amount of main memory of this instance type (in MB)
* @param diskCapacity
* disk capacity of this instance type (in GB)
* @param pricePerHour
* price per hour that is charged for running instances of this type
*/
public static InstanceType construct(String identifier, int numberOfComputeUnits, int numberOfCores,
int memorySize, int diskCapacity, int pricePerHour) {
return new InstanceType(identifier, numberOfComputeUnits, numberOfCores, memorySize, diskCapacity, pricePerHour);
}
}
......@@ -13,34 +13,48 @@
package eu.stratosphere.nephele.instance;
/**
* This factory produces {@link InstanceTypeDescription} objects.
* <p>
* This class is thread-safe.
*
*/
public class InstanceTypeDescriptionFactory {
/**
* Private constructor, so class cannot be instantiated.
*/
private InstanceTypeDescriptionFactory() {
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.taskmanager.TaskManager;
import java.util.ArrayList;
import java.util.List;
public class LocalInstanceManager extends DefaultInstanceManager {
private List<TaskManager> taskManagers = new ArrayList<TaskManager>();
public LocalInstanceManager() throws Exception{
int numTaskManager = GlobalConfiguration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
ExecutionMode execMode = numTaskManager == 1 ? ExecutionMode.LOCAL : ExecutionMode.CLUSTER;
for (int i=0; i < numTaskManager; i++){
Configuration tm = new Configuration();
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
GlobalConfiguration.includeConfiguration(tm);
taskManagers.add(new TaskManager(execMode));
}
}
/**
* Constructs a new {@link InstaceTypeDescription} object.
*
* @param instanceType
* the instance type
* @param hardwareDescription
* the hardware description as created by the {@link InstanceManager}
* @param numberOfAvailableInstances
* the number of available instances of this type
* @return the instance type description
*/
public static InstanceTypeDescription construct(InstanceType instanceType, HardwareDescription hardwareDescription,
int numberOfAvailableInstances) {
return new InstanceTypeDescription(instanceType, hardwareDescription, numberOfAvailableInstances);
@Override
public void shutdown(){
for(TaskManager taskManager: taskManagers){
taskManager.shutdown();
}
super.shutdown();
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance.cluster;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocationID;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
/**
* Representation of a host of a compute cluster.
* <p>
* This class is thread-safe.
*
*/
class ClusterInstance extends AbstractInstance {
/**
* A map of slices allocated on this host.
*/
private final Map<AllocationID, AllocatedSlice> allocatedSlices = new HashMap<AllocationID, AllocatedSlice>();
/**
* The remaining capacity of this host that can be used by instances.
*/
private InstanceType remainingCapacity;
/**
* Time when last heat beat has been received from the task manager running on this instance.
*/
private long lastReceivedHeartBeat = System.currentTimeMillis();
/**
* Constructs a new cluster instance.
*
* @param instanceConnectionInfo
* the instance connection info identifying the host
* @param capacity
* capacity of this host
* @param parentNode
* the parent node of this node in the network topology
* @param networkTopology
* the network topology this node is part of
* @param hardwareDescription
* the hardware description reported by the instance itself
*/
public ClusterInstance(final InstanceConnectionInfo instanceConnectionInfo, final InstanceType capacity,
final NetworkNode parentNode, final NetworkTopology networkTopology,
final HardwareDescription hardwareDescription) {
super(capacity, instanceConnectionInfo, parentNode, networkTopology, hardwareDescription);
this.remainingCapacity = capacity;
}
/**
* Updates the time of last received heart beat to the current system time.
*/
synchronized void reportHeartBeat() {
this.lastReceivedHeartBeat = System.currentTimeMillis();
}
/**
* Returns whether the host is still alive.
*
* @param cleanUpInterval
* duration (in milliseconds) after which a host is
* considered dead if it has no received heat-beats.
* @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
* has expired, <code>false</code> otherwise
*/
synchronized boolean isStillAlive(final long cleanUpInterval) {
if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
return false;
}
return true;
}
/**
* Tries to create a new slice on this instance.
*
* @param reqType
* the type describing the hardware characteristics of the slice
* @param jobID
* the ID of the job the new slice belongs to
* @return a new {@AllocatedSlice} object if a slice with the given hardware characteristics could
* still be accommodated on this instance or <code>null</code> if the instance's remaining resources
* were insufficient to host the desired slice
*/
synchronized AllocatedSlice createSlice(final InstanceType reqType, final JobID jobID) {
// check whether we can accommodate the instance
if (remainingCapacity.getNumberOfComputeUnits() >= reqType.getNumberOfComputeUnits()
&& remainingCapacity.getNumberOfCores() >= reqType.getNumberOfCores()
&& remainingCapacity.getMemorySize() >= reqType.getMemorySize()
&& remainingCapacity.getDiskCapacity() >= reqType.getDiskCapacity()) {
// reduce available capacity by what has been requested
remainingCapacity = InstanceTypeFactory.construct(remainingCapacity.getIdentifier(), remainingCapacity
.getNumberOfComputeUnits()
- reqType.getNumberOfComputeUnits(), remainingCapacity.getNumberOfCores() - reqType.getNumberOfCores(),
remainingCapacity.getMemorySize() - reqType.getMemorySize(), remainingCapacity.getDiskCapacity()
- reqType.getDiskCapacity(), remainingCapacity.getPricePerHour());
final long allocationTime = System.currentTimeMillis();
final AllocatedSlice slice = new AllocatedSlice(this, reqType, jobID, allocationTime);
this.allocatedSlices.put(slice.getAllocationID(), slice);
return slice;
}
// we cannot accommodate the instance
return null;
}
/**
* Removes the slice identified by the given allocation ID from
* this instance and frees up the allocated resources.
*
* @param allocationID
* the allocation ID of the slice to be removed
* @return the slice with has been removed from the instance or <code>null</code> if no slice
* with the given allocation ID could be found
*/
synchronized AllocatedSlice removeAllocatedSlice(final AllocationID allocationID) {
final AllocatedSlice slice = this.allocatedSlices.remove(allocationID);
if (slice != null) {
this.remainingCapacity = InstanceTypeFactory.construct(this.remainingCapacity.getIdentifier(),
this.remainingCapacity
.getNumberOfComputeUnits()
+ slice.getType().getNumberOfComputeUnits(), this.remainingCapacity.getNumberOfCores()
+ slice.getType().getNumberOfCores(), this.remainingCapacity.getMemorySize()
+ slice.getType().getMemorySize(), this.remainingCapacity.getDiskCapacity()
+ slice.getType().getDiskCapacity(), this.remainingCapacity.getPricePerHour());
}
return slice;
}
/**
* Removes all allocated slices on this instance and frees
* up their allocated resources.
*
* @return a list of all removed slices
*/
synchronized List<AllocatedSlice> removeAllAllocatedSlices() {
final List<AllocatedSlice> slices = new ArrayList<AllocatedSlice>(this.allocatedSlices.values());
final Iterator<AllocatedSlice> it = slices.iterator();
while (it.hasNext()) {
removeAllocatedSlice(it.next().getAllocationID());
}
return slices;
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance.cluster;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import eu.stratosphere.nephele.instance.InstanceType;
/**
* This class represents a pending request, i.e. a request for a particular type and number of {@link AbstractInstance}
* objects which could not be fulfilled yet.
* <p>
* This class is not thread-safe.
*
*/
public final class PendingRequestsMap {
/**
* The map storing the pending instance requests for the job this pending request object belongs to.
*/
private final Map<InstanceType, Integer> pendingRequests = new HashMap<InstanceType, Integer>();
/**
* Checks if the job this object belongs to has pending instance requests.
*
* @return <code>true</code> if the job this object belongs to has pending instance requests, <code>false</code>
* otherwise
*/
boolean hasPendingRequests() {
return !(this.pendingRequests.isEmpty());
}
/**
* Adds the a pending request for the given number of instances of the given type to this map.
*
* @param instanceType
* the requested instance type
* @param numberOfInstances
* the requested number of instances of this type
*/
void addRequest(final InstanceType instanceType, final int numberOfInstances) {
Integer numberOfRemainingInstances = this.pendingRequests.get(instanceType);
if (numberOfRemainingInstances == null) {
numberOfRemainingInstances = Integer.valueOf(numberOfInstances);
} else {
numberOfRemainingInstances = Integer.valueOf(numberOfRemainingInstances.intValue() + numberOfInstances);
}
this.pendingRequests.put(instanceType, numberOfRemainingInstances);
}
/**
* Returns an iterator for the pending requests encapsulated in this map.
*
* @return an iterator for the pending requests encapsulated in this map
*/
Iterator<Map.Entry<InstanceType, Integer>> iterator() {
return this.pendingRequests.entrySet().iterator();
}
/**
* Decreases the number of remaining instances to request of the given type.
*
* @param instanceType
* the instance type for which the number of remaining instances shall be decreased
*/
void decreaseNumberOfPendingInstances(final InstanceType instanceType) {
Integer numberOfRemainingInstances = this.pendingRequests.get(instanceType);
if (numberOfRemainingInstances == null) {
return;
}
numberOfRemainingInstances = Integer.valueOf(numberOfRemainingInstances.intValue() - 1);
if (numberOfRemainingInstances.intValue() == 0) {
this.pendingRequests.remove(instanceType);
} else {
this.pendingRequests.put(instanceType, numberOfRemainingInstances);
}
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance.local;
import java.util.List;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.jobgraph.JobID;
/**
* This class is an auxiliary class to send the notification
* about the availability of an {@link AllocatedResource} to the given {@link InstanceListener} object. The notification
* must be sent from
* a separate thread, otherwise the atomic operation of requesting an instance
* for a vertex and switching to the state ASSINING could not be guaranteed.
* This class is thread-safe.
*
*/
public class LocalInstanceNotifier extends Thread {
/**
* The {@link InstanceListener} object to send the notification to.
*/
private final InstanceListener instanceListener;
/**
* The ID of the job the new instance belongs to.
*/
private final JobID jobID;
/**
* The resources allocated for the job.
*/
private final List<AllocatedResource> allocatedResources;
/**
* Constructs a new instance notifier object.
*
* @param instanceListener
* the listener object to send the notification to
* @param jobID
* the ID of the job the newly allocated resources belongs to
* @param allocatedResource
* the resources allocated for the job
*/
public LocalInstanceNotifier(final InstanceListener instanceListener, final JobID jobID, final List<AllocatedResource> allocatedResources) {
this.instanceListener = instanceListener;
this.jobID = jobID;
this.allocatedResources = allocatedResources;
}
@Override
public void run() {
this.instanceListener.resourcesAllocated(this.jobID, this.allocatedResources);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册