未验证 提交 b104ea06 编写于 作者: J jiaqi 提交者: GitHub

add get_last_save_xbox_base/get_last_save_xbox (#19122)

* add get_last_save_xbox_base/get_last_save_xbox
* fix fleet_util bug of load paddle model
* add doc string in fleet api
上级 bfd514c7
...@@ -401,7 +401,9 @@ void FleetWrapper::LoadFromPaddleModel(Scope& scope, const uint64_t table_id, ...@@ -401,7 +401,9 @@ void FleetWrapper::LoadFromPaddleModel(Scope& scope, const uint64_t table_id,
std::vector<std::string> var_list, std::vector<std::string> var_list,
std::string model_path, std::string model_path,
std::string model_proto_file, std::string model_proto_file,
std::vector<std::string> table_var_list,
bool load_combine) { bool load_combine) {
#ifdef PADDLE_WITH_PSLIB
// load ProgramDesc from model file // load ProgramDesc from model file
auto read_proto_func = [](const std::string& filename) -> ProgramDesc { auto read_proto_func = [](const std::string& filename) -> ProgramDesc {
std::string contents; std::string contents;
...@@ -467,7 +469,8 @@ void FleetWrapper::LoadFromPaddleModel(Scope& scope, const uint64_t table_id, ...@@ -467,7 +469,8 @@ void FleetWrapper::LoadFromPaddleModel(Scope& scope, const uint64_t table_id,
} }
} }
delete old_scope; delete old_scope;
PushDenseParamSync(scope, table_id, old_param_list); PushDenseParamSync(scope, table_id, table_var_list);
#endif
} }
void FleetWrapper::LoadModel(const std::string& path, const int mode) { void FleetWrapper::LoadModel(const std::string& path, const int mode) {
......
...@@ -136,6 +136,7 @@ class FleetWrapper { ...@@ -136,6 +136,7 @@ class FleetWrapper {
void LoadFromPaddleModel(Scope& scope, const uint64_t table_id, // NOLINT void LoadFromPaddleModel(Scope& scope, const uint64_t table_id, // NOLINT
std::vector<std::string> var_list, std::vector<std::string> var_list,
std::string model_path, std::string model_proto_file, std::string model_path, std::string model_proto_file,
std::vector<std::string> table_var_list,
bool load_combine); bool load_combine);
// mode = 0, load all feature // mode = 0, load all feature
// mode = 1, laod delta feature, which means load diff // mode = 1, laod delta feature, which means load diff
......
...@@ -170,6 +170,22 @@ class PSLib(Fleet): ...@@ -170,6 +170,22 @@ class PSLib(Fleet):
self._role_maker._finalize() self._role_maker._finalize()
def distributed_optimizer(self, optimizer, strategy={}): def distributed_optimizer(self, optimizer, strategy={}):
"""
distributed_optimizer
Args:
optimizer(Optimizer): optimizer
strategy(dict): strategy
Examples:
.. code-block:: python
fleet.distributed_optimizer(optimizer)
Returns:
optimizer(DownpourOptimizer): downpour optimizer
"""
self._optimizer = DownpourOptimizer(optimizer, strategy) self._optimizer = DownpourOptimizer(optimizer, strategy)
return self._optimizer return self._optimizer
...@@ -182,6 +198,20 @@ class PSLib(Fleet): ...@@ -182,6 +198,20 @@ class PSLib(Fleet):
export_for_deployment=True): export_for_deployment=True):
""" """
save pserver model called from a worker save pserver model called from a worker
Args:
executor(Executor): fluid executor
dirname(str): save model path
feeded_var_names(list): default None
target_vars(list): default None
main_program(Program): default None
export_for_deployment(bool): default None
Examples:
.. code-block:: python
fleet.save_inference_model(dirname="hdfs:/my/path")
""" """
self._fleet_ptr.save_model(dirname) self._fleet_ptr.save_model(dirname)
...@@ -332,6 +362,7 @@ class PSLib(Fleet): ...@@ -332,6 +362,7 @@ class PSLib(Fleet):
scope(Scope): Scope object scope(Scope): Scope object
model_proto_file(str): path of program desc proto binary model_proto_file(str): path of program desc proto binary
file, can be local or hdfs/afs file file, can be local or hdfs/afs file
var_names(list): var name list
load_combine(bool): load from a file or splited param files load_combine(bool): load from a file or splited param files
default False. default False.
...@@ -357,11 +388,13 @@ class PSLib(Fleet): ...@@ -357,11 +388,13 @@ class PSLib(Fleet):
mode = kwargs.get("mode", 0) mode = kwargs.get("mode", 0)
scope = kwargs.get("scope", None) scope = kwargs.get("scope", None)
model_proto_file = kwargs.get("model_proto_file", None) model_proto_file = kwargs.get("model_proto_file", None)
var_names = kwargs.get("var_names", None)
load_combine = kwargs.get("load_combine", False) load_combine = kwargs.get("load_combine", False)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
if scope is not None and model_proto_file is not None: if scope is not None and model_proto_file is not None:
self._load_one_table_from_paddle_model( self._load_one_table_from_paddle_model(scope, table_id, model_path,
scope, table_id, model_path, model_proto_file, load_combine) model_proto_file, var_names,
load_combine)
elif self._role_maker.is_first_worker(): elif self._role_maker.is_first_worker():
self._fleet_ptr.load_model_one_table(table_id, model_path, mode) self._fleet_ptr.load_model_one_table(table_id, model_path, mode)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
...@@ -371,6 +404,7 @@ class PSLib(Fleet): ...@@ -371,6 +404,7 @@ class PSLib(Fleet):
table_id, table_id,
model_path, model_path,
model_proto_file, model_proto_file,
var_names=None,
load_combine=False): load_combine=False):
""" """
load params from paddle model, and push params to pserver load params from paddle model, and push params to pserver
...@@ -381,6 +415,7 @@ class PSLib(Fleet): ...@@ -381,6 +415,7 @@ class PSLib(Fleet):
model_path(str): path of paddle model, can be local or hdfs/afs file model_path(str): path of paddle model, can be local or hdfs/afs file
model_proto_file(str): path of program desc proto binary file, model_proto_file(str): path of program desc proto binary file,
can be local or hdfs/afs file can be local or hdfs/afs file
var_names(list): load var names
load_combine(bool): load from a file or splited param files load_combine(bool): load from a file or splited param files
""" """
...@@ -415,17 +450,17 @@ class PSLib(Fleet): ...@@ -415,17 +450,17 @@ class PSLib(Fleet):
for i in self._opt_info["fleet_desc"].trainer_param.dense_table: for i in self._opt_info["fleet_desc"].trainer_param.dense_table:
if table_id is not None and table_id != i.table_id: if table_id is not None and table_id != i.table_id:
continue continue
var_list = [var for var in i.dense_variable_name] table_var_names = [var for var in i.dense_variable_name]
skip = False skip = False
for var in var_list: for var in table_var_names:
if scope.find_var(var) is None: if scope.find_var(var) is None:
skip = True skip = True
break break
if skip: if skip:
continue continue
self._fleet_ptr.load_from_paddle_model( self._fleet_ptr.load_from_paddle_model(
scope, table_id, var_list, model_path, model_proto_file, scope, table_id, var_names, model_path, model_proto_file,
load_combine) table_var_names, load_combine)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def _set_opt_info(self, opt_info): def _set_opt_info(self, opt_info):
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Fleet Utils""" """Fleet Utils."""
import collections import collections
import json import json
...@@ -310,11 +310,14 @@ class FleetUtil(object): ...@@ -310,11 +310,14 @@ class FleetUtil(object):
model_path, model_path,
xbox_base_key, xbox_base_key,
data_path, data_path,
hadoop_fs_name,
monitor_data={}): monitor_data={}):
xbox_dict = collections.OrderedDict() xbox_dict = collections.OrderedDict()
xbox_dict["id"] = int(time.time()) xbox_dict["id"] = str(int(time.time()))
xbox_dict["key"] = str(xbox_base_key) xbox_dict["key"] = str(xbox_base_key)
xbox_dict["input"] = model_path.rstrip("/") + "/000" if model_path.startswith("hdfs:") or model_path.startswith("afs:"):
model_path = model_path[model_path.find(":") + 1:]
xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000"
xbox_dict["record_count"] = "111111" xbox_dict["record_count"] = "111111"
xbox_dict["job_name"] = "default_job_name" xbox_dict["job_name"] = "default_job_name"
xbox_dict["ins_tag"] = "feasign" xbox_dict["ins_tag"] = "feasign"
...@@ -437,7 +440,7 @@ class FleetUtil(object): ...@@ -437,7 +440,7 @@ class FleetUtil(object):
hadoop_fs_ugi, hadoop_fs_ugi,
monitor_data={}, monitor_data={},
hadoop_home="$HADOOP_HOME", hadoop_home="$HADOOP_HOME",
donefile_name="xbox_patch_done.txt"): donefile_name=None):
""" """
write delta donefile or xbox base donefile write delta donefile or xbox base donefile
...@@ -451,7 +454,7 @@ class FleetUtil(object): ...@@ -451,7 +454,7 @@ class FleetUtil(object):
hadoop_fs_ugi(str): hdfs/afs fs ugi hadoop_fs_ugi(str): hdfs/afs fs ugi
monitor_data(dict): metrics monitor_data(dict): metrics
hadoop_home(str): hadoop home, default is "$HADOOP_HOME" hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is "donefile.txt" donefile_name(str): donefile name, default is None"
Examples: Examples:
.. code-block:: python .. code-block:: python
...@@ -478,9 +481,13 @@ class FleetUtil(object): ...@@ -478,9 +481,13 @@ class FleetUtil(object):
if pass_id != "-1": if pass_id != "-1":
suffix_name = "/%s/delta-%s/" % (day, pass_id) suffix_name = "/%s/delta-%s/" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
if donefile_name is None:
donefile_name = "xbox_patch_done.txt"
else: else:
suffix_name = "/%s/base/" % day suffix_name = "/%s/base/" % day
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
if donefile_name is None:
donefile_name = "xbox_base_done.txt"
if isinstance(data_path, list): if isinstance(data_path, list):
data_path = ",".join(data_path) data_path = ",".join(data_path)
...@@ -488,7 +495,7 @@ class FleetUtil(object): ...@@ -488,7 +495,7 @@ class FleetUtil(object):
if fleet.worker_index() == 0: if fleet.worker_index() == 0:
donefile_path = output_path + "/" + donefile_name donefile_path = output_path + "/" + donefile_name
xbox_str = self._get_xbox_str(output_path, day, model_path, \ xbox_str = self._get_xbox_str(output_path, day, model_path, \
xbox_base_key, data_path, monitor_data={}) xbox_base_key, data_path, hadoop_fs_name, monitor_data={})
configs = { configs = {
"fs.default.name": hadoop_fs_name, "fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi "hadoop.job.ugi": hadoop_fs_ugi
...@@ -717,14 +724,13 @@ class FleetUtil(object): ...@@ -717,14 +724,13 @@ class FleetUtil(object):
""" """
day = str(day) day = str(day)
pass_id = str(pass_id)
suffix_name = "/%s/base/" % day suffix_name = "/%s/base/" % day
model_path = output_path + suffix_name model_path = output_path + suffix_name
self.rank0_print("going to save_xbox_base_model " + model_path) self.rank0_print("going to save_xbox_base_model " + model_path)
fleet.save_persistables(None, model_path, mode=2) fleet.save_persistables(None, model_path, mode=2)
self.rank0_print("save_xbox_base_model done") self.rank0_print("save_xbox_base_model done")
def save_cache_model(self, output_path, day, pass_id): def save_cache_model(self, output_path, day, pass_id, mode=1):
""" """
save cache model save cache model
...@@ -732,6 +738,7 @@ class FleetUtil(object): ...@@ -732,6 +738,7 @@ class FleetUtil(object):
output_path(str): output path output_path(str): output path
day(str|int): training day day(str|int): training day
pass_id(str|int): training pass id pass_id(str|int): training pass id
mode(str|int): save mode
Returns: Returns:
key_num(int): cache key num key_num(int): cache key num
...@@ -746,10 +753,11 @@ class FleetUtil(object): ...@@ -746,10 +753,11 @@ class FleetUtil(object):
""" """
day = str(day) day = str(day)
pass_id = str(pass_id) pass_id = str(pass_id)
mode = int(mode)
suffix_name = "/%s/delta-%s" % (day, pass_id) suffix_name = "/%s/delta-%s" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_model %s" % model_path) self.rank0_print("going to save_cache_model %s" % model_path)
key_num = fleet.save_cache_model(None, model_path, mode=1) key_num = fleet.save_cache_model(None, model_path, mode=mode)
self.rank0_print("save_cache_model done") self.rank0_print("save_cache_model done")
return key_num return key_num
...@@ -922,6 +930,97 @@ class FleetUtil(object): ...@@ -922,6 +930,97 @@ class FleetUtil(object):
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
def get_last_save_xbox_base(self,
output_path,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME"):
"""
get last saved base xbox info from xbox_base_done.txt
Args:
output_path(str): output path
hadoop_fs_name(str): hdfs/afs fs_name
hadoop_fs_ugi(str): hdfs/afs fs_ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
Returns:
[last_save_day, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
last_save_day, last_path, xbox_base_key = \
fleet_util.get_last_save_xbox_base("hdfs:/my/path", 20190722,
88)
"""
donefile_path = output_path + "/xbox_base_done.txt"
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if not client.is_file(donefile_path):
return [-1, -1, int(time.time())]
pre_content = client.cat(donefile_path)
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = int(last_dict["input"].split("/")[-3])
last_path = "/".join(last_dict["input"].split("/")[:-1])
xbox_base_key = int(last_dict["key"])
return [last_day, last_path, xbox_base_key]
def get_last_save_xbox(self,
output_path,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME"):
"""
get last saved xbox info from xbox_patch_done.txt
Args:
output_path(str): output path
hadoop_fs_name(str): hdfs/afs fs_name
hadoop_fs_ugi(str): hdfs/afs fs_ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
Returns:
[last_save_day, last_save_pass, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_save_pass(int): pass id of saved
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
last_save_day, last_save_pass, last_path, xbox_base_key = \
fleet_util.get_last_save_xbox("hdfs:/my/path", 20190722, 88)
"""
donefile_path = output_path + "/xbox_patch_done.txt"
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if not client.is_file(donefile_path):
return [-1, -1, "", int(time.time())]
pre_content = client.cat(donefile_path)
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = int(last_dict["input"].split("/")[-3])
last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1])
last_path = "/".join(last_dict["input"].split("/")[:-1])
xbox_base_key = int(last_dict["key"])
return [last_day, last_pass, last_path, xbox_base_key]
def get_last_save_model(self, def get_last_save_model(self,
output_path, output_path,
hadoop_fs_name, hadoop_fs_name,
...@@ -937,18 +1036,19 @@ class FleetUtil(object): ...@@ -937,18 +1036,19 @@ class FleetUtil(object):
hadoop_home(str): hadoop home, default is "$HADOOP_HOME" hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
Returns: Returns:
[last_save_day, last_save_pass, last_path] [last_save_day, last_save_pass, last_path, xbox_base_key]
last_save_day(int): day of saved model last_save_day(int): day of saved model
last_save_pass(int): pass id of saved last_save_pass(int): pass id of saved
last_path(str): model path last_path(str): model path
xbox_base_key(int): xbox key
Examples: Examples:
.. code-block:: python .. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil() fleet_util = FleetUtil()
last_save_day, last_save_pass, last_path = \ last_save_day, last_save_pass, last_path, xbox_base_key = \
fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722, 88) fleet_util.get_last_save_model("hdfs:/my/path", 20190722, 88)
""" """
last_save_day = -1 last_save_day = -1
...@@ -961,13 +1061,14 @@ class FleetUtil(object): ...@@ -961,13 +1061,14 @@ class FleetUtil(object):
} }
client = HDFSClient(hadoop_home, configs) client = HDFSClient(hadoop_home, configs)
if not client.is_file(donefile_path): if not client.is_file(donefile_path):
return [-1, -1, ""] return [-1, -1, "", int(time.time())]
content = client.cat(donefile_path) content = client.cat(donefile_path)
content = content.split("\n")[-1].split("\t") content = content.split("\n")[-1].split("\t")
last_save_day = int(content[0]) last_save_day = int(content[0])
last_save_pass = int(content[3]) last_save_pass = int(content[3])
last_path = content[2] last_path = content[2]
return [last_save_day, last_save_pass, last_path] xbox_base_key = int(content[1])
return [last_save_day, last_save_pass, last_path, xbox_base_key]
def get_online_pass_interval(self, days, hours, split_interval, def get_online_pass_interval(self, days, hours, split_interval,
split_per_pass, is_data_hourly_placed): split_per_pass, is_data_hourly_placed):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册