提交 2d324b62 编写于 作者: X Xi Chen

GA for creating and cleaning instances

上级 ad4bef71
...@@ -17,7 +17,7 @@ import os ...@@ -17,7 +17,7 @@ import os
import json import json
import math import math
import time import time
import base64 import threading
import netaddr import netaddr
import boto3 import boto3
...@@ -29,16 +29,11 @@ import paramiko ...@@ -29,16 +29,11 @@ import paramiko
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument( parser.add_argument(
'--key_name', '--key_name', type=str, default="", help="required, key pair name")
type=str,
default="",
required=True,
help="required, key pair name")
parser.add_argument( parser.add_argument(
'--security_group_id', '--security_group_id',
type=str, type=str,
default="", default="",
required=True,
help="required, the security group id associated with your VPC") help="required, the security group id associated with your VPC")
parser.add_argument( parser.add_argument(
...@@ -55,13 +50,13 @@ parser.add_argument( ...@@ -55,13 +50,13 @@ parser.add_argument(
parser.add_argument( parser.add_argument(
'--pserver_instance_type', '--pserver_instance_type',
type=str, type=str,
default="p2.xlarge", default="p2.8xlarge",
help="your pserver instance type") help="your pserver instance type, p2.8xlarge by default")
parser.add_argument( parser.add_argument(
'--trainer_instance_type', '--trainer_instance_type',
type=str, type=str,
default="p2.xlarge", default="p2.8xlarge",
help="your trainer instance type") help="your trainer instance type, p2.8xlarge by default")
parser.add_argument( parser.add_argument(
'--task_name', '--task_name',
...@@ -71,13 +66,21 @@ parser.add_argument( ...@@ -71,13 +66,21 @@ parser.add_argument(
parser.add_argument( parser.add_argument(
'--pserver_image_id', '--pserver_image_id',
type=str, type=str,
default="ami-1ae93962", default="ami-da2c1cbf",
help="ami id for system image, default one has nvidia-docker ready") help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-east-2"
)
parser.add_argument( parser.add_argument(
'--trainer_image_id', '--trainer_image_id',
type=str, type=str,
default="ami-1ae93962", default="ami-da2c1cbf",
help="ami id for system image, default one has nvidia-docker ready") help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-west-2"
)
parser.add_argument(
'--availability_zone',
type=str,
default="us-east-2a",
help="aws zone id to place ec2 instances")
parser.add_argument( parser.add_argument(
'--trainer_count', type=int, default=1, help="Trainer count") '--trainer_count', type=int, default=1, help="Trainer count")
...@@ -88,17 +91,18 @@ parser.add_argument( ...@@ -88,17 +91,18 @@ parser.add_argument(
parser.add_argument( parser.add_argument(
'--pserver_bash_file', '--pserver_bash_file',
type=str, type=str,
required=False,
default=os.path.join(os.path.dirname(__file__), "pserver.sh.template"), default=os.path.join(os.path.dirname(__file__), "pserver.sh.template"),
help="pserver bash file path") help="pserver bash file path")
parser.add_argument( parser.add_argument(
'--trainer_bash_file', '--trainer_bash_file',
type=str, type=str,
required=False,
default=os.path.join(os.path.dirname(__file__), "trainer.sh.template"), default=os.path.join(os.path.dirname(__file__), "trainer.sh.template"),
help="trainer bash file path") help="trainer bash file path")
parser.add_argument(
'--action', type=str, default="create", help="create|cleanup|status")
parser.add_argument('--pem_path', type=str, help="private key file") parser.add_argument('--pem_path', type=str, help="private key file")
parser.add_argument( parser.add_argument(
...@@ -176,7 +180,9 @@ def create_subnet(): ...@@ -176,7 +180,9 @@ def create_subnet():
print("trying to create subnet") print("trying to create subnet")
subnet_desc = ec2client.create_subnet( subnet_desc = ec2client.create_subnet(
CidrBlock=str(subnet_cidr), VpcId=args.vpc_id) CidrBlock=str(subnet_cidr),
VpcId=args.vpc_id,
AvailabilityZone=args.availability_zone)
subnet_id = subnet_desc["Subnet"]["SubnetId"] subnet_id = subnet_desc["Subnet"]["SubnetId"]
...@@ -211,8 +217,6 @@ def script_to_str(file_path): ...@@ -211,8 +217,6 @@ def script_to_str(file_path):
def run_instances(image_id, instance_type, count, role, cmd=""): def run_instances(image_id, instance_type, count, role, cmd=""):
if cmd:
cmd = base64.b64encode(cmd)
response = ec2client.run_instances( response = ec2client.run_instances(
ImageId=image_id, ImageId=image_id,
InstanceType=instance_type, InstanceType=instance_type,
...@@ -222,6 +226,7 @@ def run_instances(image_id, instance_type, count, role, cmd=""): ...@@ -222,6 +226,7 @@ def run_instances(image_id, instance_type, count, role, cmd=""):
DryRun=False, DryRun=False,
InstanceInitiatedShutdownBehavior="stop", InstanceInitiatedShutdownBehavior="stop",
KeyName=args.key_name, KeyName=args.key_name,
Placement={'AvailabilityZone': args.availability_zone},
NetworkInterfaces=[{ NetworkInterfaces=[{
'DeviceIndex': 0, 'DeviceIndex': 0,
'SubnetId': args.subnet_id, 'SubnetId': args.subnet_id,
...@@ -270,59 +275,94 @@ def run_instances(image_id, instance_type, count, role, cmd=""): ...@@ -270,59 +275,94 @@ def run_instances(image_id, instance_type, count, role, cmd=""):
def create_pservers(): def create_pservers():
return run_instances( try:
image_id=args.pserver_image_id, return run_instances(
instance_type=args.pserver_instance_type, image_id=args.pserver_image_id,
count=args.pserver_count, instance_type=args.pserver_instance_type,
role="PSERVER", ) count=args.pserver_count,
role="PSERVER", )
except Exception, e:
print e
cleanup(args.task_name)
def create_trainers(kickoff_cmd, pserver_endpoints_str): def create_trainers(kickoff_cmd, pserver_endpoints_str):
responses = [] try:
for i in xrange(args.trainer_count): responses = []
cmd = kickoff_cmd.format( for i in xrange(args.trainer_count):
PSERVER_HOSTS=pserver_endpoints_str, cmd = kickoff_cmd.format(
DOCKER_IMAGE=args.docker_image, PSERVER_HOSTS=pserver_endpoints_str,
TRAINER_INDEX=str(i)) DOCKER_IMAGE=args.docker_image,
print(cmd) TRAINER_INDEX=str(i))
responses.append( print(cmd)
run_instances( responses.append(
image_id=args.trainer_image_id, run_instances(
instance_type=args.trainer_instance_type, image_id=args.trainer_image_id,
count=1, instance_type=args.trainer_instance_type,
role="TRAINER", count=1,
cmd=cmd, )[0]) role="TRAINER",
return responses cmd=cmd, )[0])
return responses
except Exception, e:
print e
cleanup(args.task_name)
def cleanup(task_name): def cleanup(task_name):
#shutdown all ec2 instances #shutdown all ec2 instances
instances = ec2client.describe_instances(Filters=[{ instances_response = ec2client.describe_instances(Filters=[{
"Name": "tag", "Name": "tag:Task_name",
"Value": "Task_name=" + task_name "Values": [task_name]
}]) }])
instance_ids = [] instance_ids = []
for instance in instances["Reservations"][0]["Instances"]: if len(instances_response["Reservations"]) > 0:
instance_ids.append(instance["InstanceId"]) for reservation in instances_response["Reservations"]:
for instance in reservation["Instances"]:
instance_ids.append(instance["InstanceId"])
ec2client.stop_instances(InstanceIds=instance_ids) ec2client.terminate_instances(InstanceIds=instance_ids)
instance_stop_waiter = ec2client.get_waiter('instance_stopped') instance_termination_waiter = ec2client.get_waiter(
instance_stop_waiter.wait(InstanceIds=instance_ids) 'instance_terminated')
instance_termination_waiter.wait(InstanceIds=instance_ids)
#delete the subnet created #delete the subnet created
subnet = ec2client.describe_subnets(Filters=[{ subnet = ec2client.describe_subnets(Filters=[{
"Name": "tag", "Name": "tag:Task_name",
"Value": "Task_name=" + task_name "Values": [task_name]
}]) }])
ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"]) if len(subnet["Subnets"]) > 0:
ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"])
# no subnet delete waiter, just leave it. # no subnet delete waiter, just leave it.
return return
def kickoff_pserver(host, pserver_endpoints_str):
try:
ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=host, username="ubuntu", pkey=ssh_key)
cmd = (script_to_str(args.pserver_bash_file)).format(
PSERVER_HOSTS=pserver_endpoints_str,
DOCKER_IMAGE=args.docker_image,
PSERVER_PORT=args.pserver_port)
print(cmd)
stdin, stdout, stderr = ssh_client.exec_command(command=cmd)
return_code = stdout.channel.recv_exit_status()
print(return_code)
if return_code != 0:
raise Exception("Error while kicking off pserver training process")
except Exception, e:
print e
cleanup(args.task_name)
finally:
ssh_client.close()
def main(): def main():
if not args.task_name: if not args.task_name:
args.task_name = generate_task_name() args.task_name = generate_task_name()
...@@ -349,37 +389,25 @@ def main(): ...@@ -349,37 +389,25 @@ def main():
pserver_endpoints_str = ",".join(pserver_endpoints) pserver_endpoints_str = ",".join(pserver_endpoints)
# ssh to pservers to start training
ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path)
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print("kicking off pserver training process") print("kicking off pserver training process")
pserver_threads = []
for pserver in pserver_create_response: for pserver in pserver_create_response:
try: pserver_thread = threading.Thread(
ssh_client.connect( target=kickoff_pserver,
hostname=pserver["PublicIpAddress"], args=(pserver["PublicIpAddress"], pserver_endpoints_str))
username="ubuntu", pserver_thread.start()
pkey=ssh_key) pserver_threads.append(pserver_thread)
cmd = (script_to_str(args.pserver_bash_file)).format(
PSERVER_HOSTS=pserver_endpoints_str, for pserver_thread in pserver_threads:
DOCKER_IMAGE=args.docker_image) pserver_thread.join()
print(cmd)
stdin, stdout, stderr = ssh_client.exec_command(command=cmd) print("all pserver training process started")
if stderr.read():
raise Exception(
"Error while kicking off pserver training process")
#print(stdout.read())
except Exception, e:
print e
cleanup(args.task_name)
finally:
ssh_client.close()
print("creating trainers and kicking off trainer training process") print("creating trainers and kicking off trainer training process")
create_trainers( create_trainers(
kickoff_cmd=script_to_str(args.trainer_bash_file), kickoff_cmd=script_to_str(args.trainer_bash_file),
pserver_endpoints_str=pserver_endpoints_str) pserver_endpoints_str=pserver_endpoints_str)
print("trainers created")
def print_arguments(): def print_arguments():
...@@ -391,4 +419,11 @@ def print_arguments(): ...@@ -391,4 +419,11 @@ def print_arguments():
if __name__ == "__main__": if __name__ == "__main__":
print_arguments() print_arguments()
main() if args.action == "create":
if not args.key_name or not args.security_group_id:
raise ValueError("key_name and security_group_id are required")
main()
elif args.action == "cleanup":
if not args.task_name:
raise ValueError("task_name is required")
cleanup(args.task_name)
nvidia-docker run -i -e "TRAINING_ROLE=PSERVER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} #!/bin/bash
\ No newline at end of file nvidia-docker run -p {PSERVER_PORT}:{PSERVER_PORT} -e "TRAINING_ROLE=PSERVER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE}
\ No newline at end of file
nvidia-docker run -i -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} #!/bin/bash
\ No newline at end of file nvidia-docker run -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册