未验证 提交 89daf8e6 编写于 作者: X xujiaqi01 提交者: GitHub

cherry-pick1.7 save prefix, unit accessor (#23789)

* add save with prefix (#23449)
* add unit accessor (#23703)
* test=develop
上级 1d11bf1a
...@@ -718,6 +718,38 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) { ...@@ -718,6 +718,38 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) {
#endif #endif
} }
void FleetWrapper::SaveModelOneTable(const uint64_t table_id,
const std::string& path, const int mode) {
#ifdef PADDLE_WITH_PSLIB
auto ret =
pslib_ptr_->_worker_ptr->save(table_id, path, std::to_string(mode));
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "save model of table id: " << table_id
<< ", to path: " << path << " failed";
}
#else
VLOG(0) << "FleetWrapper::SaveModelOneTable does nothing when no pslib";
#endif
}
void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id,
const std::string& path,
const int mode,
const std::string& prefix) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->save(table_id, path, std::to_string(mode),
prefix);
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "save model (with prefix) of table id: " << table_id
<< ", to path: " << path << " failed";
}
#else
VLOG(0) << "FleetWrapper::SaveModelOneTablePrefix does nothing when no pslib";
#endif
}
void FleetWrapper::PrintTableStat(const uint64_t table_id) { void FleetWrapper::PrintTableStat(const uint64_t table_id) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id); auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id);
......
...@@ -195,15 +195,22 @@ class FleetWrapper { ...@@ -195,15 +195,22 @@ class FleetWrapper {
void PrintTableStat(const uint64_t table_id); void PrintTableStat(const uint64_t table_id);
// mode = 0, load all feature // mode = 0, load all feature
// mode = 1, laod delta feature, which means load diff // mode = 1, load delta feature, which means load diff
void LoadModel(const std::string& path, const int mode); void LoadModel(const std::string& path, const int mode);
// mode = 0, load all feature // mode = 0, load all feature
// mode = 1, laod delta feature, which means load diff // mode = 1, load delta feature, which means load diff
void LoadModelOneTable(const uint64_t table_id, const std::string& path, void LoadModelOneTable(const uint64_t table_id, const std::string& path,
const int mode); const int mode);
// mode = 0, save all feature // mode = 0, save all feature
// mode = 1, save delta feature, which means save diff // mode = 1, save delta feature, which means save diff
void SaveModel(const std::string& path, const int mode); void SaveModel(const std::string& path, const int mode);
// mode = 0, save all feature
// mode = 1, save delta feature, which means save diff
void SaveModelOneTable(const uint64_t table_id, const std::string& path,
const int mode);
// save model with prefix
void SaveModelOneTablePrefix(const uint64_t table_id, const std::string& path,
const int mode, const std::string& prefix);
// get save cache threshold // get save cache threshold
double GetCacheThreshold(int table_id); double GetCacheThreshold(int table_id);
// shuffle cache model between servers // shuffle cache model between servers
......
...@@ -61,5 +61,7 @@ TEST(TEST_FLEET, fleet_1) { ...@@ -61,5 +61,7 @@ TEST(TEST_FLEET, fleet_1) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
#else #else
fleet->RunServer("", 0); fleet->RunServer("", 0);
fleet->SaveModelOneTable(0, "", 0);
fleet->SaveModelOneTablePrefix(0, "", 0, "");
#endif #endif
} }
...@@ -78,6 +78,9 @@ void BindFleetWrapper(py::module* m) { ...@@ -78,6 +78,9 @@ void BindFleetWrapper(py::module* m) {
&framework::FleetWrapper::SetClient2ClientConfig) &framework::FleetWrapper::SetClient2ClientConfig)
.def("set_pull_local_thread_num", .def("set_pull_local_thread_num",
&framework::FleetWrapper::SetPullLocalThreadNum) &framework::FleetWrapper::SetPullLocalThreadNum)
.def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable)
.def("save_model_one_table_with_prefix",
&framework::FleetWrapper::SaveModelOneTablePrefix)
.def("copy_table", &framework::FleetWrapper::CopyTable) .def("copy_table", &framework::FleetWrapper::CopyTable)
.def("copy_table_by_feasign", .def("copy_table_by_feasign",
&framework::FleetWrapper::CopyTableByFeasign); &framework::FleetWrapper::CopyTableByFeasign);
......
...@@ -567,6 +567,88 @@ class PSLib(Fleet): ...@@ -567,6 +567,88 @@ class PSLib(Fleet):
model_proto_file, table_var_names, load_combine) model_proto_file, table_var_names, load_combine)
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def load_model(self, model_dir=None, **kwargs):
"""
load pslib model, there are at least 4 modes, these modes are the same
in load one table/save model/save one table:
0: load checkpoint model
1: load delta model (delta means diff, it's usually for online predict)
2: load base model (base model filters some feasigns in checkpoint, it's
usually for online predict)
3: load batch model (do some statistic works in checkpoint, such as
calculate unseen days of each feasign)
Args:
model_dir(str): if you use hdfs, model_dir should starts with
'hdfs:', otherwise means local dir
kwargs(dict): user-defined properties.
mode(int): the modes illustrated above, default 0
Examples:
.. code-block:: python
fleet.load_model("afs:/user/path/")
"""
mode = kwargs.get("mode", 0)
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.load_model(model_dir, mode)
self._role_maker._barrier_worker()
def save_model(self, model_dir=None, **kwargs):
"""
save pslib model, the modes are same with load model.
Args:
model_dir(str): if you use hdfs, model_dir should starts with
'hdfs:', otherwise means local dir
kwargs(dict): user-defined properties.
mode(int): the modes illustrated above, default 0
Examples:
.. code-block:: python
fleet.save_model("afs:/user/path/")
"""
mode = kwargs.get("mode", 0)
prefix = kwargs.get("prefix", None)
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.save_model(model_dir, mode)
self._role_maker._barrier_worker()
def save_one_table(self, table_id, model_dir, **kwargs):
"""
save pslib model's one table, the modes are same with load model.
Args:
table_id(int): table id
model_dir(str): if you use hdfs, model_dir should starts with
'hdfs:', otherwise means local dir
kwargs(dict): user-defined properties.
mode(int): the modes illustrated above, default 0
prefix(str): the parts to save can have prefix,
for example, part-prefix-000-00000
Examples:
.. code-block:: python
fleet.save_one_table("afs:/user/path/")
"""
mode = kwargs.get("mode", 0)
prefix = kwargs.get("prefix", None)
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
if prefix is not None:
self._fleet_ptr.save_model_one_table_with_prefix(
table_id, model_dir, mode, prefix)
else:
self._fleet_ptr.save_model_one_table(table_id, model_dir, mode)
self._role_maker._barrier_worker()
def _set_opt_info(self, opt_info): def _set_opt_info(self, opt_info):
""" """
this function saves the result from DistributedOptimizer.minimize() this function saves the result from DistributedOptimizer.minimize()
......
...@@ -81,7 +81,12 @@ class DownpourServer(Server): ...@@ -81,7 +81,12 @@ class DownpourServer(Server):
'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \ 'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \
'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \ 'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \
'sparse_cache_file_num', 'sparse_beta1_decay_rate', 'sparse_beta2_decay_rate', \ 'sparse_cache_file_num', 'sparse_beta1_decay_rate', 'sparse_beta2_decay_rate', \
'sparse_ada_epsilon', 'sparse_optimizer'] 'sparse_ada_epsilon', 'sparse_optimizer', 'sparse_ssd_unseenday_threshold', \
'embed_sparse_optimizer', 'embed_sparse_learning_rate', 'embed_sparse_weight_bounds', \
'embed_sparse_initial_range', 'embed_sparse_initial_g2sum', 'embed_sparse_beta1_decay_rate', \
'embed_sparse_beta2_decay_rate', 'embedx_sparse_optimizer', 'embedx_sparse_learning_rate', \
'embedx_sparse_weight_bounds', 'embedx_sparse_initial_range', 'embedx_sparse_initial_g2sum', \
'embedx_sparse_beta1_decay_rate', 'embedx_sparse_beta2_decay_rate']
for key in strategy: for key in strategy:
if key not in support_sparse_key_list: if key not in support_sparse_key_list:
...@@ -112,10 +117,13 @@ class DownpourServer(Server): ...@@ -112,10 +117,13 @@ class DownpourServer(Server):
# DownpourFeatureValueAccessor: for ctr task, has cvm, embedding and sgd info # DownpourFeatureValueAccessor: for ctr task, has cvm, embedding and sgd info
# DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info # DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info
# DownpourSparseValueAccessor : for general task, has embedding and sgd info # DownpourSparseValueAccessor : for general task, has embedding and sgd info
# DownpourCtrDoubleAccessor : for ctr task, which show clk are in double
# DownpourUnitAccessor : for ctr task, has cvm, slot, embedding and sgd info
support_accessor_class = [ support_accessor_class = [
'DownpourFeatureValueAccessor', 'DownpourCtrAccessor', 'DownpourFeatureValueAccessor', 'DownpourCtrAccessor',
'DownpourSparseValueAccessor' 'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor',
'DownpourUnitAccessor'
] ]
if strategy.get('sparse_accessor_class') is not None: if strategy.get('sparse_accessor_class') is not None:
accessor_class = strategy.get('sparse_accessor_class') accessor_class = strategy.get('sparse_accessor_class')
...@@ -128,7 +136,9 @@ class DownpourServer(Server): ...@@ -128,7 +136,9 @@ class DownpourServer(Server):
table.accessor.accessor_class = accessor_class table.accessor.accessor_class = accessor_class
if accessor_class == 'DownpourFeatureValueAccessor' or accessor_class == 'DownpourCtrAccessor': if accessor_class == 'DownpourFeatureValueAccessor' \
or accessor_class == 'DownpourCtrAccessor' \
or accessor_class == 'DownpourCtrDoubleAccessor':
table.accessor.sparse_sgd_param.learning_rate = strategy.get( table.accessor.sparse_sgd_param.learning_rate = strategy.get(
'sparse_learning_rate', 0.05) 'sparse_learning_rate', 0.05)
table.accessor.sparse_sgd_param.initial_g2sum = strategy.get( table.accessor.sparse_sgd_param.initial_g2sum = strategy.get(
...@@ -241,6 +251,12 @@ class DownpourServer(Server): ...@@ -241,6 +251,12 @@ class DownpourServer(Server):
table2.param = 2 table2.param = 2
table2.converter = converter table2.converter = converter
table2.deconverter = deconverter table2.deconverter = deconverter
elif accessor_class == 'DownpourUnitAccessor':
self.add_sparse_table_common_config(table, strategy)
self.add_sparse_optimizer(table.accessor.embed_sgd_param,
strategy, "embed_")
self.add_sparse_optimizer(table.accessor.embedx_sgd_param,
strategy, "embedx_")
def add_dense_table(self, table_id, param_var, grad_var, strategy, def add_dense_table(self, table_id, param_var, grad_var, strategy,
sparse_table_names): sparse_table_names):
...@@ -360,6 +376,77 @@ class DownpourServer(Server): ...@@ -360,6 +376,77 @@ class DownpourServer(Server):
'datanorm_decay_rate', 0.999999) 'datanorm_decay_rate', 0.999999)
table.accessor.fea_dim = fea_dim table.accessor.fea_dim = fea_dim
def add_sparse_optimizer(self, sgd, strategy, prefix):
optimizer_name = strategy.get(prefix + "sparse_optimizer", "adam")
sgd.name = optimizer_name
if optimizer_name == "naive":
sgd.naive.learning_rate = \
strategy.get(prefix + 'sparse_learning_rate', 0.05)
sgd.naive.initial_range = \
strategy.get(prefix + 'sparse_initial_range', 1e-4)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.naive.weight_bounds.extend(bounds)
elif optimizer_name == "adagrad":
sgd.adagrad.learning_rate = \
strategy.get(prefix + 'sparse_learning_rate', 0.05)
sgd.adagrad.initial_range = \
strategy.get(prefix + 'sparse_initial_range', 1e-4)
sgd.adagrad.initial_g2sum = strategy.get(
prefix + 'sparse_initial_g2sum', 3)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adagrad.weight_bounds.extend(bounds)
elif optimizer_name == "adam":
sgd.adam.learning_rate = \
strategy.get(prefix + 'sparse_learning_rate', 0.001)
sgd.adam.initial_range = \
strategy.get(prefix + 'sparse_initial_range', 1e-4)
sgd.adam.beta1_decay_rate = strategy.get(
prefix + 'sparse_beta1_decay_rate', 0.9)
sgd.adam.beta2_decay_rate = strategy.get(
prefix + 'sparse_beta2_decay_rate', 0.999)
sgd.adam.ada_epsilon = strategy.get(prefix + 'sparse_ada_epsilon',
1e-8)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adam.weight_bounds.extend(bounds)
def add_sparse_table_common_config(self, table, strategy):
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.embedx_threshold = strategy.get(
'sparse_embedx_threshold', 10)
table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3
table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get(
'sparse_nonclk_coeff', 0.1)
table.accessor.downpour_accessor_param.click_coeff = strategy.get(
'sparse_click_coeff', 1)
table.accessor.downpour_accessor_param.base_threshold = strategy.get(
'sparse_base_threshold', 1.5)
table.accessor.downpour_accessor_param.delta_threshold = strategy.get(
'sparse_delta_threshold', 0.25)
table.accessor.downpour_accessor_param.delta_keep_days = strategy.get(
'sparse_delta_keep_days', 16)
table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get(
'sparse_delete_after_unseen_days', 30)
table.accessor.downpour_accessor_param.show_click_decay_rate = strategy.get(
'sparse_show_click_decay_rate', 0.98)
table.accessor.downpour_accessor_param.delete_threshold = strategy.get(
'sparse_delete_threshold', 0.8)
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)")
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)")
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
def get_desc(self): def get_desc(self):
""" """
Return downpour server program_desc Return downpour server program_desc
......
...@@ -419,7 +419,8 @@ class DistributedAdam(DistributedOptimizerImplBase): ...@@ -419,7 +419,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["dump_param"] = strategy.get("dump_param", []) opt_info["dump_param"] = strategy.get("dump_param", [])
if server._server.downpour_server_param.downpour_table_param[ if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class in [ 0].accessor.accessor_class in [
"DownpourCtrAccessor", "DownpourCtrDoubleAccessor" "DownpourCtrAccessor", "DownpourCtrDoubleAccessor",
"DownpourUnitAccessor"
]: ]:
opt_info["dump_slot"] = True opt_info["dump_slot"] = True
elif server._server.downpour_server_param.downpour_table_param[ elif server._server.downpour_server_param.downpour_table_param[
......
...@@ -21,7 +21,8 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker ...@@ -21,7 +21,8 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestFleet1(unittest.TestCase): class TestFleet1(unittest.TestCase):
""" """
Test cases for fleet minimize. Test cases for fleet minimize,
and some other fleet apu tests.
""" """
def setUp(self): def setUp(self):
...@@ -80,6 +81,25 @@ class TestFleet1(unittest.TestCase): ...@@ -80,6 +81,25 @@ class TestFleet1(unittest.TestCase):
except: except:
print("do not support pslib test, skip") print("do not support pslib test, skip")
return return
try:
# worker should call these methods instead of server
# the following is only for test when with_pslib=off
def test_func():
"""
it is only a test function
"""
return True
fleet._role_maker.is_first_worker = test_func
fleet._role_maker._barrier_worker = test_func
fleet.save_model("./model_000")
fleet.save_one_table(0, "./model_001")
fleet.save_one_table(0, "./model_002", prefix="hahaha")
fleet.load_model("./model_0003")
fleet.load_one_table(0, "./model_004")
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -80,6 +80,25 @@ class TestFleet1(unittest.TestCase): ...@@ -80,6 +80,25 @@ class TestFleet1(unittest.TestCase):
except: except:
print("do not support pslib test, skip") print("do not support pslib test, skip")
return return
try:
# worker should call these methods instead of server
# the following is only for test when with_pslib=off
def test_func():
"""
it is only a test function
"""
return True
fleet._role_maker.is_first_worker = test_func
fleet._role_maker._barrier_worker = test_func
fleet.save_model("./model_000")
fleet.save_one_table(0, "./model_001")
fleet.save_one_table(0, "./model_002", prefix="hahaha")
fleet.load_model("./model_0003")
fleet.load_one_table(0, "./model_004")
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__": if __name__ == "__main__":
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test fleet."""
from __future__ import print_function
import os
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestFleet1(unittest.TestCase):
"""
Test cases for fleet minimize.
"""
def setUp(self):
"""Set up, set envs."""
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
try:
import netifaces
except:
print("warning: no netifaces, skip test_pslib_1")
return
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002"
os.environ["PADDLE_TRAINER_ID"] = "0"
role_maker = GeneralRoleMaker()
role_maker.generate_role()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
fleet.init(role_maker)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
show = fluid.layers.data(name="show", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
emb = fluid.layers.embedding(input=show, size=[1, 1], \
is_sparse=True, is_distributed=True, \
param_attr=fluid.ParamAttr(name="embedding"))
fc = fluid.layers.fc(input=emb, size=1, act=None)
label = fluid.layers.data(name="click", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
label_cast = fluid.layers.cast(label, dtype='float32')
cost = fluid.layers.log_loss(fc, label_cast)
strategy = {}
strategy["embedding"] = {}
strategy["embedding"]["sparse_accessor_class"] = "DownpourUnitAccessor"
strategy["embedding"]["embed_sparse_optimizer"] = "naive"
try:
adam1 = fluid.optimizer.Adam(learning_rate=0.000005)
adam1 = fleet.distributed_optimizer(adam1, strategy=strategy)
adam1.minimize([cost], [scope])
strategy["embedding"]["embed_sparse_optimizer"] = "adagrad"
adam2 = fluid.optimizer.Adam(learning_rate=0.000005)
adam2 = fleet.distributed_optimizer(adam2, strategy=strategy)
adam2.minimize([cost], [scope])
strategy["embedding"]["embed_sparse_optimizer"] = "adam"
adam3 = fluid.optimizer.Adam(learning_rate=0.000005)
adam3 = fleet.distributed_optimizer(adam3, strategy=strategy)
adam3.minimize([cost], [scope])
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册