diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 4507a18ca6a6fd777f76e1ae7bff348c5afbcbc7..83a47c3d9ef45c82edc2a1d215cce200a824a58d 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -420,6 +420,9 @@ class DistributedAdam(DistributedOptimizerImplBase): if server._server.downpour_server_param.downpour_table_param[ 0].accessor.accessor_class == "DownpourCtrAccessor": opt_info["dump_slot"] = True + elif server._server.downpour_server_param.downpour_table_param[ + 0].accessor.accessor_class == "DownpourSparseValueAccessor": + opt_info["no_cvm"] = True opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {}) opt_info["copy_table"] = strategy.get("copy_table", {}) opt_info["loss_names"] = strategy.get("loss_names", []) diff --git a/python/paddle/fluid/tests/unittests/test_fleet.py b/python/paddle/fluid/tests/unittests/test_fleet.py new file mode 100644 index 0000000000000000000000000000000000000000..5f508917ef51ba1f81a6aae528530c0421205585 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet.py @@ -0,0 +1,86 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fleet.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestFleet1(unittest.TestCase): + """ + Test cases for fleet minimize. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker() + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + emb = fluid.layers.embedding(input=show, size=[1, 1], \ + is_sparse=True, is_distributed=True, \ + param_attr=fluid.ParamAttr(name="embedding")) + fc = fluid.layers.fc(input=emb, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer( + adam, + strategy={ + "embedding": { + "sparse_accessor_class": "DownpourSparseValueAccessor" + } + }) + adam.minimize([cost], [scope]) + fleet.run_server() + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py new file mode 100644 index 0000000000000000000000000000000000000000..ef655d1999a87ba3a80ff1318e7697bd02217de9 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py @@ -0,0 +1,86 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fleet.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestFleet1(unittest.TestCase): + """ + Test cases for fleet minimize. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker() + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + emb = fluid.layers.embedding(input=show, size=[1, 1], \ + is_sparse=True, is_distributed=True, \ + param_attr=fluid.ParamAttr(name="embedding")) + fc = fluid.layers.fc(input=emb, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer( + adam, + strategy={ + "embedding": { + "sparse_accessor_class": "DownpourCtrAccessor" + } + }) + adam.minimize([cost], [scope]) + fleet.run_server() + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main()