From 4efcb9df45675dc1b0bc58eb03dd425e31a506ce Mon Sep 17 00:00:00 2001 From: MrChengmo Date: Wed, 23 Sep 2020 13:39:13 +0800 Subject: [PATCH] update fleet run for multi device support --- python/paddle/distributed/fleet/launch.py | 44 ++----- .../paddle/distributed/fleet/launch_utils.py | 123 ++++++++++++------ 2 files changed, 91 insertions(+), 76 deletions(-) diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c055574d287..1a87fc2c300 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -106,6 +106,14 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra help="The path for each process's log.If it's not set, the log will printed to default pipe." ) + base_group.add_argument( + "--gpus", + type=str, + default=None, + help="It's for gpu training and the training process will run on the gpus," + "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." + ) + base_group.add_argument( "training_script", type=str, @@ -124,13 +132,6 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra type=str, default="127.0.0.1", help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") - collective_group.add_argument( - "--gpus", - type=str, - default=None, - help="It's for gpu training and the training process will run on the gpus," - "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." - ) ps_group = parser.add_argument_group("Parameter-Server Parameters") # for parameter server @@ -193,35 +194,6 @@ def get_cluster_from_args(args, gpus): return get_cluster(node_ips, node_ip, trainer_endpoints, gpus) -def get_gpus(gpus): - if gpus is None: - gpus_num = fluid.core.get_cuda_device_count() - res_gpus = [str(x) for x in range(0, gpus_num)] - else: - cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") - if cuda_visible_devices is None or cuda_visible_devices == "": - res_gpus = [x.strip() for x in gpus.split(',')] - else: - # change gpus into relative values - # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; - # therefore gpus=0,1,2,3 - cuda_visible_devices_list = cuda_visible_devices.split(',') - for x in gpus.split(','): - assert x in cuda_visible_devices_list, "Can't find "\ - "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ - % (x, cuda_visible_devices) - res_gpus = [ - cuda_visible_devices_list.index(x.strip()) - for x in gpus.split(',') - ] - logger.info("Change selected_gpus into reletive values. --ips:{} " - "will change into relative_ips:{} according to your " - "CUDA_VISIBLE_DEVICES:{}".format( - gpus, res_gpus, cuda_visible_devices_list)) - - return res_gpus - - def launch_collective(args): # parse arguments, used for cloud-single-machine and local gpus = get_gpus(args.gpus) diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index ecc1afbd0c9..9e40f2ac60a 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -526,6 +526,35 @@ def watch_local_trainers(procs, nranks): return alive +def get_gpus(gpus): + if gpus is None: + gpus_num = fluid.core.get_cuda_device_count() + res_gpus = [str(x) for x in range(0, gpus_num)] + else: + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + res_gpus = [x.strip() for x in gpus.split(',')] + else: + # change gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7; + # therefore gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in gpus.split(','): + assert x in cuda_visible_devices_list, "Can't find "\ + "your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + res_gpus = [ + cuda_visible_devices_list.index(x.strip()) + for x in gpus.split(',') + ] + logger.info("Change selected_gpus into reletive values. --ips:{} " + "will change into relative_ips:{} according to your " + "CUDA_VISIBLE_DEVICES:{}".format( + gpus, res_gpus, cuda_visible_devices_list)) + + return res_gpus + + def direct_start(args): # run ps-cpu mode on paddlecloud, using given envs cmd = [sys.executable, "-u", args.training_script] + \ @@ -636,6 +665,7 @@ class ParameterServerLauncher(object): args.workers.split(",") ) == self.worker_num, "The worker_num and workers doesn't match. Expect workers endpoints num epual to worker_num, but received workers enpoint num: {} and worker_num {}".format( len(args.workers.split(",")), self.worker_num) + self.worker_endpoints = args.workers else: ports = get_ports(self.worker_num, self.server_num) @@ -750,9 +780,9 @@ class ParameterServerLauncher(object): self.gloo_rendezvous_dir = tempfile.mkdtemp() # 3. subproces start - self.procs = [] - self.cmds = [] - self.log_fns = [] + self.procs = {"worker": [], "server": [], "heter_worker": []} + self.cmds = {"worker": [], "server": [], "heter_worker": []} + self.log_fns = {"worker": [], "server": [], "heter_worker": []} self.start_pod_server(args, pod) self.start_pod_worker(args, pod) @@ -762,30 +792,37 @@ class ParameterServerLauncher(object): "Please check servers, workers and heter_worker logs in {}/workerlog.*, {}/serverlog.* and {}/heterlog.*". format(args.log_dir, args.log_dir, args.log_dir)) - # only wait worker to finish here - for i, proc in enumerate(self.procs): - if i < len(pod.servers) and i > len(pod.servers) + len(pod.workers): - continue - self.procs[i].proc.wait() - if len(self.log_fns) > 0: - self.log_fns[i].close() - logger.info( - "all workers exit, going to finish parameter server and heter_worker" - ) - - for i in range( - len(pod.servers + pod.workers), - len(pod.servers + pod.workers + pod.heter_workers)): - if len(self.log_fns) > 0: - self.log_fns[i].close() - self.procs[i].proc.terminate() - logger.info("all heter worker are killed") - - for i in range(len(pod.servers)): - if len(self.log_fns) > 0: - self.log_fns[i].close() - self.procs[i].proc.terminate() - logger.info("all parameter server are killed", file=sys.stderr) + # 4. wait for finish training + if len(self.procs["worker"]) > 0: + # if node has worker procs + # only wait worker to finish here + for i, proc in enumerate(self.procs["worker"]): + self.procs["worker"][i].proc.wait() + if len(self.log_fns["worker"]) > 0: + self.log_fns["worker"][i].close() + logger.info( + "all workers exit, going to finish parameter server and heter_worker" + ) + if len(self.procs["server"]) > 0: + for i, proc in enumerate(self.procs["server"]): + self.log_fns["server"][i].close() + self.procs["server"][i].proc.terminate() + logger.info("all parameter server are killed") + if len(self.procs["heter_worker"]) > 0: + for i, proc in enumerate(self.procs["heter_worker"]): + self.log_fns["heter_worker"][i].close() + self.procs["heter_worker"][i].proc.terminate() + logger.info("all heter_worker are killed") + else: + # if node has not worker procs + # blocking training process + if len(self.procs["server"]) > 0: + for i, proc in enumerate(self.procs["server"]): + self.procs["server"][i].proc.wait() + + if len(self.procs["heter_worker"]) > 0: + for i, proc in enumerate(self.procs["heter_worker"]): + self.procs["heter_worker"][i].proc.wait() if os.path.exists(self.gloo_rendezvous_dir): shutil.rmtree(self.gloo_rendezvous_dir) @@ -814,7 +851,7 @@ class ParameterServerLauncher(object): cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args - self.cmds.append(cmd) + self.cmds["server"].append(cmd) if idx == 0: logger.info( @@ -827,7 +864,7 @@ class ParameterServerLauncher(object): if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w") - self.log_fns.append(fn) + self.log_fns["server"].append(fn) proc = subprocess.Popen( cmd, env=current_env, stdout=fn, stderr=fn) else: @@ -841,7 +878,7 @@ class ParameterServerLauncher(object): tp.log_offset = fn.tell() if fn else None tp.cmd = cmd - self.procs.append(tp) + self.procs["server"].append(tp) def start_pod_worker(self, args, pod): default_env = os.environ.copy() @@ -850,13 +887,16 @@ class ParameterServerLauncher(object): current_env.pop("https_proxy", None) heter_device_num = 0 + device_list = [] if args.heter_worker_device == "gpu": - heter_device_num = fluid.core.get_cuda_device_count() + device_list = get_gpus(args.gpus) + heter_device_num = len(device_list) elif args.heter_worker_device == "xpu": heter_device_num = fluid.core.get_xpu_device_count() + device_list = [str(x) for x in range(0, heter_device_num)] for idx, cur_worker in enumerate(pod.workers): - device_id = str(idx % heter_device_num) + device_id = str(device_list[idx % heter_device_num]) proc_env = { "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, @@ -878,7 +918,7 @@ class ParameterServerLauncher(object): cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args - self.cmds.append(cmd) + self.cmds["worker"].append(cmd) if idx == 0: logger.info( @@ -891,7 +931,7 @@ class ParameterServerLauncher(object): if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w") - self.log_fns.append(fn) + self.log_fns["worker"].append(fn) proc = subprocess.Popen( cmd, env=current_env, stdout=fn, stderr=fn) else: @@ -905,7 +945,7 @@ class ParameterServerLauncher(object): tp.log_offset = fn.tell() if fn else None tp.cmd = cmd - self.procs.append(tp) + self.procs["worker"].append(tp) def start_pod_heter_worker(self, args, pod): default_env = os.environ.copy() @@ -914,14 +954,17 @@ class ParameterServerLauncher(object): current_env.pop("https_proxy", None) heter_device_num = 0 + device_list = [] if args.heter_worker_device == "gpu": - heter_device_num = fluid.core.get_cuda_device_count() + device_list = get_gpus(args.gpus) + heter_device_num = len(device_list) elif args.heter_worker_device == "xpu": heter_device_num = fluid.core.get_xpu_device_count() + device_list = [str(x) for x in range(0, heter_device_num)] assert heter_device_num != 0 for idx, cur_heter_worker in enumerate(pod.heter_workers): - device_id = str(idx % heter_device_num) + device_id = str(device_list[idx % heter_device_num]) proc_env = { "PADDLE_PSERVERS_IP_PORT_LIST": self.server_endpoints, "PADDLE_TRAINER_ENDPOINTS": self.worker_endpoints, @@ -944,7 +987,7 @@ class ParameterServerLauncher(object): cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args - self.cmds.append(cmd) + self.cmds["heter_worker"].append(cmd) if idx == 0: logger.info( @@ -957,7 +1000,7 @@ class ParameterServerLauncher(object): if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) fn = open("%s/heterlog.%d" % (args.log_dir, idx), "w") - self.log_fns.append(fn) + self.log_fns["heter_worker"].append(fn) proc = subprocess.Popen( cmd, env=current_env, stdout=fn, stderr=fn) else: @@ -971,4 +1014,4 @@ class ParameterServerLauncher(object): tp.log_offset = fn.tell() if fn else None tp.cmd = cmd - self.procs.append(tp) + self.procs["heter_worker"].append(tp) -- GitLab