diff --git a/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java b/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java index e8c62923fe58aa799b3ceea2b1c8bc80215c18d7..cd8771d0c5d03ea15e831e6169388927772cbf1a 100644 --- a/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java +++ b/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterInstance.java @@ -25,6 +25,7 @@ import eu.stratosphere.nephele.instance.AbstractInstance; import eu.stratosphere.nephele.instance.AllocationID; import eu.stratosphere.nephele.instance.InstanceConnectionInfo; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.io.channels.ChannelID; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.topology.NetworkNode; @@ -116,7 +117,7 @@ class ClusterInstance extends AbstractInstance { && remainingCapacity.getDiskCapacity() >= reqType.getDiskCapacity()) { // reduce available capacity by what has been requested - remainingCapacity = new InstanceType(remainingCapacity.getIdentifier(), remainingCapacity + remainingCapacity = InstanceTypeFactory.construct(remainingCapacity.getIdentifier(), remainingCapacity .getNumberOfComputeUnits() - reqType.getNumberOfComputeUnits(), remainingCapacity.getNumberOfCores() - reqType.getNumberOfCores(), remainingCapacity.getMemorySize() - reqType.getMemorySize(), remainingCapacity.getDiskCapacity() @@ -147,7 +148,7 @@ class ClusterInstance extends AbstractInstance { final AllocatedSlice slice = this.allocatedSlices.remove(allocationID); if (slice != null) { - this.remainingCapacity = new InstanceType(this.remainingCapacity.getIdentifier(), this.remainingCapacity + this.remainingCapacity = InstanceTypeFactory.construct(this.remainingCapacity.getIdentifier(), this.remainingCapacity .getNumberOfComputeUnits() + slice.getType().getNumberOfComputeUnits(), this.remainingCapacity.getNumberOfCores() + slice.getType().getNumberOfCores(), this.remainingCapacity.getMemorySize() diff --git a/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java b/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java index e3b7f972ea45f1d73d92918a289053d3c4bd7b6e..1c731118a672e1e25c23c70171b2f3b13fad8608 100644 --- a/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java +++ b/nephele/nephele-clustermanager/src/main/java/eu/stratosphere/nephele/instance/cluster/ClusterManager.java @@ -48,6 +48,7 @@ import eu.stratosphere.nephele.instance.InstanceException; import eu.stratosphere.nephele.instance.InstanceListener; import eu.stratosphere.nephele.instance.InstanceManager; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.topology.NetworkNode; import eu.stratosphere.nephele.topology.NetworkTopology; @@ -345,16 +346,16 @@ public class ClusterManager implements InstanceManager { // parse entry try { // if successful add new instance type - final InstanceType instanceType = InstanceType.getTypeFromString(descr); + final InstanceType instanceType = InstanceTypeFactory.constructFromDescription(descr); LOG.info("Loaded instance type " + instanceType.getIdentifier() + " from the configuration"); - instanceTypes.add(InstanceType.getTypeFromString(descr)); + instanceTypes.add(instanceType); } catch (Throwable t) { LOG.error("Error parsing " + key + ":" + descr + ". Using default using default instance type: " + ConfigConstants.DEFAULT_INSTANCE_TYPE + " for instance type " + count + ".", t); // we need to add an instance type anyways, because otherwise a non-parsable instance description // would cause the numbering to be wrong. - instanceTypes.add(InstanceType.getTypeFromString(ConfigConstants.DEFAULT_INSTANCE_TYPE)); + instanceTypes.add(InstanceTypeFactory.constructFromDescription(ConfigConstants.DEFAULT_INSTANCE_TYPE)); } // Increase key index diff --git a/nephele/nephele-clustermanager/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java b/nephele/nephele-clustermanager/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java index 6e15ba29392b4cd7ce174b4339742357266e69cd..fc48c397b01fa235b8c920bcffa43fe9082727f1 100644 --- a/nephele/nephele-clustermanager/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java +++ b/nephele/nephele-clustermanager/src/test/java/eu/stratosphere/nephele/instance/cluster/HostInClusterTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.*; import eu.stratosphere.nephele.instance.InstanceConnectionInfo; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.instance.cluster.AllocatedSlice; import eu.stratosphere.nephele.instance.cluster.ClusterInstance; import eu.stratosphere.nephele.jobgraph.JobID; @@ -45,7 +46,8 @@ public class HostInClusterTest { int memorySize = 32 * 1024; int diskCapacity = 200; int pricePerHour = 10; - final InstanceType capacity = new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, + final InstanceType capacity = InstanceTypeFactory.construct(identifier, numComputeUnits, numCores, memorySize, + diskCapacity, pricePerHour); final InstanceConnectionInfo instanceConnectionInfo = new InstanceConnectionInfo(socket.getAddress(), socket .getPort(), 1235); @@ -80,7 +82,7 @@ public class HostInClusterTest { final int numCores = 8 / 8; final int memorySize = 32 * 1024 / 8; final int diskCapacity = 200 / 8; - final InstanceType type = new InstanceType("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1); + final InstanceType type = InstanceTypeFactory.construct("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1); for (int run = 0; run < 2; ++run) { // do this twice to check that everything is correctly freed @@ -95,10 +97,10 @@ public class HostInClusterTest { } // now no resources should be left - assertNull(host.createSlice(new InstanceType("dummy", 1, 0, 0, 0, 0), jobID)); - assertNull(host.createSlice(new InstanceType("dummy", 0, 1, 0, 0, 0), jobID)); - assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 1, 0, 0), jobID)); - assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 0, 1, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 1, 0, 0, 0, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 1, 0, 0, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 1, 0, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 0, 1, 0), jobID)); for (int i = 0; i < 8; ++i) { host.removeAllocatedSlice(slices[i].getAllocationID()); @@ -117,7 +119,7 @@ public class HostInClusterTest { final int numCores = 8 / 8; final int memorySize = 32 * 1024 / 8; final int diskCapacity = 200 / 8; - final InstanceType type = new InstanceType("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1); + final InstanceType type = InstanceTypeFactory.construct("dummy", numComputeUnits, numCores, memorySize, diskCapacity, -1); for (int run = 0; run < 2; ++run) { // do this twice to check that everything is correctly freed @@ -132,10 +134,10 @@ public class HostInClusterTest { } // now no resources should be left - assertNull(host.createSlice(new InstanceType("dummy", 1, 0, 0, 0, 0), jobID)); - assertNull(host.createSlice(new InstanceType("dummy", 0, 1, 0, 0, 0), jobID)); - assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 1, 0, 0), jobID)); - assertNull(host.createSlice(new InstanceType("dummy", 0, 0, 0, 1, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 1, 0, 0, 0, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 1, 0, 0, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 1, 0, 0), jobID)); + assertNull(host.createSlice(InstanceTypeFactory.construct("dummy", 0, 0, 0, 1, 0), jobID)); List removedSlices = host.removeAllAllocatedSlices(); diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/SerializableArrayList.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/SerializableArrayList.java new file mode 100644 index 0000000000000000000000000000000000000000..f87a0993e9d65373766042a3bfdcf1738914cb89 --- /dev/null +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/SerializableArrayList.java @@ -0,0 +1,95 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.nephele.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +import eu.stratosphere.nephele.io.IOReadableWritable; +import eu.stratosphere.nephele.types.StringRecord; + +/** + * This class extends a standard {@link java.util.ArrayList} by implementing the + * {@link eu.stratosphere.nephele.io.IOReadableWritable} interface. As a result, array lists of this type can be used + * with Nephele's RPC system. + *

+ * This class is not thread-safe. + * + * @author warneke + * @param + * the type of object stored inside this array list + */ +public class SerializableArrayList extends ArrayList implements IOReadableWritable { + + /** + * Generated serial version UID. + */ + private static final long serialVersionUID = 8196856588290198537L; + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + + out.writeInt(size()); + final Iterator it = iterator(); + while (it.hasNext()) { + + final E element = it.next(); + // Write out type + StringRecord.writeString(out, element.getClass().getName()); + // Write out element itself + element.write(out); + } + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unchecked") + // TODO: See if type safety can be improved here + @Override + public void read(DataInput in) throws IOException { + + // Make sure the list is empty + clear(); + final int numberOfElements = in.readInt(); + for (int i = 0; i < numberOfElements; i++) { + final String elementType = StringRecord.readString(in); + Class clazz = null; + try { + clazz = (Class) Class.forName(elementType); + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.stringifyException(e)); + } + + E element = null; + try { + element = clazz.newInstance(); + } catch (Exception e) { + throw new IOException(StringUtils.stringifyException(e)); + } + + element.read(in); + add(element); + } + } + +} diff --git a/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java b/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java index 1704396ee3da88a71cf9ddd55e10b9f922ab3fae..f17aec13355f75788b938eeeaf604ffbd28c7f84 100644 --- a/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java +++ b/nephele/nephele-ec2cloudmanager/src/main/java/eu/stratosphere/nephele/instance/cloud/CloudManager.java @@ -27,8 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +43,7 @@ import eu.stratosphere.nephele.instance.InstanceException; import eu.stratosphere.nephele.instance.InstanceListener; import eu.stratosphere.nephele.instance.InstanceManager; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.topology.NetworkTopology; import eu.stratosphere.nephele.util.StringUtils; @@ -167,9 +166,6 @@ public class CloudManager extends TimerTask implements InstanceManager { throw new RuntimeException("Illegal configuration, cloudmgr.nrtypes is not configured"); } - // read instance types - final Pattern pattern = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$"); - for (int i = 0; i < num; ++i) { final String key = "cloudmgr.instancetype." + (i + 1); @@ -178,27 +174,7 @@ public class CloudManager extends TimerTask implements InstanceManager { throw new RuntimeException("Illegal configuration for " + key); } - try { - final Matcher m = pattern.matcher(type); - if (!m.matches()) { - throw new Exception(key + " does not match pattern " + pattern.toString()); - } - - final String identifier = m.group(1); - final int numComputeUnits = Integer.parseInt(m.group(2)); - final int numCores = Integer.parseInt(m.group(3)); - final int memorySize = Integer.parseInt(m.group(4)); - final int diskCapacity = Integer.parseInt(m.group(5)); - final int pricePerHour = Integer.parseInt(m.group(6)); - - final InstanceType instanceType = new InstanceType(identifier, numComputeUnits, numCores, memorySize, - diskCapacity, pricePerHour); - instanceTypes.add(instanceType); - - } catch (Exception e) { - LOG.error("Error parsing " + key + ":" + type, e); - throw new RuntimeException("Error parsing " + key + ":" + type, e); - } + instanceTypes.add(InstanceTypeFactory.constructFromDescription(type)); } // sort by price diff --git a/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudInstanceTest.java b/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudInstanceTest.java index f0eb55c25ae1c9cccce4cbbd87a89e9d3ba0d775..31e8b47cce6f8c1112d83ca99fa41a5e66e43ec0 100644 --- a/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudInstanceTest.java +++ b/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudInstanceTest.java @@ -22,7 +22,7 @@ import java.net.InetSocketAddress; import org.junit.Test; import eu.stratosphere.nephele.instance.InstanceConnectionInfo; -import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.instance.cloud.CloudInstance; import eu.stratosphere.nephele.io.channels.ChannelID; import eu.stratosphere.nephele.topology.NetworkTopology; @@ -34,7 +34,8 @@ public class CloudInstanceTest { final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology(); - final CloudInstance ci = new CloudInstance("i-1234ABCD", new InstanceType("m1.small", 1, 1, 2048, 40, 10), + final CloudInstance ci = new CloudInstance("i-1234ABCD", + InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"), "wenjun", new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121), 1234567890, networkTopology.getRootNode(), networkTopology); @@ -49,7 +50,8 @@ public class CloudInstanceTest { final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology(); - final CloudInstance ci = new CloudInstance("i-1234ABCD", new InstanceType("m1.small", 1, 1, 2048, 40, 10), + final CloudInstance ci = new CloudInstance("i-1234ABCD", + InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"), "wenjun", new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121), System.currentTimeMillis(), networkTopology.getRootNode(), networkTopology); diff --git a/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudManagerTest.java b/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudManagerTest.java index d5db60379277f057f90e0821f2d76122578de9ab..f126e35755754f3e0475713e9c233b2690b76cd0 100644 --- a/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudManagerTest.java +++ b/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/CloudManagerTest.java @@ -52,6 +52,7 @@ import eu.stratosphere.nephele.instance.InstanceConnectionInfo; import eu.stratosphere.nephele.instance.InstanceException; import eu.stratosphere.nephele.instance.InstanceListener; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.instance.cloud.CloudInstance; import eu.stratosphere.nephele.instance.cloud.CloudManager; import eu.stratosphere.nephele.instance.cloud.FloatingInstance; @@ -297,7 +298,7 @@ public class CloudManagerTest { // request instance try { - cm.requestInstance(jobID, conf, new InstanceType("m1.small", 1, 1, 2048, 40, 10)); + cm.requestInstance(jobID, conf, InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10")); } catch (InstanceException e) { e.printStackTrace(); } @@ -327,7 +328,8 @@ public class CloudManagerTest { // report heart beat final HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(8, 32L * 1024L * 1024L * 1024L, 32L * 1024L * 1024L * 1024L); - cm.reportHeartBeat(new InstanceConnectionInfo(InetAddress.getByName(instance.getDnsName()), 10000, 20000), hardwareDescription); + cm.reportHeartBeat(new InstanceConnectionInfo(InetAddress.getByName(instance.getDnsName()), 10000, 20000), + hardwareDescription); } catch (SecurityException e) { e.printStackTrace(); diff --git a/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/JobToInstancesMappingTest.java b/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/JobToInstancesMappingTest.java index 8450b0586685df0d91ae0733435854294665f2d2..b8d605b486b7bd60c331403094f5cc2ca4ec7a66 100644 --- a/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/JobToInstancesMappingTest.java +++ b/nephele/nephele-ec2cloudmanager/src/test/java/eu/stratosphere/nephele/instance/cloud/JobToInstancesMappingTest.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import org.junit.Test; import eu.stratosphere.nephele.instance.InstanceConnectionInfo; -import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.instance.cloud.CloudInstance; import eu.stratosphere.nephele.instance.cloud.JobToInstancesMapping; import eu.stratosphere.nephele.topology.NetworkTopology; @@ -36,7 +36,7 @@ public class JobToInstancesMappingTest { final NetworkTopology networkTopology = NetworkTopology.createEmptyTopology(); JobToInstancesMapping map = new JobToInstancesMapping("wenjun", "1234567", "abcdefg"); - CloudInstance ci = new CloudInstance("i-1234ABCD", new InstanceType("m1.small", 1, 1, 2048, 40, 10), "wenjun", + CloudInstance ci = new CloudInstance("i-1234ABCD", InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"), "wenjun", new InstanceConnectionInfo(new InetSocketAddress("localhost", 6122).getAddress(), 6122, 6121), 1234567890, networkTopology.getRootNode(), networkTopology); diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java b/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java similarity index 64% rename from nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java rename to nephele/nephele-management/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java index 870e2ee3d896c80376c5294d50be148bc3b5b2cd..c1e284d362ac46b00b7ee866c94c30476e39e22c 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java +++ b/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/instance/InstanceType.java @@ -15,9 +15,12 @@ package eu.stratosphere.nephele.instance; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import eu.stratosphere.nephele.io.IOReadableWritable; +import eu.stratosphere.nephele.types.StringRecord; /** * An instance type describes the hardware resources a task manager runs on. According @@ -26,18 +29,12 @@ import java.util.regex.PatternSyntaxException; * * @author warneke */ -public class InstanceType { - /** - * The pattern used to parse descriptions of instance types. - */ - private static Pattern descr_pattern = null; - - // ------------------------------------------------------------------------ +public class InstanceType implements IOReadableWritable { /** * The identifier for this instance type. */ - private final String identifier; + private String identifier; /** * The number of computational units of this instance type. @@ -47,27 +44,33 @@ public class InstanceType { * specified number of compute units expresses the fraction of the * CPU capacity promised to a user. */ - private final int numberOfComputeUnits; + private int numberOfComputeUnits = 0; /** * The number of CPU cores of this instance type. */ - private final int numberOfCores; + private int numberOfCores = 0; /** * The amount of main memory of this instance type (in MB). */ - private final int memorySize; + private int memorySize = 0; /** * The disk capacity of this instance type (in GB). */ - private final int diskCapacity; + private int diskCapacity = 0; /** * The price per hour that is charged for running instances of this type. */ - private final int pricePerHour; + private int pricePerHour = 0; + + /** + * Public constructor required for the serialization process. + */ + public InstanceType() { + } /** * Creates a new instance type. @@ -85,7 +88,7 @@ public class InstanceType { * @param pricePerHour * price per hour that is charged for running instances of this type */ - public InstanceType(String identifier, int numberOfComputeUnits, int numberOfCores, int memorySize, + InstanceType(String identifier, int numberOfComputeUnits, int numberOfCores, int memorySize, int diskCapacity, int pricePerHour) { this.identifier = identifier; @@ -166,60 +169,46 @@ public class InstanceType { */ public String toStringRepresentation() { StringBuilder bld = new StringBuilder(32); - bld.append(identifier); + bld.append(this.identifier); bld.append(','); - bld.append(numberOfComputeUnits); + bld.append(this.numberOfComputeUnits); bld.append(','); - bld.append(numberOfCores); + bld.append(this.numberOfCores); bld.append(','); - bld.append(memorySize); + bld.append(this.memorySize); bld.append(','); - bld.append(diskCapacity); + bld.append(this.diskCapacity); bld.append(','); - bld.append(pricePerHour); + bld.append(this.pricePerHour); return bld.toString(); } - // ------------------------------------------------------------------------ + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + + StringRecord.writeString(out, this.identifier); + out.writeInt(this.numberOfComputeUnits); + out.writeInt(this.numberOfCores); + out.writeInt(this.memorySize); + out.writeInt(this.diskCapacity); + out.writeInt(this.pricePerHour); + } /** - * Gets an instance type parsed from its string description. - * - * @param description - * The string description of the instance type. - * @return An instance that corresponds to the description. - * @throws IllegalArgumentException - * Thrown, if the string does not correctly describe an instance. - */ - public static final InstanceType getTypeFromString(String description) throws IllegalArgumentException { - if (descr_pattern == null) { - try { - descr_pattern = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$"); - } catch (PatternSyntaxException psex) { - throw new RuntimeException("Invalid Regex Pattern to parse instance description.", psex); - } - } - - try { - final Matcher m = descr_pattern.matcher(description); - - if (!m.matches()) { - throw new IllegalArgumentException("The value '" + description + "' does not match pattern " - + descr_pattern.toString()); - } - - final String identifier = m.group(1); - final int numComputeUnits = Integer.parseInt(m.group(2)); - final int numCores = Integer.parseInt(m.group(3)); - final int memorySize = Integer.parseInt(m.group(4)); - final int diskCapacity = Integer.parseInt(m.group(5)); - final int pricePerHour = Integer.parseInt(m.group(6)); - - return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour); - } catch (Exception e) { - throw new IllegalArgumentException("The value '" + description + "' does not match pattern " - + descr_pattern.toString()); - } + * {@inheritDoc} + */ + @Override + public void read(DataInput in) throws IOException { + + this.identifier = StringRecord.readString(in); + this.numberOfComputeUnits = in.readInt(); + this.numberOfCores = in.readInt(); + this.memorySize = in.readInt(); + this.diskCapacity = in.readInt(); + this.pricePerHour = in.readInt(); } } diff --git a/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java b/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java new file mode 100644 index 0000000000000000000000000000000000000000..e4d4afd6f77d56ea57b2113e6d1949c4197c6c0b --- /dev/null +++ b/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescription.java @@ -0,0 +1,101 @@ +package eu.stratosphere.nephele.instance; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import eu.stratosphere.nephele.io.IOReadableWritable; + +/** + * An instance type description provides details of instance type. Is can comprise both the hardware description from + * the instance type description (as provided by the operator/administrator of the instance) as well as the actual + * hardware description which has been determined on the compute instance itself. + * + * @author warneke + */ +public class InstanceTypeDescription implements IOReadableWritable { + + /** + * The instance type. + */ + private InstanceType instanceType = null; + + /** + * The hardware description as created by the {@link InstanceManager}. + */ + private HardwareDescription hardwareDescription = null; + + /** + * The number of available instances of this type. + */ + private int numberOfAvailableInstances = 0; + + /** + * Public default constructor required for serialization process. + */ + public InstanceTypeDescription() { + } + + /** + * Constructs a new instance type description + * + * @param instanceType + * the instance type + * @param hardwareDescription + * the hardware description as created by the {@link InstanceManager} + * @param numberOfAvailableInstances + * the number of available instances of this type + */ + InstanceTypeDescription(InstanceType instanceType, HardwareDescription hardwareDescription, + int numberOfAvailableInstances) { + + this.instanceType = instanceType; + this.hardwareDescription = hardwareDescription; + this.numberOfAvailableInstances = numberOfAvailableInstances; + } + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + + if (this.instanceType == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + this.instanceType.write(out); + } + + if (this.hardwareDescription == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + this.hardwareDescription.write(out); + } + + out.writeInt(this.numberOfAvailableInstances); + } + + /** + * {@inheritDoc} + */ + @Override + public void read(DataInput in) throws IOException { + + if(in.readBoolean()) { + this.instanceType = new InstanceType(); + this.instanceType.read(in); + } else { + this.instanceType = null; + } + + if(in.readBoolean()) { + this.hardwareDescription = new HardwareDescription(); + this.hardwareDescription.read(in); + } + + this.numberOfAvailableInstances = in.readInt(); + } + +} diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/HardwareDescriptionFactory.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/HardwareDescriptionFactory.java index 04abbca987aeabd5dac00e78b3f13e33503027cf..e2784ac0655767e5a1ae6083be3bf0c7cd356dda 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/HardwareDescriptionFactory.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/HardwareDescriptionFactory.java @@ -78,6 +78,12 @@ public class HardwareDescriptionFactory { */ private static String os = null; + /** + * Private constructor, so class cannot be instantiated. + */ + private HardwareDescriptionFactory() { + } + /** * Extracts a hardware description object from the system. * diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..bd04f2409fd1991039b74dc29178cd98bf728dea --- /dev/null +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeDescriptionFactory.java @@ -0,0 +1,49 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.nephele.instance; + +/** + * This factory produces {@link InstanceTypeDescription} objects. + *

+ * This class is thread-safe. + * + * @author warneke + */ +public class InstanceTypeDescriptionFactory { + + /** + * Private constructor, so class cannot be instantiated. + */ + private InstanceTypeDescriptionFactory() { + } + + /** + * Constructs a new {@link InstaceTypeDescription} object. + * + * @param instanceType + * the instance type + * @param hardwareDescription + * the hardware description as created by the {@link InstanceManager} + * @param numberOfAvailableInstances + * the number of available instances of this type + * @return the instance type description + */ + public static InstanceTypeDescription construct(InstanceType instanceType, HardwareDescription hardwareDescription, + int numberOfAvailableInstances) { + + return new InstanceTypeDescription(instanceType, hardwareDescription, numberOfAvailableInstances); + } +} diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..4c30c199f1e1f28a41ae4531dfceb0d926080d9d --- /dev/null +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/InstanceTypeFactory.java @@ -0,0 +1,94 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.nephele.instance; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This factory constructs {@link InstanceType} objects. + * + * @author warneke + */ +public class InstanceTypeFactory { + + /** + * The logger used to report errors. + */ + private static final Log LOG = LogFactory.getLog(InstanceTypeFactory.class); + + /** + * The pattern used to parse the hardware descriptions of instance types. + */ + private static Pattern INSTANCE_TYPE_PATTERN = Pattern.compile("^([^,]+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)$"); + + /** + * Private constructor, so class cannot be instantiated. + */ + private InstanceTypeFactory() { + } + + /** + * Constructs an {@link InstanceType} object by parsing a hardware description string. + * + * @param description + * the hardware description reflected by this instance type + * @return an instance type reflecting the given hardware description or null if the description cannot + * be parsed + */ + public static InstanceType constructFromDescription(String description) { + + final Matcher m = INSTANCE_TYPE_PATTERN.matcher(description); + if (!m.matches()) { + LOG.error("Cannot extract instance type from string " + description); + return null; + } + + final String identifier = m.group(1); + final int numComputeUnits = Integer.parseInt(m.group(2)); + final int numCores = Integer.parseInt(m.group(3)); + final int memorySize = Integer.parseInt(m.group(4)); + final int diskCapacity = Integer.parseInt(m.group(5)); + final int pricePerHour = Integer.parseInt(m.group(6)); + + return new InstanceType(identifier, numComputeUnits, numCores, memorySize, diskCapacity, pricePerHour); + } + + /** + * Constructs an {@link InstanceType} from the given parameters. + * + * @param identifier + * identifier for this instance type + * @param numberOfComputeUnits + * number of computational units of this instance type + * @param numberOfCores + * number of CPU cores of this instance type + * @param memorySize + * amount of main memory of this instance type (in MB) + * @param diskCapacity + * disk capacity of this instance type (in GB) + * @param pricePerHour + * price per hour that is charged for running instances of this type + */ + public static InstanceType construct(String identifier, int numberOfComputeUnits, int numberOfCores, + int memorySize, int diskCapacity, int pricePerHour) { + + return new InstanceType(identifier, numberOfComputeUnits, numberOfCores, memorySize, diskCapacity, pricePerHour); + } +} diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java index d5fba3af41088b610b545802c769388870797ab0..6ec1172adaba7ad8a7e623f133107174cf19b518 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java @@ -25,11 +25,13 @@ import eu.stratosphere.nephele.configuration.GlobalConfiguration; import eu.stratosphere.nephele.instance.AllocatedResource; import eu.stratosphere.nephele.instance.AllocationID; import eu.stratosphere.nephele.instance.HardwareDescription; +import eu.stratosphere.nephele.instance.HardwareDescriptionFactory; import eu.stratosphere.nephele.instance.InstanceConnectionInfo; import eu.stratosphere.nephele.instance.InstanceException; import eu.stratosphere.nephele.instance.InstanceListener; import eu.stratosphere.nephele.instance.InstanceManager; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.topology.NetworkTopology; @@ -45,6 +47,8 @@ public class LocalInstanceManager implements InstanceManager { private AllocatedResource allocatedResource = null; + private HardwareDescription hardwareDescription; + private LocalTaskManagerThread localTaskManagerThread; private final NetworkTopology networkTopology; @@ -57,7 +61,7 @@ public class LocalInstanceManager implements InstanceManager { String descr = config.getString(ConfigConstants.JOBMANAGER_LOCALINSTANCE_TYPE_KEY, null); try { if (descr != null) { - type = InstanceType.getTypeFromString(descr); + type = InstanceTypeFactory.constructFromDescription(descr); } } catch (IllegalArgumentException iaex) { LogFactory.getLog(LocalInstanceManager.class).error( @@ -127,6 +131,8 @@ public class LocalInstanceManager implements InstanceManager { this.allocatedResource = new AllocatedResource(new LocalInstance(this.defaultInstanceType, instanceConnectionInfo, this.networkTopology.getRootNode(), this.networkTopology), new AllocationID()); + + this.hardwareDescription = hardwareDescription; } } } @@ -141,8 +147,8 @@ public class LocalInstanceManager implements InstanceManager { if (this.localTaskManagerThread != null) { // Interrupt the thread running the task manager this.localTaskManagerThread.interrupt(); - - while(!this.localTaskManagerThread.isTaskManagerShutDown()) { + + while (!this.localTaskManagerThread.isTaskManagerShutDown()) { try { Thread.sleep(100); } catch (InterruptedException e) { @@ -174,25 +180,27 @@ public class LocalInstanceManager implements InstanceManager { // ------------------------------------------------------------------------ /** - * Creates an instance type for the local machine that calls this method. The local instance is - * given the system's number of CPU cores, the amount of memory currently available to the system - * (actually 80% of it) and the amount of disc space in the temp directory. + * Creates a default instance type based on the hardware characteristics of the machine that calls this method. The + * default instance type contains the machine's number of CPU cores and size of physical memory. The disc capacity + * is calculated from the free space in the directory for temporary files. * - * @return An instance type for the local machine. + * @return the default instance type used for the local machine */ public static final InstanceType createDefaultInstanceType() { - final Runtime runtime = Runtime.getRuntime(); - final int numberOfCPUCores = runtime.availableProcessors(); - final int memorySizeInMB = (int) ((runtime.freeMemory() + (runtime.maxMemory() - runtime.totalMemory())) * 0.8f / (1024 * 1024)); + final HardwareDescription hardwareDescription = HardwareDescriptionFactory.extractFromSystem(); int diskCapacityInGB = 0; - final String tempDir = System.getProperty("java.io.tmpdir"); + final String tempDir = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH); if (tempDir != null) { File f = new File(tempDir); - diskCapacityInGB = (int) (f.getFreeSpace() * 0.8f / (1024 * 1024 * 1024)); + diskCapacityInGB = (int) (f.getFreeSpace() / (1024L * 1024L * 1024L)); } - return new InstanceType("default", numberOfCPUCores, numberOfCPUCores, memorySizeInMB, diskCapacityInGB, 0); + final int physicalMemory = (int) (hardwareDescription.getSizeOfPhysicalMemory() / (1024L * 1024L)); + + return InstanceTypeFactory.construct("default", hardwareDescription.getNumberOfCPUCores(), + hardwareDescription.getNumberOfCPUCores(), physicalMemory, diskCapacityInGB, 0); } } diff --git a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java index 29a1aee9c222fbadb1ae221e9cc30184f8fc9c1b..5eba2d01e396d6fc465a7777509d0b8a5bde9c34 100644 --- a/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java +++ b/pact/pact-compiler/src/main/java/eu/stratosphere/pact/compiler/PactCompiler.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.nephele.configuration.GlobalConfiguration; import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.pact.common.contract.CoGroupContract; import eu.stratosphere.pact.common.contract.Contract; import eu.stratosphere.pact.common.contract.CrossContract; @@ -335,12 +336,12 @@ public class PactCompiler { PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION); InstanceType type = null; try { - type = InstanceType.getTypeFromString(instanceDescr); + type = InstanceTypeFactory.constructFromDescription(instanceDescr); } catch (IllegalArgumentException iaex) { LOG.error("Invalid description of standard instance type in PACT configuration: " + instanceDescr + ". Using default instance type " + PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION + ".", iaex); - type = InstanceType.getTypeFromString(PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION); + type = InstanceTypeFactory.constructFromDescription(PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION); } this.pactInstanceType = type;