未验证 提交 30a1213b 编写于 作者: D danleifeng 提交者: GitHub

【GPUPS】add afsclient and gpupsutil (#41324)

* add gpupsutil and afsclient; test=develop
上级 2d6b71a2
...@@ -72,6 +72,18 @@ int AfsWrapper::download(const std::string& local_file, ...@@ -72,6 +72,18 @@ int AfsWrapper::download(const std::string& local_file,
const std::string& afs_file) { const std::string& afs_file) {
return afs_handler_.download_file(local_file, afs_file); return afs_handler_.download_file(local_file, afs_file);
} }
int AfsWrapper::touchz(const std::string& path) {
return afs_handler_.touchz(path);
}
std::string AfsWrapper::cat(const std::string& path) {
return afs_handler_.cat(path);
}
int AfsWrapper::mv(const std::string& old_path, const std::string& dest_path) {
return afs_handler_.mv(old_path, dest_path);
}
#endif #endif
std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL; std::shared_ptr<PSGPUWrapper> PSGPUWrapper::s_instance_ = NULL;
...@@ -84,7 +96,7 @@ void PSGPUWrapper::InitAfsApi(const std::string& fs_name, ...@@ -84,7 +96,7 @@ void PSGPUWrapper::InitAfsApi(const std::string& fs_name,
int ret = afs_handler_.init(fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(), int ret = afs_handler_.init(fs_name.c_str(), fs_user.c_str(), pass_wd.c_str(),
conf.c_str()); conf.c_str());
if (ret != 0) { if (ret != 0) {
LOG(ERROR) << "AFS Init Error"; VLOG(0) << "AFS Init Error";
} }
use_afs_api_ = 1; use_afs_api_ = 1;
} }
......
...@@ -71,6 +71,10 @@ class AfsWrapper { ...@@ -71,6 +71,10 @@ class AfsWrapper {
int download(const std::string& local_file, const std::string& afs_file); int download(const std::string& local_file, const std::string& afs_file);
int touchz(const std::string& path);
std::string cat(const std::string& path);
int mv(const std::string& old_path, const std::string& dest_path);
private: private:
paddle::ps::AfsApiWrapper afs_handler_; paddle::ps::AfsApiWrapper afs_handler_;
}; };
......
...@@ -81,6 +81,12 @@ void BindAfsWrapper(py::module* m) { ...@@ -81,6 +81,12 @@ void BindAfsWrapper(py::module* m) {
.def("upload", &framework::AfsWrapper::upload, .def("upload", &framework::AfsWrapper::upload,
py::call_guard<py::gil_scoped_release>()) py::call_guard<py::gil_scoped_release>())
.def("remove", &framework::AfsWrapper::remove, .def("remove", &framework::AfsWrapper::remove,
py::call_guard<py::gil_scoped_release>())
.def("touchz", &framework::AfsWrapper::touchz,
py::call_guard<py::gil_scoped_release>())
.def("cat", &framework::AfsWrapper::cat,
py::call_guard<py::gil_scoped_release>())
.def("mv", &framework::AfsWrapper::mv,
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
} }
#endif #endif
......
...@@ -1145,3 +1145,420 @@ class HDFSClient(FS): ...@@ -1145,3 +1145,420 @@ class HDFSClient(FS):
file_list.append({'path': file_path, 'size': file_size}) file_list.append({'path': file_path, 'size': file_size})
return file_list return file_list
class AFSClient(FS):
"""
A tool of AFS. Use AfsWrapper.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.ls_dir("hdfs:/test_hdfs_client")
"""
def __init__(
self,
time_out=5 * 60 * 1000, # ms
sleep_inter=1000): # ms
self._fs = core.AfsWrapper()
self._time_out = time_out
def init(self, fs_name, fs_user, fs_passwd, fs_conf):
self._fs.init(fs_name, fs_user, fs_passwd, fs_conf)
def list_dirs(self, fs_path):
"""
Only list directorys under `fs_path` .
Args:
fs_path(str): The HDFS file path.
Returns:
List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
subdirs = client.list_dirs("hdfs:/test_hdfs_client")
"""
if not self.is_exist(fs_path):
return []
# TODO:fengdanlei
dirs, files = self._ls_dir(fs_path)
return dirs
def ls_dir(self, fs_path):
"""
List directorys and files under `fs_path` .
Args:
fs_path(str): The HDFS file path.
Returns:
Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")
"""
if not self.is_exist(fs_path):
return [], []
return self._ls_dir(fs_path)
def _ls_dir(self, fs_path):
files = self._fs.list(fs_path)
dirs = [fs_path]
return dirs, files
def is_dir(self, fs_path):
"""
Whether the remote HDFS path is a directory.
Args:
fs_path(str): The HDFS file path.
Returns:
Bool: Return true if the path exists and it's a directory, otherwise return false.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
ret = client.is_file("hdfs:/test_hdfs_client")
"""
if not self.is_exist(fs_path):
return False
return self._is_dir(fs_path)
def _is_dir(self, fs_path):
list_path = self._fs.list(fs_path)
if (len(list_path)) > 0:
return True
else:
return False
def is_file(self, fs_path):
"""
Whether the remote HDFS path is a file.
Args:
fs_path(str): The HDFS file path.
Returns:
Bool: Return true if the path exists and it's a file, otherwise return false.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
ret = client.is_file("hdfs:/test_hdfs_client")
"""
if not self.is_exist(fs_path):
return False
return not self._is_dir(fs_path)
def is_exist(self, fs_path):
"""
Whether the remote HDFS path exists.
Args:
fs_path(str): The hdfs file path.
Returns:
Bool: Whether it's is file or directory, return true if the path exists,
otherwise return false.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
ret = client.is_exist("hdfs:/test_hdfs_client")
"""
return self._fs.exist(fs_path)
def upload_dir(self, local_dir, dest_dir, overwrite=False):
"""
upload dir to hdfs
Args:
local_dir(str): local dir
dest_dir(str): hdfs dest dir
overwrite(bool): is overwrite
Returns:
return code
"""
local_dir = local_dir.rstrip("/")
dest_dir = dest_dir.rstrip("/")
local_basename = os.path.basename(local_dir)
if self.is_exist(dest_dir + "/" + local_basename) and overwrite:
self.delete(dest_dir + "/" + local_basename)
if not self.is_exist(dest_dir):
self.mkdirs(dest_dir)
self._fs.upload(local_dir, dest_dir)
# can't retry
def upload(self, local_path, fs_path, multi_processes=1, overwrite=False):
"""
Upload the local path to remote HDFS.
Args:
local_path(str): The local path.
fs_path(str): The HDFS path.
multi_processes(int|1): the upload data process at the same time, default=5
overwrite(bool|False): will overwrite file on HDFS or not
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")
"""
local = LocalFS()
if not local.is_exist(local_path):
raise FSFileNotExistsError("{} not exists".format(local_path))
self._fs.upload(local_path, fs_path)
def download(self, fs_path, local_path, multi_processes=1, overwrite=False):
"""
Download remote HDFS path to the local.
Args:
fs_path(str): The HDFS path.
local_path(str): The local path.
multi_processes(int|1): the download data process at the same time, default=1
overwrite(bool): is overwrite
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.download("hdfs:/test_hdfs_client", "./")
"""
def __subprocess_download(local_path, datas):
"""
download file from HDFS
Args:
local_path(str): the local file path
datas(str): the hdfs file path list
"""
for data in datas:
self._fs.download(local_path, data)
if not self.is_exist(fs_path):
raise FSFileNotExistsError("{} not exits".format(fs_path))
# download file
if self.is_file(fs_path):
return self._fs.download(local_path, fs_path)
# download dir
_, all_filenames = self.ls_dir(fs_path)
all_files = [fs_path + i for i in all_filenames]
procs = []
for i in range(multi_processes):
process_datas = self._split_files(all_files, i, multi_processes)
p = multiprocessing.Process(
target=__subprocess_download, args=(local_path, process_datas))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
def mkdirs(self, fs_path):
"""
Create a remote HDFS directory.
Args:
fs_path(str): The HDFS directory path.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.mkdirs("hdfs:/test_hdfs_client")
"""
if self.is_exist(fs_path):
return
self._fs.mkdir(fs_path)
def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
"""
Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .
Args:
fs_src_path(str): Name of the file or directory, that's needed to be moved.
fs_dst_path(str): Name of the file or directory to which to move to.
overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Excetption.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")
"""
if overwrite and self.is_exist(fs_dst_path):
self.delete(fs_dst_path)
if test_exists:
if not self.is_exist(fs_src_path):
raise FSFileNotExistsError("{} is not exists".format(
fs_src_path))
if self.is_exist(fs_dst_path):
raise FSFileExistsError("{} exists already".format(fs_dst_path))
self._fs.mv(fs_src_path, fs_dst_path)
def delete(self, fs_path):
"""
Delete a remote HDFS path, whether it's a file or directory.
Args:
fs_path(str): The HDFS file path.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import HDFSClient
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.delete("hdfs:/test_hdfs_client")
"""
if not self.is_exist(fs_path):
return
self._fs.remove(fs_path)
def touch(self, fs_path, exist_ok=True):
"""
Create a remote HDFS file.
Args:
fs_path(str): The HDFS file path.
exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
program will throw an Exception. Default is true.
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.touch("hdfs:/test_hdfs_client")
"""
if self.is_exist(fs_path):
if exist_ok:
return
raise FSFileExistsError
return self._fs.touchz(fs_path)
def need_upload_download(self):
return True
def cat(self, fs_path=None):
"""
Cat a remote HDFS file.
Args:
fs_path(str): The HDFS file path.
Returns:
file content
Examples:
.. code-block:: text
from paddle.distributed.fleet.utils import AFSClient
client = AFSClient()
client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
client.cat("hdfs:/test_hdfs_client")
"""
if self.is_file(fs_path):
return self._fs.cat(fs_path)
else:
return ""
def _split_files(self, files, trainer_id, trainers):
"""
split file list
Args:
files(list): file list
trainer_id(int): trainer mpi rank id
trainers(int): all trainers num
Returns:
fileist(list): file list of current trainer
"""
remainder = len(files) % trainers
blocksize = len(files) // trainers
blocks = [blocksize] * trainers
for i in range(remainder):
blocks[i] += 1
trainer_files = [[]] * trainers
begin = 0
for i in range(trainers):
trainer_files[i] = files[begin:begin + blocks[i]]
begin += blocks[i]
return trainer_files[trainer_id]
...@@ -25,11 +25,11 @@ import time ...@@ -25,11 +25,11 @@ import time
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid.log_helper import get_logger from paddle.fluid.log_helper import get_logger
from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient, AFSClient
from . import utils from . import utils
OpRole = core.op_proto_and_checker_maker.OpRole OpRole = core.op_proto_and_checker_maker.OpRole
__all__ = ["FleetUtil"] __all__ = ["FleetUtil", "GPUPSUtil"]
_logger = get_logger( _logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s %(levelname)s: %(message)s') __name__, logging.INFO, fmt='%(asctime)s %(levelname)s: %(message)s')
...@@ -1721,3 +1721,470 @@ class FleetUtil(object): ...@@ -1721,3 +1721,470 @@ class FleetUtil(object):
else: else:
return [start_list[heter_index], end_list[heter_index], send_list[heter_index], \ return [start_list[heter_index], end_list[heter_index], send_list[heter_index], \
recv_list[heter_index], program_list[heter_index]] recv_list[heter_index], program_list[heter_index]]
class GPUPSUtil(FleetUtil):
"""
GPUPSUtil provides some common functions for users' convenience.
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
fleet_util = GPUPSUtil()
fleet_util.rank0_print("my log")
"""
def __init__(self, fs_client=None):
super(GPUPSUtil, self).__init__("pslib")
self._afs = fs_client
# self._afs = fs_client._fs
def init(self, fs_name, fs_user, fs_passwd, fs_conf):
r"""
init for fs config
Args:
fs_name(str): fs name
fs_user(str): fs user
fs_passwd(str): fs password
fs_conf(str): fs and afs conf path
Returns:
None
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
fleet_util = GPUPSUtil()
fleet_util.init(20190722, 88, 88, "./afs.conf")
"""
self._afs.init(fs_name, fs_user, fs_passwd, fs_conf)
def set_fsclient(self, fs_client):
r"""
set fs_client for fs config
Args:
fs_client(AFSClient): fs_client object
Returns:
None
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
"""
self._afs = fs_client
def get_last_save_xbox_base(self, output_path):
r"""
get last saved base xbox info from xbox_base_done.txt
Args:
output_path(str): output path
Returns:
[last_save_day, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
last_save_day, last_path, xbox_base_key = \
fleet_util.get_last_save_xbox_base("hdfs:/my/path")
"""
donefile_path = output_path + "/xbox_base_done.txt"
if not self._afs.is_file(donefile_path):
return [-1, -1, int(time.time())]
self._afs.download(donefile_path, "./xbox_base_done.txt")
# pre_content = self._afs.cat(donefile_path)
pre_content = ""
with open("xbox_base_done.txt", "r") as f:
pre_content = f.read()
pre_content = pre_content.strip()
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = int(last_dict["input"].split("/")[-3])
last_path = "/".join(last_dict["input"].split("/")[:-1])
xbox_base_key = int(last_dict["key"])
return [last_day, last_path, xbox_base_key]
def get_last_save_xbox(self, output_path):
r"""
get last saved xbox info from xbox_patch_done.txt
Args:
output_path(str): output path
Returns:
[last_save_day, last_save_pass, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_save_pass(int): pass id of saved
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
last_save_day, last_save_pass, last_path, xbox_base_key = \
fleet_util.get_last_save_xbox("hdfs:/my/path")
"""
donefile_path = output_path + "/xbox_patch_done.txt"
if not self._afs.is_file(donefile_path):
return [-1, -1, "", int(time.time())]
self._afs.download(donefile_path, "xbox_patch_done.txt")
pre_content = ""
with open("xbox_patch_done.txt", "r") as f:
pre_content = f.read()
pre_content = pre_content.strip()
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = int(last_dict["input"].split("/")[-3])
last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1])
last_path = "/".join(last_dict["input"].split("/")[:-1])
xbox_base_key = int(last_dict["key"])
os.remove("xbox_patch_done.txt")
return [last_day, last_pass, last_path, xbox_base_key]
def get_last_save_model(self, output_path):
r"""
get last saved model info from donefile.txt
Args:
output_path(str): output path
Returns:
[last_save_day, last_save_pass, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_save_pass(int): pass id of saved
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
last_save_day, last_save_pass, last_path, xbox_base_key = \
fleet_util.get_last_save_model("hdfs:/my/path")
"""
last_save_day = -1
last_save_pass = -1
last_path = ""
donefile_path = output_path + "/donefile.txt"
if not self._afs.is_file(donefile_path):
return [-1, -1, "", int(time.time())]
self._afs.download(donefile_path, "./donefile.txt")
content = ""
with open("donefile.txt", "r") as f:
content = f.read()
content = content.strip().split("\n")[-1].split("\t")
last_save_day = int(content[0])
last_save_pass = int(content[3])
last_path = content[2]
xbox_base_key = int(content[1])
os.remove("donefile.txt")
return [last_save_day, last_save_pass, last_path, xbox_base_key]
def write_model_donefile(self,
output_path,
day,
pass_id,
xbox_base_key,
donefile_name="donefile.txt"):
"""
write donefile when save model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
xbox_base_key(str|int): xbox base key
donefile_name(str): donefile name, default is "donefile.txt"
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
fleet_util.write_model_donefile(output_path="hdfs:/my/output",
model_path="hdfs:/my/model",
day=20190723,
pass_id=66,
xbox_base_key=int(time.time()))
"""
day = str(day)
pass_id = str(pass_id)
xbox_base_key = int(xbox_base_key)
if pass_id != "-1":
suffix_name = "/%s/%s/" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name
else:
suffix_name = "/%s/0/" % day
model_path = output_path.rstrip("/") + suffix_name
if fleet.worker_index() == 0:
donefile_path = output_path + "/" + donefile_name
content = "%s\t%lu\t%s\t%s\t%d" % (day, xbox_base_key,\
model_path, pass_id, 0)
if self._afs.is_file(donefile_path):
self._afs.download(donefile_path, donefile_name)
pre_content = ""
with open(donefile_name, "r") as f:
pre_content = f.read()
pre_content_list = pre_content.strip().split("\n")
day_list = [i.split("\t")[0] for i in pre_content_list]
pass_list = [i.split("\t")[3] for i in pre_content_list]
os.remove(donefile_name)
exist = False
for i in range(len(day_list)):
if int(day) == int(day_list[i]) and \
int(pass_id) == int(pass_list[i]):
exist = True
break
if not exist:
with open(donefile_name, "w") as f:
f.write(pre_content.strip() + "\n")
f.write(content + "\n")
self._afs.delete(donefile_path)
self._afs.upload(donefile_name, donefile_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
else:
self.rank0_error("not write %s because %s/%s already "
"exists" % (donefile_name, day, pass_id))
else:
with open(donefile_name, "w") as f:
f.write(content + "\n")
self._afs.upload(donefile_name, donefile_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
def write_xbox_donefile(self,
output_path,
day,
pass_id,
xbox_base_key,
data_path,
hadoop_fs_name,
hadoop_fs_ugi,
monitor_data={},
hadoop_home="$HADOOP_HOME",
donefile_name=None):
"""
write delta donefile or xbox base donefile
Args:
output_path(str): output path
day(str|int): training day of model
pass_id(str|int): training pass id of model
xbox_base_key(str|int): xbox base key
data_path(str|list): training data path
monitor_data(dict): metrics
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is None"
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
fleet_util.write_xbox_donefile(
output_path="hdfs:/my/output/",
model_path="hdfs:/my/output/20190722/01",
day=20190722,
pass_id=1,
xbox_base_key=int(time.time()),
data_path="hdfs:/my/data/",
monitor_data={})
"""
day = str(day)
pass_id = str(pass_id)
xbox_base_key = int(xbox_base_key)
mode = None
if pass_id != "-1":
mode = "patch"
suffix_name = "/%s/delta-%s/" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name
if donefile_name is None:
donefile_name = "xbox_patch_done.txt"
else:
mode = "base"
suffix_name = "/%s/base/" % day
model_path = output_path.rstrip("/") + suffix_name
if donefile_name is None:
donefile_name = "xbox_base_done.txt"
if isinstance(data_path, list):
data_path = ",".join(data_path)
if fleet.worker_index() == 0:
donefile_path = output_path + "/" + donefile_name
xbox_str = self._get_xbox_str(output_path, day, model_path, \
xbox_base_key, data_path, hadoop_fs_name, monitor_data={},
mode=mode)
if self._afs.is_exist(donefile_path):
self.rank0_info("exist %s succeed" % (donefile_path))
self._afs.download(donefile_path, donefile_name)
pre_content = ""
with open(donefile_name, "r") as f:
pre_content = f.read()
last_dict = json.loads(pre_content.strip().split("\n")[-1])
last_day = last_dict["input"].split("/")[-3]
last_pass = last_dict["input"].split("/")[-2].split("-")[-1]
os.remove(donefile_name)
self.rank0_info("remove %s succeed" % (donefile_name))
exist = False
if int(day) < int(last_day) or \
int(day) == int(last_day) and \
int(pass_id) <= int(last_pass):
exist = True
if not exist:
with open(donefile_name, "w") as f:
f.write(pre_content.strip() + "\n")
f.write(xbox_str + "\n")
self._afs.delete(donefile_path)
self._afs.upload(donefile_name, donefile_path)
self.rank0_info("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
else:
self.rank0_info("not write %s because %s/%s already "
"exists" % (donefile_name, day, pass_id))
else:
with open(donefile_name, "w") as f:
f.write(xbox_str + "\n")
self._afs.upload(donefile_name, donefile_path)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
def write_cache_donefile(self,
output_path,
day,
pass_id,
key_num,
donefile_name="sparse_cache.meta",
**kwargs):
"""
write cache donefile
Args:
output_path(str): output path
day(str|int): training day of model
pass_id(str|int): training pass id of model
key_num(str|int): save cache return value
donefile_name(str): donefile name, default is "sparse_cache.meta"
kwargs(dict): user defined properties
file_num(int): cache file num
table_id(int): cache table id
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import GPUPSUtil
from paddle.distributed.fleet.utils.fs import AFSClient
hdfs_client = AFSClient()
fleet_util = GPUPSUtil()
fleet_util.set_fsclient(hdfs_client)
fleet_util.write_cache_donefile(
output_path="hdfs:/my/output/",
day=20190722,
pass_id=1,
key_num=123456)
"""
day = str(day)
pass_id = str(pass_id)
key_num = int(key_num)
file_num = kwargs.get("file_num", 16)
table_id = kwargs.get("table_id", 0)
if pass_id != "-1":
suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id)
model_path = output_path.rstrip("/") + suffix_name
else:
suffix_name = "/%s/base/%03d_cache" % (day, table_id)
model_path = output_path.rstrip("/") + suffix_name
if fleet.worker_index() == 0:
donefile_path = model_path + "/" + donefile_name
if self._afs.is_file(donefile_path):
self.rank0_error( \
"not write because %s already exists" % donefile_path)
else:
meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \
% (file_num, key_num)
with open(donefile_name, "w") as f:
f.write(meta_str)
self._afs.upload(donefile_name, donefile_path)
self.rank0_error("write %s succeed" % donefile_path)
def _get_xbox_str(self,
output_path,
day,
model_path,
xbox_base_key,
data_path,
hadoop_fs_name,
monitor_data={},
mode="patch"):
xbox_dict = collections.OrderedDict()
if mode == "base":
xbox_dict["id"] = str(xbox_base_key)
elif mode == "patch":
xbox_dict["id"] = str(int(time.time()))
else:
print("warning: unknown mode %s, set it to patch" % mode)
mode = "patch"
xbox_dict["id"] = str(int(time.time()))
xbox_dict["key"] = str(xbox_base_key)
if model_path.startswith("hdfs:") or model_path.startswith("afs:"):
model_path = model_path[model_path.find(":") + 1:]
xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000"
xbox_dict["record_count"] = "111111"
xbox_dict["partition_type"] = "2"
xbox_dict["job_name"] = "default_job_name"
xbox_dict["ins_tag"] = "feasign"
xbox_dict["ins_path"] = data_path
xbox_dict["job_id"] = os.environ.get("PADDLE_JOB_ID", "")
# currently hard code here, set monitor_data empty string
xbox_dict["monitor_data"] = ""
xbox_dict["monitor_path"] = output_path.rstrip("/") + "/monitor/" \
+ day + ".txt"
xbox_dict["mpi_size"] = str(fleet.worker_num())
return json.dumps(xbox_dict)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册