提交 daa64b4f 编写于 作者: P Philipp Richter

cloudmanager EC2 implementation continued

上级 dee05a44
......@@ -226,11 +226,9 @@ public class ClusterManager implements InstanceManager {
}
if (instanceListener != null) {
instanceListener.allocatedResourceDied(
removedSlice.getJobID(),
new AllocatedResource(
removedSlice.getHostingInstance(), removedSlice.getType(), removedSlice
.getAllocationID()));
instanceListener.allocatedResourceDied(removedSlice.getJobID(),
new AllocatedResource(removedSlice.getHostingInstance(), removedSlice.getType(),
removedSlice.getAllocationID()));
}
}
......@@ -697,50 +695,66 @@ public class ClusterManager implements InstanceManager {
* {@inheritDoc}
*/
@Override
public synchronized void requestInstance(JobID jobID, Configuration conf, InstanceType instanceType, int count)
throws InstanceException {
public synchronized void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, Integer> instanceMap,
List<String> splitAffinityList) throws InstanceException {
// TODO: Introduce topology awareness here
AllocatedSlice slice = null;
// Iterate over all instance types
final Iterator<Map.Entry<InstanceType, Integer>> it = instanceMap.entrySet().iterator();
while (it.hasNext()) {
// Try to match the instance type without slicing first
for (final ClusterInstance host : this.registeredHosts.values()) {
if (host.getType().equals(instanceType)) {
slice = host.createSlice(instanceType, jobID);
if (slice != null) {
break;
// Iterate over all requested instances of a specific type
final Map.Entry<InstanceType, Integer> entry = it.next();
for (int i = 0; i < entry.getValue().intValue(); i++) {
LOG.info("Trying to allocate instance of type " + entry.getKey().getIdentifier());
// TODO: Introduce topology awareness here
// TODO: Daniel: Code taken from AbstractScheduler..
AllocatedSlice slice = null;
// Try to match the instance type without slicing first
for (final ClusterInstance host : this.registeredHosts.values()) {
if (host.getType().equals(entry.getKey())) {
slice = host.createSlice(entry.getKey(), jobID);
if (slice != null) {
break;
}
}
}
}
}
// Use slicing now if necessary
if (slice == null) {
// Use slicing now if necessary
if (slice == null) {
for (final ClusterInstance host : this.registeredHosts.values()) {
slice = host.createSlice(entry.getKey(), jobID);
if (slice != null) {
break;
}
}
for (final ClusterInstance host : this.registeredHosts.values()) {
slice = host.createSlice(instanceType, jobID);
if (slice != null) {
break;
}
}
}
if (slice == null) {
throw new InstanceException("Could not find a suitable instance");
}
if (slice == null) {
throw new InstanceException("Could not find a suitable instance");
}
List<AllocatedSlice> allocatedSlices = this.slicesOfJobs.get(jobID);
if (allocatedSlices == null) {
allocatedSlices = new ArrayList<AllocatedSlice>();
this.slicesOfJobs.put(jobID, allocatedSlices);
}
allocatedSlices.add(slice);
List<AllocatedSlice> allocatedSlices = this.slicesOfJobs.get(jobID);
if (allocatedSlices == null) {
allocatedSlices = new ArrayList<AllocatedSlice>();
this.slicesOfJobs.put(jobID, allocatedSlices);
}
allocatedSlices.add(slice);
if (this.instanceListener != null) {
ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
this.instanceListener, slice);
clusterInstanceNotifier.start();
}
if (this.instanceListener != null) {
ClusterInstanceNotifier clusterInstanceNotifier = new ClusterInstanceNotifier(
this.instanceListener, slice);
clusterInstanceNotifier.start();
}
}
}
}
/**
......
......@@ -18,6 +18,7 @@ package eu.stratosphere.nephele.instance.cluster;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
......@@ -252,12 +253,13 @@ public class ClusterManagerTest {
// now we should be able to request two instances of type small and one of type medium
final JobID jobID = new JobID();
final Configuration conf = new Configuration();
Map<InstanceType, Integer> instanceMap = new HashMap<InstanceType, Integer>();
instanceMap.put(cm.getInstanceTypeByName(SMALL_INSTANCE_TYPE_NAME), 2);
instanceMap.put(cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME), 1);
try {
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(SMALL_INSTANCE_TYPE_NAME),1);
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(SMALL_INSTANCE_TYPE_NAME),1);
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME),1);
cm.requestInstance(jobID, conf, instanceMap, null);
} catch (InstanceException ie) {
fail(ie.getMessage());
}
......@@ -284,8 +286,9 @@ public class ClusterManagerTest {
// Try to allocate more resources which must result in an error
try {
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME),1);
Map<InstanceType, Integer> instancem = new HashMap<InstanceType, Integer>();
instancem.put(cm.getInstanceTypeByName(MEDIUM_INSTANCE_TYPE_NAME), 1);
cm.requestInstance(jobID, conf, instancem, null);
fail("ClusterManager allowed to request more instances than actually available");
......@@ -306,7 +309,9 @@ public class ClusterManagerTest {
// Now further allocations should be possible
try {
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME));
Map<InstanceType, Integer> instancem = new HashMap<InstanceType, Integer>();
instancem.put(cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME), 1);
cm.requestInstance(jobID, conf, instancem, null);
} catch (InstanceException ie) {
fail(ie.getMessage());
}
......@@ -345,7 +350,9 @@ public class ClusterManagerTest {
try {
cm.requestInstance(jobID, conf, cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME));
Map<InstanceType, Integer> instancem = new HashMap<InstanceType, Integer>();
instancem.put(cm.getInstanceTypeByName(LARGE_INSTANCE_TYPE_NAME), 1);
cm.requestInstance(jobID, conf, instancem, null);
} catch (InstanceException ie) {
fail(ie.getMessage());
......
......@@ -21,10 +21,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
......@@ -409,87 +411,6 @@ public class CloudManager extends TimerTask implements InstanceManager {
}
/**
* Returns all allocated instances by a specific user with an AWS access ID and an AWS secret key.
*
* @param owner
* the owner of the instances
* @param awsAccessId
* the access ID into AWS
* @param awsSecretKey
* the secret key used to generate signatures for authentication
* @return a list of allocated instances by a specific user with an AWS access ID and an AWS secret key
* @throws InstanceException
* something wrong happens to the global configuration
*/
private List<com.xerox.amazonws.ec2.ReservationDescription.Instance> describeInstances(String owner,
String awsAccessId, String awsSecretKey) throws InstanceException {
final Boolean isSecure = GlobalConfiguration.getBoolean(EC2WSSECUREKEY, false);
final String server = GlobalConfiguration.getString(EC2WSSERVERKEY, null);
if (server == null) {
LOG.error("Unable to contact cloud: web service server unknown");
return null;
}
final int port = GlobalConfiguration.getInteger(EC2WSPORTKEY, -1);
if (port < 0) {
LOG.error("cloud.ec2ws.port not defined in config file");
return null;
}
final com.xerox.amazonws.ec2.Jec2 ec2Client = new com.xerox.amazonws.ec2.Jec2(awsAccessId, awsSecretKey,
isSecure, server, port);
ec2Client.setResourcePrefix("/services/Eucalyptus");
ec2Client.setSignatureVersion(1);
List<com.xerox.amazonws.ec2.ReservationDescription> reservation = null;
try {
reservation = ec2Client.describeInstances(new String[0]);
} catch (Exception e) {
LOG.error("Error while communicating with cloud: " + StringUtils.stringifyException(e));
return null;
}
if (reservation == null) {
LOG.debug("EC2 describeInstances returned null");
return null;
}
final List<com.xerox.amazonws.ec2.ReservationDescription.Instance> returnInstances = new ArrayList<com.xerox.amazonws.ec2.ReservationDescription.Instance>();
final Iterator<com.xerox.amazonws.ec2.ReservationDescription> it = reservation.iterator();
while (it.hasNext()) {
final com.xerox.amazonws.ec2.ReservationDescription r = it.next();
// Check if owner matches
if (!owner.equals(r.getOwner())) {
continue;
}
final List<com.xerox.amazonws.ec2.ReservationDescription.Instance> instances = r.getInstances();
if (instances == null) {
LOG.debug("EC2 describesInstances includes no instances for owner " + owner);
continue;
}
final Iterator<com.xerox.amazonws.ec2.ReservationDescription.Instance> it2 = instances.iterator();
while (it2.hasNext()) {
returnInstances.add(it2.next());
}
// End while it2
}
// End while it
return returnInstances;
}
/**
* Returns a cloud instance if this instance is assigned to a job.
*
......@@ -532,37 +453,38 @@ public class CloudManager extends TimerTask implements InstanceManager {
LOG.warn("Supplied instance connection info is null");
return null;
}
synchronized (this.reservedInstances) {
if (this.reservedInstances.size() == 0) {
return null;
}
final Iterator<String> it = this.reservedInstances.keySet().iterator();
// Collect Jobs that have reserved instances
final HashSet<JobID> jobsWithReservedInstances = new HashSet<JobID>();
//TODO: Change Iteration here.. for each reserved instance, an AWS webservice call is initiated right now
while (it.hasNext()) {
for (JobID id : this.reservedInstances.values()) {
jobsWithReservedInstances.add(id);
}
// Now we call the webservice for each job..
for (JobID id : jobsWithReservedInstances) {
final String instanceID = it.next();
final JobID jobID = this.reservedInstances.get(instanceID);
JobToInstancesMapping mapping = null;
synchronized (this.jobToInstancesMap) {
mapping = this.jobToInstancesMap.get(jobID);
mapping = this.jobToInstancesMap.get(id);
}
if (mapping == null) {
LOG.error("Unknown mapping for job ID " + jobID);
LOG.error("Unknown mapping for job ID " + id);
continue;
}
AmazonEC2Client ec2client = EC2ClientFactory.getEC2Client(mapping.getAwsAccessId(),
mapping.getAwsSecretKey());
DescribeInstancesRequest request = new DescribeInstancesRequest();
DescribeInstancesResult result = ec2client.describeInstances(request);
......@@ -585,6 +507,7 @@ public class CloudManager extends TimerTask implements InstanceManager {
}
}
}
return null;
......@@ -628,26 +551,16 @@ public class CloudManager extends TimerTask implements InstanceManager {
}
/**
* Requests an instance for a new job. If there is a floating instance, this instance will be assigned to this job.
* If not, a new instance
* will be allocated in the cloud and directly reserved for this job.
*
* @param jobID
* the ID of the job, which needs to request an instance.
* @param conf
* the configuration of the job
* @param instanceType
* the type of the requesting instance
* @throws InstanceException
* something wrong happens to the job configuration
* {@inheritDoc}
*/
@Override
public synchronized void requestInstance(JobID jobID, Configuration conf, InstanceType instanceType, int count)
throws InstanceException {
public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, Integer> instanceMap,
List<String> splitAffinityList) throws InstanceException {
if (conf == null) {
throw new InstanceException("No job configuration provided, unable to acquire credentials");
}
// TODO: maybe load default credentials from config?
// First check, if all required configuration entries are available
final String owner = conf.getString("job.cloud.username", null);
......@@ -665,6 +578,8 @@ public class CloudManager extends TimerTask implements InstanceManager {
throw new InstanceException("Unable to allocate cloud instance: Cannot find AWS secret key");
}
final String sshKeyPair = conf.getString("job.cloud.sshkeypair", null);
JobToInstancesMapping jobToInstanceMapping = null;
// Check if there already exist a mapping for this job
......@@ -679,90 +594,121 @@ public class CloudManager extends TimerTask implements InstanceManager {
}
}
// Check if there is any floating instance with matching owner and type
// Map containing the instances that will actually be requested via the EC2 interface..
Map<InstanceType, Integer> instancesToBeRequested = new HashMap<InstanceType, Integer>();
final LinkedList<CloudInstance> floatinginstances = anyFloatingInstanceAvailable(owner, awsAccessId,
awsSecretKey, instanceType, count);
// First we check, if there are any floating instances available that we can use
if (floatinginstances.size() >= count) {
// we have enough floating instances.. no need to request any new instances.
// Iterate over all instance types
final Iterator<Map.Entry<InstanceType, Integer>> it = instanceMap.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<InstanceType, Integer> entry = it.next();
final InstanceType actualInstanceType = entry.getKey();
final int neededinstancecount = entry.getValue();
for (CloudInstance ci : floatinginstances) {
jobToInstanceMapping.assignInstanceToJob(ci);
this.instanceListener.resourceAllocated(jobID, ci.asAllocatedResource());
}
// Now check, if floating instances of specific type are available...
final LinkedList<CloudInstance> floatinginstances = anyFloatingInstanceAvailable(owner, awsAccessId,
awsSecretKey, actualInstanceType, neededinstancecount);
} else {
// we don't have enough floating instances..
if (floatinginstances.size() >= neededinstancecount) {
// We have enough floating instances of that particular type. Grab them!
for (CloudInstance ci : floatinginstances) {
jobToInstanceMapping.assignInstanceToJob(ci);
this.instanceListener.resourceAllocated(jobID, ci.asAllocatedResource());
}
// first, use the floating instances we have:
for (CloudInstance ci : floatinginstances) {
jobToInstanceMapping.assignInstanceToJob(ci);
this.instanceListener.resourceAllocated(jobID, ci.asAllocatedResource());
} else {
// We don't have enough floating instances.. first request those available
for (CloudInstance ci : floatinginstances) {
jobToInstanceMapping.assignInstanceToJob(ci);
this.instanceListener.resourceAllocated(jobID, ci.asAllocatedResource());
}
// Add instances that need to be requested to the map..
final int instancerequestcount = neededinstancecount - floatinginstances.size();
instancesToBeRequested.put(actualInstanceType, instancerequestcount);
}
// now we still have to request new instances..
int neededInstances = count - floatinginstances.size();
}// End iterating over instance types
LinkedList<String> instanceIDs = allocateCloudInstance(awsAccessId, awsSecretKey, instanceType,
neededInstances);
// Now, we need to request the EC2 instances..
for (String i : instanceIDs) {
this.reservedInstances.put(i, jobID);
LinkedList<String> instanceIDs = allocateCloudInstance(awsAccessId, awsSecretKey, instancesToBeRequested,
sshKeyPair);
}
for (String i : instanceIDs) {
this.reservedInstances.put(i, jobID);
}
}
/**
* Allocates an instance (virtual machine) in the cloud.
* Requests (allocates) instances (VMs) from Amazon EC2.
*
* @param awsAccessId
* the access ID into AWS
* @param awsSecretKey
* the secret key used to generate signatures for authentication
* @param instanceType
* the type of the allocating instance
* @return the ID of the allocated instance
* @param instancesToBeRequested
* Map containing desired instances types and count
* @param sshKeyPair
* Optional parameter to insert an EC2 SSH key/value pair
* @return
*/
private LinkedList<String> allocateCloudInstance(String awsAccessId, String awsSecretKey,
InstanceType instanceType, int count) {
// TODO: Implement count!!
Map<InstanceType, Integer> instancesToBeRequested, String sshKeyPair) {
final String imageID = GlobalConfiguration.getString("ec2.image.id", null);
if (imageID == null) {
LOG.error("Unable to allocate instance: Image ID is unknown");
return null;
}
final String jobManagerIPAddress = GlobalConfiguration.getString("jobmanager.rpc.address", null);
if (jobManagerIPAddress == null) {
LOG.error("JobManager IP address is not set (jobmanager.rpc.address)");
return null;
}
AmazonEC2Client ec2client = EC2ClientFactory.getEC2Client(awsAccessId, awsSecretKey);
com.amazonaws.services.ec2.model.RunInstancesRequest request = new RunInstancesRequest(imageID, count, count);
request.setInstanceType(instanceType.getIdentifier());
BlockDeviceMapping bdm = new BlockDeviceMapping();
bdm.setVirtualName("ephemeral0");
bdm.setDeviceName("/dev/sdb1");
//request.setKeyName(Configuration.ec2Key); //want to set an SSH key for the image?
LinkedList<BlockDeviceMapping> bdmlist = new LinkedList<BlockDeviceMapping>();
bdmlist.add(bdm);
request.setBlockDeviceMappings(bdmlist);
//TODO: User-Data (IP address of jobmanager)
//request.setUserData(userData);
RunInstancesResult result = ec2client.runInstances(request);
LinkedList<String> instanceIDs = new LinkedList<String>();
for(Instance i: result.getReservation().getInstances()){
instanceIDs.add(i.getInstanceId());
// Iterate over instance types..
final Iterator<Map.Entry<InstanceType, Integer>> it = instancesToBeRequested.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<InstanceType, Integer> entry = it.next();
final InstanceType actualInstanceType = entry.getKey();
final int neededinstancecount = entry.getValue();
RunInstancesRequest request = new RunInstancesRequest(imageID, neededinstancecount, neededinstancecount);
request.setInstanceType(actualInstanceType.getIdentifier());
// TODO: Make this configurable!
BlockDeviceMapping bdm = new BlockDeviceMapping();
bdm.setVirtualName("ephemeral0");
bdm.setDeviceName("/dev/sdb1");
if (sshKeyPair != null) {
request.setKeyName(sshKeyPair);
}
LinkedList<BlockDeviceMapping> bdmlist = new LinkedList<BlockDeviceMapping>();
bdmlist.add(bdm);
request.setBlockDeviceMappings(bdmlist);
// Setting User-Data parameters
request.setUserData(EC2Utilities.createTaskManagerUserData(jobManagerIPAddress));
// Request instances!
RunInstancesResult result = ec2client.runInstances(request);
for (Instance i : result.getReservation().getInstances()) {
instanceIDs.add(i.getInstanceId());
}
}
return instanceIDs;
......@@ -799,7 +745,6 @@ public class CloudManager extends TimerTask implements InstanceManager {
AmazonEC2Client ec2client = EC2ClientFactory.getEC2Client(awsAccessId, awsSecretKey);
DescribeInstancesRequest request = new DescribeInstancesRequest();
DescribeInstancesResult result = ec2client.describeInstances(request);
......@@ -953,4 +898,5 @@ public class CloudManager extends TimerTask implements InstanceManager {
// TODO Auto-generated method stub
return null;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.instance.cloud;
import java.util.Hashtable;
......@@ -22,7 +37,8 @@ public class EC2ClientFactory {
* @param awsSecretKey
* @return
*/
static AmazonEC2Client getEC2Client(String awsAccessId, String awsSecretKey){
static synchronized AmazonEC2Client getEC2Client(String awsAccessId, String awsSecretKey){
//Check if a client-object was already generated
if(ec2clients.containsKey(awsAccessId)){
......@@ -34,6 +50,8 @@ public class EC2ClientFactory {
BasicAWSCredentials credentials = new BasicAWSCredentials(awsAccessId, awsSecretKey);
AmazonEC2Client client = new AmazonEC2Client(credentials);
//TODO: Make endpoints configurable (US, EU, Asia etc).
//client.setEndpoint(arg0)
ec2clients.put(awsAccessId, client);
return client;
}
......
package eu.stratosphere.nephele.instance.cloud;
/**
* This class provides auxiliary methods needed to set up the custom EC2-image.
* @author casp
*
*/
public class EC2Utilities {
public static synchronized String createTaskManagerUserData(String JobManagerIpAddress) {
/*
* When the type is set to TASKMANAGER (in /tmp/STRATOSPHERE_TYPE
* then the AMI assumes that another file called /tmp/JOBMANAGER_ADDRESS
* exists containing the (internal) IP address of the jobmanager instance.
*/
final String taskManagerUserData = "#!/bin/bash \n echo TASKMANAGER >> /tmp/STRATOSPHERE_TYPE \n echo " + JobManagerIpAddress + " >> /tmp/JOBMANAGER_ADDRESS";
return new String(org.apache.commons.codec.binary.Base64.encodeBase64(taskManagerUserData.getBytes()));
}
}
......@@ -305,7 +305,9 @@ public class CloudManagerTest {
// request instance
try {
cm.requestInstance(jobID, conf, InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"),1);
Map<InstanceType, Integer> instanceMap = new HashMap<InstanceType, Integer>();
instanceMap.put(InstanceTypeFactory.constructFromDescription("m1.small,1,1,2048,40,10"), 1);
cm.requestInstance(jobID, conf, instanceMap, null);
} catch (InstanceException e) {
e.printStackTrace();
}
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.nephele.instance;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.configuration.Configuration;
......@@ -46,7 +47,7 @@ public interface InstanceManager {
* @throws InstanceException
* thrown if an error occurs during the instance request
*/
void requestInstance(JobID jobID, Configuration conf, InstanceType instanceType, int count) throws InstanceException;
void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, Integer> instanceMap, List<String> splitAffinityList) throws InstanceException;
/**
* Releases an allocated resource from a job.
......
......@@ -16,6 +16,9 @@
package eu.stratosphere.nephele.instance.local;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
......@@ -214,33 +217,6 @@ public class LocalInstanceManager implements InstanceManager {
}
}
/**
* {@inheritDoc}
*/
@Override
public void requestInstance(JobID jobID, Configuration conf, InstanceType instanceType, int count) throws InstanceException {
//TODO: Implement count - fail if count > 1
boolean assignmentSuccessful = false;
AllocatedResource allocatedResource = null;
synchronized (this.synchronizationObject) {
if (this.localInstance != null) { // Instance is available
if (this.allocatedResource == null) { // Instance is not used by another job
allocatedResource = new AllocatedResource(this.localInstance, instanceType, new AllocationID());
this.allocatedResource = allocatedResource;
assignmentSuccessful = true;
}
}
}
if (assignmentSuccessful) {
// Spawn a new thread to send the notification
new LocalInstanceNotifier(this.instanceListener, jobID, allocatedResource).start();
} else {
throw new InstanceException("No instance of type " + instanceType + " available");
}
}
/**
* {@inheritDoc}
......@@ -335,4 +311,43 @@ public class LocalInstanceManager implements InstanceManager {
return this.instanceTypeDescriptionMap;
}
@Override
public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, Integer> instanceMap,
List<String> splitAffinityList) throws InstanceException {
//TODO: This can be implemented way simpler...
// Iterate over all instance types
final Iterator<Map.Entry<InstanceType, Integer>> it = instanceMap.entrySet().iterator();
while (it.hasNext()) {
// Iterate over all requested instances of a specific type
final Map.Entry<InstanceType, Integer> entry = it.next();
for (int i = 0; i < entry.getValue().intValue(); i++) {
boolean assignmentSuccessful = false;
AllocatedResource allocatedResource = null;
synchronized (this.synchronizationObject) {
if (this.localInstance != null) { // Instance is available
if (this.allocatedResource == null) { // Instance is not used by another job
allocatedResource = new AllocatedResource(this.localInstance, entry.getKey(), new AllocationID());
this.allocatedResource = allocatedResource;
assignmentSuccessful = true;
}
}
}
if (assignmentSuccessful) {
// Spawn a new thread to send the notification
new LocalInstanceNotifier(this.instanceListener, jobID, allocatedResource).start();
} else {
throw new InstanceException("No instance of type " + entry.getKey() + " available");
}
}
}
}
}
......@@ -36,6 +36,7 @@ import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.DeploymentManager;
......@@ -136,22 +137,29 @@ public abstract class AbstractScheduler implements InstanceListener {
if (requiredInstances.isEmpty()) {
return;
}
this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(), requiredInstances,null);
/*
* ... stuff moved to instance manager
final Iterator<Map.Entry<InstanceType, Integer>> it = requiredInstances.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<InstanceType, Integer> entry = it.next();
/*for (int i = 0; i < entry.getValue().intValue(); i++) {
for (int i = 0; i < entry.getValue().intValue(); i++) {
LOG.info("Trying to allocate instance of type " + entry.getKey().getIdentifier());
this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
entry.getKey());
}*/
}
this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
entry.getKey(), entry.getValue().intValue());
}
}*/
// Switch vertex state to assigning
final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
......
......@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.junit.Test;
......@@ -92,11 +93,13 @@ public class ExecutionGraphTest {
* {@inheritDoc}
*/
@Override
public void requestInstance(JobID jobID, Configuration conf, InstanceType instanceType, int count)
throws InstanceException {
public void requestInstance(JobID jobID, Configuration conf, Map<InstanceType, Integer> instanceMap,
List<String> splitAffinityList) throws InstanceException {
throw new IllegalStateException("requestInstance called on TestInstanceManager");
}
/**
* {@inheritDoc}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册