提交 fc128def 编写于 作者: D Daniel Warneke

Refactores instance type as a preparation for the new instance description interface

上级 f06f1959
......@@ -25,6 +25,7 @@ import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocationID;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkNode;
......@@ -116,7 +117,7 @@ class ClusterInstance extends AbstractInstance {
&& remainingCapacity.getDiskCapacity() >= reqType.getDiskCapacity()) {
// reduce available capacity by what has been requested
remainingCapacity = new InstanceType(remainingCapacity.getIdentifier(), remainingCapacity
remainingCapacity = InstanceTypeFactory.construct(remainingCapacity.getIdentifier(), remainingCapacity
.getNumberOfComputeUnits()
- reqType.getNumberOfComputeUnits(), remainingCapacity.getNumberOfCores() - reqType.getNumberOfCores(),
remainingCapacity.getMemorySize() - reqType.getMemorySize(), remainingCapacity.getDiskCapacity()
......@@ -147,7 +148,7 @@ class ClusterInstance extends AbstractInstance {
final AllocatedSlice slice = this.allocatedSlices.remove(allocationID);
if (slice != null) {
this.remainingCapacity = new InstanceType(this.remainingCapacity.getIdentifier(), this.remainingCapacity
this.remainingCapacity = InstanceTypeFactory.construct(this.remainingCapacity.getIdentifier(), this.remainingCapacity
.getNumberOfComputeUnits()
+ slice.getType().getNumberOfComputeUnits(), this.remainingCapacity.getNumberOfCores()
+ slice.getType().getNumberOfCores(), this.remainingCapacity.getMemorySize()
......
......@@ -48,6 +48,7 @@ import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
......@@ -345,16 +346,16 @@ public class ClusterManager implements InstanceManager {
// parse entry
try {
// if successful add new instance type
final InstanceType instanceType = InstanceType.getTypeFromString(descr);
final InstanceType instanceType = InstanceTypeFactory.constructFromDescription(descr);
LOG.info("Loaded instance type " + instanceType.getIdentifier() + " from the configuration");
instanceTypes.add(InstanceType.getTypeFromString(descr));
instanceTypes.add(instanceType);
} catch (Throwable t) {
LOG.error("Error parsing " + key + ":" + descr + ". Using default using default instance type: " +
ConfigConstants.DEFAULT_INSTANCE_TYPE + " for instance type " + count + ".", t);
// we need to add an instance type anyways, because otherwise a non-parsable instance description
// would cause the numbering to be wrong.
instanceTypes.add(InstanceType.getTypeFromString(ConfigConstants.DEFAULT_INSTANCE_TYPE));
instanceTypes.add(InstanceTypeFactory.constructFromDescription(ConfigConstants.DEFAULT_INSTANCE_TYPE));
}
// Increase key index
......
......@@ -26,6 +26,7 @@ import static org.junit.Assert.*;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.instance.cluster.AllocatedSlice;
import eu.stratosphere.nephele.instance.cluster.ClusterInstance;
import eu.stratosphere.nephele.jobgraph.JobID;
......@@ -45,7 +46,8 @@ public class HostInClusterTest {
int memorySize = 32 * 1024;
int diskCapacity = 200;
int pricePerHour = 10;
final InstanceType capacity = new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity,
final InstanceType capacity = InstanceTypeFactory.construct(identifier, numComputeUnits, numCores, memorySize,
diskCapacity,
pricePerHour);
final InstanceConnectionInfo instanceConnectionInfo = new InstanceConnectionInfo(socket.getAddress(), socket
.getPort(), 1235);
......@@ -80,7 +82,7 @@ public class HostInClusterTest {
final int numCores = 8 / 8;
final int memorySize = 32 * 1024 / 8;
final int diskCapacity = 200 / 8;
final InstanceType type = new InstanceType("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1);
final InstanceType type = InstanceTypeFactory.construct("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1);
for (int run = 0; run < 2; ++run) {
// do this twice to check that everything is correctly freed
......@@ -95,10 +97,10 @@ public class HostInClusterTest {
}
// now no resources should be left
assertNull(host.createSlice(new InstanceType("dummy", 1, 0, 0, 0, 0), jobID));
assertNull(host.createSlice(new InstanceType("dummy", 0, 1, 0, 0, 0), jobID));
assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 1, 0, 0), jobID));
assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 0, 1, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 1, 0, 0, 0, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 1, 0, 0, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 1, 0, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 0, 1, 0), jobID));
for (int i = 0; i < 8; ++i) {
host.removeAllocatedSlice(slices[i].getAllocationID());
......@@ -117,7 +119,7 @@ public class HostInClusterTest {
final int numCores = 8 / 8;
final int memorySize = 32 * 1024 / 8;
final int diskCapacity = 200 / 8;
final InstanceType type = new InstanceType("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1);
final InstanceType type = InstanceTypeFactory.construct("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1);
for (int run = 0; run < 2; ++run) {
// do this twice to check that everything is correctly freed
......@@ -132,10 +134,10 @@ public class HostInClusterTest {
}
// now no resources should be left
assertNull(host.createSlice(new InstanceType("dummy", 1, 0, 0, 0, 0), jobID));
assertNull(host.createSlice(new InstanceType("dummy", 0, 1, 0, 0, 0), jobID));
assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 1, 0, 0), jobID));
assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 0, 1, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 1, 0, 0, 0, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 1, 0, 0, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 1, 0, 0), jobID));
assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 0, 1, 0), jobID));
List<AllocatedSlice> removedSlices = host.removeAllAllocatedSlices();
......
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.types.StringRecord;
/**
* This class extends a standard {@link java.util.ArrayList} by implementing the
* {@link eu.stratosphere.nephele.io.IOReadableWritable} interface. As a result, array lists of this type can be used
* with Nephele's RPC system.
* <p>
* This class is not thread-safe.
*
* @author warneke
* @param <E>
* the type of object stored inside this array list
*/
public class SerializableArrayList<E extends IOReadableWritable> extends ArrayList<E> implements IOReadableWritable {
/**
* Generated serial version UID.
*/
private static final long serialVersionUID = 8196856588290198537L;
/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(size());
final Iterator<E> it = iterator();
while (it.hasNext()) {
final E element = it.next();
// Write out type
StringRecord.writeString(out, element.getClass().getName());
// Write out element itself
element.write(out);
}
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
// TODO: See if type safety can be improved here
@Override
public void read(DataInput in) throws IOException {
// Make sure the list is empty
clear();
final int numberOfElements = in.readInt();
for (int i = 0; i < numberOfElements; i++) {
final String elementType = StringRecord.readString(in);
Class<E> clazz = null;
try {
clazz = (Class<E>) Class.forName(elementType);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
}
E element = null;
try {
element = clazz.newInstance();
} catch (Exception e) {
throw new IOException(StringUtils.stringifyException(e));
}
element.read(in);
add(element);
}
}
}
......@@ -27,8 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -45,6 +43,7 @@ import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkTopology;
import eu.stratosphere.nephele.util.StringUtils;
......@@ -167,9 +166,6 @@ public class CloudManager extends TimerTask implements InstanceManager {
throw new RuntimeException("Illegal configuration, cloudmgr.nrtypes is not configured");
}
// read instance types
final Pattern pattern = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$");
for (int i = 0; i < num; ++i) {
final String key = "cloudmgr.instancetype." + (i + 1);
......@@ -178,27 +174,7 @@ public class CloudManager extends TimerTask implements InstanceManager {
throw new RuntimeException("Illegal configuration for " + key);
}
try {
final Matcher m = pattern.matcher(type);
if (!m.matches()) {
throw new Exception(key + " does not match pattern " + pattern.toString());
}
final String identifier = m.group(1);
final int numComputeUnits = Integer.parseInt(m.group(2));
final int numCores = Integer.parseInt(m.group(3));
final int memorySize = Integer.parseInt(m.group(4));
final int diskCapacity = Integer.parseInt(m.group(5));
final int pricePerHour = Integer.parseInt(m.group(6));
final InstanceType instanceType = new InstanceType(identifier, numComputeUnits, numCores, memorySize,
diskCapacity, pricePerHour);
instanceTypes.add(instanceType);
} catch (Exception e) {
LOG.error("Error parsing " + key + ":" + type, e);
throw new RuntimeException("Error parsing " + key + ":" + type, e);
}
instanceTypes.add(InstanceTypeFactory.constructFromDescription(type));
}
// sort by price
......
......@@ -22,7 +22,7 @@ import java.net.InetSocketAddress;
import org.junit.Test;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.instance.cloud.CloudInstance;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.topology.NetworkTopology;
......@@ -34,7 +34,8 @@ public class CloudInstanceTest {
final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology();
final CloudInstance ci = new CloudInstance("i-1234ABCD", new InstanceType("m1.small", 1, 1, 2048, 40, 10),
final CloudInstance ci = new CloudInstance("i-1234ABCD",
InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"),
"wenjun", new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121),
1234567890, networkTopology.getRootNode(), networkTopology);
......@@ -49,7 +50,8 @@ public class CloudInstanceTest {
final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology();
final CloudInstance ci = new CloudInstance("i-1234ABCD", new InstanceType("m1.small", 1, 1, 2048, 40, 10),
final CloudInstance ci = new CloudInstance("i-1234ABCD",
InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"),
"wenjun", new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121),
System.currentTimeMillis(), networkTopology.getRootNode(), networkTopology);
......
......@@ -52,6 +52,7 @@ import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.instance.cloud.CloudInstance;
import eu.stratosphere.nephele.instance.cloud.CloudManager;
import eu.stratosphere.nephele.instance.cloud.FloatingInstance;
......@@ -297,7 +298,7 @@ public class CloudManagerTest {
// request instance
try {
cm.requestInstance(jobID, conf, new InstanceType("m1.small", 1, 1, 2048, 40, 10));
cm.requestInstance(jobID, conf, InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"));
} catch (InstanceException e) {
e.printStackTrace();
}
......@@ -327,7 +328,8 @@ public class CloudManagerTest {
// report heart beat
final HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(8,
32L * 1024L * 1024L * 1024L, 32L * 1024L * 1024L * 1024L);
cm.reportHeartBeat(new InstanceConnectionInfo(InetAddress.getByName(instance.getDnsName()), 10000, 20000), hardwareDescription);
cm.reportHeartBeat(new InstanceConnectionInfo(InetAddress.getByName(instance.getDnsName()), 10000, 20000),
hardwareDescription);
} catch (SecurityException e) {
e.printStackTrace();
......
......@@ -23,7 +23,7 @@ import java.util.ArrayList;
import org.junit.Test;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.instance.cloud.CloudInstance;
import eu.stratosphere.nephele.instance.cloud.JobToInstancesMapping;
import eu.stratosphere.nephele.topology.NetworkTopology;
......@@ -36,7 +36,7 @@ public class JobToInstancesMappingTest {
final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology();
JobToInstancesMapping map = new JobToInstancesMapping("wenjun", "1234567", "abcdefg");
CloudInstance ci = new CloudInstance("i-1234ABCD", new InstanceType("m1.small", 1, 1, 2048, 40, 10), "wenjun",
CloudInstance ci = new CloudInstance("i-1234ABCD", InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"), "wenjun",
new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121), 1234567890,
networkTopology.getRootNode(), networkTopology);
......
......@@ -15,9 +15,12 @@
package eu.stratosphere.nephele.instance;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.types.StringRecord;
/**
* An instance type describes the hardware resources a task manager runs on. According
......@@ -26,18 +29,12 @@ import java.util.regex.PatternSyntaxException;
*
* @author warneke
*/
public class InstanceType {
/**
* The pattern used to parse descriptions of instance types.
*/
private static Pattern descr_pattern = null;
// ------------------------------------------------------------------------
public class InstanceType implements IOReadableWritable {
/**
* The identifier for this instance type.
*/
private final String identifier;
private String identifier;
/**
* The number of computational units of this instance type.
......@@ -47,27 +44,33 @@ public class InstanceType {
* specified number of compute units expresses the fraction of the
* CPU capacity promised to a user.
*/
private final int numberOfComputeUnits;
private int numberOfComputeUnits = 0;
/**
* The number of CPU cores of this instance type.
*/
private final int numberOfCores;
private int numberOfCores = 0;
/**
* The amount of main memory of this instance type (in MB).
*/
private final int memorySize;
private int memorySize = 0;
/**
* The disk capacity of this instance type (in GB).
*/
private final int diskCapacity;
private int diskCapacity = 0;
/**
* The price per hour that is charged for running instances of this type.
*/
private final int pricePerHour;
private int pricePerHour = 0;
/**
* Public constructor required for the serialization process.
*/
public InstanceType() {
}
/**
* Creates a new instance type.
......@@ -85,7 +88,7 @@ public class InstanceType {
* @param pricePerHour
* price per hour that is charged for running instances of this type
*/
public InstanceType(String identifier, int numberOfComputeUnits, int numberOfCores, int memorySize,
InstanceType(String identifier, int numberOfComputeUnits, int numberOfCores, int memorySize,
int diskCapacity, int pricePerHour) {
this.identifier = identifier;
......@@ -166,60 +169,46 @@ public class InstanceType {
*/
public String toStringRepresentation() {
StringBuilder bld = new StringBuilder(32);
bld.append(identifier);
bld.append(this.identifier);
bld.append(',');
bld.append(numberOfComputeUnits);
bld.append(this.numberOfComputeUnits);
bld.append(',');
bld.append(numberOfCores);
bld.append(this.numberOfCores);
bld.append(',');
bld.append(memorySize);
bld.append(this.memorySize);
bld.append(',');
bld.append(diskCapacity);
bld.append(this.diskCapacity);
bld.append(',');
bld.append(pricePerHour);
bld.append(this.pricePerHour);
return bld.toString();
}
// ------------------------------------------------------------------------
/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
StringRecord.writeString(out, this.identifier);
out.writeInt(this.numberOfComputeUnits);
out.writeInt(this.numberOfCores);
out.writeInt(this.memorySize);
out.writeInt(this.diskCapacity);
out.writeInt(this.pricePerHour);
}
/**
* Gets an instance type parsed from its string description.
*
* @param description
* The string description of the instance type.
* @return An instance that corresponds to the description.
* @throws IllegalArgumentException
* Thrown, if the string does not correctly describe an instance.
*/
public static final InstanceType getTypeFromString(String description) throws IllegalArgumentException {
if (descr_pattern == null) {
try {
descr_pattern = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$");
} catch (PatternSyntaxException psex) {
throw new RuntimeException("Invalid Regex Pattern to parse instance description.", psex);
}
}
try {
final Matcher m = descr_pattern.matcher(description);
if (!m.matches()) {
throw new IllegalArgumentException("The value '" + description + "' does not match pattern "
+ descr_pattern.toString());
}
final String identifier = m.group(1);
final int numComputeUnits = Integer.parseInt(m.group(2));
final int numCores = Integer.parseInt(m.group(3));
final int memorySize = Integer.parseInt(m.group(4));
final int diskCapacity = Integer.parseInt(m.group(5));
final int pricePerHour = Integer.parseInt(m.group(6));
return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour);
} catch (Exception e) {
throw new IllegalArgumentException("The value '" + description + "' does not match pattern "
+ descr_pattern.toString());
}
* {@inheritDoc}
*/
@Override
public void read(DataInput in) throws IOException {
this.identifier = StringRecord.readString(in);
this.numberOfComputeUnits = in.readInt();
this.numberOfCores = in.readInt();
this.memorySize = in.readInt();
this.diskCapacity = in.readInt();
this.pricePerHour = in.readInt();
}
}
package eu.stratosphere.nephele.instance;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.io.IOReadableWritable;
/**
* An instance type description provides details of instance type. Is can comprise both the hardware description from
* the instance type description (as provided by the operator/administrator of the instance) as well as the actual
* hardware description which has been determined on the compute instance itself.
*
* @author warneke
*/
public class InstanceTypeDescription implements IOReadableWritable {
/**
* The instance type.
*/
private InstanceType instanceType = null;
/**
* The hardware description as created by the {@link InstanceManager}.
*/
private HardwareDescription hardwareDescription = null;
/**
* The number of available instances of this type.
*/
private int numberOfAvailableInstances = 0;
/**
* Public default constructor required for serialization process.
*/
public InstanceTypeDescription() {
}
/**
* Constructs a new instance type description
*
* @param instanceType
* the instance type
* @param hardwareDescription
* the hardware description as created by the {@link InstanceManager}
* @param numberOfAvailableInstances
* the number of available instances of this type
*/
InstanceTypeDescription(InstanceType instanceType, HardwareDescription hardwareDescription,
int numberOfAvailableInstances) {
this.instanceType = instanceType;
this.hardwareDescription = hardwareDescription;
this.numberOfAvailableInstances = numberOfAvailableInstances;
}
/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
if (this.instanceType == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
this.instanceType.write(out);
}
if (this.hardwareDescription == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
this.hardwareDescription.write(out);
}
out.writeInt(this.numberOfAvailableInstances);
}
/**
* {@inheritDoc}
*/
@Override
public void read(DataInput in) throws IOException {
if(in.readBoolean()) {
this.instanceType = new InstanceType();
this.instanceType.read(in);
} else {
this.instanceType = null;
}
if(in.readBoolean()) {
this.hardwareDescription = new HardwareDescription();
this.hardwareDescription.read(in);
}
this.numberOfAvailableInstances = in.readInt();
}
}
......@@ -78,6 +78,12 @@ public class HardwareDescriptionFactory {
*/
private static String os = null;
/**
* Private constructor, so class cannot be instantiated.
*/
private HardwareDescriptionFactory() {
}
/**
* Extracts a hardware description object from the system.
*
......
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
/**
* This factory produces {@link InstanceTypeDescription} objects.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public class InstanceTypeDescriptionFactory {
/**
* Private constructor, so class cannot be instantiated.
*/
private InstanceTypeDescriptionFactory() {
}
/**
* Constructs a new {@link InstaceTypeDescription} object.
*
* @param instanceType
* the instance type
* @param hardwareDescription
* the hardware description as created by the {@link InstanceManager}
* @param numberOfAvailableInstances
* the number of available instances of this type
* @return the instance type description
*/
public static InstanceTypeDescription construct(InstanceType instanceType, HardwareDescription hardwareDescription,
int numberOfAvailableInstances) {
return new InstanceTypeDescription(instanceType, hardwareDescription, numberOfAvailableInstances);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This factory constructs {@link InstanceType} objects.
*
* @author warneke
*/
public class InstanceTypeFactory {
/**
* The logger used to report errors.
*/
private static final Log LOG = LogFactory.getLog(InstanceTypeFactory.class);
/**
* The pattern used to parse the hardware descriptions of instance types.
*/
private static Pattern INSTANCE_TYPE_PATTERN = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$");
/**
* Private constructor, so class cannot be instantiated.
*/
private InstanceTypeFactory() {
}
/**
* Constructs an {@link InstanceType} object by parsing a hardware description string.
*
* @param description
* the hardware description reflected by this instance type
* @return an instance type reflecting the given hardware description or <code>null</code> if the description cannot
* be parsed
*/
public static InstanceType constructFromDescription(String description) {
final Matcher m = INSTANCE_TYPE_PATTERN.matcher(description);
if (!m.matches()) {
LOG.error("Cannot extract instance type from string " + description);
return null;
}
final String identifier = m.group(1);
final int numComputeUnits = Integer.parseInt(m.group(2));
final int numCores = Integer.parseInt(m.group(3));
final int memorySize = Integer.parseInt(m.group(4));
final int diskCapacity = Integer.parseInt(m.group(5));
final int pricePerHour = Integer.parseInt(m.group(6));
return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour);
}
/**
* Constructs an {@link InstanceType} from the given parameters.
*
* @param identifier
* identifier for this instance type
* @param numberOfComputeUnits
* number of computational units of this instance type
* @param numberOfCores
* number of CPU cores of this instance type
* @param memorySize
* amount of main memory of this instance type (in MB)
* @param diskCapacity
* disk capacity of this instance type (in GB)
* @param pricePerHour
* price per hour that is charged for running instances of this type
*/
public static InstanceType construct(String identifier, int numberOfComputeUnits, int numberOfCores,
int memorySize, int diskCapacity, int pricePerHour) {
return new InstanceType(identifier, numberOfComputeUnits, numberOfCores, memorySize, diskCapacity, pricePerHour);
}
}
......@@ -25,11 +25,13 @@ import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.AllocationID;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkTopology;
......@@ -45,6 +47,8 @@ public class LocalInstanceManager implements InstanceManager {
private AllocatedResource allocatedResource = null;
private HardwareDescription hardwareDescription;
private LocalTaskManagerThread localTaskManagerThread;
private final NetworkTopology networkTopology;
......@@ -57,7 +61,7 @@ public class LocalInstanceManager implements InstanceManager {
String descr = config.getString(ConfigConstants.JOBMANAGER_LOCALINSTANCE_TYPE_KEY, null);
try {
if (descr != null) {
type = InstanceType.getTypeFromString(descr);
type = InstanceTypeFactory.constructFromDescription(descr);
}
} catch (IllegalArgumentException iaex) {
LogFactory.getLog(LocalInstanceManager.class).error(
......@@ -127,6 +131,8 @@ public class LocalInstanceManager implements InstanceManager {
this.allocatedResource = new AllocatedResource(new LocalInstance(this.defaultInstanceType,
instanceConnectionInfo, this.networkTopology.getRootNode(), this.networkTopology),
new AllocationID());
this.hardwareDescription = hardwareDescription;
}
}
}
......@@ -141,8 +147,8 @@ public class LocalInstanceManager implements InstanceManager {
if (this.localTaskManagerThread != null) {
// Interrupt the thread running the task manager
this.localTaskManagerThread.interrupt();
while(!this.localTaskManagerThread.isTaskManagerShutDown()) {
while (!this.localTaskManagerThread.isTaskManagerShutDown()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
......@@ -174,25 +180,27 @@ public class LocalInstanceManager implements InstanceManager {
// ------------------------------------------------------------------------
/**
* Creates an instance type for the local machine that calls this method. The local instance is
* given the system's number of CPU cores, the amount of memory currently available to the system
* (actually 80% of it) and the amount of disc space in the temp directory.
* Creates a default instance type based on the hardware characteristics of the machine that calls this method. The
* default instance type contains the machine's number of CPU cores and size of physical memory. The disc capacity
* is calculated from the free space in the directory for temporary files.
*
* @return An instance type for the local machine.
* @return the default instance type used for the local machine
*/
public static final InstanceType createDefaultInstanceType() {
final Runtime runtime = Runtime.getRuntime();
final int numberOfCPUCores = runtime.availableProcessors();
final int memorySizeInMB = (int) ((runtime.freeMemory() + (runtime.maxMemory() - runtime.totalMemory())) * 0.8f / (1024 * 1024));
final HardwareDescription hardwareDescription = HardwareDescriptionFactory.extractFromSystem();
int diskCapacityInGB = 0;
final String tempDir = System.getProperty("java.io.tmpdir");
final String tempDir = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
if (tempDir != null) {
File f = new File(tempDir);
diskCapacityInGB = (int) (f.getFreeSpace() * 0.8f / (1024 * 1024 * 1024));
diskCapacityInGB = (int) (f.getFreeSpace() / (1024L * 1024L * 1024L));
}
return new InstanceType("default", numberOfCPUCores, numberOfCPUCores, memorySizeInMB, diskCapacityInGB, 0);
final int physicalMemory = (int) (hardwareDescription.getSizeOfPhysicalMemory() / (1024L * 1024L));
return InstanceTypeFactory.construct("default", hardwareDescription.getNumberOfCPUCores(),
hardwareDescription.getNumberOfCPUCores(), physicalMemory, diskCapacityInGB, 0);
}
}
......@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.pact.common.contract.CoGroupContract;
import eu.stratosphere.pact.common.contract.Contract;
import eu.stratosphere.pact.common.contract.CrossContract;
......@@ -335,12 +336,12 @@ public class PactCompiler {
PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION);
InstanceType type = null;
try {
type = InstanceType.getTypeFromString(instanceDescr);
type = InstanceTypeFactory.constructFromDescription(instanceDescr);
}
catch (IllegalArgumentException iaex) {
LOG.error("Invalid description of standard instance type in PACT configuration: " + instanceDescr +
". Using default instance type " + PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION + ".", iaex);
type = InstanceType.getTypeFromString(PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION);
type = InstanceTypeFactory.constructFromDescription(PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION);
}
this.pactInstanceType = type;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册