From a99bc64c63c1035108255a165b51a3e5b8febf6f Mon Sep 17 00:00:00 2001 From: jiaqi <173596896@qq.com> Date: Thu, 8 Aug 2019 23:36:03 +0800 Subject: [PATCH] add fleet util, add some interface in hdfs util (#18752) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add fleet util (fleet/utils/fleet_util.py): functions for users' convenience * add some interface in hdfs util : hdfs is_file、hdfs cat --- paddle/fluid/pybind/fleet_wrapper_py.cc | 1 + .../fluid/incubate/fleet/utils/fleet_util.py | 1316 +++++++++++++++++ .../paddle/fluid/incubate/fleet/utils/hdfs.py | 42 + python/setup.py.in | 3 +- 4 files changed, 1361 insertions(+), 1 deletion(-) create mode 100644 python/paddle/fluid/incubate/fleet/utils/fleet_util.py diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index 36fc0822e82..92cda13481f 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -43,6 +43,7 @@ void BindFleetWrapper(py::module* m) { py::class_(*m, "Fleet") .def(py::init()) .def("push_dense", &framework::FleetWrapper::PushDenseVarsSync) + .def("pull_dense", &framework::FleetWrapper::PullDenseVarsSync) .def("init_server", &framework::FleetWrapper::InitServer) .def("run_server", &framework::FleetWrapper::RunServer) .def("init_worker", &framework::FleetWrapper::InitWorker) diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py new file mode 100644 index 00000000000..d35382c01f3 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -0,0 +1,1316 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Fleet Utils""" + +import collections +import json +import logging +import math +import numpy as np +import os +import sys +import time +import paddle.fluid as fluid +from paddle.fluid.log_helper import get_logger +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +from . import hdfs +from .hdfs import * + +__all__ = ["FleetUtil"] + +_logger = get_logger( + __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') + + +class FleetUtil(object): + """ + FleetUtil provides some common functions for users' convenience. + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.rank0_print("my log") + + """ + + def rank0_print(self, s): + """ + Worker of rank 0 print some log. + + Args: + s(str): string to print + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.rank0_print("my log") + + """ + if fleet.worker_index() != 0: + return + print(s) + sys.stdout.flush() + + def rank0_info(self, s): + """ + Worker of rank 0 print some log info. + + Args: + s(str): string to log + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.rank0_info("my log info") + + """ + if fleet.worker_index() != 0: + return + _logger.info(s) + + def rank0_error(self, s): + """ + Worker of rank 0 print some log error. + + Args: + s(str): string to log + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.rank0_error("my log error") + + """ + if fleet.worker_index() != 0: + return + _logger.error(s) + + def set_zero(self, + var_name, + scope=fluid.global_scope(), + place=fluid.CPUPlace(), + param_type="int64"): + """ + Set tensor of a Variable to zero. + + Args: + var_name(str): name of Variable + scope(Scope): Scope object, default is fluid.global_scope() + place(Place): Place object, default is fluid.CPUPlace() + param_type(str): param data type, default is int64 + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.set_zero(myvar.name, myscope) + + """ + param = scope.var(var_name).get_tensor() + param_array = np.zeros(param._get_dims()).astype(param_type) + param.set(param_array, place) + + def print_global_auc(self, + scope=fluid.global_scope(), + stat_pos="_generated_var_2", + stat_neg="_generated_var_3", + print_prefix=""): + """ + Print global auc of all distributed workers. + + Args: + scope(Scope): Scope object, default is fluid.global_scope() + stat_pos(str): name of auc pos bucket Variable + stat_neg(str): name of auc neg bucket Variable + print_prefix(str): prefix of print auc + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.print_global_auc(myscope, stat_pos=stat_pos.name, + stat_neg=stat_neg.name) + + # below is part of model + emb = my_slot_net(slots, label) # emb can be fc layer of size 1 + similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(\ + emb, min=-15.0, max=15.0), name="similarity_norm")\ + binary_predict = fluid.layers.concat(input=[\ + fluid.layers.elementwise_sub(\ + fluid.layers.ceil(similarity_norm), similarity_norm),\ + similarity_norm], axis=1) + auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \ + stat_neg] = fluid.layers.auc(input=binary_predict,\ + label=label, curve='ROC',\ + num_thresholds=4096) + + """ + auc_value = self.get_global_auc(scope, stat_pos, stat_neg) + self.rank0_print(print_prefix + " global auc = %s" % auc_value) + + def get_global_auc(self, + scope=fluid.global_scope(), + stat_pos="_generated_var_2", + stat_neg="_generated_var_3"): + """ + Get global auc of all distributed workers. + + Args: + scope(Scope): Scope object, default is fluid.global_scope() + stat_pos(str): name of auc pos bucket Variable + stat_neg(str): name of auc neg bucket Variable + + Returns: + auc_value(float), total_ins_num(int) + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + auc_value, _ = fleet_util.get_global_auc(myscope, + stat_pos=stat_pos, + stat_neg=stat_neg) + + """ + if scope.find_var(stat_pos) is None or scope.find_var(stat_neg) is None: + self.rank0_print("not found auc bucket") + return None + fleet._role_maker._barrier_worker() + # auc pos bucket + pos = np.array(scope.find_var(stat_pos).get_tensor()) + # auc pos bucket shape + old_pos_shape = np.array(pos.shape) + # reshape to one dim + pos = pos.reshape(-1) + global_pos = np.copy(pos) * 0 + # mpi allreduce + fleet._role_maker._node_type_comm.Allreduce(pos, global_pos) + # reshape to its original shape + global_pos = global_pos.reshape(old_pos_shape) + + # auc neg bucket + neg = np.array(scope.find_var(stat_neg).get_tensor()) + old_neg_shape = np.array(neg.shape) + neg = neg.reshape(-1) + global_neg = np.copy(neg) * 0 + fleet._role_maker._node_type_comm.Allreduce(neg, global_neg) + global_neg = global_neg.reshape(old_neg_shape) + + # calculate auc + num_bucket = len(global_pos[0]) + area = 0.0 + pos = 0.0 + neg = 0.0 + new_pos = 0.0 + new_neg = 0.0 + total_ins_num = 0 + for i in xrange(num_bucket): + index = num_bucket - 1 - i + new_pos = pos + global_pos[0][index] + total_ins_num += global_pos[0][index] + new_neg = neg + global_neg[0][index] + total_ins_num += global_neg[0][index] + area += (new_neg - neg) * (pos + new_pos) / 2 + pos = new_pos + neg = new_neg + + auc_value = None + if pos * neg == 0 or total_ins_num == 0: + auc_value = 0.5 + else: + auc_value = area / (pos * neg) + + fleet._role_maker._barrier_worker() + return auc_value + + def load_fleet_model_one_table(self, table_id, path): + """ + load pslib model to one table + + Args: + table_id(int): load model to one table, default is None, which mean + load all table. + path(str): model path + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.load_fleet_model("hdfs:/my/model/path", table_id=1) + """ + fleet.load_one_table(table_id, path) + + def load_fleet_model(self, path, mode=0): + """ + load pslib model + + Args: + path(str): model path + mode(str): 0 or 1, which means load checkpoint or delta model, + default is 0 + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + + fleet_util.load_fleet_model("hdfs:/my/model/path") + + fleet_util.load_fleet_model("hdfs:/my/model/path", mode=0) + + """ + fleet.init_server(path, mode=mode) + + def save_fleet_model(self, path, mode=0): + """ + save pslib model + + Args: + path(str): model path + mode(str): 0 or 1, which means save checkpoint or delta model, + default is 0 + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_fleet_model("hdfs:/my/model/path") + + """ + fleet.save_persistables(None, path, mode=mode) + + def _get_xbox_str(self, + output_path, + day, + model_path, + xbox_base_key, + data_path, + monitor_data={}): + xbox_dict = collections.OrderedDict() + xbox_dict["id"] = int(time.time()) + xbox_dict["key"] = str(xbox_base_key) + xbox_dict["input"] = model_path.rstrip("/") + "/000" + xbox_dict["record_count"] = "111111" + xbox_dict["job_name"] = "default_job_name" + xbox_dict["ins_tag"] = "feasign" + xbox_dict["ins_path"] = data_path + job_id_with_host = os.popen("echo -n ${JOB_ID}").read().strip() + instance_id = os.popen("echo -n ${INSTANCE_ID}").read().strip() + start_pos = instance_id.find(job_id_with_host) + end_pos = instance_id.find("--") + if start_pos != -1 and end_pos != -1: + job_id_with_host = instance_id[start_pos:end_pos] + xbox_dict["job_id"] = job_id_with_host + # currently hard code here, set monitor_data empty string + xbox_dict["monitor_data"] = "" + xbox_dict["monitor_path"] = output_path.rstrip("/") + "/monitor/" \ + + day + ".txt" + xbox_dict["mpi_size"] = str(fleet.worker_num()) + return json.dumps(xbox_dict) + + def write_model_donefile(self, + output_path, + day, + pass_id, + xbox_base_key, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME", + donefile_name="donefile.txt"): + """ + write donefile when save model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + xbox_base_key(str|int): xbox base key + hadoop_fs_name(str): hdfs/afs fs name + hadoop_fs_ugi(str): hdfs/afs fs ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + donefile_name(str): donefile name, default is "donefile.txt" + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.write_model_donefile(output_path="hdfs:/my/output", + model_path="hdfs:/my/model", + day=20190723, + pass_id=66, + xbox_base_key=int(time.time()), + hadoop_fs_name="hdfs://xxx", + hadoop_fs_ugi="user,passwd") + + """ + day = str(day) + pass_id = str(pass_id) + xbox_base_key = int(xbox_base_key) + + if pass_id != "-1": + suffix_name = "/%s/%s/" % (day, pass_id) + model_path = output_path.rstrip("/") + suffix_name + else: + suffix_name = "/%s/0/" % day + model_path = output_path.rstrip("/") + suffix_name + + if fleet.worker_index() == 0: + donefile_path = output_path + "/" + donefile_name + content = "%s\t%lu\t%s\t%s\t%d" % (day, xbox_base_key,\ + model_path, pass_id, 0) + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + if client.is_file(donefile_path): + pre_content = client.cat(donefile_path) + pre_content_list = pre_content.split("\n") + day_list = [i.split("\t")[0] for i in pre_content_list] + pass_list = [i.split("\t")[3] for i in pre_content_list] + exist = False + for i in range(len(day_list)): + if int(day) == int(day_list[i]) and \ + int(pass_id) == int(pass_list[i]): + exist = True + break + if not exist: + with open(donefile_name, "w") as f: + f.write(pre_content + "\n") + f.write(content + "\n") + client.delete(donefile_path) + client.upload( + output_path, + donefile_name, + multi_processes=1, + overwrite=False) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + else: + self.rank0_error("not write %s because %s/%s already " + "exists" % (donefile_name, day, pass_id)) + else: + with open(donefile_name, "w") as f: + f.write(content + "\n") + client.upload( + output_path, + donefile_name, + multi_processes=1, + overwrite=False) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + fleet._role_maker._barrier_worker() + + def write_xbox_donefile(self, + output_path, + day, + pass_id, + xbox_base_key, + data_path, + hadoop_fs_name, + hadoop_fs_ugi, + monitor_data={}, + hadoop_home="$HADOOP_HOME", + donefile_name="xbox_patch_done.txt"): + """ + write delta donefile or xbox base donefile + + Args: + output_path(str): output path + day(str|int): training day of model + pass_id(str|int): training pass id of model + xbox_base_key(str|int): xbox base key + data_path(str|list): training data path + hadoop_fs_name(str): hdfs/afs fs name + hadoop_fs_ugi(str): hdfs/afs fs ugi + monitor_data(dict): metrics + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + donefile_name(str): donefile name, default is "donefile.txt" + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.write_xbox_donefile( + output_path="hdfs:/my/output/", + model_path="hdfs:/my/output/20190722/01", + day=20190722, + pass_id=1, + xbox_base_key=int(time.time()), + data_path="hdfs:/my/data/", + hadoop_fs_name="hdfs://xxx", + hadoop_fs_ugi="user,passwd", + monitor_data={} + ) + + """ + day = str(day) + pass_id = str(pass_id) + xbox_base_key = int(xbox_base_key) + + if pass_id != "-1": + suffix_name = "/%s/delta-%s/" % (day, pass_id) + model_path = output_path.rstrip("/") + suffix_name + else: + suffix_name = "/%s/base/" % day + model_path = output_path.rstrip("/") + suffix_name + + if isinstance(data_path, list): + data_path = ",".join(data_path) + + if fleet.worker_index() == 0: + donefile_path = output_path + "/" + donefile_name + xbox_str = self._get_xbox_str(output_path, day, model_path, \ + xbox_base_key, data_path, monitor_data={}) + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + if client.is_file(donefile_path): + pre_content = client.cat(donefile_path) + last_dict = json.loads(pre_content.split("\n")[-1]) + last_day = last_dict["input"].split("/")[-3] + last_pass = last_dict["input"].split("/")[-2].split("-")[-1] + exist = False + if int(day) < int(last_day) or \ + int(day) == int(last_day) and \ + int(pass_id) <= int(last_pass): + exist = True + if not exist: + with open(donefile_name, "w") as f: + f.write(pre_content + "\n") + f.write(xbox_str + "\n") + client.delete(donefile_path) + client.upload( + output_path, + donefile_name, + multi_processes=1, + overwrite=False) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + else: + self.rank0_error("not write %s because %s/%s already " + "exists" % (donefile_name, day, pass_id)) + else: + with open(donefile_name, "w") as f: + f.write(xbox_str + "\n") + client.upload( + output_path, + donefile_name, + multi_processes=1, + overwrite=False) + self.rank0_error("write %s/%s %s succeed" % \ + (day, pass_id, donefile_name)) + fleet._role_maker._barrier_worker() + + def write_cache_donefile(self, + output_path, + day, + pass_id, + key_num, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME", + donefile_name="sparse_cache.meta"): + """ + write cache donefile + + Args: + output_path(str): output path + day(str|int): training day of model + pass_id(str|int): training pass id of model + key_num(str|int): save cache return value + hadoop_fs_name(str): hdfs/afs fs name + hadoop_fs_ugi(str): hdfs/afs fs ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + donefile_name(str): donefile name, default is "sparse_cache.meta" + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.write_cache_donefile( + output_path="hdfs:/my/output/", + day=20190722, + pass_id=1, + key_num=123456, + hadoop_fs_name="hdfs://xxx", + hadoop_fs_ugi="user,passwd", + ) + + """ + day = str(day) + pass_id = str(pass_id) + key_num = int(key_num) + + if pass_id != "-1": + suffix_name = "/%s/delta-%s/000_cache" % (day, pass_id) + model_path = output_path.rstrip("/") + suffix_name + else: + suffix_name = "/%s/base/000_cache" % day + model_path = output_path.rstrip("/") + suffix_name + + if fleet.worker_index() == 0: + donefile_path = model_path + "/" + donefile_name + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + if client.is_file(donefile_path): + self.rank0_error( \ + "not write because %s already exists" % donefile_path) + else: + meta_str = \ + "file_prefix:part\npart_num:16\nkey_num:%d\n" % key_num + with open(donefile_name, "w") as f: + f.write(meta_str) + client.upload( + model_path, + donefile_name, + multi_processes=1, + overwrite=False) + self.rank0_error("write %s succeed" % donefile_path) + fleet._role_maker._barrier_worker() + + def load_model(self, output_path, day, pass_id): + """ + load pslib model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.load_model("hdfs:/my/path", 20190722, 88) + + """ + day = str(day) + pass_id = str(pass_id) + suffix_name = "/%s/%s/" % (day, pass_id) + load_path = output_path + suffix_name + self.rank0_error("going to load_model %s" % load_path) + self.load_fleet_model(load_path) + self.rank0_error("load_model done") + + def save_model(self, output_path, day, pass_id): + """ + save pslib model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_model("hdfs:/my/path", 20190722, 88) + + """ + day = str(day) + pass_id = str(pass_id) + suffix_name = "/%s/%s/" % (day, pass_id) + model_path = output_path + suffix_name + self.rank0_error("going to save_model %s" % model_path) + self.save_fleet_model(model_path) + self.rank0_error("save_model done") + + def save_batch_model(self, output_path, day): + """ + save batch model + + Args: + output_path(str): output path + day(str|int): training day + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_batch_model("hdfs:/my/path", 20190722) + + """ + day = str(day) + suffix_name = "/%s/0/" % day + model_path = output_path + suffix_name + self.rank0_error("going to save_model %s" % model_path) + fleet.save_persistables(None, model_path, mode=3) + self.rank0_error("save_batch_model done") + + def save_delta_model(self, output_path, day, pass_id): + """ + save delta model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_batch_model("hdfs:/my/path", 20190722, 88) + + """ + day = str(day) + pass_id = str(pass_id) + suffix_name = "/%s/delta-%s/" % (day, pass_id) + model_path = output_path + suffix_name + self.rank0_error("going to save_delta_model %s" % model_path) + fleet.save_persistables(None, model_path, mode=1) + self.rank0_error("save_delta_model done") + + def save_xbox_base_model(self, output_path, day): + """ + save xbox base model + + Args: + output_path(str): output path + day(str|int): training day + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722, 88) + + """ + day = str(day) + pass_id = str(pass_id) + suffix_name = "/%s/base/" % day + model_path = output_path + suffix_name + self.rank0_error("going to save_xbox_base_model " + model_path) + fleet.save_persistables(None, model_path, mode=2) + self.rank0_error("save_xbox_base_model done") + + def save_cache_model(self, output_path, day, pass_id): + """ + save cache model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + + Returns: + key_num(int): cache key num + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_cache_model("hdfs:/my/path", 20190722, 88) + + """ + day = str(day) + pass_id = str(pass_id) + suffix_name = "/%s/delta-%s" % (day, pass_id) + model_path = output_path.rstrip("/") + suffix_name + self.rank0_error("going to save_cache_model %s" % model_path) + key_num = fleet.save_cache_model(None, model_path, mode=0) + self.rank0_error("save_cache_model done") + return key_num + + def save_cache_base_model(self, output_path, day): + """ + save cache model + + Args: + output_path(str): output path + day(str|int): training day + pass_id(str|int): training pass id + + Returns: + key_num(int): cache key num + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_cache_base_model("hdfs:/my/path", 20190722) + + """ + day = str(day) + suffix_name = "/%s/base" % day + model_path = output_path.rstrip("/") + suffix_name + self.rank0_error("going to save_cache_model %s" % model_path) + key_num = fleet.save_cache_model(None, model_path, mode=0) + self.rank0_error("save_cache_model done") + return key_num + + def pull_all_dense_params(self, scope, program): + """ + pull all dense params in trainer of rank 0 + + Args: + scope(Scope): fluid Scope + program(Program): fluid Program + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.pull_all_dense_params(my_scope, my_program) + + """ + fleet._role_maker._barrier_worker() + if fleet._role_maker.is_first_worker(): + tables = fleet._dist_desc.trainer_param.dense_table + prog_id = str(id(program)) + prog_conf = fleet._opt_info['program_configs'][prog_id] + prog_tables = {} + for key in prog_conf: + if "dense" not in key: + continue + for table_id in prog_conf[key]: + prog_tables[int(table_id)] = 0 + for table in tables: + if int(table.table_id) not in prog_tables: + continue + var_name_list = [] + for i in range(0, len(table.dense_variable_name)): + var_name = table.dense_variable_name[i] + if scope.find_var(var_name) is None: + raise ValueError("var " + var_name + + " not found in scope " + + "when pull dense") + var_name_list.append(var_name) + fleet._fleet_ptr.pull_dense(scope, + int(table.table_id), var_name_list) + fleet._role_maker._barrier_worker() + + def save_paddle_params(self, + executor, + scope, + program, + model_name, + output_path, + day, + pass_id, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME", + var_names=None, + save_combine=True): + """ + save paddle model, and upload to hdfs dnn_plugin path + + Args: + executor(Executor): fluid Executor + scope(Scope): fluid Scope + program(Program): fluid Program + model_name(str): save model local dir or filename + output_path(str): hdfs/afs output path + day(str|int): training day + pass_id(str|int): training pass + hadoop_fs_name(str): hadoop fs name + hadoop_fs_ugi(str): hadoop fs ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + var_names(list): save persistable var names, default is None + save_combine(bool): whether to save in a file or seperate files, + default is True + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_paddle_params(exe, + join_scope, + join_program, + "paddle_dense.model.0", + "hdfs:/my/output/path/", + day=20190727, + pass_id=6, + hadoop_fs_name="xxx", + hadoop_fs_ugi="xxx,xxx", + var_names=join_all_var_names) + fleet_util.save_paddle_params(exe, + join_scope, + join_program, + "paddle_dense.model.usr.0", + "hdfs:/my/output/path/", + day=20190727, + pass_id=6, + hadoop_fs_name="xxx", + hadoop_fs_ugi="xxx,xxx", + var_names=join_user_var_names) + fleet_util.save_paddle_params(exe, + join_scope, + join_program, + "paddle_dense.model.item.0", + "hdfs:/my/output/path/", + day=20190727, + pass_id=6, + hadoop_fs_name="xxx", + hadoop_fs_ugi="xxx,xxx", + var_names=join_user_item_names) + + """ + day = str(day) + pass_id = str(pass_id) + # pull dense before save + self.pull_all_dense_params(scope, program) + if fleet.worker_index() == 0: + vars = [program.global_block().var(i) for i in var_names] + with fluid.scope_guard(scope): + if save_combine: + fluid.io.save_vars( + executor, "./", program, vars=vars, filename=model_name) + else: + fluid.io.save_vars(executor, model_name, program, vars=vars) + + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + + if pass_id == "-1": + dest = "%s/%s/base/dnn_plugin/" % (output_path, day) + else: + dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day, + pass_id) + if not client.is_exist(dest): + client.makedirs(dest) + + client.upload(dest, model_name) + + fleet._role_maker._barrier_worker() + + def get_last_save_model(self, + output_path, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME"): + """ + get last saved model info from donefile.txt + + Args: + output_path(str): output path + hadoop_fs_name(str): hdfs/afs fs_name + hadoop_fs_ugi(str): hdfs/afs fs_ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + + Returns: + [last_save_day, last_save_pass, last_path] + last_save_day(int): day of saved model + last_save_pass(int): pass id of saved + last_path(str): model path + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + last_save_day, last_save_pass, last_path = \ + fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722, 88) + + """ + last_save_day = -1 + last_save_pass = -1 + last_path = "" + donefile_path = output_path + "/donefile.txt" + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + if not client.is_file(donefile_path): + return [-1, -1, ""] + content = client.cat(donefile_path) + content = content.split("\n")[-1].split("\t") + last_save_day = int(content[0]) + last_save_pass = int(content[3]) + last_path = content[2] + return [last_save_day, last_save_pass, last_path] + + def get_online_pass_interval(self, days, hours, split_interval, + split_per_pass, is_data_hourly_placed): + """ + get online pass interval + + Args: + days(str): days to train + hours(str): hours to train + split_interval(int|str): split interval + split_per_pass(int}str): split per pass + is_data_hourly_placed(bool): is data hourly placed + + Returns: + online_pass_interval(list) + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + online_pass_interval = fleet_util.get_online_pass_interval( + days="{20190720..20190729}", + hours="{0..23}", + split_interval=5, + split_per_pass=2, + is_data_hourly_placed=False) + + """ + days = os.popen("echo -n " + days).read().split(" ") + hours = os.popen("echo -n " + hours).read().split(" ") + split_interval = int(split_interval) + split_per_pass = int(split_per_pass) + splits_per_day = 24 * 60 / split_interval + pass_per_day = splits_per_day / split_per_pass + left_train_hour = int(hours[0]) + right_train_hour = int(hours[-1]) + + start = 0 + split_path = [] + for i in range(splits_per_day): + h = start / 60 + m = start % 60 + if h < left_train_hour or h > right_train_hour: + start += split_interval + continue + if is_data_hourly_placed: + split_path.append("%02d" % h) + else: + split_path.append("%02d%02d" % (h, m)) + start += split_interval + + start = 0 + online_pass_interval = [] + for i in range(pass_per_day): + online_pass_interval.append([]) + for j in range(start, start + split_per_pass): + online_pass_interval[i].append(split_path[j]) + start += split_per_pass + + return online_pass_interval + + def get_global_metrics(self, + scope=fluid.global_scope(), + stat_pos_name="_generated_var_2", + stat_neg_name="_generated_var_3", + sqrerr_name="sqrerr", + abserr_name="abserr", + prob_name="prob", + q_name="q", + pos_ins_num_name="pos", + total_ins_num_name="total"): + """ + get global metrics, including auc, bucket_error, mae, rmse, + actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num. + + Args: + scope(Scope): Scope object, default is fluid.global_scope() + stat_pos_name(str): name of auc pos bucket Variable + stat_neg_name(str): name of auc neg bucket Variable + sqrerr_name(str): name of sqrerr Variable + abserr_name(str): name of abserr Variable + prob_name(str): name of prob Variable + q_name(str): name of q Variable + pos_ins_num_name(str): name of pos ins num Variable + total_ins_num_name(str): name of total ins num Variable + + Returns: + [auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc, + mean_predict_qvalue, total_ins_num] + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + metric_list = fleet_util.get_global_metrics(myscope, + stat_pos.nane, + stat_neg.name, + local_sqrerr.name, + local_abserr.name, + local_prob.name, + local_q.name, + local_pos_ins.name, + local_total_ins.name) + + # below is part of model + label = fluid.layers.data(name="click", shape=[-1, 1],\ + dtype="int64", lod_level=0, append_batch_size=False) + emb = my_slot_net(slots, label) # emb can be fc layer of size 1 + similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(\ + emb, min=-15.0, max=15.0), name="similarity_norm")\ + binary_predict = fluid.layers.concat(input=[\ + fluid.layers.elementwise_sub(\ + fluid.layers.ceil(similarity_norm), similarity_norm),\ + similarity_norm], axis=1) + auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \ + stat_neg] = fluid.layers.auc(input=binary_predict,\ + label=label, curve='ROC',\ + num_thresholds=4096) + local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins,\ + local_total_ins = fluid.contrib.layers.ctr_metric_bundle(\ + similarity_norm, label) + + """ + if scope.find_var(stat_pos_name) is None or \ + scope.find_var(stat_neg_name) is None: + self.rank0_print("not found auc bucket") + return [None] * 9 + elif scope.find_var(sqrerr_name) is None: + self.rank0_print("not found sqrerr_name=%s" % sqrerr_name) + return [None] * 9 + elif scope.find_var(abserr_name) is None: + self.rank0_print("not found abserr_name=%s" % abserr_name) + return [None] * 9 + elif scope.find_var(prob_name) is None: + self.rank0_print("not found prob_name=%s" % prob_name) + return [None] * 9 + elif scope.find_var(q_name) is None: + self.rank0_print("not found q_name=%s" % q_name) + return [None] * 9 + elif scope.find_var(pos_ins_num_name) is None: + self.rank0_print("not found pos_ins_num_name=%s" % pos_ins_num_name) + return [None] * 9 + elif scope.find_var(total_ins_num_name) is None: + self.rank0_print("not found total_ins_num_name=%s" % \ + total_ins_num_name) + return [None] * 9 + + # barrier worker to ensure all workers finished training + fleet._role_maker._barrier_worker() + + # get auc + auc = self.get_global_auc(scope, stat_pos_name, stat_neg_name) + pos = np.array(scope.find_var(stat_pos_name).get_tensor()) + # auc pos bucket shape + old_pos_shape = np.array(pos.shape) + # reshape to one dim + pos = pos.reshape(-1) + global_pos = np.copy(pos) * 0 + # mpi allreduce + fleet._role_maker._node_type_comm.Allreduce(pos, global_pos) + # reshape to its original shape + global_pos = global_pos.reshape(old_pos_shape) + # auc neg bucket + neg = np.array(scope.find_var(stat_neg_name).get_tensor()) + old_neg_shape = np.array(neg.shape) + neg = neg.reshape(-1) + global_neg = np.copy(neg) * 0 + fleet._role_maker._node_type_comm.Allreduce(neg, global_neg) + global_neg = global_neg.reshape(old_neg_shape) + + num_bucket = len(global_pos[0]) + + def get_metric(name): + metric = np.array(scope.find_var(name).get_tensor()) + old_metric_shape = np.array(metric.shape) + metric = metric.reshape(-1) + global_metric = np.copy(metric) * 0 + fleet._role_maker._node_type_comm.Allreduce(metric, global_metric) + global_metric = global_metric.reshape(old_metric_shape) + return global_metric[0] + + global_sqrerr = get_metric(sqrerr_name) + global_abserr = get_metric(abserr_name) + global_prob = get_metric(prob_name) + global_q_value = get_metric(q_name) + # note: get ins_num from auc bucket is not actual value, + # so get it from metric op + pos_ins_num = get_metric(pos_ins_num_name) + total_ins_num = get_metric(total_ins_num_name) + neg_ins_num = total_ins_num - pos_ins_num + + mae = global_abserr / total_ins_num + rmse = math.sqrt(global_sqrerr / total_ins_num) + actual_ctr = pos_ins_num / total_ins_num + predicted_ctr = global_prob / total_ins_num + mean_predict_qvalue = global_q_value / total_ins_num + copc = 0.0 + if abs(predicted_ctr > 1e-6): + copc = actual_ctr / predicted_ctr + + # calculate bucket error + last_ctr = -1.0 + impression_sum = 0.0 + ctr_sum = 0.0 + click_sum = 0.0 + error_sum = 0.0 + error_count = 0.0 + click = 0.0 + show = 0.0 + ctr = 0.0 + adjust_ctr = 0.0 + relative_error = 0.0 + actual_ctr = 0.0 + relative_ctr_error = 0.0 + k_max_span = 0.01 + k_relative_error_bound = 0.05 + for i in xrange(num_bucket): + click = global_pos[0][i] + show = global_pos[0][i] + global_neg[0][i] + ctr = float(i) / num_bucket + if abs(ctr - last_ctr) > k_max_span: + last_ctr = ctr + impression_sum = 0.0 + ctr_sum = 0.0 + click_sum = 0.0 + impression_sum += show + ctr_sum += ctr * show + click_sum += click + if impression_sum == 0: + continue + adjust_ctr = ctr_sum / impression_sum + if adjust_ctr == 0: + continue + relative_error = \ + math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum)) + if relative_error < k_relative_error_bound: + actual_ctr = click_sum / impression_sum + relative_ctr_error = abs(actual_ctr / adjust_ctr - 1) + error_sum += relative_ctr_error * impression_sum + error_count += impression_sum + last_ctr = -1 + + bucket_error = error_sum / error_count if error_count > 0 else 0.0 + + return [ + auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc, + mean_predict_qvalue, int(total_ins_num) + ] + + def print_global_metrics(self, + scope=fluid.global_scope(), + stat_pos_name="_generated_var_2", + stat_neg_name="_generated_var_3", + sqrerr_name="sqrerr", + abserr_name="abserr", + prob_name="prob", + q_name="q", + pos_ins_num_name="pos", + total_ins_num_name="total", + print_prefix=""): + """ + print global metrics, including auc, bucket_error, mae, rmse, + actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num. + + Args: + scope(Scope): Scope object, default is fluid.global_scope() + stat_pos_name(str): name of auc pos bucket Variable + stat_neg_name(str): name of auc neg bucket Variable + sqrerr_name(str): name of sqrerr Variable + abserr_name(str): name of abserr Variable + prob_name(str): name of prob Variable + q_name(str): name of q Variable + pos_ins_num_name(str): name of pos ins num Variable + total_ins_num_name(str): name of total ins num Variable + print_prefix(str): print prefix + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.print_global_metrics(myscope, + stat_pos.nane, + stat_neg.name, + local_sqrerr.name, + local_abserr.name, + local_prob.name, + local_q.name, + local_pos_ins.name, + local_total_ins.name) + + # below is part of model + label = fluid.layers.data(name="click", shape=[-1, 1],\ + dtype="int64", lod_level=0, append_batch_size=False) + emb = my_slot_net(slots, label) # emb can be fc layer of size 1 + similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(\ + emb, min=-15.0, max=15.0), name="similarity_norm")\ + binary_predict = fluid.layers.concat(input=[\ + fluid.layers.elementwise_sub(\ + fluid.layers.ceil(similarity_norm), similarity_norm),\ + similarity_norm], axis=1) + auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \ + stat_neg] = fluid.layers.auc(input=binary_predict,\ + label=label, curve='ROC',\ + num_thresholds=4096) + local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins, \ + local_total_ins = fluid.contrib.layers.ctr_metric_bundle(\ + similarity_norm, label) + + """ + if scope.find_var(stat_pos_name) is None or \ + scope.find_var(stat_neg_name) is None: + self.rank0_print("not found auc bucket") + return + elif scope.find_var(sqrerr_name) is None: + self.rank0_print("not found sqrerr_name=%s" % sqrerr_name) + return + elif scope.find_var(abserr_name) is None: + self.rank0_print("not found abserr_name=%s" % abserr_name) + return + elif scope.find_var(prob_name) is None: + self.rank0_print("not found prob_name=%s" % prob_name) + return + elif scope.find_var(q_name) is None: + self.rank0_print("not found q_name=%s" % q_name) + return + elif scope.find_var(pos_ins_num_name) is None: + self.rank0_print("not found pos_ins_num_name=%s" % pos_ins_num_name) + return + elif scope.find_var(total_ins_num_name) is None: + self.rank0_print("not found total_ins_num_name=%s" % \ + total_ins_num_name) + return + + auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc,\ + mean_predict_qvalue, total_ins_num = self.get_global_metrics(\ + scope, stat_pos_name, stat_neg_name, sqrerr_name, abserr_name,\ + prob_name, q_name, pos_ins_num_name, total_ins_num_name) + self.rank0_print("%s global AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f " + "RMSE=%.6f Actural_CTR=%.6f Predicted_CTR=%.6f " + "COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % + (print_prefix, auc, bucket_error, mae, rmse, + actual_ctr, predicted_ctr, copc, mean_predict_qvalue, + total_ins_num)) diff --git a/python/paddle/fluid/incubate/fleet/utils/hdfs.py b/python/paddle/fluid/incubate/fleet/utils/hdfs.py index 5468df42505..3e7390b2788 100644 --- a/python/paddle/fluid/incubate/fleet/utils/hdfs.py +++ b/python/paddle/fluid/incubate/fleet/utils/hdfs.py @@ -92,6 +92,22 @@ class HDFSClient(object): return ret_code, ret_out, ret_err + def cat(self, hdfs_path=None): + if self.is_file(hdfs_path): + exist_cmd = ['-cat', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd( + exist_cmd, retry_times=1) + if returncode != 0: + _logger.error("HDFS cat HDFS path: {} failed".format(hdfs_path)) + return "" + else: + _logger.error("HDFS cat HDFS path: {} succeed".format( + hdfs_path)) + return output.strip() + + else: + return "" + def is_exist(self, hdfs_path=None): """ whether the remote HDFS path exists @@ -141,6 +157,32 @@ class HDFSClient(object): hdfs_path)) return True + def is_file(self, hdfs_path=None): + """ + whether the remote HDFS path is file + + Args: + hdfs_path(str): the hdfs file path + + Returns: + True or False + """ + + if not self.is_exist(hdfs_path): + return False + + dir_cmd = ['-test', '-d', hdfs_path] + returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) + + if returncode == 0: + _logger.error("HDFS path: {} failed is not a file".format( + hdfs_path)) + return False + else: + _logger.info("HDFS path: {} successfully is a file".format( + hdfs_path)) + return True + def delete(self, hdfs_path): """ Remove a file or directory from HDFS. diff --git a/python/setup.py.in b/python/setup.py.in index 4a7a9323710..3587a3a2fef 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -133,7 +133,8 @@ packages=['paddle', 'paddle.fluid.incubate.fleet.parameter_server', 'paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler', 'paddle.fluid.incubate.fleet.parameter_server.pslib', - 'paddle.fluid.incubate.fleet.collective'] + 'paddle.fluid.incubate.fleet.collective', + 'paddle.fluid.incubate.fleet.utils'] with open('@PADDLE_SOURCE_DIR@/python/requirements.txt') as f: setup_requires = f.read().splitlines() -- GitLab