From d5a66fd7a06fa16ab7ad6755536ff60c1e235794 Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Mon, 10 Aug 2020 10:26:30 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90paddle.fleet=E3=80=91support=20multi-n?= =?UTF-8?q?ode=20cpu=20training=20=20for=20fleetrun=20(#26011)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * support multi-ps training mode for fleetrun; test=develop --- python/paddle/fleet/launch.py | 168 +++++++++++++----- python/paddle/fleet/launch_utils.py | 41 ++++- .../tests/unittests/test_fleet_launch.sh | 8 + 3 files changed, 171 insertions(+), 46 deletions(-) diff --git a/python/paddle/fleet/launch.py b/python/paddle/fleet/launch.py index de5e0b66b3e..8b08c916c84 100644 --- a/python/paddle/fleet/launch.py +++ b/python/paddle/fleet/launch.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -paddle.distributed.launch is a module that spawns multiple distributed +paddle.distributed.launch is a module that spawns multiple distributed process on each training node for gpu training and cpu training. Usage: - In both of single node training or multiple node training, this module + In both of single node training or multiple node training, this module launch a process on each of the given gpu card or cpu machine. GPU training: 1. for single node training with all visible gpu cards: @@ -24,11 +24,10 @@ launch a process on each of the given gpu card or cpu machine. fleetrun --gpus="0,1,2,3" your_training_py (arg1 arg2 and all others) 3. for multiple node training such as two node:192.168.0.16, 192.168.0.17 on 192.168.0.16: - fleetrun --ips="192.168.0.16,192.168.0.17" --node_ip=192.168.0.16 \ + fleetrun --ips="192.168.0.16,192.168.0.17" \ your_training_py (arg1 arg2 and all others) on 192.168.0.17: fleetrun --ips="192.168.0.16,192.168.0.17" \ - --node_ip=192.168.0.17 \ your_training_py (arg1 arg2 and all others) CPU training: 1. for single node training with multi servers and workers: @@ -96,15 +95,14 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra "--servers", type=str, default="", help="User defined servers ip:port") parser.add_argument( "--workers", type=str, default="", help="User defined workers ip:port") - parser.add_argument( - "--worker_num", type=int, default=2, help="number of workers") + parser.add_argument("--worker_num", type=int, help="number of workers") - parser.add_argument( - "--server_num", type=int, default=2, help="number of servers") + parser.add_argument("--server_num", type=int, help="number of servers") parser.add_argument( "--log_dir", type=str, + default="log", help="The path for each process's log.If it's not set, the log will printed to default pipe." ) #positional @@ -129,11 +127,11 @@ def get_cluster_from_args(args, gpus): _, node_ip = get_host_name_ip() # node_ip = args.node_ip - assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips:{%s}" \ + assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \ % (node_ip, node_ips) node_rank = node_ips.index(node_ip) - logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format( + logger.debug("parsed from args: node_ips:{} node_ip:{} node_rank:{}".format( node_ips, node_ip, node_rank)) free_ports = None @@ -187,8 +185,11 @@ def launch_collective(args): cluster = None pod = None + start_port = 6170 + if os.environ.get('FLAGS_START_PORT') is not None: + start_port = os.environ.get('FLAGS_START_PORT') if cloud_utils.use_paddlecloud() and trainers_num != 1: - cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus) + cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus, start_port) logger.info("get cluster from cloud:{}".format(cluster)) else: # trainers_num = 1 or not use paddlecloud ips="a,b" @@ -213,11 +214,78 @@ def launch_collective(args): def launch_ps(args): - worker_num = args.worker_num - server_num = args.server_num - start_port = 6170 - if os.environ.get('FLAGS_START_PORT') is not None: - start_port = os.environ.get('FLAGS_START_PORT') + ports = None + if args.server_num: + server_num = args.server_num + ports = get_ports(server_num, 0) + server_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) + else: + assert args.servers != "", "The setting of CPU mode must be either server_num or servers." + server_endpoints = args.servers + server_endpoints_ips = [ + x.strip().split(":")[0] for x in server_endpoints.split(",") + ] + server_endpoints_port = [ + x.strip().split(":")[1] for x in server_endpoints.split(",") + ] + server_num = len(server_endpoints_ips) + + if args.worker_num: + worker_num = args.worker_num + ports = get_ports(worker_num, server_num) + worker_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) + else: + assert args.workers != "", "The setting of CPU mode must be either worker_num or workers." + worker_endpoints = args.workers + worker_endpoints_ips = [ + x.strip().split(":")[0] for x in worker_endpoints.split(",") + ] + worker_endpoints_port = [ + x.strip().split(":")[1] for x in worker_endpoints.split(",") + ] + worker_num = len(worker_endpoints_ips) + node_ips = list(set(server_endpoints_ips + worker_endpoints_ips)) + + # local train + if len(set(node_ips)) == 1: + current_node_ip = node_ips[0] + else: + _, current_node_ip = get_host_name_ip() + + assert current_node_ip in node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \ + % (current_node_ip, node_ips) + node_rank = node_ips.index(current_node_ip) + logger.debug( + "parsed from args: node_ips:{} current_node_ip:{} node_rank:{}, server_ports:{}". + format(node_ips, current_node_ip, node_rank, server_endpoints_port)) + + cluster = Cluster(hdfs=None) + server_rank = 0 + worker_rank = 0 + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + for i in range(len(server_endpoints_ips)): + if ip == server_endpoints_ips[i]: + server = Trainer() + server.endpoint = "%s:%s" % (ip, server_endpoints_port[i]) + server.rank = server_rank + server_rank += 1 + pod.servers.append(server) + for j in range(len(worker_endpoints_ips)): + if ip == worker_endpoints_ips[j]: + worker = Trainer() + worker.endpoint = "%s:%s" % (ip, worker_endpoints_port[i]) + worker.rank = worker_rank + worker_rank += 1 + pod.workers.append(worker) + + cluster.pods.append(pod) + + pod_rank = node_ips.index(current_node_ip) + pod = cluster.pods[pod_rank] + default_env = os.environ.copy() current_env = copy.copy(default_env) current_env.pop("http_proxy", None) @@ -225,68 +293,78 @@ def launch_ps(args): procs = [] cmds = [] log_fns = [] - ports = range(start_port, start_port + server_num, 1) - default_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - user_endpoints = "" - if args.servers == "": - user_endpoints = default_endpoints - else: - user_endpoints = args.servers - user_endpoints_ips = [x.split(":")[0] for x in user_endpoints.split(",")] - user_endpoints_port = [x.split(":")[1] for x in user_endpoints.split(",")] - for i in range(server_num): + for idx, cur_server in enumerate(pod.servers): current_env.update({ - "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, - "PADDLE_PORT": user_endpoints_port[i], + "PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints, + "PADDLE_PORT": cur_server.endpoint.split(":")[1], "TRAINING_ROLE": "PSERVER", "PADDLE_TRAINERS_NUM": str(worker_num), - "POD_IP": user_endpoints_ips[i] + "POD_IP": cur_server.endpoint.split(":")[0] }) cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args cmds.append(cmd) + if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/serverlog.%d" % (args.log_dir, i), "w") + fn = open("%s/serverlog.%d" % (args.log_dir, idx), "w") log_fns.append(fn) proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) else: proc = subprocess.Popen(cmd, env=current_env) - procs.append(proc) - for i in range(worker_num): + tp = TrainerProc() + tp.proc = proc + tp.rank = cur_server.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = 0 if fn else None + tp.cmd = cmd + + procs.append(tp) + + for idx, cur_worker in enumerate(pod.workers): current_env.update({ - "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, + "PADDLE_PSERVERS_IP_PORT_LIST": server_endpoints, "PADDLE_TRAINERS_NUM": str(worker_num), "TRAINING_ROLE": "TRAINER", - "PADDLE_TRAINER_ID": str(i) + "PADDLE_TRAINER_ID": str(cur_worker.rank) }) cmd = [sys.executable, "-u", args.training_script ] + args.training_script_args cmds.append(cmd) if args.log_dir is not None: os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/workerlog.%d" % (args.log_dir, i), "w") + fn = open("%s/workerlog.%d" % (args.log_dir, idx), "w") log_fns.append(fn) proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) else: proc = subprocess.Popen(cmd, env=current_env) - procs.append(proc) + + tp = TrainerProc() + tp.proc = proc + tp.rank = cur_worker.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = 0 if fn else None + tp.cmd = cmd + + procs.append(tp) # only wait worker to finish here for i, proc in enumerate(procs): - if i < server_num: + if i < len(pod.servers): continue - procs[i].wait() + procs[i].proc.wait() if len(log_fns) > 0: log_fns[i].close() print("all workers exit, going to finish parameter server", file=sys.stderr) - for i in range(server_num): + for i in range(len(pod.servers)): if len(log_fns) > 0: log_fns[i].close() - procs[i].terminate() + procs[i].proc.terminate() print("all parameter server are killed", file=sys.stderr) @@ -303,11 +381,15 @@ def launch(): co_arg for co_arg in collective_args if co_arg in " ".join(sys.argv[1:-1]) ] - if len(has_ps_args) > 0 or fluid.core.get_cuda_device_count() == 0: - logger.info("Run cpu parameter-sever mode.") + cuda_device_num = fluid.core.get_cuda_device_count() + if len(has_ps_args) > 0 or cuda_device_num == 0: + logger.info( + "Run parameter-sever cpu mode. pserver args:{}, cuda count:{}". + format(has_ps_args, cuda_device_num)) launch_ps(args) elif len(has_collective_args) > 0: - logger.info("Run gpu collective mode.") + logger.info("Run collective gpu mode. gpu args:{}, cuda count:{}". + format(has_collective_args, cuda_device_num)) launch_collective(args) else: logger.warning( diff --git a/python/paddle/fleet/launch_utils.py b/python/paddle/fleet/launch_utils.py index 040e7254f8c..350d8ae2b44 100644 --- a/python/paddle/fleet/launch_utils.py +++ b/python/paddle/fleet/launch_utils.py @@ -142,12 +142,16 @@ class Pod(object): self.addr = None self.port = None self.trainers = [] + self.servers = [] + self.workers = [] self.gpus = [] def __str__(self): - return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format( - self.rank, self.id, self.addr, self.port, self.gpus, - [str(t) for t in self.trainers]) + return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{} servers:{} \ + workers:{}".format(self.rank, self.id, self.addr, self.port, + self.gpus, [str(t) for t in self.trainers], + [str(s) for s in self.servers], + [str(w) for w in self.workers]) def __eq__(self, pod): if self.rank != pod.rank or \ @@ -168,6 +172,26 @@ class Pod(object): pod.trainers[i])) return False + if len(self.servers) != len(pod.servers): + logger.debug("servers {} != {}".format(self.servers, pod.servers)) + return False + + for i in range(len(self.servers)): + if self.servers[i] != pod.servers[i]: + logger.debug("servers {} != {}".format(self.servers[i], + pod.servers[i])) + return False + + if len(self.workers) != len(pod.workers): + logger.debug("workers {} != {}".format(self.workers, pod.workers)) + return False + + for i in range(len(self.workers)): + if self.workers[i] != pod.workers[i]: + logger.debug("workers {} != {}".format(self.workers[i], + pod.workers[i])) + return False + return True def __ne__(self, pod): @@ -303,6 +327,17 @@ def find_free_ports(num): return None +def get_ports(num, offset): + if os.environ.get('FLAGS_START_PORT') is None: + ports = find_free_ports(num) + if ports is not None: + ports = list(ports) + else: + start_port = os.environ.get('FLAGS_START_PORT') + ports = range(start_port + offset, start_port + offset + num, 1) + return ports + + class TrainerProc(object): def __init__(self): self.proc = None diff --git a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh index 5e5c4e17f5b..ebe99ffb635 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_launch.sh +++ b/python/paddle/fluid/tests/unittests/test_fleet_launch.sh @@ -10,6 +10,14 @@ function test_launch_ps(){ echo "test pserver launch failed" exit -1 fi + + fleetrun --servers="120.0.0.1:6780,120.0.0.1:6781" --workers="120.0.0.1:6782,120.0.0.1:6783" fleet_ps_training.py 2> ut.elog + if grep -q "server are killed" ut.elog; then + echo "test pserver launch succeed" + else + echo "test pserver launch failed" + exit -1 + fi } if [[ ${WITH_GPU} == "OFF" ]]; then -- GitLab