diff --git a/tools/aws_benchmarking/paddle_banchmarking_aws.py b/tools/aws_benchmarking/paddle_banchmarking_aws.py index c63e4b0742e4d5a7108d92c675d54a54ecc640be..68285406c461fe8ad5be27fefd519320db81f5c9 100644 --- a/tools/aws_benchmarking/paddle_banchmarking_aws.py +++ b/tools/aws_benchmarking/paddle_banchmarking_aws.py @@ -17,7 +17,7 @@ import os import json import math import time -import base64 +import threading import netaddr import boto3 @@ -29,16 +29,11 @@ import paramiko parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( - '--key_name', - type=str, - default="", - required=True, - help="required, key pair name") + '--key_name', type=str, default="", help="required, key pair name") parser.add_argument( '--security_group_id', type=str, default="", - required=True, help="required, the security group id associated with your VPC") parser.add_argument( @@ -55,13 +50,13 @@ parser.add_argument( parser.add_argument( '--pserver_instance_type', type=str, - default="p2.xlarge", - help="your pserver instance type") + default="p2.8xlarge", + help="your pserver instance type, p2.8xlarge by default") parser.add_argument( '--trainer_instance_type', type=str, - default="p2.xlarge", - help="your trainer instance type") + default="p2.8xlarge", + help="your trainer instance type, p2.8xlarge by default") parser.add_argument( '--task_name', @@ -71,13 +66,21 @@ parser.add_argument( parser.add_argument( '--pserver_image_id', type=str, - default="ami-1ae93962", - help="ami id for system image, default one has nvidia-docker ready") + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-east-2" +) parser.add_argument( '--trainer_image_id', type=str, - default="ami-1ae93962", - help="ami id for system image, default one has nvidia-docker ready") + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, use ami-1ae93962 for us-west-2" +) + +parser.add_argument( + '--availability_zone', + type=str, + default="us-east-2a", + help="aws zone id to place ec2 instances") parser.add_argument( '--trainer_count', type=int, default=1, help="Trainer count") @@ -88,17 +91,18 @@ parser.add_argument( parser.add_argument( '--pserver_bash_file', type=str, - required=False, default=os.path.join(os.path.dirname(__file__), "pserver.sh.template"), help="pserver bash file path") parser.add_argument( '--trainer_bash_file', type=str, - required=False, default=os.path.join(os.path.dirname(__file__), "trainer.sh.template"), help="trainer bash file path") +parser.add_argument( + '--action', type=str, default="create", help="create|cleanup|status") + parser.add_argument('--pem_path', type=str, help="private key file") parser.add_argument( @@ -176,7 +180,9 @@ def create_subnet(): print("trying to create subnet") subnet_desc = ec2client.create_subnet( - CidrBlock=str(subnet_cidr), VpcId=args.vpc_id) + CidrBlock=str(subnet_cidr), + VpcId=args.vpc_id, + AvailabilityZone=args.availability_zone) subnet_id = subnet_desc["Subnet"]["SubnetId"] @@ -211,8 +217,6 @@ def script_to_str(file_path): def run_instances(image_id, instance_type, count, role, cmd=""): - if cmd: - cmd = base64.b64encode(cmd) response = ec2client.run_instances( ImageId=image_id, InstanceType=instance_type, @@ -222,6 +226,7 @@ def run_instances(image_id, instance_type, count, role, cmd=""): DryRun=False, InstanceInitiatedShutdownBehavior="stop", KeyName=args.key_name, + Placement={'AvailabilityZone': args.availability_zone}, NetworkInterfaces=[{ 'DeviceIndex': 0, 'SubnetId': args.subnet_id, @@ -270,59 +275,94 @@ def run_instances(image_id, instance_type, count, role, cmd=""): def create_pservers(): - return run_instances( - image_id=args.pserver_image_id, - instance_type=args.pserver_instance_type, - count=args.pserver_count, - role="PSERVER", ) + try: + return run_instances( + image_id=args.pserver_image_id, + instance_type=args.pserver_instance_type, + count=args.pserver_count, + role="PSERVER", ) + except Exception, e: + print e + cleanup(args.task_name) def create_trainers(kickoff_cmd, pserver_endpoints_str): - responses = [] - for i in xrange(args.trainer_count): - cmd = kickoff_cmd.format( - PSERVER_HOSTS=pserver_endpoints_str, - DOCKER_IMAGE=args.docker_image, - TRAINER_INDEX=str(i)) - print(cmd) - responses.append( - run_instances( - image_id=args.trainer_image_id, - instance_type=args.trainer_instance_type, - count=1, - role="TRAINER", - cmd=cmd, )[0]) - return responses + try: + responses = [] + for i in xrange(args.trainer_count): + cmd = kickoff_cmd.format( + PSERVER_HOSTS=pserver_endpoints_str, + DOCKER_IMAGE=args.docker_image, + TRAINER_INDEX=str(i)) + print(cmd) + responses.append( + run_instances( + image_id=args.trainer_image_id, + instance_type=args.trainer_instance_type, + count=1, + role="TRAINER", + cmd=cmd, )[0]) + return responses + except Exception, e: + print e + cleanup(args.task_name) def cleanup(task_name): #shutdown all ec2 instances - instances = ec2client.describe_instances(Filters=[{ - "Name": "tag", - "Value": "Task_name=" + task_name + instances_response = ec2client.describe_instances(Filters=[{ + "Name": "tag:Task_name", + "Values": [task_name] }]) instance_ids = [] - for instance in instances["Reservations"][0]["Instances"]: - instance_ids.append(instance["InstanceId"]) + if len(instances_response["Reservations"]) > 0: + for reservation in instances_response["Reservations"]: + for instance in reservation["Instances"]: + instance_ids.append(instance["InstanceId"]) - ec2client.stop_instances(InstanceIds=instance_ids) + ec2client.terminate_instances(InstanceIds=instance_ids) - instance_stop_waiter = ec2client.get_waiter('instance_stopped') - instance_stop_waiter.wait(InstanceIds=instance_ids) + instance_termination_waiter = ec2client.get_waiter( + 'instance_terminated') + instance_termination_waiter.wait(InstanceIds=instance_ids) - #delete the subnet created +#delete the subnet created subnet = ec2client.describe_subnets(Filters=[{ - "Name": "tag", - "Value": "Task_name=" + task_name + "Name": "tag:Task_name", + "Values": [task_name] }]) - ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"]) + if len(subnet["Subnets"]) > 0: + ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"]) # no subnet delete waiter, just leave it. return +def kickoff_pserver(host, pserver_endpoints_str): + try: + ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path) + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect(hostname=host, username="ubuntu", pkey=ssh_key) + cmd = (script_to_str(args.pserver_bash_file)).format( + PSERVER_HOSTS=pserver_endpoints_str, + DOCKER_IMAGE=args.docker_image, + PSERVER_PORT=args.pserver_port) + print(cmd) + stdin, stdout, stderr = ssh_client.exec_command(command=cmd) + return_code = stdout.channel.recv_exit_status() + print(return_code) + if return_code != 0: + raise Exception("Error while kicking off pserver training process") + except Exception, e: + print e + cleanup(args.task_name) + finally: + ssh_client.close() + + def main(): if not args.task_name: args.task_name = generate_task_name() @@ -349,37 +389,25 @@ def main(): pserver_endpoints_str = ",".join(pserver_endpoints) - # ssh to pservers to start training - ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path) - ssh_client = paramiko.SSHClient() - ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - print("kicking off pserver training process") + pserver_threads = [] for pserver in pserver_create_response: - try: - ssh_client.connect( - hostname=pserver["PublicIpAddress"], - username="ubuntu", - pkey=ssh_key) - cmd = (script_to_str(args.pserver_bash_file)).format( - PSERVER_HOSTS=pserver_endpoints_str, - DOCKER_IMAGE=args.docker_image) - print(cmd) - stdin, stdout, stderr = ssh_client.exec_command(command=cmd) - if stderr.read(): - raise Exception( - "Error while kicking off pserver training process") - #print(stdout.read()) - except Exception, e: - print e - cleanup(args.task_name) - finally: - ssh_client.close() + pserver_thread = threading.Thread( + target=kickoff_pserver, + args=(pserver["PublicIpAddress"], pserver_endpoints_str)) + pserver_thread.start() + pserver_threads.append(pserver_thread) + + for pserver_thread in pserver_threads: + pserver_thread.join() + + print("all pserver training process started") print("creating trainers and kicking off trainer training process") create_trainers( kickoff_cmd=script_to_str(args.trainer_bash_file), pserver_endpoints_str=pserver_endpoints_str) + print("trainers created") def print_arguments(): @@ -391,4 +419,11 @@ def print_arguments(): if __name__ == "__main__": print_arguments() - main() + if args.action == "create": + if not args.key_name or not args.security_group_id: + raise ValueError("key_name and security_group_id are required") + main() + elif args.action == "cleanup": + if not args.task_name: + raise ValueError("task_name is required") + cleanup(args.task_name) diff --git a/tools/aws_benchmarking/pserver.sh.template b/tools/aws_benchmarking/pserver.sh.template index ddfe2f9d3167f280aaa8d00f87907d9754f82436..e6642c2db496e50ec76977593bc7eab589d39839 100644 --- a/tools/aws_benchmarking/pserver.sh.template +++ b/tools/aws_benchmarking/pserver.sh.template @@ -1 +1,2 @@ -nvidia-docker run -i -e "TRAINING_ROLE=PSERVER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file +#!/bin/bash +nvidia-docker run -p {PSERVER_PORT}:{PSERVER_PORT} -e "TRAINING_ROLE=PSERVER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file diff --git a/tools/aws_benchmarking/trainer.sh.template b/tools/aws_benchmarking/trainer.sh.template index 70aceb88144201b8de2f5eeaa4068fe2e61767bc..05a7d3b91db42b32955193ee5689f1911a07a907 100644 --- a/tools/aws_benchmarking/trainer.sh.template +++ b/tools/aws_benchmarking/trainer.sh.template @@ -1 +1,2 @@ -nvidia-docker run -i -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file +#!/bin/bash +nvidia-docker run -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file