From 24a063f6ac0ba1122b5b6bec524c6ec659197e5f Mon Sep 17 00:00:00 2001 From: gongweibao Date: Fri, 3 Apr 2020 06:46:10 -0500 Subject: [PATCH] Add fleet checkpoint on local fs and remote fs(such as hdfs) for EDL (#22586) --- paddle/fluid/framework/io/CMakeLists.txt | 2 +- paddle/fluid/framework/io/shell.cc | 39 +- paddle/fluid/framework/io/shell.h | 7 +- paddle/fluid/pybind/pybind.cc | 6 +- python/paddle/distributed/cloud_utils.py | 79 ++++ python/paddle/distributed/fs_wrapper.py | 223 +++++++++ python/paddle/distributed/launch.py | 274 +++-------- python/paddle/distributed/utils.py | 424 ++++++++++++++++++ .../incubate/fleet/collective/__init__.py | 204 ++++++++- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../tests/unittests/test_fleet_checkpoint.py | 77 ++++ .../fluid/tests/unittests/test_launch.sh | 6 +- python/requirements.txt | 1 + 13 files changed, 1130 insertions(+), 214 deletions(-) create mode 100644 python/paddle/distributed/cloud_utils.py create mode 100644 python/paddle/distributed/fs_wrapper.py create mode 100644 python/paddle/distributed/utils.py create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py diff --git a/paddle/fluid/framework/io/CMakeLists.txt b/paddle/fluid/framework/io/CMakeLists.txt index 1fb2d3d259a..50183e0bf54 100644 --- a/paddle/fluid/framework/io/CMakeLists.txt +++ b/paddle/fluid/framework/io/CMakeLists.txt @@ -1,4 +1,4 @@ cc_library(fs SRCS fs.cc DEPS string_helper glog boost) -cc_library(shell SRCS shell.cc DEPS string_helper glog) +cc_library(shell SRCS shell.cc DEPS string_helper glog timer) cc_test(test_fs SRCS test_fs.cc DEPS fs shell) diff --git a/paddle/fluid/framework/io/shell.cc b/paddle/fluid/framework/io/shell.cc index 53afa2ce71c..76828286819 100644 --- a/paddle/fluid/framework/io/shell.cc +++ b/paddle/fluid/framework/io/shell.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "paddle/fluid/framework/io/shell.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/timer.h" namespace paddle { namespace framework { @@ -296,23 +298,48 @@ std::pair, std::shared_ptr> shell_p2open( #endif } -std::string shell_get_command_output(const std::string& cmd) { +std::string shell_get_command_output(const std::string& cmd, int time_out, + int sleep_inter, bool print_cmd) { #if defined _WIN32 || defined __APPLE__ - return ""; + PADDLE_THROW(platform::errors::Unimplemented( + "This function(shell_get_command_output) is not implemented under _WIN32 " + "or __APPLE__.")); #else int err_no = 0; + platform::Timer timer; do { + if (print_cmd) { + LOG(INFO) << "exec cmd:[" << cmd << "]"; + } err_no = 0; std::shared_ptr pipe = shell_popen(cmd, "r", &err_no); string::LineFileReader reader; - if (reader.getdelim(&*pipe, 0)) { - pipe = nullptr; - if (err_no == 0) { + char* buf = reader.getdelim(&*pipe, 0); + if (err_no == 0) { + if (buf) { return reader.get(); } + return ""; + } + + if (sleep_inter > 0) { + usleep(sleep_inter); } - } while (err_no == -1); + + timer.Pause(); + if (time_out > 0 && timer.ElapsedMS() >= time_out) { + PADDLE_THROW(paddle::platform::errors::ExecutionTimeout( + "shell_get_command_output execute error errno:%d and try until " + "timeout.", + errno)); + return ""; + } + timer.Resume(); + + pipe = nullptr; + } while (err_no); + return ""; #endif } diff --git a/paddle/fluid/framework/io/shell.h b/paddle/fluid/framework/io/shell.h index 0aebe86c4c5..194b1c0edaf 100644 --- a/paddle/fluid/framework/io/shell.h +++ b/paddle/fluid/framework/io/shell.h @@ -65,7 +65,12 @@ inline void shell_execute(const std::string& cmd) { } while (err_no == -1); } -extern std::string shell_get_command_output(const std::string& cmd); +// timeout:ms, default -1 means forever. +// sleep_inter:ms, default -1 means not sleep. +extern std::string shell_get_command_output(const std::string& cmd, + int time_out = -1, + int sleep_inter = -1, + bool print_cmd = false); } // namespace framework } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 4fce0e5eb55..62866a492cb 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1494,8 +1494,10 @@ All parameter, weight, gradient are variables in Paddle. m.def("is_compiled_with_mkldnn", IsCompiledWithMKLDNN); m.def("is_compiled_with_brpc", IsCompiledWithBrpc); m.def("is_compiled_with_dist", IsCompiledWithDIST); - m.def("run_cmd", [](const std::string &cmd) -> const std::string { - return paddle::framework::shell_get_command_output(cmd); + m.def("run_cmd", [](const std::string &cmd, int time_out = -1, + int sleep_inter = -1) -> const std::string { + return paddle::framework::shell_get_command_output(cmd, time_out, + sleep_inter); }); #ifdef PADDLE_WITH_CUDA m.def("is_float16_supported", [](const platform::CUDAPlace &place) -> bool { diff --git a/python/paddle/distributed/cloud_utils.py b/python/paddle/distributed/cloud_utils.py new file mode 100644 index 00000000000..fcd883d476b --- /dev/null +++ b/python/paddle/distributed/cloud_utils.py @@ -0,0 +1,79 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from utils import get_cluster, logger +import os + + +def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus): + """ + args_node_ips, args_node_ip:string + """ + #you can automatically get ip info while using paddlecloud multi nodes mode. + node_ips = os.getenv("PADDLE_TRAINERS") + assert node_ips is not None, "PADDLE_TRAINERS should not be None" + + node_ip = os.getenv("POD_IP") + assert node_ip is not None, "POD_IP should not be None" + + node_rank = os.getenv("PADDLE_TRAINER_ID") + assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" + + node_ips = node_ips.split(",") + num_nodes = len(node_ips) + node_rank = int(node_rank) + + if node_ip != "127.0.0.1" and node_ip != args_node_ip: + logger.warning("Please NOTE: When using paddlecloud, node_ip is \ +automatically got from POD_IP. Your input node_ip: {} doesn't equals to \ +node_ip: {} from paddlecloud environment.".format(args_node_ip, node_ip)) + + if args_node_ips != "127.0.0.1" and args_node_ips != ",".join(node_ips): + logger.warning( + "Please NOTE: When using paddlecloud, cluster_node_ips is \ +automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ +Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ +paddlecloud environment.".format(args_node_ips, node_ips)) + + started_port = args_port + print("num_nodes:", num_nodes) + if num_nodes > 1: + try: + paddle_port = int(os.getenv("PADDLE_PORT", "")) + paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) + + if paddle_port_num >= len( + selected_gpus) and paddle_port != args_port: + logger.warning("Use Cloud specified port:{}.".format( + paddle_port)) + started_port = paddle_port + + except Exception as e: + print(e) + pass + + if started_port is None: + started_port = 6170 + + logger.debug("parsed from args:node_ips:{} \ + node_ip:{} node_rank:{} started_port:{}" + .format(node_ips, node_ip, node_rank, started_port)) + + ports = [x for x in range(started_port, started_port + len(selected_gpus))] + cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) + return cluster, cluster.pods[node_rank] + + +def get_trainers_num(): + return int(os.getenv("PADDLE_TRAINERS_NUM", "1")) diff --git a/python/paddle/distributed/fs_wrapper.py b/python/paddle/distributed/fs_wrapper.py new file mode 100644 index 00000000000..014ef3ef8de --- /dev/null +++ b/python/paddle/distributed/fs_wrapper.py @@ -0,0 +1,223 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import sys +import abc +import os +from pathlib import PurePosixPath +import shutil + + +class FS(object): + @abc.abstractmethod + def list_dirs(self, fs_path): + pass + + @abc.abstractmethod + def ls_dir(self, fs_path): + pass + + @abc.abstractmethod + def stat(self, fs_path): + pass + + @abc.abstractmethod + def upload(self, local_path, fs_path): + pass + + @abc.abstractmethod + def download(self, fs_path, local_path): + pass + + @abc.abstractmethod + def mkdir(self, fs_path): + pass + + @abc.abstractmethod + def mv(self, fs_src_path, fs_dst_path): + pass + + @abc.abstractmethod + def rmr(self, fs_path): + pass + + @abc.abstractmethod + def rm(self, fs_path): + pass + + @abc.abstractmethod + def delete(self, fs_path): + pass + + @abc.abstractmethod + def need_upload_download(self): + pass + + +class LocalFS(FS): + def list_dirs(self, fs_path): + if not self.stat(fs_path): + return [] + + return [ + f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f) + ] + + def ls_dir(self, fs_path): + return [f for f in os.listdir(fs_path)] + + def stat(self, fs_path): + return os.path.exists(fs_path) + + def mkdir(self, fs_path): + assert not os.path.isfile(fs_path), "{} is already a file".format( + fs_path) + os.system("mkdir -p {}".format(fs_path)) + + def mv(self, fs_src_path, fs_dst_path): + os.rename(fs_src_path, fs_dst_path) + + def rmr(self, fs_path): + shutil.rmtree(fs_path) + + def rm(self, fs_path): + os.remove(fs_path) + + def delete(self, fs_path): + if not self.stat(fs_path): + return + + if os.path.isfile(fs_path): + return self.rm(fs_path) + + return self.rmr(fs_path) + + def need_upload_download(self): + return False + + +class BDFS(FS): + def __init__(self, + hdfs_name, + hdfs_ugi, + time_out=20 * 60 * 1000, + sleep_inter=1000): + self._base_cmd = "hadoop fs -Dfs.default.name=\"{}\" -Dhadoop.job.ugi=\"{}\"".format( + hdfs_name, hdfs_ugi) + self._time_out = time_out + self._sleep_inter = sleep_inter + + def _run_cmd(self, cmd): + ret = fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + if len(ret) <= 0: + return [] + + lines = ret.splitlines() + return lines + + def list_dirs(self, fs_path): + if not self.stat(fs_path): + return [] + + dirs, _ = self.ls_dir(fs_path) + return dirs + + def ls_dir(self, fs_path): + """ + list directory under fs_path, and only give the pure name, not include the fs_path + """ + cmd = "{} -ls {}".format(self._base_cmd, fs_path) + lines = self._run_cmd(cmd) + + dirs = [] + files = [] + for line in lines: + arr = line.split() + if len(arr) != 8: + continue + + if fs_path not in arr[7]: + continue + + p = PurePosixPath(arr[7]) + if arr[0][0] == 'd': + dirs.append(p.name) + else: + files.append(p.name) + + return dirs, files + + def is_dir(self, fs_path): + cmd = "{} -test -d {} ; echo $?".format(self._base_cmd, fs_path) + + test = self._run_cmd(cmd) + if test[0].strip() == "0": + return True + + return False + + def stat(self, fs_path): + cmd = "{} -test -e {} ; echo $?".format(self._base_cmd, fs_path) + + test = self._run_cmd(cmd) + if test[0].strip() == "0": + return True + + return False + + def upload(self, local_path, fs_path): + cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path) + fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + + def download(self, fs_path, local_path): + cmd = "{} -get {} {}/".format(self._base_cmd, fs_path, local_path) + fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + + def mkdir(self, fs_path): + + if not self.stat(fs_path): + cmd = "{} -mkdir {}".format(self._base_cmd, fs_path) + fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + + def mv(self, fs_src_path, fs_dst_path): + cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path) + fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + + def rmr(self, fs_path): + if not self.stat(fs_path): + return + + cmd = "{} -rmr {}".format(self._base_cmd, fs_path) + return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + + def rm(self, fs_path): + if not self.stat(fs_path): + return + + cmd = "{} -rm {}".format(self._base_cmd, fs_path) + return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter) + + def delete(self, fs_path): + if not self.stat(fs_path): + return + + is_dir = self.is_dir(fs_path) + if is_dir: + return self.rmr(fs_path) + + return self.rm(fs_path) + + def need_upload_download(self): + return True diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index cd27b05b63f..1228333f5d3 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -36,7 +36,6 @@ launch a process on each of the given gpu card. """ from __future__ import print_function -import logging import sys from sys import version import subprocess @@ -45,17 +44,11 @@ import time import six import copy from argparse import ArgumentParser, REMAINDER +import paddle import paddle.fluid as fluid -from contextlib import closing -import socket -logger = logging.getLogger() -logger.setLevel(logging.INFO) -log_handler = logging.StreamHandler() -log_format = logging.Formatter( - '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') -log_handler.setFormatter(log_format) -logger.addHandler(log_handler) +from paddle.distributed.utils import * +import paddle.distributed.cloud_utils as cloud_utils def _print_arguments(args): @@ -65,32 +58,6 @@ def _print_arguments(args): print("------------------------------------------------") -def find_free_ports(num): - def __free_port(): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(('', 0)) - return s.getsockname()[1] - - port_set = set() - step = 0 - while True: - port = __free_port() - if port not in port_set: - port_set.add(port) - - if len(port_set) >= num: - return port_set - - step += 1 - if step > 100: - print( - "can't find avilable port and use the specified static port now!" - ) - return None - - return None - - def _parse_args(): """ Helper function parsing the command line options @@ -146,6 +113,12 @@ POD_IP (current node ip address, not needed for local training) "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training." ) + parser.add_argument( + "--log_level", + type=int, + default=20, # logging.INFO, details are here:https://docs.python.org/3/library/logging.html#levels + help="Logging level, default is logging.INFO") + parser.add_argument( "--log_dir", type=str, @@ -166,196 +139,97 @@ POD_IP (current node ip address, not needed for local training) return parser.parse_args() -def terminate_procs(procs): - for p in procs: - if p.poll() is None: - p.terminate() +def get_cluster_from_args(args, selected_gpus): + node_ips = [x.strip() for x in args.cluster_node_ips.split(',')] + node_ip = args.node_ip + node_rank = node_ips.index(node_ip) + + logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format( + node_ips, node_ip, node_rank)) + free_ports = None + if not args.use_paddlecloud and len( + node_ips) <= 1 and args.started_port is None: + free_ports = find_free_ports(len(selected_gpus)) + if free_ports is not None: + free_ports = list(free_ports) + else: + free_ports = [ + x + for x in range(args.started_port, args.started_port + len( + selected_gpus)) + ] -def start_procs(args): - """ - """ - default_env = os.environ.copy() + return get_cluster(node_ips, node_ip, free_ports, selected_gpus) - current_node_ip = args.node_ip - node_ips = [x.strip() for x in args.cluster_node_ips.split(',')] - node_id = node_ips.index(current_node_ip) - if args.use_paddlecloud: - trainer_nums = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) - if trainer_nums != 1: - #you can automatically get ip info while using paddlecloud multi nodes mode. - current_node_ip = os.getenv("POD_IP") - assert current_node_ip is not None, "POD_IP should not be None" - node_ips = os.getenv("PADDLE_TRAINERS") - assert node_ips is not None, "PADDLE_TRAINERS should not be None" - node_ips = node_ips.split(",") - node_id = os.getenv("PADDLE_TRAINER_ID") - assert node_id is not None, "PADDLE_TRAINER_ID should not be None" - node_id = int(node_id) - - if args.node_ip != "127.0.0.1" and current_node_ip != args.node_ip: - logger.warning( - "Please NOTE: When using paddlecloud, current_node_ip is \ -automatically got from POD_IP. Your input node_ip: {} doesn't equals to \ -current_node_ip: {} from paddlecloud environment." - .format(args.node_ip, current_node_ip)) - if args.cluster_node_ips != "127.0.0.1" and args.cluster_node_ips != ",".join( - node_ips): - logger.warning( - "Please NOTE: When using paddlecloud, cluster_node_ips is \ -automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\ -Your input cluster_node_ips: {} doesn't equals to IPs: {} from \ -paddlecloud environment.".format(args.cluster_node_ips, node_ips)) - num_nodes = len(node_ips) - - if args.selected_gpus is None: + +def get_gpus(selected_gpus): + if selected_gpus is None: gpus_num = fluid.core.get_cuda_device_count() selected_gpus = [str(x) for x in range(0, gpus_num)] else: cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") if cuda_visible_devices is None or cuda_visible_devices == "": - selected_gpus = [x.strip() for x in args.selected_gpus.split(',')] + selected_gpus = [x.strip() for x in selected_gpus.split(',')] else: # change selected_gpus into relative values # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; # therefore selected_gpus=0,1,2,3 cuda_visible_devices_list = cuda_visible_devices.split(',') - for x in args.selected_gpus.split(','): + for x in selected_gpus.split(','): assert x in cuda_visible_devices_list, "Can't find "\ "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ % (x, cuda_visible_devices) selected_gpus = [ cuda_visible_devices_list.index(x.strip()) - for x in args.selected_gpus.split(',') + for x in selected_gpus.split(',') ] - selected_gpus_num = len(selected_gpus) - - if args.use_paddlecloud and num_nodes > 1: - cloud_paddle_port = os.getenv("PADDLE_PORT", "") - cloud_paddle_port_num = os.getenv("PADDLE_PORTS_NUM", "") - if cloud_paddle_port != "" and cloud_paddle_port_num != "": - cloud_paddle_port_num = int(cloud_paddle_port_num) - if cloud_paddle_port_num >= selected_gpus_num: - args.started_port = int(cloud_paddle_port) - logger.warning("Use Cloud specified port:{}.".format( - cloud_paddle_port)) - free_ports = None - if not args.use_paddlecloud and num_nodes <= 1 and args.started_port is None: - free_ports = find_free_ports(selected_gpus_num) - if free_ports is not None: - free_ports = list(free_ports) - args.started_port = free_ports[0] + return selected_gpus - if args.started_port is None: - args.started_port = 6170 - if free_ports is None: - free_ports = [ - x - for x in range(args.started_port, args.started_port + - selected_gpus_num) - ] +def launch(args): + # parse arguments, used for cloud-single-machine and local + selected_gpus = get_gpus(args.selected_gpus) + trainers_num = cloud_utils.get_trainers_num() + logger.debug("parsed from args trainerss_num:{} selected_gpus:{}".format( + trainers_num, selected_gpus)) - trainers_endpoints = "" - for ip in node_ips: - for i in range(0, selected_gpus_num): - if trainers_endpoints != "": - trainers_endpoints += "," - trainers_endpoints += "%s:%d" % (ip, free_ports[i]) + cluster = None + pod = None - nranks = num_nodes * selected_gpus_num + if args.use_paddlecloud and trainers_num != 1: + cluster, pod = cloud_utils.get_cloud_cluster( + args.cluster_node_ips, args.node_ip, args.started_port, + selected_gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + else: + cluster, pod = get_cluster_from_args(args, selected_gpus) + logger.info("get cluster from args:{}".format(cluster)) - if args.print_config: - print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id, - ", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes, - ", node_ips:", node_ips, ", nranks:", nranks) - - current_env = copy.copy(default_env) - #paddle broadcast ncclUniqueId use socket, and - #proxy maybe make trainers unreachable, so delete them. - #if we set them to "", grpc will log error message "bad uri" - #so just delete them. - current_env.pop("http_proxy", None) - current_env.pop("https_proxy", None) - - procs = [] - log_fns = [] - cmds = [] - ranks = [] - for i in range(0, selected_gpus_num): - rank = (node_id * selected_gpus_num + i) - current_env.update({ - "FLAGS_selected_gpus": "%s" % selected_gpus[i], - "PADDLE_TRAINER_ID": "%d" % rank, - "PADDLE_CURRENT_ENDPOINT": - "%s:%d" % (current_node_ip, free_ports[i]), - "PADDLE_TRAINERS_NUM": "%d" % nranks, - "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints - }) - - cmd = [sys.executable, "-u", args.training_script - ] + args.training_script_args - cmds.append(cmd) - - if args.log_dir is not None: - os.system("mkdir -p {}".format(args.log_dir)) - fn = open("%s/workerlog.%d" % (args.log_dir, i), "w") - log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) - else: - proc = subprocess.Popen(cmd, env=current_env) - - procs.append(proc) - ranks.append(rank) - - try: - alive = True - error = False - error_rank = [] - # wait all process finish or one error - while alive and not error: - alive = False - for rank, p in zip(ranks, procs): - ret = p.poll() - if ret is None: - alive = True - elif ret != 0: - error = True - error_rank.append(rank) - time.sleep(1) - - if error: - terminate_procs(procs) - exit(1) - - except KeyboardInterrupt: - logger.warning("KeyboardInterrupt, exit") - terminate_procs(procs) - raise - except SystemExit: - logger.error( - "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". - format(nranks, error_rank)) - terminate_procs(procs) - raise - except: - logger.error( - "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". - format(nranks, error_rank)) - terminate_procs(procs) - raise - finally: - for fn in log_fns: - fn.close() - - -def launch(): + procs = start_local_trainers( + cluster, + pod, + training_script=args.training_script, + training_script_args=args.training_script_args, + log_dir=args.log_dir) + + while True: + alive = watch_local_trainers(procs, cluster.trainers_nranks()) + + if not alive: + logger.info("Local procs complete, POD info:{}".format(pod)) + break + + time.sleep(3) + + +if __name__ == "__main__": args = _parse_args() + + logger = get_logger(args.log_level) + if args.print_config: _print_arguments(args) - start_procs(args) - -if __name__ == "__main__": - launch() + launch(args) diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py new file mode 100644 index 00000000000..b6295e41198 --- /dev/null +++ b/python/paddle/distributed/utils.py @@ -0,0 +1,424 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +import socket +import time +import os +import signal +import copy +import sys +import subprocess +from contextlib import closing +import socket + +logger = logging.getLogger("root") +logger.propagate = False + + +class Hdfs(object): + def __init__(self): + self.hdfs_ugi = None + self.hdfs_name = None + self.hdfs_path = None + + def is_valid(self): + return self.hdfs_ugi is not None and \ + self.hdfs_name is not None and \ + self.hdfs_path is not None + + def __str__(self): + return "hdfs_ugi:{} hdfs_name:{} hdfs_path{}".format( + self.hdfs_ugi, self.hdfs_name, self.hdfs_path) + + def __eq__(self, n): + return self.hdfs_ugi == n.hdfs_ugi and \ + self.hdfs_name == n.hdfs_name and \ + self.hdfs_path == n.hdfs_path + + def __ne__(self, n): + return not self == n + + +class Cluster(object): + def __init__(self, hdfs): + self.job_server = None + self.pods = [] + self.hdfs = None + self.job_stage_flag = None + + def __str__(self): + return "job_server:{} pods:{} job_stage_flag:{} hdfs:{}".format( + self.job_server, [str(pod) for pod in self.pods], + self.job_stage_flag, self.hdfs) + + def __eq__(self, cluster): + if len(self.pods) != len(cluster.pods): + return False + + for a, b in zip(self.pods, cluster.pods): + if a != b: + return False + + if self.job_stage_flag != cluster.job_stage_flag: + return False + + return True + + def __ne__(self, cluster): + return not self.__eq__(cluster) + + def update_pods(cluster): + self.pods = copy.copy(cluster.pods) + + def trainers_nranks(self): + return len(self.trainers_endpoints()) + + def pods_nranks(self): + return len(self.pods) + + def trainers_endpoints(self): + r = [] + for pod in self.pods: + for t in pod.trainers: + r.append(t.endpoint) + return r + + def pods_endpoints(self): + r = [] + for pod in self.pods: + ep = "{}:{}".format(pod.addr, pod.port) + assert pod.port != None and pod.addr != None, "{} not a valid endpoint".format( + ep) + r.append(ep) + + return r + + def get_pod_by_id(self, pod_id): + for pod in self.pods: + if str(pod_id) == str(pod.id): + return pod + + return None + + +class JobServer(object): + def __init__(self): + self.endpoint = None + + def __str__(self): + return "{}".format(self.endpoint) + + def __eq__(self, j): + return self.endpint == j.endpoint + + def __ne__(self, j): + return not self == j + + +class Trainer(object): + def __init__(self): + self.gpus = [] + self.endpoint = None + self.rank = None + + def __str__(self): + return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint, + self.rank) + + def __eq__(self, t): + if len(self.gpus) != len(t.gpus): + return False + + if self.endpoint != t.endpoint or \ + self.rank != t.rank : + return False + + for a, b in zip(self.gpus, t.gpus): + if a != b: + return False + + return True + + def __ne__(self, t): + return not self == t + + def rank(self): + return self.rank + + +class Pod(object): + def __init__(self): + self.rank = None + self.id = None + self.addr = None + self.port = None + self.trainers = [] + self.gpus = [] + + def __str__(self): + return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format( + self.rank, self.id, self.addr, self.port, self.gpus, + [str(t) for t in self.trainers]) + + def __eq__(self, pod): + if self.rank != pod.rank or \ + self.id != pod.id or \ + self.addr != pod.addr or \ + self.port != pod.port: + logger.debug("pod {} != pod".format(self, pod)) + return False + + if len(self.trainers) != len(pod.trainers): + logger.debug("trainers {} != {}".format(self.trainers, + pod.trainers)) + return False + + for i in range(len(self.trainers)): + if self.trainers[i] != pod.trainers[i]: + logger.debug("trainer {} != {}".format(self.trainers[i], + pod.trainers[i])) + return False + + return True + + def __ne__(self, pod): + return not self == pod + + def parse_response(self, res_pods): + pass + + def rank(self): + return self.rank + + def get_visible_gpus(self): + r = "" + for g in self.gpus: + r += "{},".format(g) + + assert r != "", "this pod {} can't see any gpus".format(self) + + r = r[:-1] + return r + + +def get_logger(log_level, name="root"): + logger = logging.getLogger(name) + logger.setLevel(log_level) + + log_handler = logging.StreamHandler() + log_format = logging.Formatter( + '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') + log_handler.setFormatter(log_format) + logger.addHandler(log_handler) + + return logger + + +def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): + assert type(paddle_ports) is list, "paddle_ports must be list" + cluster = Cluster(hdfs=None) + trainer_rank = 0 + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + for i in range(len(selected_gpus)): + trainer = Trainer() + trainer.gpus.append(selected_gpus[i]) + trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) + trainer.rank = trainer_rank + trainer_rank += 1 + + pod.trainers.append(trainer) + cluster.pods.append(pod) + + pod_rank = node_ips.index(node_ip) + return cluster, cluster.pods[pod_rank] + + +def terminate_local_procs(procs): + for p in procs: + if p.proc.poll() is None: + p.proc.terminate() + p.log_fn.close() + logger.debug("terminate process id:{}".format(p.proc.pid)) + + # wait all process terminiated + # time.sleep(3) + + for step in range(0, 50): + alive = False + for p in procs: + if p.proc.poll() is None: # not termniate + os.kill(p.proc.pid, signal.SIGKILL) + alive = True + + if not alive: + logger.info("terminate all the procs") + return + + time.sleep(3) + + logger.fatal("can't kill all process and exit") + exit(1) + + +def get_host_name_ip(): + try: + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + return host_name, host_ip + except: + return None + + +def add_arguments(argname, type, default, help, argparser, **kwargs): + """Add argparse's argument. + Usage: + .. code-block:: python + parser = argparse.ArgumentParser() + add_argument("name", str, "Jonh", "User name.", parser) + args = parser.parse_args() + """ + type = distutils.util.strtobool if type == bool else type + argparser.add_argument( + "--" + argname, + default=default, + type=type, + help=help + ' Default: %(default)s.', + **kwargs) + + +def find_free_ports(num): + def __free_port(): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + port_set = set() + step = 0 + while True: + port = __free_port() + if port not in port_set: + port_set.add(port) + + if len(port_set) >= num: + return port_set + + step += 1 + if step > 100: + print( + "can't find avilable port and use the specified static port now!" + ) + return None + + return None + + +class TrainerProc(object): + def __init__(self): + self.proc = None + self.log_fn = None + self.rank = None + self.cmd = None + + +def start_local_trainers(cluster, + pod, + training_script, + training_script_args, + log_dir=None): + current_env = copy.copy(os.environ.copy()) + #paddle broadcast ncclUniqueId use socket, and + #proxy maybe make trainers unreachable, so delete them. + #if we set them to "", grpc will log error message "bad uri" + #so just delete them. + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + procs = [] + for idx, t in enumerate(pod.trainers): + proc_env = { + "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]), + "PADDLE_TRAINER_ID": "%d" % t.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) + } + + current_env.update(proc_env) + + logger.debug("trainer proc env:{}".format(current_env)) + + cmd = [sys.executable, "-u", training_script] + training_script_args + + logger.info("start trainer proc:{} env:{}".format(cmd, proc_env)) + + fn = None + if log_dir is not None: + os.system("mkdir -p {}".format(log_dir)) + fn = open("%s/workerlog.%d" % (log_dir, idx), "a") + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = t.rank + tp.log_fn = fn + tp.cmd = cmd + + procs.append(tp) + + return procs + + +def watch_local_trainers(procs, nranks): + try: + error = False + error_rank = [] + # wait all process finish or one error + alive = False + for p in procs: + ret = p.proc.poll() + if ret is None: + alive = True + elif ret != 0: + error = True + error_rank.append(p.rank) + + if error: + terminate_local_procs(procs) + exit(1) + + except KeyboardInterrupt: + logger.warning("KeyboardInterrupt, exit") + terminate_local_procs(procs) + raise + except SystemExit: + logger.error( + "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". + format(nranks, error_rank)) + terminate_local_procs(procs) + raise + except: + logger.error( + "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.". + format(nranks, error_rank)) + terminate_local_procs(procs) + raise + + return alive diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index 5150d108479..d8ec2b598cc 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -26,10 +26,14 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid import compiler +from paddle.distributed.fs_wrapper import LocalFS, BDFS import os import sys import six +import json +import re +import shutil class LambConfig(object): @@ -42,6 +46,21 @@ class DistFCConfig(object): pass +class TrainStatus(object): + def __init__(self, epoch_no=-1): + # completed epoch + self._epoch_no = epoch_no + + def next(self): + return self._epoch_no + 1 + + def __eq__(self, t): + return self._epoch_no == t._epoch_no + + def __ne__(self, t): + return not self == t + + class Collective(Fleet): def __init__(self): super(Collective, self).__init__(Mode.COLLECTIVE) @@ -51,6 +70,8 @@ class Collective(Fleet): self._origin_program = None self._transpiled_program = None self.main_program = None + self._checkoint_prefix = "__paddle_fleet_checkpoint__" + self._param_file_name = "_paddle_fleet_param__" def init_worker(self): logging.warn( @@ -103,7 +124,11 @@ class Collective(Fleet): executor, main_program, None, None, export_for_deployment) - def save_persistables(self, executor, dirname, main_program=None): + def save_persistables(self, + executor, + dirname, + main_program=None, + filename=None): """ This function filters out all variables with `persistable==True` from the give `main_program` and then saves these variables to the folder @@ -125,7 +150,182 @@ class Collective(Fleet): "In fleet.save_inference_model() function, main_program " \ "must be as Program type." - io.save_persistables(executor, dirname, main_program, None) + io.save_persistables(executor, dirname, main_program, filename=filename) + + def _save_train_status(self, path, train_status): + d = {} + d["epoch_no"] = train_status._epoch_no + + file_name = "{}/fleet_train_status".format(path) + with open(file_name, 'w') as f: + json.dump(d, f) + + def _load_train_status(self, path): + file_name = "{}/fleet_train_status".format(path) + + r = TrainStatus() + if not os.path.isfile(file_name): + return r + + d = {} + with open(file_name, 'r') as f: + d = json.load(f) + + assert "epoch_no" in d, "Can't find epoch_no in dict from train_status file:{}".format( + d) + r._epoch_no = d["epoch_no"] + assert r._epoch_no >= 0, "Data in checkpoint file is not valid:{}".format( + d) + + return r + + def _get_last_checkpoint_no(self, root_path, fs): + """ + only get the first depth + """ + max_no = -1 + d = {} + dirs = fs.list_dirs(root_path) + for dir in dirs: + g = dir.split(".") + if len(g) != 2: + continue + + if g[0] != "__paddle_fleet_checkpoint__": + continue + + try: + n = int(g[1]) + if n > max_no: + max_no = n + except: + continue + + return max_no + + def clean_redundant_check_points(self, + root_path, + fs=LocalFS(), + checkpoint_num=1): + max_no = self._get_last_checkpoint_no(root_path, fs) + if max_no < 0: + return + + if checkpoint_num < 1: + checkpoint_num = 1 + + dirs = fs.list_dirs(root_path) + for dir in dirs: + g = dir.split(".") + if len(g) != 2: + continue + + if g[0] != self._checkoint_prefix: + continue + + try: + n = int(g[1]) + if n <= max_no - checkpoint_num: + path = "{}/{}.{}".format(root_path, self._checkoint_prefix, + n) + fs.rmr(path) + except Exception as e: + print(e) + continue + + def save_check_point(self, + executor, + path, + train_status, + main_program=None, + fs=LocalFS(), + local_cache_path=".cache", + remain_all_checkpoint=True): + """ + This function save persistables and current epoch num to path. + """ + + if main_program == None: + main_program = self._transpiled_program + + if not fs.stat(path): + fs.mkdir(path) + + max_no = self._get_last_checkpoint_no(path, fs=fs) + if max_no < 0: + max_no = -1 + + real_path = "{}/{}.{}".format(path, self._checkoint_prefix, max_no + 1) + tmp_path = "{}.tmp".format(real_path) + saved_path = tmp_path + + local_fs = LocalFS() + + cache_path = None + if fs.need_upload_download(): + cache_path = "{}/{}.{}.saved_cache".format( + local_cache_path, self._checkoint_prefix, max_no + 1) + if not local_fs.stat(cache_path): + local_fs.mkdir(cache_path) + saved_path = cache_path + + self.save_persistables( + executor=executor, + dirname=saved_path, + main_program=main_program, + filename=self._param_file_name) + self._save_train_status(path=saved_path, train_status=train_status) + + if fs.need_upload_download(): + fs.delete(tmp_path) + fs.upload(cache_path, tmp_path) + fs.mv(tmp_path, real_path) + + if not remain_all_checkpoint: + self.clean_redundant_check_points(path) + + def load_check_point(self, + executor, + path, + trainer_id, + main_program=None, + fs=LocalFS(), + local_cache_path=".cache", + ignore_empty=True): + """ + This function load persistables and current epoch num from path. + """ + max_no = self._get_last_checkpoint_no(path, fs) + + if not ignore_empty: + assert max_no >= 0, "Can't find checkpoint" + + if max_no < 0: + return None + + local_fs = LocalFS() + if fs.need_upload_download(): + cache_path = "{}/{}.{}.load_cache.{}".format( + local_cache_path, self._checkoint_prefix, max_no, trainer_id) + if local_fs.stat(cache_path): + local_fs.delete(cache_path) + + real_path = "{}/{}.{}".format(path, self._checkoint_prefix, max_no) + load_path = real_path + if fs.need_upload_download(): + fs.download(real_path, cache_path) + load_path = cache_path + + if main_program == None: + main_program = self._transpiled_program + + io.load_persistables( + executor=executor, + dirname=load_path, + main_program=main_program, + filename=self._param_file_name) + + return self._load_train_status(load_path) fleet = Collective() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 704a9b4abb8..1fc1f87887f 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -28,6 +28,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_checkpoint) foreach(TEST_OP ${MIXED_DIST_TEST_OPS}) list(REMOVE_ITEM TEST_OPS ${TEST_OP}) endforeach() @@ -301,6 +302,7 @@ if(WITH_DISTRIBUTE) if(WITH_GPU) # NOTE. test_launch only work in gpu collective mode bash_test_modules(test_launch MODULES test_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + py_test_modules(test_fleet_checkpoint MODULES test_fleet_checkpoint) endif() bash_test_modules(test_launch_ps MODULES test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py b/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py new file mode 100644 index 00000000000..35c2801bde3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_checkpoint.py @@ -0,0 +1,77 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus +import os +from paddle.distributed.fs_wrapper import LocalFS, BDFS + + +class FleetTest(unittest.TestCase): + def _test_check_point(self, fs, dir_path): + file_name = "persistables" + + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:6070" + + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) + + image = fluid.data(name='img', shape=[None, 28, 28], dtype='float32') + label = fluid.data(name='label', shape=[None, 1], dtype='int64') + feeder = fluid.DataFeeder( + feed_list=[image, label], place=fluid.CPUPlace()) + predict = fluid.layers.fc(input=image, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=predict, label=label) + avg_loss = fluid.layers.mean(loss) + optimizer = fluid.optimizer.AdamOptimizer(learning_rate=0.001) + + dist_optimizer = fleet.distributed_optimizer(optimizer) + dist_optimizer.minimize(avg_loss) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + status = TrainStatus(2) + fleet.save_check_point(exe, dir_path, train_status=status, fs=fs) + n1 = fleet._get_last_checkpoint_no(dir_path, fs=fs) + + status2 = fleet.load_check_point(exe, dir_path, trainer_id=0, fs=fs) + self.assertEqual(status2, status) + + fleet.save_check_point(exe, dir_path, train_status=status, fs=fs) + n2 = fleet._get_last_checkpoint_no(dir_path, fs=fs) + self.assertEqual(n2, n1 + 1) + + fleet.clean_redundant_check_points(dir_path, fs=fs) + + def test_hdfs_check_point(self): + try: + fs = BDFS("xxxx", "xxxx", 1 * 1000, 1 * 1000) + dir_path = "/user/Paddle_Data/gongweibao/edl_test/my_paddle_model" + self._test_check_point(fs, dir_path) + except Exception as e: + print(e) + + def test_local_check_point(self): + fs = LocalFS() + dir_path = "./my_paddle_model" + self._test_check_point(fs, dir_path) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_launch.sh b/python/paddle/fluid/tests/unittests/test_launch.sh index 5c6b0b50ad8..f1bf6395f15 100644 --- a/python/paddle/fluid/tests/unittests/test_launch.sh +++ b/python/paddle/fluid/tests/unittests/test_launch.sh @@ -6,6 +6,7 @@ launch_py=${PADDLE_BINARY_DIR}/python/paddle/distributed/launch.py python ${launch_py} multi_process.py # use paddlecloud +echo "begin test use paddlecloud" cluster_node_ips="10.0.0.1" node_ip="10.0.0.1" export PADDLE_TRAINERS_NUM=2 @@ -14,7 +15,7 @@ export PADDLE_TRAINERS=127.0.0.1,127.0.0.2 export PADDLE_TRAINER_ID=0 export PADDLE_PORT=35019 -export PADDLE_PORTS_NUM=2 +export TRAINER_PORTS_NUM=2 distributed_args="--use_paddlecloud --cluster_node_ips=${cluster_node_ips} --node_ip=${node_ip} --selected_gpus=0,1 --log_dir=testlog" CUDA_VISIBLE_DEVICES=0,1 python ${launch_py} ${distributed_args} multi_process.py @@ -47,8 +48,9 @@ if [ -f $file_1 ]; then rm $file_1 fi + unset PADDLE_PORT -unset PADDLE_PORTS_NUM +unset TRAINER_PORTS_NUM echo "" echo "paddle.distributed.launch async poll process test" diff --git a/python/requirements.txt b/python/requirements.txt index 29bf92607d5..ef7181b4382 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -19,3 +19,4 @@ decorator prettytable objgraph astor +pathlib -- GitLab