From 2ef6188b0b86f95a1e67a56b189668d504e10d0b Mon Sep 17 00:00:00 2001 From: danleifeng <52735331+danleifeng@users.noreply.github.com> Date: Mon, 5 Jul 2021 14:09:45 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90HeterPS=E3=80=91fix=20hdfs=20and=20fle?= =?UTF-8?q?et=5Futil=20for=20supporting=20save/load/infer=20(#33903)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix hdfs and fleet_util for supporting save/load infer;test=develop --- .../fluid/framework/device_worker_factory.cc | 5 - .../framework/fleet/heter_ps/CMakeLists.txt | 4 +- .../cudf/concurrent_unordered_map.cuh.h | 2 +- .../framework/fleet/heter_ps/heter_comm_inl.h | 4 +- .../fluid/framework/fleet/ps_gpu_wrapper.cc | 6 +- python/paddle/distributed/fleet/utils/fs.py | 185 ++++++++++++++++-- .../fleet/parameter_server/pslib/node.py | 2 + .../fluid/incubate/fleet/utils/fleet_util.py | 42 +--- .../fluid/tests/unittests/hdfs_test_utils.py | 44 ++++- .../fluid/tests/unittests/test_hdfs3.py | 1 + 10 files changed, 234 insertions(+), 61 deletions(-) diff --git a/paddle/fluid/framework/device_worker_factory.cc b/paddle/fluid/framework/device_worker_factory.cc index fb2323d96e2..b6f87811bbd 100644 --- a/paddle/fluid/framework/device_worker_factory.cc +++ b/paddle/fluid/framework/device_worker_factory.cc @@ -69,11 +69,6 @@ REGISTER_DEVICE_WORKER_CLASS(DownpourWorkerOpt); REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker); #endif -#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ - (defined PADDLE_WITH_PSLIB) -REGISTER_DEVICE_WORKER_CLASS(HeterBoxWorker); -#endif - #if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \ (defined PADDLE_WITH_PSLIB) REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker); diff --git a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt index 67c44368b7a..939b5e3099a 100644 --- a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt @@ -8,11 +8,11 @@ IF(WITH_GPU) SET(HETERPS_DEPS ${HETERPS_DEPS} ${RPC_DEPS}) endif() nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS ${HETERPS_DEPS}) - nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm) + nv_test(test_heter_comm SRCS feature_value.h DEPS heter_comm) nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) ENDIF() IF(WITH_ROCM) hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context) - hip_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm) + hip_test(test_heter_comm SRCS feature_value.h DEPS heter_comm) hip_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) ENDIF() diff --git a/paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h b/paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h index c5647f2cdcf..8b04d703c88 100644 --- a/paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h +++ b/paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h @@ -765,7 +765,7 @@ x.second ); unsigned long long get_num_collisions() const { return m_collisions; } void print() { - for (size_type i = 0; i < 10; ++i) { + for (size_type i = 0; i < 5; ++i) { std::cout << i << ": " << m_hashtbl_values[i].first << "," << m_hashtbl_values[i].second << std::endl; } diff --git a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h index 1b4205e3c38..a2e09b7e081 100644 --- a/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h +++ b/paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h @@ -115,7 +115,7 @@ void HeterComm::init_path() { path_.resize(total_gpu); if (!topo_aware_) { - VLOG(1) << "init path without topo aware"; + VLOG(3) << "init path without topo aware"; for (int i = 0; i < total_gpu; ++i) { path_[i].resize(total_gpu); for (int j = 0; j < total_gpu; ++j) { @@ -130,7 +130,7 @@ void HeterComm::init_path() { } } } else { - VLOG(1) << "init path with topo aware"; + VLOG(3) << "init path with topo aware"; for (int i = 0; i < total_gpu; ++i) { path_[i].resize(total_gpu); for (int j = 0; j < total_gpu; ++j) { diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc index 67ff6b6acae..0766a3151c8 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.cc @@ -68,8 +68,6 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, thread_keys_.resize(thread_keys_thread_num_); for (int i = 0; i < thread_keys_thread_num_; i++) { thread_keys_[i].resize(thread_keys_shard_num_); - for (int j = 0; j < thread_keys_shard_num_; j++) { - } } const std::deque& vec_data = input_channel->GetData(); size_t total_len = vec_data.size(); @@ -255,7 +253,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr gpu_task, } } #endif - VLOG(1) << "GpuPs build hbmps done"; + VLOG(3) << "GpuPs build hbmps done"; device_mutex[dev]->unlock(); } @@ -295,7 +293,7 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) { HeterPs_ = HeterPsBase::get_instance(size_max, resource_); HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_); auto build_func = [this, &gpu_task, &feature_keys_count](int i) { - std::cout << "building table: " << i << std::endl; + VLOG(3) << "building table: " << i; this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(), gpu_task->device_values_[i].data(), feature_keys_count[i], 500000, 2); diff --git a/python/paddle/distributed/fleet/utils/fs.py b/python/paddle/distributed/fleet/utils/fs.py index f9cedba7773..fb518f62a12 100644 --- a/python/paddle/distributed/fleet/utils/fs.py +++ b/python/paddle/distributed/fleet/utils/fs.py @@ -111,6 +111,10 @@ class FS(object): def touch(self, fs_path, exist_ok=True): raise NotImplementedError + @abc.abstractmethod + def cat(self, fs_path=None): + raise NotImplementedError + class LocalFS(FS): """ @@ -676,14 +680,35 @@ class HDFSClient(FS): return True + def upload_dir(self, local_dir, dest_dir, overwrite=False): + """ + upload dir to hdfs + Args: + local_dir(str): local dir + dest_dir(str): hdfs dest dir + overwrite(bool): is overwrite + Returns: + return code + """ + local_dir = local_dir.rstrip("/") + dest_dir = dest_dir.rstrip("/") + local_basename = os.path.basename(local_dir) + if self.is_exist(dest_dir + "/" + local_basename) and overwrite: + self.delete(dest_dir + "/" + local_basename) + if not self.is_exist(dest_dir): + self.mkdirs(dest_dir) + self._try_upload(local_dir, dest_dir) + # can't retry - def upload(self, local_path, fs_path): + def upload(self, local_path, fs_path, multi_processes=1, overwrite=False): """ Upload the local path to remote HDFS. Args: local_path(str): The local path. fs_path(str): The HDFS path. + multi_processes(int|1): the upload data process at the same time, default=5 + overwrite(bool|False): will overwrite file on HDFS or not Examples: @@ -700,21 +725,67 @@ class HDFSClient(FS): client = HDFSClient(hadoop_home, configs) client.upload("test_hdfs_client", "hdfs:/test_hdfs_client") """ - if self.is_exist(fs_path): - raise FSFileExistsError("{} exists".format(fs_path)) + + def __subprocess_upload(hdfs_path_single, datas): + for data in datas: + self._try_upload(data, hdfs_path_single) + + def get_local_files(path): + """ + get local files + Args: + path(str): local path + Returns: + list of local files + """ + rlist = [] + + if not os.path.exists(path): + return rlist + + if os.path.isdir(path): + for file in os.listdir(path): + t = os.path.join(path, file) + rlist.append(t) + else: + rlist.append(path) + return rlist local = LocalFS() if not local.is_exist(local_path): raise FSFileNotExistsError("{} not exists".format(local_path)) + # upload_dir + if local.is_dir(local_path): + self.upload_dir(local_path, fs_path, overwrite=overwrite) + return + # upload files + all_files = get_local_files(local_path) + if not all_files: + print("there are nothing need to upload, function exit") + return + + if self.is_exist(fs_path) and overwrite: + self.delete(fs_path) + self.mkdirs(fs_path) + + procs = [] + for i in range(multi_processes): + process_datas = self._split_files(all_files, i, multi_processes) + p = multiprocessing.Process( + target=__subprocess_upload, args=(fs_path, process_datas)) + procs.append(p) + p.start() - return self._try_upload(local_path, fs_path) + # complete the processes + for proc in procs: + proc.join() @_handle_errors() def _try_upload(self, local_path, fs_path): cmd = "put {} {}".format(local_path, fs_path) ret = 0 try: - ret, lines = self._run_cmd(cmd) + ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: @@ -722,13 +793,15 @@ class HDFSClient(FS): raise e # can't retry - def download(self, fs_path, local_path): + def download(self, fs_path, local_path, multi_processes=1, overwrite=False): """ Download remote HDFS path to the local. Args: fs_path(str): The HDFS path. local_path(str): The local path. + multi_processes(int|1): the download data process at the same time, default=1 + overwrite(bool): is overwrite Examples: @@ -745,17 +818,43 @@ class HDFSClient(FS): client = HDFSClient(hadoop_home, configs) client.download("hdfs:/test_hdfs_client", "./") """ + + def __subprocess_download(local_path, datas): + """ + download file from HDFS + Args: + local_path(str): the local file path + datas(str): the hdfs file path list + """ + for data in datas: + self._try_download(data, local_path) + if not self.is_exist(fs_path): raise FSFileNotExistsError("{} not exits".format(fs_path)) - - return self._try_download(fs_path, local_path) + # download file + if self.is_file(fs_path): + return self._try_download(fs_path, local_path) + # download dir + _, all_files = self.ls_dir(fs_path) + + procs = [] + for i in range(multi_processes): + process_datas = self._split_files(all_files, i, multi_processes) + p = multiprocessing.Process( + target=__subprocess_download, args=(local_path, process_datas)) + procs.append(p) + p.start() + + # complete the processes + for proc in procs: + proc.join() @_handle_errors() def _try_download(self, fs_path, local_path): cmd = "get {} {}".format(fs_path, local_path) ret = 0 try: - ret, lines = self._run_cmd(cmd) + ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) except Exception as e: @@ -803,7 +902,7 @@ class HDFSClient(FS): if out_hdfs and not self.is_exist(fs_path): cmd = "mkdir -p {}".format(fs_path) - ret, lines = self._run_cmd(cmd) + ret, _ = self._run_cmd(cmd) if ret != 0: raise ExecuteError(cmd) @@ -939,7 +1038,71 @@ class HDFSClient(FS): cmd = "touchz {}".format(fs_path) ret, _ = self._run_cmd(cmd) if ret != 0: - raise ExecuteError + raise ExecuteError(cmd) def need_upload_download(self): return True + + def cat(self, fs_path=None): + """ + Cat a remote HDFS file. + + Args: + fs_path(str): The HDFS file path. + + Returns: + file content + + Examples: + + .. code-block:: text + + from paddle.distributed.fleet.utils import HDFSClient + + hadoop_home = "/home/client/hadoop-client/hadoop/" + configs = { + "fs.default.name": "hdfs://xxx.hadoop.com:54310", + "hadoop.job.ugi": "hello,hello123" + } + + client = HDFSClient(hadoop_home, configs) + client.cat("hdfs:/test_hdfs_client") + """ + if self.is_file(fs_path): + output = self._try_cat(fs_path) + return "\n".join(output) + else: + return "" + + @_handle_errors() + def _try_cat(self, fs_path): + cmd = "cat {}".format(fs_path) + ret, output = self._run_cmd(cmd) + if ret != 0: + raise ExecuteError(cmd) + return output + + def _split_files(self, files, trainer_id, trainers): + """ + split file list + Args: + files(list): file list + trainer_id(int): trainer mpi rank id + trainers(int): all trainers num + Returns: + fileist(list): file list of current trainer + """ + remainder = len(files) % trainers + blocksize = len(files) // trainers + + blocks = [blocksize] * trainers + for i in range(remainder): + blocks[i] += 1 + + trainer_files = [[]] * trainers + begin = 0 + for i in range(trainers): + trainer_files[i] = files[begin:begin + blocks[i]] + begin += blocks[i] + + return trainer_files[trainer_id] diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index 6fdca1c77a1..8dfe9c32cd9 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -13,6 +13,8 @@ """Defination of Server and Worker.""" from . import ps_pb2 as pslib +# NOTE: reduce removed in fuctools in python3 +from functools import reduce class Server(object): diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index d02be8af4b1..47f912c8715 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -435,11 +435,7 @@ class FleetUtil(object): f.write(pre_content + "\n") f.write(content + "\n") client.delete(donefile_path) - client.upload( - output_path, - donefile_name, - multi_processes=1, - overwrite=False) + client.upload(donefile_name, output_path) self.rank0_error("write %s/%s %s succeed" % \ (day, pass_id, donefile_name)) else: @@ -448,11 +444,7 @@ class FleetUtil(object): else: with open(donefile_name, "w") as f: f.write(content + "\n") - client.upload( - output_path, - donefile_name, - multi_processes=1, - overwrite=False) + client.upload(donefile_name, output_path) self.rank0_error("write %s/%s %s succeed" % \ (day, pass_id, donefile_name)) fleet._role_maker._barrier_worker() @@ -547,11 +539,7 @@ class FleetUtil(object): f.write(pre_content + "\n") f.write(xbox_str + "\n") client.delete(donefile_path) - client.upload( - output_path, - donefile_name, - multi_processes=1, - overwrite=False) + client.upload(donefile_name, output_path) self.rank0_error("write %s/%s %s succeed" % \ (day, pass_id, donefile_name)) else: @@ -560,11 +548,7 @@ class FleetUtil(object): else: with open(donefile_name, "w") as f: f.write(xbox_str + "\n") - client.upload( - output_path, - donefile_name, - multi_processes=1, - overwrite=False) + client.upload(donefile_name, output_path) self.rank0_error("write %s/%s %s succeed" % \ (day, pass_id, donefile_name)) fleet._role_maker._barrier_worker() @@ -638,11 +622,7 @@ class FleetUtil(object): % (file_num, key_num) with open(donefile_name, "w") as f: f.write(meta_str) - client.upload( - model_path, - donefile_name, - multi_processes=1, - overwrite=False) + client.upload(donefile_name, model_path) self.rank0_error("write %s succeed" % donefile_path) fleet._role_maker._barrier_worker() @@ -962,7 +942,7 @@ class FleetUtil(object): if not client.is_exist(dest): client.makedirs(dest) - client.upload(dest, model_name) + client.upload(model_name, dest, multi_processes=5, overwrite=True) fleet._role_maker._barrier_worker() @@ -1059,12 +1039,8 @@ class FleetUtil(object): dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day, pass_id) if not client.is_exist(dest): - client.makedirs(dest) - - if os.path.isdir(model_name): - client.upload_dir(dest, model_name) - else: - client.upload(dest, model_name) + client.mkdirs(dest) + client.upload(model_name, dest, multi_processes=5, overwrite=True) fleet._role_maker._barrier_worker() @@ -1248,7 +1224,7 @@ class FleetUtil(object): start = 0 split_path = [] for i in range(splits_per_day): - h = start / 60 + h = start // 60 m = start % 60 if h < left_train_hour or h > right_train_hour: start += split_interval diff --git a/python/paddle/fluid/tests/unittests/hdfs_test_utils.py b/python/paddle/fluid/tests/unittests/hdfs_test_utils.py index 29204a00059..b7ca06283c3 100644 --- a/python/paddle/fluid/tests/unittests/hdfs_test_utils.py +++ b/python/paddle/fluid/tests/unittests/hdfs_test_utils.py @@ -110,6 +110,24 @@ class FSTestBase(unittest.TestCase): fs.delete(dst_file) fs.delete(src_file) + def _test_upload_dir(self, fs): + # upload dir + src_file = os.path.abspath("./test_upload_dir") + dst_file = os.path.abspath("./test_uolpad_dir") + file1 = os.path.abspath("./test_upload_dir/file1") + file2 = os.path.abspath("./test_upload_dir/file2") + + local = LocalFS() + local.mkdirs(src_file) + local.touch(file1) + local.touch(file2) + + fs.upload(src_file, dst_file) + + self.assertTrue(fs.is_exist(dst_file)) + fs.delete(dst_file) + local.delete(src_file) + def _test_try_download(self, fs): src_file = os.path.abspath("./test_try_download.src") dst_file = os.path.abspath("./test_try_download.dst") @@ -152,15 +170,35 @@ class FSTestBase(unittest.TestCase): pass local = LocalFS() - local.touch(src_file) - fs.delete(dst_file) + fs.touch(src_file) + local.delete(dst_file) assert fs.need_upload_download() - self.assertFalse(fs.is_exist(dst_file)) + fs.download(src_file, dst_file) + + self.assertTrue(local.is_exist(dst_file)) + local.delete(dst_file) + fs.delete(src_file) + + def _test_download_dir(self, fs): + src_file = os.path.abspath("./test_download_dir_src") + dst_file = os.path.abspath("./test_download_dir_dst") + file1 = os.path.abspath("./test_download_dir_src/file1") + file2 = os.path.abspath("./test_download_dir_src/file2") fs.delete(dst_file) fs.delete(src_file) + fs.mkdirs(src_file) + fs.touch(file1) + fs.touch(file2) + + fs.download(src_file, dst_file) + self.assertTrue(local.is_exist(dst_file)) + local = LocalFS() + local.delete(dst_file) + fs.delete(src_file) + def _test_mkdirs(self, fs): dir_name = "./test_mkdir" fs.mkdirs(dir_name) diff --git a/python/paddle/fluid/tests/unittests/test_hdfs3.py b/python/paddle/fluid/tests/unittests/test_hdfs3.py index 218bf12ca60..d214768b2e3 100644 --- a/python/paddle/fluid/tests/unittests/test_hdfs3.py +++ b/python/paddle/fluid/tests/unittests/test_hdfs3.py @@ -38,6 +38,7 @@ class FSTest3(FSTestBase): self._test_try_download(fs) self._test_upload(fs) + self._test_upload_dir(fs) self._test_download(fs) def test_local(self): -- GitLab