From b104ea068467168ca6364535209304b61a19937c Mon Sep 17 00:00:00 2001 From: jiaqi <173596896@qq.com> Date: Wed, 14 Aug 2019 10:54:13 +0800 Subject: [PATCH] add get_last_save_xbox_base/get_last_save_xbox (#19122) * add get_last_save_xbox_base/get_last_save_xbox * fix fleet_util bug of load paddle model * add doc string in fleet api --- paddle/fluid/framework/fleet/fleet_wrapper.cc | 5 +- paddle/fluid/framework/fleet/fleet_wrapper.h | 1 + .../fleet/parameter_server/pslib/__init__.py | 47 ++++++- .../fluid/incubate/fleet/utils/fleet_util.py | 129 ++++++++++++++++-- 4 files changed, 161 insertions(+), 21 deletions(-) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 8d28e1cabfa..b072702c103 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -401,7 +401,9 @@ void FleetWrapper::LoadFromPaddleModel(Scope& scope, const uint64_t table_id, std::vector var_list, std::string model_path, std::string model_proto_file, + std::vector table_var_list, bool load_combine) { +#ifdef PADDLE_WITH_PSLIB // load ProgramDesc from model file auto read_proto_func = [](const std::string& filename) -> ProgramDesc { std::string contents; @@ -467,7 +469,8 @@ void FleetWrapper::LoadFromPaddleModel(Scope& scope, const uint64_t table_id, } } delete old_scope; - PushDenseParamSync(scope, table_id, old_param_list); + PushDenseParamSync(scope, table_id, table_var_list); +#endif } void FleetWrapper::LoadModel(const std::string& path, const int mode) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index e0456906d3c..6bc3c0910b5 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -136,6 +136,7 @@ class FleetWrapper { void LoadFromPaddleModel(Scope& scope, const uint64_t table_id, // NOLINT std::vector var_list, std::string model_path, std::string model_proto_file, + std::vector table_var_list, bool load_combine); // mode = 0, load all feature // mode = 1, laod delta feature, which means load diff diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index b4993dae9db..e0a5082a893 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -170,6 +170,22 @@ class PSLib(Fleet): self._role_maker._finalize() def distributed_optimizer(self, optimizer, strategy={}): + """ + distributed_optimizer + + Args: + optimizer(Optimizer): optimizer + strategy(dict): strategy + + Examples: + .. code-block:: python + + fleet.distributed_optimizer(optimizer) + + Returns: + optimizer(DownpourOptimizer): downpour optimizer + + """ self._optimizer = DownpourOptimizer(optimizer, strategy) return self._optimizer @@ -182,6 +198,20 @@ class PSLib(Fleet): export_for_deployment=True): """ save pserver model called from a worker + + Args: + executor(Executor): fluid executor + dirname(str): save model path + feeded_var_names(list): default None + target_vars(list): default None + main_program(Program): default None + export_for_deployment(bool): default None + + Examples: + .. code-block:: python + + fleet.save_inference_model(dirname="hdfs:/my/path") + """ self._fleet_ptr.save_model(dirname) @@ -332,6 +362,7 @@ class PSLib(Fleet): scope(Scope): Scope object model_proto_file(str): path of program desc proto binary file, can be local or hdfs/afs file + var_names(list): var name list load_combine(bool): load from a file or splited param files default False. @@ -357,11 +388,13 @@ class PSLib(Fleet): mode = kwargs.get("mode", 0) scope = kwargs.get("scope", None) model_proto_file = kwargs.get("model_proto_file", None) + var_names = kwargs.get("var_names", None) load_combine = kwargs.get("load_combine", False) self._role_maker._barrier_worker() if scope is not None and model_proto_file is not None: - self._load_one_table_from_paddle_model( - scope, table_id, model_path, model_proto_file, load_combine) + self._load_one_table_from_paddle_model(scope, table_id, model_path, + model_proto_file, var_names, + load_combine) elif self._role_maker.is_first_worker(): self._fleet_ptr.load_model_one_table(table_id, model_path, mode) self._role_maker._barrier_worker() @@ -371,6 +404,7 @@ class PSLib(Fleet): table_id, model_path, model_proto_file, + var_names=None, load_combine=False): """ load params from paddle model, and push params to pserver @@ -381,6 +415,7 @@ class PSLib(Fleet): model_path(str): path of paddle model, can be local or hdfs/afs file model_proto_file(str): path of program desc proto binary file, can be local or hdfs/afs file + var_names(list): load var names load_combine(bool): load from a file or splited param files """ @@ -415,17 +450,17 @@ class PSLib(Fleet): for i in self._opt_info["fleet_desc"].trainer_param.dense_table: if table_id is not None and table_id != i.table_id: continue - var_list = [var for var in i.dense_variable_name] + table_var_names = [var for var in i.dense_variable_name] skip = False - for var in var_list: + for var in table_var_names: if scope.find_var(var) is None: skip = True break if skip: continue self._fleet_ptr.load_from_paddle_model( - scope, table_id, var_list, model_path, model_proto_file, - load_combine) + scope, table_id, var_names, model_path, model_proto_file, + table_var_names, load_combine) self._role_maker._barrier_worker() def _set_opt_info(self, opt_info): diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 581d8b697d0..88d874676e3 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Fleet Utils""" +"""Fleet Utils.""" import collections import json @@ -310,11 +310,14 @@ class FleetUtil(object): model_path, xbox_base_key, data_path, + hadoop_fs_name, monitor_data={}): xbox_dict = collections.OrderedDict() - xbox_dict["id"] = int(time.time()) + xbox_dict["id"] = str(int(time.time())) xbox_dict["key"] = str(xbox_base_key) - xbox_dict["input"] = model_path.rstrip("/") + "/000" + if model_path.startswith("hdfs:") or model_path.startswith("afs:"): + model_path = model_path[model_path.find(":") + 1:] + xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000" xbox_dict["record_count"] = "111111" xbox_dict["job_name"] = "default_job_name" xbox_dict["ins_tag"] = "feasign" @@ -437,7 +440,7 @@ class FleetUtil(object): hadoop_fs_ugi, monitor_data={}, hadoop_home="$HADOOP_HOME", - donefile_name="xbox_patch_done.txt"): + donefile_name=None): """ write delta donefile or xbox base donefile @@ -451,7 +454,7 @@ class FleetUtil(object): hadoop_fs_ugi(str): hdfs/afs fs ugi monitor_data(dict): metrics hadoop_home(str): hadoop home, default is "$HADOOP_HOME" - donefile_name(str): donefile name, default is "donefile.txt" + donefile_name(str): donefile name, default is None" Examples: .. code-block:: python @@ -478,9 +481,13 @@ class FleetUtil(object): if pass_id != "-1": suffix_name = "/%s/delta-%s/" % (day, pass_id) model_path = output_path.rstrip("/") + suffix_name + if donefile_name is None: + donefile_name = "xbox_patch_done.txt" else: suffix_name = "/%s/base/" % day model_path = output_path.rstrip("/") + suffix_name + if donefile_name is None: + donefile_name = "xbox_base_done.txt" if isinstance(data_path, list): data_path = ",".join(data_path) @@ -488,7 +495,7 @@ class FleetUtil(object): if fleet.worker_index() == 0: donefile_path = output_path + "/" + donefile_name xbox_str = self._get_xbox_str(output_path, day, model_path, \ - xbox_base_key, data_path, monitor_data={}) + xbox_base_key, data_path, hadoop_fs_name, monitor_data={}) configs = { "fs.default.name": hadoop_fs_name, "hadoop.job.ugi": hadoop_fs_ugi @@ -717,14 +724,13 @@ class FleetUtil(object): """ day = str(day) - pass_id = str(pass_id) suffix_name = "/%s/base/" % day model_path = output_path + suffix_name self.rank0_print("going to save_xbox_base_model " + model_path) fleet.save_persistables(None, model_path, mode=2) self.rank0_print("save_xbox_base_model done") - def save_cache_model(self, output_path, day, pass_id): + def save_cache_model(self, output_path, day, pass_id, mode=1): """ save cache model @@ -732,6 +738,7 @@ class FleetUtil(object): output_path(str): output path day(str|int): training day pass_id(str|int): training pass id + mode(str|int): save mode Returns: key_num(int): cache key num @@ -746,10 +753,11 @@ class FleetUtil(object): """ day = str(day) pass_id = str(pass_id) + mode = int(mode) suffix_name = "/%s/delta-%s" % (day, pass_id) model_path = output_path.rstrip("/") + suffix_name self.rank0_print("going to save_cache_model %s" % model_path) - key_num = fleet.save_cache_model(None, model_path, mode=1) + key_num = fleet.save_cache_model(None, model_path, mode=mode) self.rank0_print("save_cache_model done") return key_num @@ -922,6 +930,97 @@ class FleetUtil(object): fleet._role_maker._barrier_worker() + def get_last_save_xbox_base(self, + output_path, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME"): + """ + get last saved base xbox info from xbox_base_done.txt + + Args: + output_path(str): output path + hadoop_fs_name(str): hdfs/afs fs_name + hadoop_fs_ugi(str): hdfs/afs fs_ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + + Returns: + [last_save_day, last_path, xbox_base_key] + last_save_day(int): day of saved model + last_path(str): model path + xbox_base_key(int): xbox key + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + last_save_day, last_path, xbox_base_key = \ + fleet_util.get_last_save_xbox_base("hdfs:/my/path", 20190722, + 88) + + """ + donefile_path = output_path + "/xbox_base_done.txt" + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + if not client.is_file(donefile_path): + return [-1, -1, int(time.time())] + pre_content = client.cat(donefile_path) + last_dict = json.loads(pre_content.split("\n")[-1]) + last_day = int(last_dict["input"].split("/")[-3]) + last_path = "/".join(last_dict["input"].split("/")[:-1]) + xbox_base_key = int(last_dict["key"]) + return [last_day, last_path, xbox_base_key] + + def get_last_save_xbox(self, + output_path, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME"): + """ + get last saved xbox info from xbox_patch_done.txt + + Args: + output_path(str): output path + hadoop_fs_name(str): hdfs/afs fs_name + hadoop_fs_ugi(str): hdfs/afs fs_ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + + Returns: + [last_save_day, last_save_pass, last_path, xbox_base_key] + last_save_day(int): day of saved model + last_save_pass(int): pass id of saved + last_path(str): model path + xbox_base_key(int): xbox key + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + last_save_day, last_save_pass, last_path, xbox_base_key = \ + fleet_util.get_last_save_xbox("hdfs:/my/path", 20190722, 88) + + """ + donefile_path = output_path + "/xbox_patch_done.txt" + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + if not client.is_file(donefile_path): + return [-1, -1, "", int(time.time())] + pre_content = client.cat(donefile_path) + last_dict = json.loads(pre_content.split("\n")[-1]) + last_day = int(last_dict["input"].split("/")[-3]) + last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1]) + last_path = "/".join(last_dict["input"].split("/")[:-1]) + xbox_base_key = int(last_dict["key"]) + return [last_day, last_pass, last_path, xbox_base_key] + def get_last_save_model(self, output_path, hadoop_fs_name, @@ -937,18 +1036,19 @@ class FleetUtil(object): hadoop_home(str): hadoop home, default is "$HADOOP_HOME" Returns: - [last_save_day, last_save_pass, last_path] + [last_save_day, last_save_pass, last_path, xbox_base_key] last_save_day(int): day of saved model last_save_pass(int): pass id of saved last_path(str): model path + xbox_base_key(int): xbox key Examples: .. code-block:: python from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil fleet_util = FleetUtil() - last_save_day, last_save_pass, last_path = \ - fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722, 88) + last_save_day, last_save_pass, last_path, xbox_base_key = \ + fleet_util.get_last_save_model("hdfs:/my/path", 20190722, 88) """ last_save_day = -1 @@ -961,13 +1061,14 @@ class FleetUtil(object): } client = HDFSClient(hadoop_home, configs) if not client.is_file(donefile_path): - return [-1, -1, ""] + return [-1, -1, "", int(time.time())] content = client.cat(donefile_path) content = content.split("\n")[-1].split("\t") last_save_day = int(content[0]) last_save_pass = int(content[3]) last_path = content[2] - return [last_save_day, last_save_pass, last_path] + xbox_base_key = int(content[1]) + return [last_save_day, last_save_pass, last_path, xbox_base_key] def get_online_pass_interval(self, days, hours, split_interval, split_per_pass, is_data_hourly_placed): -- GitLab