From 15ce8e21d83709cbd60f0097dba14ab8d8cf73b2 Mon Sep 17 00:00:00 2001 From: xujiaqi01 <173596896@qq.com> Date: Mon, 13 Apr 2020 11:20:06 +0800 Subject: [PATCH] add unit accessor (#23703) * add unit accessor in fleet, support DownpourUnitAccessor * test=develop --- .../fleet/parameter_server/pslib/node.py | 92 +++++++++++++++++- .../pslib/optimizer_factory.py | 3 +- .../unittests/test_fleet_unitaccessor.py | 94 +++++++++++++++++++ 3 files changed, 185 insertions(+), 4 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_unitaccessor.py diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index abba45f3eab..6febedc8e18 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -81,7 +81,12 @@ class DownpourServer(Server): 'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \ 'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \ 'sparse_cache_file_num', 'sparse_beta1_decay_rate', 'sparse_beta2_decay_rate', \ - 'sparse_ada_epsilon', 'sparse_optimizer', 'sparse_ssd_unseenday_threshold'] + 'sparse_ada_epsilon', 'sparse_optimizer', 'sparse_ssd_unseenday_threshold', \ + 'embed_sparse_optimizer', 'embed_sparse_learning_rate', 'embed_sparse_weight_bounds', \ + 'embed_sparse_initial_range', 'embed_sparse_initial_g2sum', 'embed_sparse_beta1_decay_rate', \ + 'embed_sparse_beta2_decay_rate', 'embedx_sparse_optimizer', 'embedx_sparse_learning_rate', \ + 'embedx_sparse_weight_bounds', 'embedx_sparse_initial_range', 'embedx_sparse_initial_g2sum', \ + 'embedx_sparse_beta1_decay_rate', 'embedx_sparse_beta2_decay_rate'] for key in strategy: if key not in support_sparse_key_list: @@ -113,10 +118,12 @@ class DownpourServer(Server): # DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info # DownpourSparseValueAccessor : for general task, has embedding and sgd info # DownpourCtrDoubleAccessor : for ctr task, which show clk are in double + # DownpourUnitAccessor : for ctr task, has cvm, slot, embedding and sgd info support_accessor_class = [ 'DownpourFeatureValueAccessor', 'DownpourCtrAccessor', - 'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor' + 'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor', + 'DownpourUnitAccessor' ] if strategy.get('sparse_accessor_class') is not None: accessor_class = strategy.get('sparse_accessor_class') @@ -130,7 +137,9 @@ class DownpourServer(Server): table.accessor.accessor_class = accessor_class - if accessor_class == 'DownpourFeatureValueAccessor' or accessor_class == 'DownpourCtrAccessor' or accessor_class == 'DownpourCtrDoubleAccessor': + if accessor_class == 'DownpourFeatureValueAccessor' \ + or accessor_class == 'DownpourCtrAccessor' \ + or accessor_class == 'DownpourCtrDoubleAccessor': table.accessor.sparse_sgd_param.learning_rate = strategy.get( 'sparse_learning_rate', 0.05) table.accessor.sparse_sgd_param.initial_g2sum = strategy.get( @@ -245,6 +254,12 @@ class DownpourServer(Server): table2.param = 2 table2.converter = converter table2.deconverter = deconverter + elif accessor_class == 'DownpourUnitAccessor': + self.add_sparse_table_common_config(table, strategy) + self.add_sparse_optimizer(table.accessor.embed_sgd_param, + strategy, "embed_") + self.add_sparse_optimizer(table.accessor.embedx_sgd_param, + strategy, "embedx_") def add_dense_table(self, table_id, param_var, grad_var, strategy, sparse_table_names): @@ -364,6 +379,77 @@ class DownpourServer(Server): 'datanorm_decay_rate', 0.999999) table.accessor.fea_dim = fea_dim + def add_sparse_optimizer(self, sgd, strategy, prefix): + optimizer_name = strategy.get(prefix + "sparse_optimizer", "adam") + sgd.name = optimizer_name + if optimizer_name == "naive": + sgd.naive.learning_rate = \ + strategy.get(prefix + 'sparse_learning_rate', 0.05) + sgd.naive.initial_range = \ + strategy.get(prefix + 'sparse_initial_range', 1e-4) + bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) + sgd.naive.weight_bounds.extend(bounds) + elif optimizer_name == "adagrad": + sgd.adagrad.learning_rate = \ + strategy.get(prefix + 'sparse_learning_rate', 0.05) + sgd.adagrad.initial_range = \ + strategy.get(prefix + 'sparse_initial_range', 1e-4) + sgd.adagrad.initial_g2sum = strategy.get( + prefix + 'sparse_initial_g2sum', 3) + bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) + sgd.adagrad.weight_bounds.extend(bounds) + elif optimizer_name == "adam": + sgd.adam.learning_rate = \ + strategy.get(prefix + 'sparse_learning_rate', 0.001) + sgd.adam.initial_range = \ + strategy.get(prefix + 'sparse_initial_range', 1e-4) + sgd.adam.beta1_decay_rate = strategy.get( + prefix + 'sparse_beta1_decay_rate', 0.9) + sgd.adam.beta2_decay_rate = strategy.get( + prefix + 'sparse_beta2_decay_rate', 0.999) + sgd.adam.ada_epsilon = strategy.get(prefix + 'sparse_ada_epsilon', + 1e-8) + bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) + sgd.adam.weight_bounds.extend(bounds) + + def add_sparse_table_common_config(self, table, strategy): + table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8) + table.accessor.embedx_threshold = strategy.get( + 'sparse_embedx_threshold', 10) + table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3 + table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get( + 'sparse_nonclk_coeff', 0.1) + table.accessor.downpour_accessor_param.click_coeff = strategy.get( + 'sparse_click_coeff', 1) + table.accessor.downpour_accessor_param.base_threshold = strategy.get( + 'sparse_base_threshold', 1.5) + table.accessor.downpour_accessor_param.delta_threshold = strategy.get( + 'sparse_delta_threshold', 0.25) + table.accessor.downpour_accessor_param.delta_keep_days = strategy.get( + 'sparse_delta_keep_days', 16) + table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get( + 'sparse_delete_after_unseen_days', 30) + table.accessor.downpour_accessor_param.show_click_decay_rate = strategy.get( + 'sparse_show_click_decay_rate', 0.98) + table.accessor.downpour_accessor_param.delete_threshold = strategy.get( + 'sparse_delete_threshold', 0.8) + converter = strategy.get( + 'sparse_converter', + "(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)") + deconverter = strategy.get( + 'sparse_deconverter', + "(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)") + + table1 = table.accessor.table_accessor_save_param.add() + table1.param = 1 + table1.converter = converter + table1.deconverter = deconverter + + table2 = table.accessor.table_accessor_save_param.add() + table2.param = 2 + table2.converter = converter + table2.deconverter = deconverter + def get_desc(self): """ Return downpour server program_desc diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index d7437f9c47e..c0be2ca66ca 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -531,7 +531,8 @@ class DistributedAdam(DistributedOptimizerImplBase): opt_info["dump_param"] = strategy.get("dump_param", []) if server._server.downpour_server_param.downpour_table_param[ 0].accessor.accessor_class in [ - "DownpourCtrAccessor", "DownpourCtrDoubleAccessor" + "DownpourCtrAccessor", "DownpourCtrDoubleAccessor", + "DownpourUnitAccessor" ]: opt_info["dump_slot"] = True elif server._server.downpour_server_param.downpour_table_param[ diff --git a/python/paddle/fluid/tests/unittests/test_fleet_unitaccessor.py b/python/paddle/fluid/tests/unittests/test_fleet_unitaccessor.py new file mode 100644 index 00000000000..8e71ccf9289 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_unitaccessor.py @@ -0,0 +1,94 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test fleet.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestFleet1(unittest.TestCase): + """ + Test cases for fleet minimize. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker() + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + emb = fluid.layers.embedding(input=show, size=[1, 1], \ + is_sparse=True, is_distributed=True, \ + param_attr=fluid.ParamAttr(name="embedding")) + fc = fluid.layers.fc(input=emb, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + + strategy = {} + strategy["embedding"] = {} + strategy["embedding"]["sparse_accessor_class"] = "DownpourUnitAccessor" + strategy["embedding"]["embed_sparse_optimizer"] = "naive" + try: + adam1 = fluid.optimizer.Adam(learning_rate=0.000005) + adam1 = fleet.distributed_optimizer(adam1, strategy=strategy) + adam1.minimize([cost], [scope]) + + strategy["embedding"]["embed_sparse_optimizer"] = "adagrad" + adam2 = fluid.optimizer.Adam(learning_rate=0.000005) + adam2 = fleet.distributed_optimizer(adam2, strategy=strategy) + adam2.minimize([cost], [scope]) + + strategy["embedding"]["embed_sparse_optimizer"] = "adam" + adam3 = fluid.optimizer.Adam(learning_rate=0.000005) + adam3 = fleet.distributed_optimizer(adam3, strategy=strategy) + adam3.minimize([cost], [scope]) + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main() -- GitLab