未验证 提交 1d11bf1a 编写于 作者: X xujiaqi01 提交者: GitHub

cherry-pick 1.7 add clear one table, fix dump slot and no_cvm (#23704)

* fix no_cvm in config_fleet (#22818)
* support get pslib version (#22835)
* add clear one table (#23089)
* fix dump slot in strategy (#23398)
上级 92cc33c0
......@@ -19,7 +19,7 @@ IF((NOT DEFINED PSLIB_VER) OR (NOT DEFINED PSLIB_URL))
MESSAGE(STATUS "use pre defined download url")
SET(PSLIB_VER "0.1.1" CACHE STRING "" FORCE)
SET(PSLIB_NAME "pslib" CACHE STRING "" FORCE)
SET(PSLIB_URL "https://pslib.bj.bcebos.com/pslib_37fecf5/pslib.tar.gz" CACHE STRING "" FORCE)
SET(PSLIB_URL "https://pslib.bj.bcebos.com/pslib.tar.gz" CACHE STRING "" FORCE)
ENDIF()
MESSAGE(STATUS "PSLIB_NAME: ${PSLIB_NAME}, PSLIB_URL: ${PSLIB_URL}")
SET(PSLIB_SOURCE_DIR "${THIRD_PARTY_PATH}/pslib")
......@@ -31,6 +31,7 @@ SET(PSLIB_ROOT ${PSLIB_INSTALL_DIR})
SET(PSLIB_INC_DIR ${PSLIB_ROOT}/include)
SET(PSLIB_LIB_DIR ${PSLIB_ROOT}/lib)
SET(PSLIB_LIB ${PSLIB_LIB_DIR}/libps.so)
SET(PSLIB_VERSION_PY ${PSLIB_DOWNLOAD_DIR}/pslib/version.py)
SET(PSLIB_IOMP_LIB ${PSLIB_LIB_DIR}/libiomp5.so) #todo what is this
SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${PSLIB_ROOT}/lib")
......
......@@ -804,6 +804,15 @@ void FleetWrapper::ClearModel() {
#endif
}
void FleetWrapper::ClearOneTable(const uint64_t table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->clear(table_id);
ret.wait();
#else
VLOG(0) << "FleetWrapper::ClearOneTable does nothing when no pslib";
#endif
}
void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope,
std::vector<std::string> var_list,
float decay, int emb_dim) {
......
......@@ -220,6 +220,8 @@ class FleetWrapper {
const std::vector<uint64_t>& feasign_list);
// clear all models, release their memory
void ClearModel();
// clear one table
void ClearOneTable(const uint64_t table_id);
// shrink sparse table
void ShrinkSparseTable(int table_id);
// shrink dense table
......
......@@ -59,6 +59,7 @@ void BindFleetWrapper(py::module* m) {
.def("save_cache", &framework::FleetWrapper::SaveCache)
.def("load_model", &framework::FleetWrapper::LoadModel)
.def("clear_model", &framework::FleetWrapper::ClearModel)
.def("clear_one_table", &framework::FleetWrapper::ClearOneTable)
.def("stop_server", &framework::FleetWrapper::StopServer)
.def("finalize_worker", &framework::FleetWrapper::FinalizeWorker)
.def("gather_servers", &framework::FleetWrapper::GatherServers)
......
......@@ -402,6 +402,23 @@ class PSLib(Fleet):
var_list, decay, emb_dim)
self._role_maker._barrier_worker()
def clear_one_table(self, table_id):
"""
clear_one_table() will be called by user. It will clear one table.
Args:
table_id(int): table id
Examples:
.. code-block:: python
fleet.clear_one_table(0)
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.clear_one_table(table_id)
self._role_maker._barrier_worker()
def clear_model(self):
"""
clear_model() will be called by user. It will clear sparse model.
......
......@@ -418,8 +418,13 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "")
opt_info["dump_param"] = strategy.get("dump_param", [])
if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class == "DownpourCtrAccessor":
0].accessor.accessor_class in [
"DownpourCtrAccessor", "DownpourCtrDoubleAccessor"
]:
opt_info["dump_slot"] = True
elif server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class == "DownpourSparseValueAccessor":
opt_info["no_cvm"] = True
opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {})
opt_info["copy_table"] = strategy.get("copy_table", {})
opt_info["loss_names"] = strategy.get("loss_names", [])
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test fleet."""
from __future__ import print_function
import os
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestFleet1(unittest.TestCase):
"""
Test cases for fleet minimize.
"""
def setUp(self):
"""Set up, set envs."""
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
try:
import netifaces
except:
print("warning: no netifaces, skip test_pslib_1")
return
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002"
os.environ["PADDLE_TRAINER_ID"] = "0"
role_maker = GeneralRoleMaker()
role_maker.generate_role()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
fleet.init(role_maker)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
show = fluid.layers.data(name="show", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
emb = fluid.layers.embedding(input=show, size=[1, 1], \
is_sparse=True, is_distributed=True, \
param_attr=fluid.ParamAttr(name="embedding"))
fc = fluid.layers.fc(input=emb, size=1, act=None)
label = fluid.layers.data(name="click", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
label_cast = fluid.layers.cast(label, dtype='float32')
cost = fluid.layers.log_loss(fc, label_cast)
try:
adam = fluid.optimizer.Adam(learning_rate=0.000005)
adam = fleet.distributed_optimizer(
adam,
strategy={
"embedding": {
"sparse_accessor_class": "DownpourSparseValueAccessor"
}
})
adam.minimize([cost], [scope])
fleet.run_server()
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test fleet."""
from __future__ import print_function
import os
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestFleet1(unittest.TestCase):
"""
Test cases for fleet minimize.
"""
def setUp(self):
"""Set up, set envs."""
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
try:
import netifaces
except:
print("warning: no netifaces, skip test_pslib_1")
return
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002"
os.environ["PADDLE_TRAINER_ID"] = "0"
role_maker = GeneralRoleMaker()
role_maker.generate_role()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
fleet.init(role_maker)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
show = fluid.layers.data(name="show", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
emb = fluid.layers.embedding(input=show, size=[1, 1], \
is_sparse=True, is_distributed=True, \
param_attr=fluid.ParamAttr(name="embedding"))
fc = fluid.layers.fc(input=emb, size=1, act=None)
label = fluid.layers.data(name="click", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
label_cast = fluid.layers.cast(label, dtype='float32')
cost = fluid.layers.log_loss(fc, label_cast)
try:
adam = fluid.optimizer.Adam(learning_rate=0.000005)
adam = fleet.distributed_optimizer(
adam,
strategy={
"embedding": {
"sparse_accessor_class": "DownpourCtrAccessor"
}
})
adam.minimize([cost], [scope])
fleet.run_server()
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__":
unittest.main()
......@@ -99,6 +99,7 @@ class TestCloudRoleMaker(unittest.TestCase):
except:
print("do not support pslib test, skip")
return
fleet.clear_one_table(0)
from paddle.fluid.incubate.fleet.base.role_maker import \
MPISymetricRoleMaker
try:
......
......@@ -195,6 +195,8 @@ else:
if '${WITH_PSLIB}' == 'ON':
shutil.copy('${PSLIB_LIB}', libs_path)
if os.path.exists('${PSLIB_VERSION_PY}'):
shutil.copy('${PSLIB_VERSION_PY}', '${PADDLE_BINARY_DIR}/python/paddle/fluid/incubate/fleet/parameter_server/pslib/')
package_data['paddle.libs'] += ['libps' + ext_name]
if '${WITH_MKLDNN}' == 'ON':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册