From 30a1213b79b87b1cdb105f8d3adfa714bc6bc524 Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Fri, 15 Apr 2022 14:06:21 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90GPUPS=E3=80=91add=20afsclient=20and=20?= =?UTF-8?q?gpupsutil=20(#41324)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add gpupsutil and afsclient; test=develop --- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 14 +- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 4 + paddle/fluid/pybind/ps_gpu_wrapper_py.cc | 6 + python/paddle/distributed/fleet/utils/fs.py | 417 ++++++++++++++++ .../fluid/incubate/fleet/utils/fleet_util.py | 471 +++++++++++++++++- 5 files changed, 909 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 115ec4d0102..5e1a08f33e3 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -72,6 +72,18 @@ int AfsWrapper::download(const std::string& local_file, const std::string& afs_file) { return afs_handler_.download_file(local_file, afs_file); } + +int AfsWrapper::touchz(const std::string& path) { + return afs_handler_.touchz(path); +} + +std::string AfsWrapper::cat(const std::string& path) { + return afs_handler_.cat(path); +} + +int AfsWrapper::mv(const std::string& old_path, const std::string& dest_path) { + return afs_handler_.mv(old_path, dest_path); +} #endif std::shared_ptr PSGPUWrapper::s_instance_ = NULL; @@ -84,7 +96,7 @@ void PSGPUWrapper::InitAfsApi(const std::string& fs_name, int ret = afs_handler_.init(fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(), conf.c_str()); if (ret != 0) { - LOG(ERROR) << "AFS Init Error"; + VLOG(0) << "AFS Init Error"; } use_afs_api_ = 1; } diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index b7060764863..c5f674d8b47 100755 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -71,6 +71,10 @@ class AfsWrapper { int download(const std::string& local_file, const std::string& afs_file); + int touchz(const std::string& path); + std::string cat(const std::string& path); + int mv(const std::string& old_path, const std::string& dest_path); + private: paddle::ps::AfsApiWrapper afs_handler_; }; diff --git a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc index 79529fca7d1..42703fc17bd 100644 --- a/paddle/fluid/pybind/ps_gpu_wrapper_py.cc +++ b/paddle/fluid/pybind/ps_gpu_wrapper_py.cc @@ -81,6 +81,12 @@ void BindAfsWrapper(py::module* m) { .def("upload", &framework::AfsWrapper::upload, py::call_guard()) .def("remove", &framework::AfsWrapper::remove, + py::call_guard()) + .def("touchz", &framework::AfsWrapper::touchz, + py::call_guard()) + .def("cat", &framework::AfsWrapper::cat, + py::call_guard()) + .def("mv", &framework::AfsWrapper::mv, py::call_guard()); } #endif diff --git a/python/paddle/distributed/fleet/utils/fs.py b/python/paddle/distributed/fleet/utils/fs.py index 8895a529526..fab7b4ff4ce 100644 --- a/python/paddle/distributed/fleet/utils/fs.py +++ b/python/paddle/distributed/fleet/utils/fs.py @@ -1145,3 +1145,420 @@ class HDFSClient(FS): file_list.append({'path': file_path, 'size': file_size}) return file_list + + +class AFSClient(FS): + """ + A tool of AFS. Use AfsWrapper. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.ls_dir("hdfs:/test_hdfs_client") + """ + + def __init__( + self, + time_out=5 * 60 * 1000, # ms + sleep_inter=1000): # ms + self._fs = core.AfsWrapper() + self._time_out = time_out + + def init(self, fs_name, fs_user, fs_passwd, fs_conf): + self._fs.init(fs_name, fs_user, fs_passwd, fs_conf) + + def list_dirs(self, fs_path): + """ + Only list directorys under `fs_path` . + + Args: + fs_path(str): The HDFS file path. + + Returns: + List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...]. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + subdirs = client.list_dirs("hdfs:/test_hdfs_client") + """ + if not self.is_exist(fs_path): + return [] + # TODO:fengdanlei + dirs, files = self._ls_dir(fs_path) + return dirs + + def ls_dir(self, fs_path): + """ + List directorys and files under `fs_path` . + + Args: + fs_path(str): The HDFS file path. + + Returns: + Tuple: Return a 2-tuple, the first element is the list of all its subdirectories, + and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]). + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + subdirs, files = client.ls_dir("hdfs:/test_hdfs_client") + """ + if not self.is_exist(fs_path): + return [], [] + + return self._ls_dir(fs_path) + + def _ls_dir(self, fs_path): + + files = self._fs.list(fs_path) + dirs = [fs_path] + return dirs, files + + def is_dir(self, fs_path): + """ + Whether the remote HDFS path is a directory. + + Args: + fs_path(str): The HDFS file path. + + Returns: + Bool: Return true if the path exists and it's a directory, otherwise return false. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + ret = client.is_file("hdfs:/test_hdfs_client") + """ + if not self.is_exist(fs_path): + return False + + return self._is_dir(fs_path) + + def _is_dir(self, fs_path): + list_path = self._fs.list(fs_path) + if (len(list_path)) > 0: + return True + else: + return False + + def is_file(self, fs_path): + """ + Whether the remote HDFS path is a file. + + Args: + fs_path(str): The HDFS file path. + + Returns: + Bool: Return true if the path exists and it's a file, otherwise return false. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + ret = client.is_file("hdfs:/test_hdfs_client") + """ + if not self.is_exist(fs_path): + return False + + return not self._is_dir(fs_path) + + def is_exist(self, fs_path): + """ + Whether the remote HDFS path exists. + + Args: + fs_path(str): The hdfs file path. + + Returns: + Bool: Whether it's is file or directory, return true if the path exists, + otherwise return false. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + ret = client.is_exist("hdfs:/test_hdfs_client") + """ + return self._fs.exist(fs_path) + + def upload_dir(self, local_dir, dest_dir, overwrite=False): + """ + upload dir to hdfs + Args: + local_dir(str): local dir + dest_dir(str): hdfs dest dir + overwrite(bool): is overwrite + Returns: + return code + """ + local_dir = local_dir.rstrip("/") + dest_dir = dest_dir.rstrip("/") + local_basename = os.path.basename(local_dir) + if self.is_exist(dest_dir + "/" + local_basename) and overwrite: + self.delete(dest_dir + "/" + local_basename) + if not self.is_exist(dest_dir): + self.mkdirs(dest_dir) + self._fs.upload(local_dir, dest_dir) + + # can't retry + def upload(self, local_path, fs_path, multi_processes=1, overwrite=False): + """ + Upload the local path to remote HDFS. + + Args: + local_path(str): The local path. + fs_path(str): The HDFS path. + multi_processes(int|1): the upload data process at the same time, default=5 + overwrite(bool|False): will overwrite file on HDFS or not + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.upload("test_hdfs_client", "hdfs:/test_hdfs_client") + """ + + local = LocalFS() + if not local.is_exist(local_path): + raise FSFileNotExistsError("{} not exists".format(local_path)) + + self._fs.upload(local_path, fs_path) + + def download(self, fs_path, local_path, multi_processes=1, overwrite=False): + """ + Download remote HDFS path to the local. + + Args: + fs_path(str): The HDFS path. + local_path(str): The local path. + multi_processes(int|1): the download data process at the same time, default=1 + overwrite(bool): is overwrite + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.download("hdfs:/test_hdfs_client", "./") + """ + + def __subprocess_download(local_path, datas): + """ + download file from HDFS + Args: + local_path(str): the local file path + datas(str): the hdfs file path list + """ + for data in datas: + self._fs.download(local_path, data) + + if not self.is_exist(fs_path): + raise FSFileNotExistsError("{} not exits".format(fs_path)) + # download file + if self.is_file(fs_path): + return self._fs.download(local_path, fs_path) + # download dir + _, all_filenames = self.ls_dir(fs_path) + all_files = [fs_path + i for i in all_filenames] + procs = [] + for i in range(multi_processes): + process_datas = self._split_files(all_files, i, multi_processes) + p = multiprocessing.Process( + target=__subprocess_download, args=(local_path, process_datas)) + procs.append(p) + p.start() + + # complete the processes + for proc in procs: + proc.join() + + def mkdirs(self, fs_path): + """ + Create a remote HDFS directory. + + Args: + fs_path(str): The HDFS directory path. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.mkdirs("hdfs:/test_hdfs_client") + """ + if self.is_exist(fs_path): + return + self._fs.mkdir(fs_path) + + def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True): + """ + Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` . + + Args: + fs_src_path(str): Name of the file or directory, that's needed to be moved. + fs_dst_path(str): Name of the file or directory to which to move to. + overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False. + test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Excetption. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2") + """ + if overwrite and self.is_exist(fs_dst_path): + self.delete(fs_dst_path) + + if test_exists: + if not self.is_exist(fs_src_path): + raise FSFileNotExistsError("{} is not exists".format( + fs_src_path)) + + if self.is_exist(fs_dst_path): + raise FSFileExistsError("{} exists already".format(fs_dst_path)) + + self._fs.mv(fs_src_path, fs_dst_path) + + def delete(self, fs_path): + """ + Delete a remote HDFS path, whether it's a file or directory. + + Args: + fs_path(str): The HDFS file path. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import HDFSClient + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.delete("hdfs:/test_hdfs_client") + """ + if not self.is_exist(fs_path): + return + self._fs.remove(fs_path) + + def touch(self, fs_path, exist_ok=True): + """ + Create a remote HDFS file. + + Args: + fs_path(str): The HDFS file path. + exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false, + program will throw an Exception. Default is true. + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.touch("hdfs:/test_hdfs_client") + """ + if self.is_exist(fs_path): + if exist_ok: + return + raise FSFileExistsError + + return self._fs.touchz(fs_path) + + def need_upload_download(self): + return True + + def cat(self, fs_path=None): + """ + Cat a remote HDFS file. + + Args: + fs_path(str): The HDFS file path. + + Returns: + file content + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import AFSClient + + client = AFSClient() + client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf") + client.cat("hdfs:/test_hdfs_client") + """ + if self.is_file(fs_path): + return self._fs.cat(fs_path) + else: + return "" + + def _split_files(self, files, trainer_id, trainers): + """ + split file list + Args: + files(list): file list + trainer_id(int): trainer mpi rank id + trainers(int): all trainers num + Returns: + fileist(list): file list of current trainer + """ + remainder = len(files) % trainers + blocksize = len(files) // trainers + + blocks = [blocksize] * trainers + for i in range(remainder): + blocks[i] += 1 + + trainer_files = [[]] * trainers + begin = 0 + for i in range(trainers): + trainer_files[i] = files[begin:begin + blocks[i]] + begin += blocks[i] + + return trainer_files[trainer_id] diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 47f912c8715..5fc8fbd0116 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -25,11 +25,11 @@ import time import paddle.fluid as fluid from paddle.fluid import core from paddle.fluid.log_helper import get_logger -from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient +from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient, AFSClient from . import utils OpRole = core.op_proto_and_checker_maker.OpRole -__all__ = ["FleetUtil"] +__all__ = ["FleetUtil", "GPUPSUtil"] _logger = get_logger( __name__, logging.INFO, fmt='%(asctime)s %(levelname)s: %(message)s') @@ -1721,3 +1721,470 @@ class FleetUtil(object): else: return [start_list[heter_index], end_list[heter_index], send_list[heter_index], \ recv_list[heter_index], program_list[heter_index]] + + +class GPUPSUtil(FleetUtil): + """ + GPUPSUtil provides some common functions for users' convenience. + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + fleet_util = GPUPSUtil() + fleet_util.rank0_print("my log") + """ + + def __init__(self, fs_client=None): + super(GPUPSUtil, self).__init__("pslib") + self._afs = fs_client + # self._afs = fs_client._fs + + def init(self, fs_name, fs_user, fs_passwd, fs_conf): + r""" + init for fs config + + Args: + fs_name(str): fs name + fs_user(str): fs user + fs_passwd(str): fs password + fs_conf(str): fs and afs conf path + + Returns: + None + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + fleet_util = GPUPSUtil() + fleet_util.init(20190722, 88, 88, "./afs.conf") + """ + self._afs.init(fs_name, fs_user, fs_passwd, fs_conf) + + def set_fsclient(self, fs_client): + r""" + set fs_client for fs config + + Args: + fs_client(AFSClient): fs_client object + + Returns: + None + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + """ + self._afs = fs_client + + def get_last_save_xbox_base(self, output_path): + r""" + get last saved base xbox info from xbox_base_done.txt + + Args: + output_path(str): output path + + Returns: + [last_save_day, last_path, xbox_base_key] + last_save_day(int): day of saved model + last_path(str): model path + xbox_base_key(int): xbox key + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + last_save_day, last_path, xbox_base_key = \ + fleet_util.get_last_save_xbox_base("hdfs:/my/path") + + """ + donefile_path = output_path + "/xbox_base_done.txt" + + if not self._afs.is_file(donefile_path): + return [-1, -1, int(time.time())] + self._afs.download(donefile_path, "./xbox_base_done.txt") + # pre_content = self._afs.cat(donefile_path) + pre_content = "" + with open("xbox_base_done.txt", "r") as f: + pre_content = f.read() + pre_content = pre_content.strip() + last_dict = json.loads(pre_content.split("\n")[-1]) + last_day = int(last_dict["input"].split("/")[-3]) + last_path = "/".join(last_dict["input"].split("/")[:-1]) + xbox_base_key = int(last_dict["key"]) + return [last_day, last_path, xbox_base_key] + + def get_last_save_xbox(self, output_path): + r""" + get last saved xbox info from xbox_patch_done.txt + + Args: + output_path(str): output path + + Returns: + [last_save_day, last_save_pass, last_path, xbox_base_key] + last_save_day(int): day of saved model + last_save_pass(int): pass id of saved + last_path(str): model path + xbox_base_key(int): xbox key + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + last_save_day, last_save_pass, last_path, xbox_base_key = \ + fleet_util.get_last_save_xbox("hdfs:/my/path") + + """ + donefile_path = output_path + "/xbox_patch_done.txt" + + if not self._afs.is_file(donefile_path): + return [-1, -1, "", int(time.time())] + self._afs.download(donefile_path, "xbox_patch_done.txt") + pre_content = "" + with open("xbox_patch_done.txt", "r") as f: + pre_content = f.read() + pre_content = pre_content.strip() + last_dict = json.loads(pre_content.split("\n")[-1]) + last_day = int(last_dict["input"].split("/")[-3]) + last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1]) + last_path = "/".join(last_dict["input"].split("/")[:-1]) + xbox_base_key = int(last_dict["key"]) + os.remove("xbox_patch_done.txt") + return [last_day, last_pass, last_path, xbox_base_key] + + def get_last_save_model(self, output_path): + r""" + get last saved model info from donefile.txt + + Args: + output_path(str): output path + + Returns: + [last_save_day, last_save_pass, last_path, xbox_base_key] + last_save_day(int): day of saved model + last_save_pass(int): pass id of saved + last_path(str): model path + xbox_base_key(int): xbox key + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + last_save_day, last_save_pass, last_path, xbox_base_key = \ + fleet_util.get_last_save_model("hdfs:/my/path") + + """ + last_save_day = -1 + last_save_pass = -1 + last_path = "" + donefile_path = output_path + "/donefile.txt" + if not self._afs.is_file(donefile_path): + return [-1, -1, "", int(time.time())] + self._afs.download(donefile_path, "./donefile.txt") + content = "" + with open("donefile.txt", "r") as f: + content = f.read() + content = content.strip().split("\n")[-1].split("\t") + last_save_day = int(content[0]) + last_save_pass = int(content[3]) + last_path = content[2] + xbox_base_key = int(content[1]) + os.remove("donefile.txt") + return [last_save_day, last_save_pass, last_path, xbox_base_key] + + def write_model_donefile(self, + output_path, + day, + pass_id, + xbox_base_key, + donefile_name="donefile.txt"): + """ + write donefile when save model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + xbox_base_key(str|int): xbox base key + donefile_name(str): donefile name, default is "donefile.txt" + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + fleet_util.write_model_donefile(output_path="hdfs:/my/output", + model_path="hdfs:/my/model", + day=20190723, + pass_id=66, + xbox_base_key=int(time.time())) + + """ + day = str(day) + pass_id = str(pass_id) + xbox_base_key = int(xbox_base_key) + + if pass_id != "-1": + suffix_name = "/%s/%s/" % (day, pass_id) + model_path = output_path.rstrip("/") + suffix_name + else: + suffix_name = "/%s/0/" % day + model_path = output_path.rstrip("/") + suffix_name + + if fleet.worker_index() == 0: + donefile_path = output_path + "/" + donefile_name + content = "%s\t%lu\t%s\t%s\t%d" % (day, xbox_base_key,\ + model_path, pass_id, 0) + if self._afs.is_file(donefile_path): + self._afs.download(donefile_path, donefile_name) + pre_content = "" + with open(donefile_name, "r") as f: + pre_content = f.read() + pre_content_list = pre_content.strip().split("\n") + day_list = [i.split("\t")[0] for i in pre_content_list] + pass_list = [i.split("\t")[3] for i in pre_content_list] + os.remove(donefile_name) + exist = False + for i in range(len(day_list)): + if int(day) == int(day_list[i]) and \ + int(pass_id) == int(pass_list[i]): + exist = True + break + if not exist: + with open(donefile_name, "w") as f: + f.write(pre_content.strip() + "\n") + f.write(content + "\n") + self._afs.delete(donefile_path) + self._afs.upload(donefile_name, donefile_path) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + else: + self.rank0_error("not write %s because %s/%s already " + "exists" % (donefile_name, day, pass_id)) + else: + with open(donefile_name, "w") as f: + f.write(content + "\n") + self._afs.upload(donefile_name, donefile_path) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + + def write_xbox_donefile(self, + output_path, + day, + pass_id, + xbox_base_key, + data_path, + hadoop_fs_name, + hadoop_fs_ugi, + monitor_data={}, + hadoop_home="$HADOOP_HOME", + donefile_name=None): + """ + write delta donefile or xbox base donefile + + Args: + output_path(str): output path + day(str|int): training day of model + pass_id(str|int): training pass id of model + xbox_base_key(str|int): xbox base key + data_path(str|list): training data path + monitor_data(dict): metrics + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + donefile_name(str): donefile name, default is None" + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + fleet_util.write_xbox_donefile( + output_path="hdfs:/my/output/", + model_path="hdfs:/my/output/20190722/01", + day=20190722, + pass_id=1, + xbox_base_key=int(time.time()), + data_path="hdfs:/my/data/", + monitor_data={}) + + """ + day = str(day) + pass_id = str(pass_id) + xbox_base_key = int(xbox_base_key) + mode = None + if pass_id != "-1": + mode = "patch" + suffix_name = "/%s/delta-%s/" % (day, pass_id) + model_path = output_path.rstrip("/") + suffix_name + if donefile_name is None: + donefile_name = "xbox_patch_done.txt" + else: + mode = "base" + suffix_name = "/%s/base/" % day + model_path = output_path.rstrip("/") + suffix_name + if donefile_name is None: + donefile_name = "xbox_base_done.txt" + + if isinstance(data_path, list): + data_path = ",".join(data_path) + if fleet.worker_index() == 0: + donefile_path = output_path + "/" + donefile_name + xbox_str = self._get_xbox_str(output_path, day, model_path, \ + xbox_base_key, data_path, hadoop_fs_name, monitor_data={}, + mode=mode) + + if self._afs.is_exist(donefile_path): + self.rank0_info("exist %s succeed" % (donefile_path)) + self._afs.download(donefile_path, donefile_name) + pre_content = "" + with open(donefile_name, "r") as f: + pre_content = f.read() + last_dict = json.loads(pre_content.strip().split("\n")[-1]) + last_day = last_dict["input"].split("/")[-3] + last_pass = last_dict["input"].split("/")[-2].split("-")[-1] + + os.remove(donefile_name) + self.rank0_info("remove %s succeed" % (donefile_name)) + exist = False + if int(day) < int(last_day) or \ + int(day) == int(last_day) and \ + int(pass_id) <= int(last_pass): + exist = True + if not exist: + with open(donefile_name, "w") as f: + f.write(pre_content.strip() + "\n") + f.write(xbox_str + "\n") + self._afs.delete(donefile_path) + self._afs.upload(donefile_name, donefile_path) + self.rank0_info("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + else: + self.rank0_info("not write %s because %s/%s already " + "exists" % (donefile_name, day, pass_id)) + else: + with open(donefile_name, "w") as f: + f.write(xbox_str + "\n") + self._afs.upload(donefile_name, donefile_path) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + + def write_cache_donefile(self, + output_path, + day, + pass_id, + key_num, + donefile_name="sparse_cache.meta", + **kwargs): + """ + write cache donefile + + Args: + output_path(str): output path + day(str|int): training day of model + pass_id(str|int): training pass id of model + key_num(str|int): save cache return value + donefile_name(str): donefile name, default is "sparse_cache.meta" + kwargs(dict): user defined properties + file_num(int): cache file num + table_id(int): cache table id + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil + from paddle.distributed.fleet.utils.fs import AFSClient + hdfs_client = AFSClient() + fleet_util = GPUPSUtil() + fleet_util.set_fsclient(hdfs_client) + fleet_util.write_cache_donefile( + output_path="hdfs:/my/output/", + day=20190722, + pass_id=1, + key_num=123456) + + """ + day = str(day) + pass_id = str(pass_id) + key_num = int(key_num) + file_num = kwargs.get("file_num", 16) + table_id = kwargs.get("table_id", 0) + + if pass_id != "-1": + suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id) + model_path = output_path.rstrip("/") + suffix_name + else: + suffix_name = "/%s/base/%03d_cache" % (day, table_id) + model_path = output_path.rstrip("/") + suffix_name + + if fleet.worker_index() == 0: + donefile_path = model_path + "/" + donefile_name + + if self._afs.is_file(donefile_path): + self.rank0_error( \ + "not write because %s already exists" % donefile_path) + else: + meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \ + % (file_num, key_num) + with open(donefile_name, "w") as f: + f.write(meta_str) + self._afs.upload(donefile_name, donefile_path) + self.rank0_error("write %s succeed" % donefile_path) + + def _get_xbox_str(self, + output_path, + day, + model_path, + xbox_base_key, + data_path, + hadoop_fs_name, + monitor_data={}, + mode="patch"): + xbox_dict = collections.OrderedDict() + if mode == "base": + xbox_dict["id"] = str(xbox_base_key) + elif mode == "patch": + xbox_dict["id"] = str(int(time.time())) + else: + print("warning: unknown mode %s, set it to patch" % mode) + mode = "patch" + xbox_dict["id"] = str(int(time.time())) + xbox_dict["key"] = str(xbox_base_key) + if model_path.startswith("hdfs:") or model_path.startswith("afs:"): + model_path = model_path[model_path.find(":") + 1:] + xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000" + xbox_dict["record_count"] = "111111" + xbox_dict["partition_type"] = "2" + xbox_dict["job_name"] = "default_job_name" + xbox_dict["ins_tag"] = "feasign" + xbox_dict["ins_path"] = data_path + xbox_dict["job_id"] = os.environ.get("PADDLE_JOB_ID", "") + # currently hard code here, set monitor_data empty string + xbox_dict["monitor_data"] = "" + xbox_dict["monitor_path"] = output_path.rstrip("/") + "/monitor/" \ + + day + ".txt" + xbox_dict["mpi_size"] = str(fleet.worker_num()) + return json.dumps(xbox_dict) -- GitLab