From aaa642821e5d85a2721abb750084de070a14ab2d Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Sun, 8 Apr 2018 12:52:14 -0700 Subject: [PATCH] adding client and docker files --- tools/aws_benchmarking/client/Dockerfile | 7 + .../client/cluster_launcher.py | 374 ++++++++++++++++++ .../aws_benchmarking/client/requirements.txt | 6 + tools/aws_benchmarking/pserver.sh.template | 2 - tools/aws_benchmarking/server/Dockerfile | 10 + .../cluster_master.py} | 198 ++++++++-- .../server/pserver.sh.template | 2 + .../{ => server}/requirements.txt | 0 .../server/trainer.sh.template | 2 + tools/aws_benchmarking/trainer.sh.template | 2 - 10 files changed, 556 insertions(+), 47 deletions(-) create mode 100644 tools/aws_benchmarking/client/Dockerfile create mode 100644 tools/aws_benchmarking/client/cluster_launcher.py create mode 100644 tools/aws_benchmarking/client/requirements.txt delete mode 100644 tools/aws_benchmarking/pserver.sh.template create mode 100644 tools/aws_benchmarking/server/Dockerfile rename tools/aws_benchmarking/{paddle_banchmarking_aws.py => server/cluster_master.py} (66%) create mode 100644 tools/aws_benchmarking/server/pserver.sh.template rename tools/aws_benchmarking/{ => server}/requirements.txt (100%) create mode 100644 tools/aws_benchmarking/server/trainer.sh.template delete mode 100644 tools/aws_benchmarking/trainer.sh.template diff --git a/tools/aws_benchmarking/client/Dockerfile b/tools/aws_benchmarking/client/Dockerfile new file mode 100644 index 0000000000..812c5d4bce --- /dev/null +++ b/tools/aws_benchmarking/client/Dockerfile @@ -0,0 +1,7 @@ +FROM python:2.7.14-stretch + +ENV HOME /root +COPY ./ /root/ +WORKDIR /root +RUN pip install -r /root/requirements.txt +ENTRYPOINT ["python", "cluster_launcher.py"] \ No newline at end of file diff --git a/tools/aws_benchmarking/client/cluster_launcher.py b/tools/aws_benchmarking/client/cluster_launcher.py new file mode 100644 index 0000000000..eaccffc204 --- /dev/null +++ b/tools/aws_benchmarking/client/cluster_launcher.py @@ -0,0 +1,374 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +import time +import math +import logging +import copy + +import netaddr +import boto3 +import namesgenerator +import paramiko +from scp import SCPClient +import requests + +parser = argparse.ArgumentParser(description=__doc__) +parser.add_argument( + '--key_name', type=str, default="", help="required, key pair name") +parser.add_argument( + '--security_group_id', + type=str, + default="", + help="required, the security group id associated with your VPC") + +parser.add_argument( + '--vpc_id', + type=str, + default="", + help="The VPC in which you wish to run test") +parser.add_argument( + '--subnet_id', + type=str, + default="", + help="The Subnet_id in which you wish to run test") + +parser.add_argument( + '--pserver_instance_type', + type=str, + default="p2.8xlarge", + help="your pserver instance type, p2.8xlarge by default") +parser.add_argument( + '--trainer_instance_type', + type=str, + default="p2.8xlarge", + help="your trainer instance type, p2.8xlarge by default") + +parser.add_argument( + '--task_name', + type=str, + default="", + help="the name you want to identify your job") +parser.add_argument( + '--pserver_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, \ + use ami-1ae93962 for us-east-2") +parser.add_argument( + '--trainer_image_id', + type=str, + default="ami-da2c1cbf", + help="ami id for system image, default one has nvidia-docker ready, \ + use ami-1ae93962 for us-west-2") + +parser.add_argument( + '--availability_zone', + type=str, + default="us-east-2a", + help="aws zone id to place ec2 instances") + +parser.add_argument( + '--trainer_count', type=int, default=1, help="Trainer count") + +parser.add_argument( + '--pserver_count', type=int, default=1, help="Pserver count") + +parser.add_argument( + '--action', type=str, default="serve", help="create|cleanup|status") + +parser.add_argument('--pem_path', type=str, help="private key file") + +parser.add_argument( + '--pserver_port', type=str, default="5436", help="pserver port") + +parser.add_argument( + '--docker_image', type=str, default="busybox", help="training docker image") + +parser.add_argument( + '--master_server_port', type=int, default=5436, help="master server port") + +parser.add_argument( + '--master_server_public_ip', type=str, help="master server public ip") + +args = parser.parse_args() + +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') + +ec2client = boto3.client('ec2') + + +def print_arguments(): + print('----------- Configuration Arguments -----------') + for arg, value in sorted(vars(args).iteritems()): + print('%s: %s' % (arg, value)) + print('------------------------------------------------') + + +def create_subnet(): + # if no vpc id provided, list vpcs + logging.info("start creating subnet") + if not args.vpc_id: + logging.info("no vpc provided, trying to find the default one") + vpcs_desc = ec2client.describe_vpcs( + Filters=[{ + "Name": "isDefault", + "Values": ["true", ] + }], ) + if len(vpcs_desc["Vpcs"]) == 0: + raise ValueError('No default VPC') + args.vpc_id = vpcs_desc["Vpcs"][0]["VpcId"] + vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] + + logging.info("default vpc fount with id %s and CidrBlock %s" % + (args.vpc_id, vpc_cidrBlock)) + + if not vpc_cidrBlock: + logging.info("trying to find cidrblock for vpc") + vpcs_desc = ec2client.describe_vpcs( + Filters=[{ + "Name": "vpc-id", + "Values": [args.vpc_id, ], + }], ) + if len(vpcs_desc["Vpcs"]) == 0: + raise ValueError('No VPC found') + vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] + logging.info("cidrblock for vpc is %s" % vpc_cidrBlock) + + # list subnets in vpc in order to create a new one + + logging.info("trying to find ip blocks for new subnet") + subnets_desc = ec2client.describe_subnets( + Filters=[{ + "Name": "vpc-id", + "Values": [args.vpc_id, ], + }], ) + + ips_taken = [] + for subnet_dec in subnets_desc["Subnets"]: + ips_taken.append(subnet_dec["CidrBlock"]) + + ip_blocks_avaliable = netaddr.IPSet( + [vpc_cidrBlock]) ^ netaddr.IPSet(ips_taken) + # adding 10 addresses as buffer + cidr_prefix = 32 - math.ceil( + math.log(args.pserver_count + args.trainer_count + 10, 2)) + if cidr_prefix <= 16: + raise ValueError('Too many nodes to fit in current VPC') + + for ipnetwork in ip_blocks_avaliable.iter_cidrs(): + try: + subnet_cidr = ipnetwork.subnet(int(cidr_prefix)).next() + logging.info("subnet ip block found %s" % (subnet_cidr)) + break + except Exception: + pass + + if not subnet_cidr: + raise ValueError( + 'No avaliable subnet to fit required nodes in current VPC') + + logging.info("trying to create subnet") + subnet_desc = ec2client.create_subnet( + CidrBlock=str(subnet_cidr), + VpcId=args.vpc_id, + AvailabilityZone=args.availability_zone) + + subnet_id = subnet_desc["Subnet"]["SubnetId"] + + subnet_waiter = ec2client.get_waiter('subnet_available') + # sleep for 1s before checking its state + time.sleep(1) + subnet_waiter.wait(SubnetIds=[subnet_id, ]) + + logging.info("subnet created") + + logging.info("adding tags to newly created subnet") + ec2client.create_tags( + Resources=[subnet_id, ], + Tags=[{ + "Key": "Task_name", + 'Value': args.task_name + }]) + return subnet_id + + +def run_instances(image_id, instance_type, count=1, role="MASTER", cmd=""): + response = ec2client.run_instances( + ImageId=image_id, + InstanceType=instance_type, + MaxCount=count, + MinCount=count, + UserData=cmd, + DryRun=False, + InstanceInitiatedShutdownBehavior="stop", + KeyName=args.key_name, + Placement={'AvailabilityZone': args.availability_zone}, + NetworkInterfaces=[{ + 'DeviceIndex': 0, + 'SubnetId': args.subnet_id, + "AssociatePublicIpAddress": True, + 'Groups': args.security_group_ids + }], + TagSpecifications=[{ + 'ResourceType': "instance", + 'Tags': [{ + "Key": 'Task_name', + "Value": args.task_name + "_master" + }, { + "Key": 'Role', + "Value": role + }] + }]) + + instance_ids = [] + for instance in response["Instances"]: + instance_ids.append(instance["InstanceId"]) + + if len(instance_ids) > 0: + logging.info(str(len(instance_ids)) + " instance(s) created") + else: + logging.info("no instance created") + #create waiter to make sure it's running + + logging.info("waiting for instance to become accessible") + waiter = ec2client.get_waiter('instance_status_ok') + waiter.wait( + Filters=[{ + "Name": "instance-status.status", + "Values": ["ok"] + }, { + "Name": "instance-status.reachability", + "Values": ["passed"] + }, { + "Name": "instance-state-name", + "Values": ["running"] + }], + InstanceIds=instance_ids) + + instances_response = ec2client.describe_instances(InstanceIds=instance_ids) + + return instances_response["Reservations"][0]["Instances"] + + +def generate_task_name(): + return namesgenerator.get_random_name() + + +def init_args(): + + if not args.task_name: + args.task_name = generate_task_name() + logging.info("task name generated %s" % (args.task_name)) + + if not args.pem_path: + args.pem_path = os.path.expanduser("~") + "/" + args.key_name + ".pem" + if args.security_group_id: + args.security_group_ids = (args.security_group_id, ) + + +def create(): + + init_args() + + # create subnet + if not args.subnet_id: + args.subnet_id = create_subnet() + + # create master node + + master_instance_response = run_instances( + image_id="ami-7a05351f", instance_type="t2.nano") + + logging.info("master server started") + + args.master_server_public_ip = master_instance_response[0][ + "PublicIpAddress"] + args.master_server_ip = master_instance_response[0]["PrivateIpAddress"] + + logging.info("master server started, master_ip=%s, task_name=%s" % + (args.master_server_public_ip, args.task_name)) + + # cp config file and pems to master node + + ssh_key = paramiko.RSAKey.from_private_key_file(args.pem_path) + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_client.connect( + hostname=args.master_server_public_ip, username="ubuntu", pkey=ssh_key) + + with SCPClient(ssh_client.get_transport()) as scp: + scp.put(os.path.expanduser("~") + "/" + ".aws", + recursive=True, + remote_path='/home/ubuntu/') + scp.put(args.pem_path, + remote_path='/home/ubuntu/' + args.key_name + ".pem") + + logging.info("credentials and pem copied to master") + + # set arguments and start docker + kick_off_cmd = "docker run -d -v /home/ubuntu/.aws:/root/.aws/" + kick_off_cmd += " -v /home/ubuntu/" + args.key_name + ".pem:/root/" + args.key_name + ".pem" + kick_off_cmd += " -p " + str(args.master_server_port) + ":" + str( + args.master_server_port) + kick_off_cmd += " putcn/paddle_aws_master" + + args_to_pass = copy.copy(args) + args_to_pass.action = "serve" + del args_to_pass.pem_path + del args_to_pass.security_group_ids + del args_to_pass.master_server_public_ip + for arg, value in sorted(vars(args_to_pass).iteritems()): + kick_off_cmd += ' --%s %s' % (arg, value) + + logging.info(kick_off_cmd) + stdin, stdout, stderr = ssh_client.exec_command(command=kick_off_cmd) + return_code = stdout.channel.recv_exit_status() + logging.info(return_code) + if return_code != 0: + raise Exception("Error while kicking off master") + + logging.info( + "master sercer finished init process, visit %s to check master log" % + (get_master_web_url("/logs"))) + + +def cleanup(): + print requests.post(get_master_web_url("/cleanup")).text + + +def status(): + print requests.post(get_master_web_url("/logs")).text + + +def get_master_web_url(path): + return "http://" + args.master_server_public_ip + ":" + args.master_server_port + path + + +if __name__ == "__main__": + print_arguments() + if args.action == "create": + if not args.key_name or not args.security_group_id: + raise ValueError("key_name and security_group_id are required") + create() + elif args.action == "cleanup": + if not args.master_server_public_ip: + raise ValueError("master_server_public_ip is required") + cleanup() + elif args.action == "status": + if not args.master_server_public_ip: + raise ValueError("master_server_public_ip is required") + status() diff --git a/tools/aws_benchmarking/client/requirements.txt b/tools/aws_benchmarking/client/requirements.txt new file mode 100644 index 0000000000..9454801f20 --- /dev/null +++ b/tools/aws_benchmarking/client/requirements.txt @@ -0,0 +1,6 @@ +netaddr==0.7.19 +boto3==1.6.21 +namesgenerator==0.3 +paramiko==2.4.1 +scp +requests diff --git a/tools/aws_benchmarking/pserver.sh.template b/tools/aws_benchmarking/pserver.sh.template deleted file mode 100644 index e6642c2db4..0000000000 --- a/tools/aws_benchmarking/pserver.sh.template +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -nvidia-docker run -p {PSERVER_PORT}:{PSERVER_PORT} -e "TRAINING_ROLE=PSERVER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file diff --git a/tools/aws_benchmarking/server/Dockerfile b/tools/aws_benchmarking/server/Dockerfile new file mode 100644 index 0000000000..1593242cdc --- /dev/null +++ b/tools/aws_benchmarking/server/Dockerfile @@ -0,0 +1,10 @@ +# A image for building paddle binaries +# Use cuda devel base image for both cpu and gpu environment +FROM python:2.7.14-stretch + +ENV HOME /root +# Add bash enhancements +COPY ./ /root/ +WORKDIR /root +RUN pip install -r /root/requirements.txt +ENTRYPOINT ["python", "cluster_master.py"] \ No newline at end of file diff --git a/tools/aws_benchmarking/paddle_banchmarking_aws.py b/tools/aws_benchmarking/server/cluster_master.py similarity index 66% rename from tools/aws_benchmarking/paddle_banchmarking_aws.py rename to tools/aws_benchmarking/server/cluster_master.py index 68285406c4..a4f54e4441 100644 --- a/tools/aws_benchmarking/paddle_banchmarking_aws.py +++ b/tools/aws_benchmarking/server/cluster_master.py @@ -18,12 +18,15 @@ import json import math import time import threading +import logging import netaddr import boto3 import namesgenerator import paramiko +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + # You must have aws_access_key_id, aws_secret_access_key, region set in # ~/.aws/credentials and ~/.aws/config @@ -101,7 +104,7 @@ parser.add_argument( help="trainer bash file path") parser.add_argument( - '--action', type=str, default="create", help="create|cleanup|status") + '--action', type=str, default="serve", help="create|cleanup|serve") parser.add_argument('--pem_path', type=str, help="private key file") @@ -111,15 +114,25 @@ parser.add_argument( parser.add_argument( '--docker_image', type=str, default="busybox", help="training docker image") +parser.add_argument( + '--master_server_port', type=int, default=5436, help="master server port") + +parser.add_argument( + '--master_server_ip', type=str, default="", help="master server private ip") + args = parser.parse_args() ec2client = boto3.client('ec2') +logging.basicConfig( + filename='master.log', level=logging.INFO, format='%(asctime)s %(message)s') + def create_subnet(): # if no vpc id provided, list vpcs + logging.info("start creating subnet") if not args.vpc_id: - print("no vpc provided, trying to find the default one") + logging.info("no vpc provided, trying to find the default one") vpcs_desc = ec2client.describe_vpcs( Filters=[{ "Name": "isDefault", @@ -130,11 +143,11 @@ def create_subnet(): args.vpc_id = vpcs_desc["Vpcs"][0]["VpcId"] vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] - print("default vpc fount with id %s and CidrBlock %s" % - (args.vpc_id, vpc_cidrBlock)) + logging.info("default vpc fount with id %s and CidrBlock %s" % + (args.vpc_id, vpc_cidrBlock)) if not vpc_cidrBlock: - print("trying to find cidrblock for vpc") + logging.info("trying to find cidrblock for vpc") vpcs_desc = ec2client.describe_vpcs( Filters=[{ "Name": "vpc-id", @@ -143,11 +156,11 @@ def create_subnet(): if len(vpcs_desc["Vpcs"]) == 0: raise ValueError('No VPC found') vpc_cidrBlock = vpcs_desc["Vpcs"][0]["CidrBlock"] - print("cidrblock for vpc is %s" % vpc_cidrBlock) + logging.info("cidrblock for vpc is %s" % vpc_cidrBlock) # list subnets in vpc in order to create a new one - print("trying to find ip blocks for new subnet") + logging.info("trying to find ip blocks for new subnet") subnets_desc = ec2client.describe_subnets( Filters=[{ "Name": "vpc-id", @@ -169,7 +182,7 @@ def create_subnet(): for ipnetwork in ip_blocks_avaliable.iter_cidrs(): try: subnet_cidr = ipnetwork.subnet(int(cidr_prefix)).next() - print("subnet ip block found %s" % (subnet_cidr)) + logging.info("subnet ip block found %s" % (subnet_cidr)) break except Exception: pass @@ -178,7 +191,7 @@ def create_subnet(): raise ValueError( 'No avaliable subnet to fit required nodes in current VPC') - print("trying to create subnet") + logging.info("trying to create subnet") subnet_desc = ec2client.create_subnet( CidrBlock=str(subnet_cidr), VpcId=args.vpc_id, @@ -191,9 +204,9 @@ def create_subnet(): time.sleep(1) subnet_waiter.wait(SubnetIds=[subnet_id, ]) - print("subnet created") + logging.info("subnet created") - print("adding tags to newly created subnet") + logging.info("adding tags to newly created subnet") ec2client.create_tags( Resources=[subnet_id, ], Tags=[{ @@ -249,12 +262,12 @@ def run_instances(image_id, instance_type, count, role, cmd=""): instance_ids.append(instance["InstanceId"]) if len(instance_ids) > 0: - print(str(len(instance_ids)) + " instance(s) created") + logging.info(str(len(instance_ids)) + " instance(s) created") else: - print("no instance created") + logging.info("no instance created") #create waiter to make sure it's running - print("waiting for instance to become accessible") + logging.info("waiting for instance to become accessible") waiter = ec2client.get_waiter('instance_status_ok') waiter.wait( Filters=[{ @@ -281,8 +294,8 @@ def create_pservers(): instance_type=args.pserver_instance_type, count=args.pserver_count, role="PSERVER", ) - except Exception, e: - print e + except Exception: + logging.exception("error while trying to create pservers") cleanup(args.task_name) @@ -293,8 +306,11 @@ def create_trainers(kickoff_cmd, pserver_endpoints_str): cmd = kickoff_cmd.format( PSERVER_HOSTS=pserver_endpoints_str, DOCKER_IMAGE=args.docker_image, - TRAINER_INDEX=str(i)) - print(cmd) + TRAINER_INDEX=str(i), + TASK_NAME=args.task_name, + MASTER_ENDPOINT=args.master_server_ip + ":" + + str(args.master_server_port)) + logging.info(cmd) responses.append( run_instances( image_id=args.trainer_image_id, @@ -303,13 +319,14 @@ def create_trainers(kickoff_cmd, pserver_endpoints_str): role="TRAINER", cmd=cmd, )[0]) return responses - except Exception, e: - print e + except Exception: + logging.exception("error while trying to create trainers") cleanup(args.task_name) def cleanup(task_name): #shutdown all ec2 instances + print("going to clean up " + task_name + " instances") instances_response = ec2client.describe_instances(Filters=[{ "Name": "tag:Task_name", "Values": [task_name] @@ -327,7 +344,7 @@ def cleanup(task_name): 'instance_terminated') instance_termination_waiter.wait(InstanceIds=instance_ids) -#delete the subnet created + #delete the subnet created subnet = ec2client.describe_subnets(Filters=[{ "Name": "tag:Task_name", @@ -337,6 +354,7 @@ def cleanup(task_name): if len(subnet["Subnets"]) > 0: ec2client.delete_subnet(SubnetId=subnet["Subnets"][0]["SubnetId"]) # no subnet delete waiter, just leave it. + logging.info("Clearnup done") return @@ -349,38 +367,47 @@ def kickoff_pserver(host, pserver_endpoints_str): cmd = (script_to_str(args.pserver_bash_file)).format( PSERVER_HOSTS=pserver_endpoints_str, DOCKER_IMAGE=args.docker_image, - PSERVER_PORT=args.pserver_port) - print(cmd) + PSERVER_PORT=args.pserver_port, + TASK_NAME=args.task_name, + MASTER_ENDPOINT=args.master_server_ip + ":" + + str(args.master_server_port)) + logging.info(cmd) stdin, stdout, stderr = ssh_client.exec_command(command=cmd) return_code = stdout.channel.recv_exit_status() - print(return_code) + logging.info(return_code) if return_code != 0: raise Exception("Error while kicking off pserver training process") - except Exception, e: - print e + except Exception: + logging.exception("Error while kicking off pserver training process") cleanup(args.task_name) finally: ssh_client.close() -def main(): +def init_args(): + if not args.task_name: args.task_name = generate_task_name() - print("task name generated", args.task_name) - - if not args.subnet_id: - print("creating subnet for this task") - args.subnet_id = create_subnet() - print("subnet %s created" % (args.subnet_id)) + logging.info("task name generated %s" % (args.task_name)) if not args.pem_path: args.pem_path = os.path.expanduser("~") + "/" + args.key_name + ".pem" if args.security_group_id: args.security_group_ids = (args.security_group_id, ) - print("creating pservers") + args.trainers_job_done_count = 0 + + +def create_cluster(): + + if not args.subnet_id: + logging.info("creating subnet for this task") + args.subnet_id = create_subnet() + logging.info("subnet %s created" % (args.subnet_id)) + + logging.info("creating pservers") pserver_create_response = create_pservers() - print("pserver created, collecting pserver ips") + logging.info("pserver created, collecting pserver ips") pserver_endpoints = [] for pserver in pserver_create_response: @@ -389,7 +416,7 @@ def main(): pserver_endpoints_str = ",".join(pserver_endpoints) - print("kicking off pserver training process") + logging.info("kicking off pserver training process") pserver_threads = [] for pserver in pserver_create_response: pserver_thread = threading.Thread( @@ -401,29 +428,114 @@ def main(): for pserver_thread in pserver_threads: pserver_thread.join() - print("all pserver training process started") + logging.info("all pserver training process started") - print("creating trainers and kicking off trainer training process") + logging.info("creating trainers and kicking off trainer training process") create_trainers( kickoff_cmd=script_to_str(args.trainer_bash_file), pserver_endpoints_str=pserver_endpoints_str) - print("trainers created") + logging.info("trainers created") + + +def start_server(args): + class S(BaseHTTPRequestHandler): + def _set_headers(self): + self.send_response(200) + self.send_header('Content-type', 'text/text') + self.end_headers() + + def do_HEAD(self): + self._set_headers() + + def do_404(self): + self.send_response(404) + self.send_header('Content-type', 'text/text') + self.end_headers() + logging.info("Received invalid GET request" + self.path) + self.wfile.write("NO ACTION FOUND") + + def do_GET(self): + self._set_headers() + request_path = self.path + if request_path == "/status" or request_path == "/logs": + logging.info("Received request to return status") + with open("master.log", "r") as logfile: + self.wfile.write(logfile.read().strip()) + else: + self.do_404() + + def do_POST(self): + + request_path = self.path + + if request_path == "/save_data": + self._set_headers() + logging.info("Received request to save data") + self.wfile.write("DATA SAVED!") + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + if args.task_name: + with open(args.task_name + ".txt", "a") as text_file: + text_file.write(post_data + "\n") + + elif request_path == "/cleanup": + self._set_headers() + logging.info("Received request to cleanup cluster") + cleanup(args.task_name) + self.wfile.write("cleanup in progress") + + elif request_path == "/trainer_job_done": + self._set_headers() + logging.info("Received request to increase job done count") + args.trainers_job_done_count += 1 + self.wfile.write( + str(args.trainers_job_done_count) + " tainers job done") + if args.trainers_job_done_count >= args.trainer_count: + logging.info("going to clean up") + cleanup(args.task_name) + + else: + self.do_404() + + server_address = ('', args.master_server_port) + httpd = HTTPServer(server_address, S) + logging.info("HTTP server is starting") + httpd.serve_forever() def print_arguments(): - print('----------- Configuration Arguments -----------') + logging.info('----------- Configuration Arguments -----------') for arg, value in sorted(vars(args).iteritems()): - print('%s: %s' % (arg, value)) - print('------------------------------------------------') + logging.info('%s: %s' % (arg, value)) + logging.info('------------------------------------------------') if __name__ == "__main__": print_arguments() if args.action == "create": + logging.info("going to create cluster") if not args.key_name or not args.security_group_id: raise ValueError("key_name and security_group_id are required") - main() + init_args() + create_cluster() elif args.action == "cleanup": + logging.info("going to cleanup cluster") if not args.task_name: raise ValueError("task_name is required") cleanup(args.task_name) + elif args.action == "serve": + # serve mode + if not args.master_server_ip: + raise ValueError( + "No master server ip set, please run with --action create") + + logging.info("going to start serve and create cluster") + + init_args() + + logging.info("starting server in another thread") + server_thread = threading.Thread(target=start_server, args=(args, )) + server_thread.start() + + create_cluster() + server_thread.join() diff --git a/tools/aws_benchmarking/server/pserver.sh.template b/tools/aws_benchmarking/server/pserver.sh.template new file mode 100644 index 0000000000..6fbf2c5230 --- /dev/null +++ b/tools/aws_benchmarking/server/pserver.sh.template @@ -0,0 +1,2 @@ +#!/bin/bash +nvidia-docker run -p {PSERVER_PORT}:{PSERVER_PORT} -e "MASTER_ENDPOINT={MASTER_ENDPOINT}" -e "TASK_NAME={TASK_NAME}" -e "TRAINING_ROLE=PSERVER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file diff --git a/tools/aws_benchmarking/requirements.txt b/tools/aws_benchmarking/server/requirements.txt similarity index 100% rename from tools/aws_benchmarking/requirements.txt rename to tools/aws_benchmarking/server/requirements.txt diff --git a/tools/aws_benchmarking/server/trainer.sh.template b/tools/aws_benchmarking/server/trainer.sh.template new file mode 100644 index 0000000000..a83408733d --- /dev/null +++ b/tools/aws_benchmarking/server/trainer.sh.template @@ -0,0 +1,2 @@ +#!/bin/bash +nvidia-docker run -e "MASTER_ENDPOINT={MASTER_ENDPOINT}" -e "TASK_NAME={TASK_NAME}" -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file diff --git a/tools/aws_benchmarking/trainer.sh.template b/tools/aws_benchmarking/trainer.sh.template deleted file mode 100644 index 05a7d3b91d..0000000000 --- a/tools/aws_benchmarking/trainer.sh.template +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -nvidia-docker run -e "TRAINER_INDEX={TRAINER_INDEX}" -e "TRAINING_ROLE=TRAINER" -e "PSERVER_HOSTS={PSERVER_HOSTS}" {DOCKER_IMAGE} \ No newline at end of file -- GitLab