From a4bd3a691bd29a21756b3a1844a8f3a24fd1a667 Mon Sep 17 00:00:00 2001 From: liuyuhui Date: Thu, 24 Sep 2020 19:52:26 +0800 Subject: [PATCH] fix selected_gpus bug for local_cluster (#205) * fix selected_gpus bug for local_cluster * fix bugs about single machine multicards training for local_cluster * fix python -m config.yaml -b backend.yaml * re add selected_gpus_num config for single train mode * fix bug for submit for paddlecloud k8s Co-authored-by: wuzhihua <35824027+fuyinno4@users.noreply.github.com> --- core/engine/cluster/cluster.py | 40 +++- core/engine/cluster_utils.py | 324 +++++++++++++++++++++++++++++++++ core/engine/local_cluster.py | 93 +++++++--- run.py | 1 + 4 files changed, 426 insertions(+), 32 deletions(-) create mode 100644 core/engine/cluster_utils.py diff --git a/core/engine/cluster/cluster.py b/core/engine/cluster/cluster.py index a64e99e3..385738e0 100644 --- a/core/engine/cluster/cluster.py +++ b/core/engine/cluster/cluster.py @@ -19,10 +19,16 @@ import copy import os import subprocess import warnings +import sys +import logging from paddlerec.core.engine.engine import Engine from paddlerec.core.factory import TrainerFactory from paddlerec.core.utils import envs +import paddlerec.core.engine.cluster_utils as cluster_utils + +logger = logging.getLogger("root") +logger.propagate = False class ClusterEngine(Engine): @@ -47,8 +53,38 @@ class ClusterEngine(Engine): self.backend)) def start_worker_procs(self): - trainer = TrainerFactory.create(self.trainer) - trainer.run() + if (envs.get_runtime_environ("fleet_mode") == "COLLECTIVE"): + #trainer_ports = os.getenv("TRAINER_PORTS", None).split(",") + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + selected_gpus = range(int(os.getenv("TRAINER_GPU_CARD_COUNT"))) + else: + # change selected_gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; + # therefore selected_gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in range(int(os.getenv("TRAINER_GPU_CARD_COUNT"))): + assert x in cuda_visible_devices_list, "Can't find "\ + "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + selected_gpus = [cuda_visible_devices_list.index(x)] + print("selected_gpus:{}".format(selected_gpus)) + + factory = "paddlerec.core.factory" + cmd = [sys.executable, "-u", "-m", factory, self.trainer] + logs_dir = envs.get_runtime_environ("log_dir") + print("use_paddlecloud_flag:{}".format( + cluster_utils.use_paddlecloud())) + if cluster_utils.use_paddlecloud(): + cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + procs = cluster_utils.start_local_trainers( + cluster, pod, cmd, log_dir=logs_dir) + print("cluster:{}".format(cluster)) + print("pod:{}".format(pod)) + else: + trainer = TrainerFactory.create(self.trainer) + trainer.run() def start_master_procs(self): if self.backend == "PADDLECLOUD": diff --git a/core/engine/cluster_utils.py b/core/engine/cluster_utils.py new file mode 100644 index 00000000..10c11773 --- /dev/null +++ b/core/engine/cluster_utils.py @@ -0,0 +1,324 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +import socket +import time +import os +import signal +import copy +import sys +import subprocess +from contextlib import closing +import socket + +logger = logging.getLogger("root") +logger.propagate = False + + +class Cluster(object): + def __init__(self, hdfs): + self.job_server = None + self.pods = [] + self.hdfs = None + self.job_stage_flag = None + + def __str__(self): + return "job_server:{} pods:{} job_stage_flag:{} hdfs:{}".format( + self.job_server, [str(pod) for pod in self.pods], + self.job_stage_flag, self.hdfs) + + def __eq__(self, cluster): + if len(self.pods) != len(cluster.pods): + return False + + for a, b in zip(self.pods, cluster.pods): + if a != b: + return False + + if self.job_stage_flag != cluster.job_stage_flag: + return False + + return True + + def __ne__(self, cluster): + return not self.__eq__(cluster) + + def update_pods(cluster): + self.pods = copy.copy(cluster.pods) + + def trainers_nranks(self): + return len(self.trainers_endpoints()) + + def pods_nranks(self): + return len(self.pods) + + def trainers_endpoints(self): + r = [] + for pod in self.pods: + for t in pod.trainers: + r.append(t.endpoint) + return r + + def pods_endpoints(self): + r = [] + for pod in self.pods: + ep = "{}:{}".format(pod.addr, pod.port) + assert pod.port != None and pod.addr != None, "{} not a valid endpoint".format( + ep) + r.append(ep) + + return r + + def get_pod_by_id(self, pod_id): + for pod in self.pods: + if str(pod_id) == str(pod.id): + return pod + + return None + + +class JobServer(object): + def __init__(self): + self.endpoint = None + + def __str__(self): + return "{}".format(self.endpoint) + + def __eq__(self, j): + return self.endpint == j.endpoint + + def __ne__(self, j): + return not self == j + + +class Trainer(object): + def __init__(self): + self.gpus = [] + self.endpoint = None + self.rank = None + + def __str__(self): + return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint, + self.rank) + + def __eq__(self, t): + if len(self.gpus) != len(t.gpus): + return False + + if self.endpoint != t.endpoint or \ + self.rank != t.rank: + return False + + for a, b in zip(self.gpus, t.gpus): + if a != b: + return False + + return True + + def __ne__(self, t): + return not self == t + + def rank(self): + return self.rank + + +class Pod(object): + def __init__(self): + self.rank = None + self.id = None + self.addr = None + self.port = None + self.trainers = [] + self.gpus = [] + + def __str__(self): + return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format( + self.rank, self.id, self.addr, self.port, self.gpus, + [str(t) for t in self.trainers]) + + def __eq__(self, pod): + if self.rank != pod.rank or \ + self.id != pod.id or \ + self.addr != pod.addr or \ + self.port != pod.port: + logger.debug("pod {} != pod".format(self, pod)) + return False + + if len(self.trainers) != len(pod.trainers): + logger.debug("trainers {} != {}".format(self.trainers, + pod.trainers)) + return False + + for i in range(len(self.trainers)): + if self.trainers[i] != pod.trainers[i]: + logger.debug("trainer {} != {}".format(self.trainers[i], + pod.trainers[i])) + return False + + return True + + def __ne__(self, pod): + return not self == pod + + def parse_response(self, res_pods): + pass + + def rank(self): + return self.rank + + def get_visible_gpus(self): + r = "" + for g in self.gpus: + r += "{},".format(g) + + assert r != "", "this pod {} can't see any gpus".format(self) + + r = r[:-1] + return r + + +def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): + assert type(paddle_ports) is list, "paddle_ports must be list" + cluster = Cluster(hdfs=None) + trainer_rank = 0 + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + for i in range(len(selected_gpus)): + trainer = Trainer() + trainer.gpus.append(selected_gpus[i]) + trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) + trainer.rank = trainer_rank + trainer_rank += 1 + + pod.trainers.append(trainer) + cluster.pods.append(pod) + + pod_rank = node_ips.index(node_ip) + return cluster, cluster.pods[pod_rank] + + +def get_cloud_cluster(selected_gpus, args_port=None): + #you can automatically get ip info while using paddlecloud multi nodes mode. + node_ips = os.getenv("PADDLE_TRAINERS") + assert node_ips is not None, "PADDLE_TRAINERS should not be None" + print("node_ips:{}".format(node_ips)) + node_ip = os.getenv("POD_IP") + assert node_ip is not None, "POD_IP should not be None" + print("node_ip:{}".format(node_ip)) + node_rank = os.getenv("PADDLE_TRAINER_ID") + assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" + print("node_rank:{}".format(node_rank)) + node_ips = node_ips.split(",") + num_nodes = len(node_ips) + node_rank = int(node_rank) + + started_port = args_port + print("num_nodes:", num_nodes) + if num_nodes > 1: + try: + paddle_port = int(os.getenv("PADDLE_PORT", "")) + paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) + + if paddle_port_num >= len( + selected_gpus) and paddle_port != args_port: + logger.warning("Use Cloud specified port:{}.".format( + paddle_port)) + started_port = paddle_port + + except Exception as e: + print(e) + pass + + if started_port is None: + started_port = 6170 + + logger.debug("parsed from args:node_ips:{} \ + node_ip:{} node_rank:{} started_port:{}" + .format(node_ips, node_ip, node_rank, started_port)) + + ports = [x for x in range(started_port, started_port + len(selected_gpus))] + cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) + return cluster, cluster.pods[node_rank] + + +def use_paddlecloud(): + node_ips = os.getenv("PADDLE_TRAINERS", None) + node_ip = os.getenv("POD_IP", None) + node_rank = os.getenv("PADDLE_TRAINER_ID", None) + if node_ips is None or node_ip is None or node_rank is None: + return False + else: + return True + + +class TrainerProc(object): + def __init__(self): + self.proc = None + self.log_fn = None + self.log_offset = None + self.rank = None + self.local_rank = None + self.cmd = None + + +def start_local_trainers(cluster, pod, cmd, log_dir=None): + current_env = copy.copy(os.environ.copy()) + #paddle broadcast ncclUniqueId use socket, and + #proxy maybe make trainers unreachable, so delete them. + #if we set them to "", grpc will log error message "bad uri" + #so just delete them. + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + procs = [] + for idx, t in enumerate(pod.trainers): + proc_env = { + "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]), + "PADDLE_TRAINER_ID": "%d" % t.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) + } + + current_env.update(proc_env) + + logger.debug("trainer proc env:{}".format(current_env)) + + # cmd = [sys.executable, "-u", training_script] + + logger.info("start trainer proc:{} env:{}".format(cmd, proc_env)) + + fn = None + if log_dir is not None: + os.system("mkdir -p {}".format(log_dir)) + fn = open("%s/workerlog.%d" % (log_dir, idx), "a") + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = t.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = fn.tell() if fn else None + tp.cmd = cmd + + procs.append(proc) + + return procs diff --git a/core/engine/local_cluster.py b/core/engine/local_cluster.py index 88f21ef8..b6ff736e 100755 --- a/core/engine/local_cluster.py +++ b/core/engine/local_cluster.py @@ -19,9 +19,14 @@ import copy import os import sys import subprocess +import logging from paddlerec.core.engine.engine import Engine from paddlerec.core.utils import envs +import paddlerec.core.engine.cluster_utils as cluster_utils + +logger = logging.getLogger("root") +logger.propagate = False class LocalClusterEngine(Engine): @@ -97,42 +102,70 @@ class LocalClusterEngine(Engine): stderr=fn, cwd=os.getcwd()) procs.append(proc) + elif fleet_mode.upper() == "COLLECTIVE": - selected_gpus = self.envs["selected_gpus"].split(",") + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + selected_gpus = [ + x.strip() for x in self.envs["selected_gpus"].split(",") + ] + else: + # change selected_gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; + # therefore selected_gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in self.envs["selected_gpus"].split(","): + assert x in cuda_visible_devices_list, "Can't find "\ + "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + selected_gpus = [ + cuda_visible_devices_list.index(x.strip()) + for x in self.envs["selected_gpus"].split(",") + ] selected_gpus_num = len(selected_gpus) - for i in range(selected_gpus_num - 1): - while True: - new_port = envs.find_free_port() - if new_port not in ports: - ports.append(new_port) - break - user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - factory = "paddlerec.core.factory" cmd = [sys.executable, "-u", "-m", factory, self.trainer] - for i in range(selected_gpus_num): - current_env.update({ - "PADDLE_TRAINER_ENDPOINTS": user_endpoints, - "PADDLE_CURRENT_ENDPOINTS": user_endpoints[i], - "PADDLE_TRAINERS_NUM": str(worker_num), - "TRAINING_ROLE": "TRAINER", - "PADDLE_TRAINER_ID": str(i), - "FLAGS_selected_gpus": str(selected_gpus[i]), - "PADDLEREC_GPU_NUMS": str(selected_gpus_num) - }) - - os.system("mkdir -p {}".format(logs_dir)) - fn = open("%s/worker.%d" % (logs_dir, i), "w") - log_fns.append(fn) - proc = subprocess.Popen( - cmd, - env=current_env, - stdout=fn, - stderr=fn, - cwd=os.getcwd()) - procs.append(proc) + print("use_paddlecloud_flag:{}".format( + cluster_utils.use_paddlecloud())) + if cluster_utils.use_paddlecloud(): + cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + procs = cluster_utils.start_local_trainers( + cluster, pod, cmd, log_dir=logs_dir) + + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + for i in range(selected_gpus_num - 1): + while True: + new_port = envs.find_free_port() + if new_port not in ports: + ports.append(new_port) + break + user_endpoints = ",".join( + ["127.0.0.1:" + str(x) for x in ports]) + for i in range(selected_gpus_num): + current_env.update({ + "PADDLE_TRAINER_ENDPOINTS": user_endpoints, + "PADDLE_CURRENT_ENDPOINTS": user_endpoints[i], + "PADDLE_TRAINERS_NUM": str(worker_num), + "TRAINING_ROLE": "TRAINER", + "PADDLE_TRAINER_ID": str(i), + "FLAGS_selected_gpus": str(selected_gpus[i]), + "PADDLEREC_GPU_NUMS": str(selected_gpus_num) + }) + + os.system("mkdir -p {}".format(logs_dir)) + fn = open("%s/worker.%d" % (logs_dir, i), "w") + log_fns.append(fn) + proc = subprocess.Popen( + cmd, + env=current_env, + stdout=fn, + stderr=fn, + cwd=os.getcwd()) + procs.append(proc) # only wait worker to finish here for i, proc in enumerate(procs): diff --git a/run.py b/run.py index c916ecd0..8d21c43f 100755 --- a/run.py +++ b/run.py @@ -348,6 +348,7 @@ def cluster_engine(args): cluster_envs["fleet_mode"] = fleet_mode cluster_envs["engine_role"] = "WORKER" + cluster_envs["log_dir"] = "logs" cluster_envs["train.trainer.trainer"] = trainer cluster_envs["train.trainer.engine"] = "cluster" cluster_envs["train.trainer.executor_mode"] = executor_mode -- GitLab