diff --git a/cmake/external/pslib.cmake b/cmake/external/pslib.cmake index 959386166708537fad3bce7d022b2ee2648aa440..bdfd335172d877d7e294c898dad7e3a554f5531c 100644 --- a/cmake/external/pslib.cmake +++ b/cmake/external/pslib.cmake @@ -19,7 +19,7 @@ IF((NOT DEFINED PSLIB_VER) OR (NOT DEFINED PSLIB_URL)) MESSAGE(STATUS "use pre defined download url") SET(PSLIB_VER "0.1.1" CACHE STRING "" FORCE) SET(PSLIB_NAME "pslib" CACHE STRING "" FORCE) - SET(PSLIB_URL "https://pslib.bj.bcebos.com/pslib_37fecf5/pslib.tar.gz" CACHE STRING "" FORCE) + SET(PSLIB_URL "https://pslib.bj.bcebos.com/pslib.tar.gz" CACHE STRING "" FORCE) ENDIF() MESSAGE(STATUS "PSLIB_NAME: ${PSLIB_NAME}, PSLIB_URL: ${PSLIB_URL}") SET(PSLIB_SOURCE_DIR "${THIRD_PARTY_PATH}/pslib") @@ -31,6 +31,7 @@ SET(PSLIB_ROOT ${PSLIB_INSTALL_DIR}) SET(PSLIB_INC_DIR ${PSLIB_ROOT}/include) SET(PSLIB_LIB_DIR ${PSLIB_ROOT}/lib) SET(PSLIB_LIB ${PSLIB_LIB_DIR}/libps.so) +SET(PSLIB_VERSION_PY ${PSLIB_DOWNLOAD_DIR}/pslib/version.py) SET(PSLIB_IOMP_LIB ${PSLIB_LIB_DIR}/libiomp5.so) #todo what is this SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${PSLIB_ROOT}/lib") diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index fc52e1a4c930bd41571de4416a6b413923f0e94e..371a5507f1fb06a106c5337d38ebcbd8d25658ed 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -804,6 +804,15 @@ void FleetWrapper::ClearModel() { #endif } +void FleetWrapper::ClearOneTable(const uint64_t table_id) { +#ifdef PADDLE_WITH_PSLIB + auto ret = pslib_ptr_->_worker_ptr->clear(table_id); + ret.wait(); +#else + VLOG(0) << "FleetWrapper::ClearOneTable does nothing when no pslib"; +#endif +} + void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope, std::vector var_list, float decay, int emb_dim) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 5d831f31c7f6a6f7887e2d1f425a34416a6206ce..a54aea034d2fbfe0d867a6fe28eaa676c8ab3c5c 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -220,6 +220,8 @@ class FleetWrapper { const std::vector& feasign_list); // clear all models, release their memory void ClearModel(); + // clear one table + void ClearOneTable(const uint64_t table_id); // shrink sparse table void ShrinkSparseTable(int table_id); // shrink dense table diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index fac6de452aed018f1397c536c40d7a55b5f188b0..3b4505c611b283648f4da1d36f0200bb3e439d8a 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -59,6 +59,7 @@ void BindFleetWrapper(py::module* m) { .def("save_cache", &framework::FleetWrapper::SaveCache) .def("load_model", &framework::FleetWrapper::LoadModel) .def("clear_model", &framework::FleetWrapper::ClearModel) + .def("clear_one_table", &framework::FleetWrapper::ClearOneTable) .def("stop_server", &framework::FleetWrapper::StopServer) .def("finalize_worker", &framework::FleetWrapper::FinalizeWorker) .def("gather_servers", &framework::FleetWrapper::GatherServers) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 40337110cfe966511050b78e3e463e7653c3caba..27fecf495ade0e8b66ceed83619726ac5d938401 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -402,6 +402,23 @@ class PSLib(Fleet): var_list, decay, emb_dim) self._role_maker._barrier_worker() + def clear_one_table(self, table_id): + """ + clear_one_table() will be called by user. It will clear one table. + + Args: + table_id(int): table id + + Examples: + .. code-block:: python + + fleet.clear_one_table(0) + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.clear_one_table(table_id) + self._role_maker._barrier_worker() + def clear_model(self): """ clear_model() will be called by user. It will clear sparse model. diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 4507a18ca6a6fd777f76e1ae7bff348c5afbcbc7..c58705a0fad99592e7d6c39772e2aea788145d68 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -418,8 +418,13 @@ class DistributedAdam(DistributedOptimizerImplBase): opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "") opt_info["dump_param"] = strategy.get("dump_param", []) if server._server.downpour_server_param.downpour_table_param[ - 0].accessor.accessor_class == "DownpourCtrAccessor": + 0].accessor.accessor_class in [ + "DownpourCtrAccessor", "DownpourCtrDoubleAccessor" + ]: opt_info["dump_slot"] = True + elif server._server.downpour_server_param.downpour_table_param[ + 0].accessor.accessor_class == "DownpourSparseValueAccessor": + opt_info["no_cvm"] = True opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {}) opt_info["copy_table"] = strategy.get("copy_table", {}) opt_info["loss_names"] = strategy.get("loss_names", []) diff --git a/python/paddle/fluid/tests/unittests/test_fleet.py b/python/paddle/fluid/tests/unittests/test_fleet.py new file mode 100644 index 0000000000000000000000000000000000000000..5f508917ef51ba1f81a6aae528530c0421205585 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet.py @@ -0,0 +1,86 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fleet.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestFleet1(unittest.TestCase): + """ + Test cases for fleet minimize. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker() + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + emb = fluid.layers.embedding(input=show, size=[1, 1], \ + is_sparse=True, is_distributed=True, \ + param_attr=fluid.ParamAttr(name="embedding")) + fc = fluid.layers.fc(input=emb, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer( + adam, + strategy={ + "embedding": { + "sparse_accessor_class": "DownpourSparseValueAccessor" + } + }) + adam.minimize([cost], [scope]) + fleet.run_server() + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py new file mode 100644 index 0000000000000000000000000000000000000000..ef655d1999a87ba3a80ff1318e7697bd02217de9 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py @@ -0,0 +1,86 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fleet.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestFleet1(unittest.TestCase): + """ + Test cases for fleet minimize. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker() + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + emb = fluid.layers.embedding(input=show, size=[1, 1], \ + is_sparse=True, is_distributed=True, \ + param_attr=fluid.ParamAttr(name="embedding")) + fc = fluid.layers.fc(input=emb, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer( + adam, + strategy={ + "embedding": { + "sparse_accessor_class": "DownpourCtrAccessor" + } + }) + adam.minimize([cost], [scope]) + fleet.run_server() + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py index 7322891338f3b75a9dc04bc5ae5ed6e4515d4869..47aeee95921346fe61c83ff6cc2b4f6a1c7fb07e 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py @@ -99,6 +99,7 @@ class TestCloudRoleMaker(unittest.TestCase): except: print("do not support pslib test, skip") return + fleet.clear_one_table(0) from paddle.fluid.incubate.fleet.base.role_maker import \ MPISymetricRoleMaker try: diff --git a/python/setup.py.in b/python/setup.py.in index 99d1ac6a7eb9cfefa71cd1ddc63d4dbb8385bccb..d9df46078bf5a0d6201922bd03d90be92614a88d 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -195,6 +195,8 @@ else: if '${WITH_PSLIB}' == 'ON': shutil.copy('${PSLIB_LIB}', libs_path) + if os.path.exists('${PSLIB_VERSION_PY}'): + shutil.copy('${PSLIB_VERSION_PY}', '${PADDLE_BINARY_DIR}/python/paddle/fluid/incubate/fleet/parameter_server/pslib/') package_data['paddle.libs'] += ['libps' + ext_name] if '${WITH_MKLDNN}' == 'ON':