未验证 提交 2ef6188b 编写于 作者: D danleifeng 提交者: GitHub

【HeterPS】fix hdfs and fleet_util for supporting save/load/infer (#33903)

* fix hdfs and fleet_util for supporting save/load infer;test=develop
上级 70100e4f
......@@ -69,11 +69,6 @@ REGISTER_DEVICE_WORKER_CLASS(DownpourWorkerOpt);
REGISTER_DEVICE_WORKER_CLASS(HeterCpuWorker);
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_DEVICE_WORKER_CLASS(HeterBoxWorker);
#endif
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
(defined PADDLE_WITH_PSLIB)
REGISTER_DEVICE_WORKER_CLASS(PSGPUWorker);
......
......@@ -8,11 +8,11 @@ IF(WITH_GPU)
SET(HETERPS_DEPS ${HETERPS_DEPS} ${RPC_DEPS})
endif()
nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS ${HETERPS_DEPS})
nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm)
nv_test(test_heter_comm SRCS feature_value.h DEPS heter_comm)
nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
ENDIF()
IF(WITH_ROCM)
hip_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context)
hip_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm)
hip_test(test_heter_comm SRCS feature_value.h DEPS heter_comm)
hip_library(heter_ps SRCS heter_ps.cu DEPS heter_comm)
ENDIF()
......@@ -765,7 +765,7 @@ x.second );
unsigned long long get_num_collisions() const { return m_collisions; }
void print() {
for (size_type i = 0; i < 10; ++i) {
for (size_type i = 0; i < 5; ++i) {
std::cout << i << ": " << m_hashtbl_values[i].first << ","
<< m_hashtbl_values[i].second << std::endl;
}
......
......@@ -115,7 +115,7 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
path_.resize(total_gpu);
if (!topo_aware_) {
VLOG(1) << "init path without topo aware";
VLOG(3) << "init path without topo aware";
for (int i = 0; i < total_gpu; ++i) {
path_[i].resize(total_gpu);
for (int j = 0; j < total_gpu; ++j) {
......@@ -130,7 +130,7 @@ void HeterComm<KeyType, ValType, GradType>::init_path() {
}
}
} else {
VLOG(1) << "init path with topo aware";
VLOG(3) << "init path with topo aware";
for (int i = 0; i < total_gpu; ++i) {
path_[i].resize(total_gpu);
for (int j = 0; j < total_gpu; ++j) {
......
......@@ -68,8 +68,6 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
thread_keys_.resize(thread_keys_thread_num_);
for (int i = 0; i < thread_keys_thread_num_; i++) {
thread_keys_[i].resize(thread_keys_shard_num_);
for (int j = 0; j < thread_keys_shard_num_; j++) {
}
}
const std::deque<Record>& vec_data = input_channel->GetData();
size_t total_len = vec_data.size();
......@@ -255,7 +253,7 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task,
}
}
#endif
VLOG(1) << "GpuPs build hbmps done";
VLOG(3) << "GpuPs build hbmps done";
device_mutex[dev]->unlock();
}
......@@ -295,7 +293,7 @@ void PSGPUWrapper::BuildGPUPS(uint64_t table_id, int feature_dim) {
HeterPs_ = HeterPsBase::get_instance(size_max, resource_);
HeterPs_->set_nccl_comm_and_size(inner_comms_, inter_comms_, node_size_);
auto build_func = [this, &gpu_task, &feature_keys_count](int i) {
std::cout << "building table: " << i << std::endl;
VLOG(3) << "building table: " << i;
this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(),
gpu_task->device_values_[i].data(),
feature_keys_count[i], 500000, 2);
......
......@@ -111,6 +111,10 @@ class FS(object):
def touch(self, fs_path, exist_ok=True):
raise NotImplementedError
@abc.abstractmethod
def cat(self, fs_path=None):
raise NotImplementedError
class LocalFS(FS):
"""
......@@ -676,14 +680,35 @@ class HDFSClient(FS):
return True
def upload_dir(self, local_dir, dest_dir, overwrite=False):
"""
upload dir to hdfs
Args:
local_dir(str): local dir
dest_dir(str): hdfs dest dir
overwrite(bool): is overwrite
Returns:
return code
"""
local_dir = local_dir.rstrip("/")
dest_dir = dest_dir.rstrip("/")
local_basename = os.path.basename(local_dir)
if self.is_exist(dest_dir + "/" + local_basename) and overwrite:
self.delete(dest_dir + "/" + local_basename)
if not self.is_exist(dest_dir):
self.mkdirs(dest_dir)
self._try_upload(local_dir, dest_dir)
# can't retry
def upload(self, local_path, fs_path):
def upload(self, local_path, fs_path, multi_processes=1, overwrite=False):
"""
Upload the local path to remote HDFS.
Args:
local_path(str): The local path.
fs_path(str): The HDFS path.
multi_processes(int|1): the upload data process at the same time, default=5
overwrite(bool|False): will overwrite file on HDFS or not
Examples:
......@@ -700,21 +725,67 @@ class HDFSClient(FS):
client = HDFSClient(hadoop_home, configs)
client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")
"""
if self.is_exist(fs_path):
raise FSFileExistsError("{} exists".format(fs_path))
def __subprocess_upload(hdfs_path_single, datas):
for data in datas:
self._try_upload(data, hdfs_path_single)
def get_local_files(path):
"""
get local files
Args:
path(str): local path
Returns:
list of local files
"""
rlist = []
if not os.path.exists(path):
return rlist
if os.path.isdir(path):
for file in os.listdir(path):
t = os.path.join(path, file)
rlist.append(t)
else:
rlist.append(path)
return rlist
local = LocalFS()
if not local.is_exist(local_path):
raise FSFileNotExistsError("{} not exists".format(local_path))
# upload_dir
if local.is_dir(local_path):
self.upload_dir(local_path, fs_path, overwrite=overwrite)
return
# upload files
all_files = get_local_files(local_path)
if not all_files:
print("there are nothing need to upload, function exit")
return
if self.is_exist(fs_path) and overwrite:
self.delete(fs_path)
self.mkdirs(fs_path)
procs = []
for i in range(multi_processes):
process_datas = self._split_files(all_files, i, multi_processes)
p = multiprocessing.Process(
target=__subprocess_upload, args=(fs_path, process_datas))
procs.append(p)
p.start()
return self._try_upload(local_path, fs_path)
# complete the processes
for proc in procs:
proc.join()
@_handle_errors()
def _try_upload(self, local_path, fs_path):
cmd = "put {} {}".format(local_path, fs_path)
ret = 0
try:
ret, lines = self._run_cmd(cmd)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
......@@ -722,13 +793,15 @@ class HDFSClient(FS):
raise e
# can't retry
def download(self, fs_path, local_path):
def download(self, fs_path, local_path, multi_processes=1, overwrite=False):
"""
Download remote HDFS path to the local.
Args:
fs_path(str): The HDFS path.
local_path(str): The local path.
multi_processes(int|1): the download data process at the same time, default=1
overwrite(bool): is overwrite
Examples:
......@@ -745,17 +818,43 @@ class HDFSClient(FS):
client = HDFSClient(hadoop_home, configs)
client.download("hdfs:/test_hdfs_client", "./")
"""
def __subprocess_download(local_path, datas):
"""
download file from HDFS
Args:
local_path(str): the local file path
datas(str): the hdfs file path list
"""
for data in datas:
self._try_download(data, local_path)
if not self.is_exist(fs_path):
raise FSFileNotExistsError("{} not exits".format(fs_path))
# download file
if self.is_file(fs_path):
return self._try_download(fs_path, local_path)
# download dir
_, all_files = self.ls_dir(fs_path)
procs = []
for i in range(multi_processes):
process_datas = self._split_files(all_files, i, multi_processes)
p = multiprocessing.Process(
target=__subprocess_download, args=(local_path, process_datas))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
@_handle_errors()
def _try_download(self, fs_path, local_path):
cmd = "get {} {}".format(fs_path, local_path)
ret = 0
try:
ret, lines = self._run_cmd(cmd)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
......@@ -803,7 +902,7 @@ class HDFSClient(FS):
if out_hdfs and not self.is_exist(fs_path):
cmd = "mkdir -p {}".format(fs_path)
ret, lines = self._run_cmd(cmd)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
......@@ -939,7 +1038,71 @@ class HDFSClient(FS):
cmd = "touchz {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError
raise ExecuteError(cmd)
def need_upload_download(self):
return True
def cat(self, fs_path=None):
"""
Cat a remote HDFS file.
Args:
fs_path(str): The HDFS file path.
Returns:
file content
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.cat("hdfs:/test_hdfs_client")
"""
if self.is_file(fs_path):
output = self._try_cat(fs_path)
return "\n".join(output)
else:
return ""
@_handle_errors()
def _try_cat(self, fs_path):
cmd = "cat {}".format(fs_path)
ret, output = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
return output
def _split_files(self, files, trainer_id, trainers):
"""
split file list
Args:
files(list): file list
trainer_id(int): trainer mpi rank id
trainers(int): all trainers num
Returns:
fileist(list): file list of current trainer
"""
remainder = len(files) % trainers
blocksize = len(files) // trainers
blocks = [blocksize] * trainers
for i in range(remainder):
blocks[i] += 1
trainer_files = [[]] * trainers
begin = 0
for i in range(trainers):
trainer_files[i] = files[begin:begin + blocks[i]]
begin += blocks[i]
return trainer_files[trainer_id]
......@@ -13,6 +13,8 @@
"""Defination of Server and Worker."""
from . import ps_pb2 as pslib
# NOTE: reduce removed in fuctools in python3
from functools import reduce
class Server(object):
......
......@@ -435,11 +435,7 @@ class FleetUtil(object):
f.write(pre_content + "\n")
f.write(content + "\n")
client.delete(donefile_path)
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
client.upload(donefile_name, output_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
else:
......@@ -448,11 +444,7 @@ class FleetUtil(object):
else:
with open(donefile_name, "w") as f:
f.write(content + "\n")
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
client.upload(donefile_name, output_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
fleet._role_maker._barrier_worker()
......@@ -547,11 +539,7 @@ class FleetUtil(object):
f.write(pre_content + "\n")
f.write(xbox_str + "\n")
client.delete(donefile_path)
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
client.upload(donefile_name, output_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
else:
......@@ -560,11 +548,7 @@ class FleetUtil(object):
else:
with open(donefile_name, "w") as f:
f.write(xbox_str + "\n")
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
client.upload(donefile_name, output_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
fleet._role_maker._barrier_worker()
......@@ -638,11 +622,7 @@ class FleetUtil(object):
% (file_num, key_num)
with open(donefile_name, "w") as f:
f.write(meta_str)
client.upload(
model_path,
donefile_name,
multi_processes=1,
overwrite=False)
client.upload(donefile_name, model_path)
self.rank0_error("write %s succeed" % donefile_path)
fleet._role_maker._barrier_worker()
......@@ -962,7 +942,7 @@ class FleetUtil(object):
if not client.is_exist(dest):
client.makedirs(dest)
client.upload(dest, model_name)
client.upload(model_name, dest, multi_processes=5, overwrite=True)
fleet._role_maker._barrier_worker()
......@@ -1059,12 +1039,8 @@ class FleetUtil(object):
dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day,
pass_id)
if not client.is_exist(dest):
client.makedirs(dest)
if os.path.isdir(model_name):
client.upload_dir(dest, model_name)
else:
client.upload(dest, model_name)
client.mkdirs(dest)
client.upload(model_name, dest, multi_processes=5, overwrite=True)
fleet._role_maker._barrier_worker()
......@@ -1248,7 +1224,7 @@ class FleetUtil(object):
start = 0
split_path = []
for i in range(splits_per_day):
h = start / 60
h = start // 60
m = start % 60
if h < left_train_hour or h > right_train_hour:
start += split_interval
......
......@@ -110,6 +110,24 @@ class FSTestBase(unittest.TestCase):
fs.delete(dst_file)
fs.delete(src_file)
def _test_upload_dir(self, fs):
# upload dir
src_file = os.path.abspath("./test_upload_dir")
dst_file = os.path.abspath("./test_uolpad_dir")
file1 = os.path.abspath("./test_upload_dir/file1")
file2 = os.path.abspath("./test_upload_dir/file2")
local = LocalFS()
local.mkdirs(src_file)
local.touch(file1)
local.touch(file2)
fs.upload(src_file, dst_file)
self.assertTrue(fs.is_exist(dst_file))
fs.delete(dst_file)
local.delete(src_file)
def _test_try_download(self, fs):
src_file = os.path.abspath("./test_try_download.src")
dst_file = os.path.abspath("./test_try_download.dst")
......@@ -152,15 +170,35 @@ class FSTestBase(unittest.TestCase):
pass
local = LocalFS()
local.touch(src_file)
fs.delete(dst_file)
fs.touch(src_file)
local.delete(dst_file)
assert fs.need_upload_download()
self.assertFalse(fs.is_exist(dst_file))
fs.download(src_file, dst_file)
self.assertTrue(local.is_exist(dst_file))
local.delete(dst_file)
fs.delete(src_file)
def _test_download_dir(self, fs):
src_file = os.path.abspath("./test_download_dir_src")
dst_file = os.path.abspath("./test_download_dir_dst")
file1 = os.path.abspath("./test_download_dir_src/file1")
file2 = os.path.abspath("./test_download_dir_src/file2")
fs.delete(dst_file)
fs.delete(src_file)
fs.mkdirs(src_file)
fs.touch(file1)
fs.touch(file2)
fs.download(src_file, dst_file)
self.assertTrue(local.is_exist(dst_file))
local = LocalFS()
local.delete(dst_file)
fs.delete(src_file)
def _test_mkdirs(self, fs):
dir_name = "./test_mkdir"
fs.mkdirs(dir_name)
......
......@@ -38,6 +38,7 @@ class FSTest3(FSTestBase):
self._test_try_download(fs)
self._test_upload(fs)
self._test_upload_dir(fs)
self._test_download(fs)
def test_local(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册