From 500a8bc20ceef5a9903890d1771698f6ce7feeba Mon Sep 17 00:00:00 2001 From: wangzhen38 <41941775+wangzhen38@users.noreply.github.com> Date: Wed, 22 Feb 2023 10:31:53 +0800 Subject: [PATCH] [RM FLUID] rm ps mode (#50704) --- .../incubate/fleet/parameter_server/mode.py | 29 - .../fleet/parameter_server/pslib/node.py | 803 ------------ .../pslib/optimizer_factory.py | 1077 ----------------- .../collective/fleet/test_recv_save_op.py | 2 +- .../tests/unittests/test_communicator_geo.py | 2 +- .../fluid/transpiler/geo_sgd_transpiler.py | 2 +- .../fleet/parameter_server/ir/public.py | 2 +- .../fleet/parameter_server/pslib/.gitignore | 0 .../pslib/optimizer_factory.py | 2 +- 9 files changed, 5 insertions(+), 1914 deletions(-) delete mode 100644 python/paddle/fluid/incubate/fleet/parameter_server/mode.py delete mode 100644 python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py delete mode 100644 python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py rename python/paddle/{fluid => }/incubate/fleet/parameter_server/pslib/.gitignore (100%) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/mode.py b/python/paddle/fluid/incubate/fleet/parameter_server/mode.py deleted file mode 100644 index 623e919ba35..00000000000 --- a/python/paddle/fluid/incubate/fleet/parameter_server/mode.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class PSMode: - """ - There are various mode for fleet, each of them is designed for different model. - """ - - TRANSPILER = 1 - PSLIB = 2 - - -class DistributedMode: - SYNC = 0 - ASYNC = 1 - HALF_ASYNC = 2 - GEO = 3 diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py deleted file mode 100644 index 73fcd18bdb7..00000000000 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ /dev/null @@ -1,803 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -"""Defination of Server and Worker.""" - -from . import ps_pb2 as pslib - -# NOTE: reduce removed in fuctools in python3 -from functools import reduce - - -class Server: - """ - A Server basic class - it's a base class, does not have implementation - """ - - def __init__(self): - pass - - -class Worker: - """ - A Worker basic class. - it's a base class, does not have implementation - """ - - def __init__(self): - pass - - -class DownpourServer(Server): - """ - DownpourServer class is used to generate server program_desc - Args: - server: it is pslib.ServerParameter() - Examples: - server = DownpourServer() - """ - - def __init__(self): - self._server = pslib.ServerParameter() - self._server.downpour_server_param.service_param.server_class = ( - "DownpourBrpcPsServer" - ) - self._server.downpour_server_param.service_param.client_class = ( - "DownpourBrpcPsClient" - ) - self._server.downpour_server_param.service_param.service_class = ( - "DownpourPsService" - ) - self._server.downpour_server_param.service_param.start_server_port = 0 - self._server.downpour_server_param.service_param.server_thread_num = 12 - - def add_sparse_table(self, table_id, strategy): - """ - Args: - table_id(int): id of sparse params table - strategy(dict): the config dict. - Returns: - return None - """ - - for table in self._server.downpour_server_param.downpour_table_param: - if table.table_id == table_id: - if table.type == pslib.PS_SPARSE_TABLE: - return - else: - raise ValueError( - "expect table %s type=%s, but actual type=%s" - % (table_id, pslib.PS_SPARSE_TABLE, table.type) - ) - if strategy is None: - strategy = dict() - table = self._server.downpour_server_param.downpour_table_param.add() - table.table_id = table_id - table.type = pslib.PS_SPARSE_TABLE - - support_sparse_key_list = [ - 'sparse_table_class', - 'sparse_compress_in_save', - 'sparse_shard_num', - 'sparse_accessor_class', - 'sparse_learning_rate', - 'sparse_initial_g2sum', - 'sparse_initial_range', - 'sparse_weight_bounds', - 'sparse_embedx_dim', - 'sparse_embedx_threshold', - 'sparse_nonclk_coeff', - 'sparse_click_coeff', - 'sparse_base_threshold', - 'sparse_delta_threshold', - 'sparse_delta_keep_days', - 'sparse_delete_after_unseen_days', - 'sparse_show_click_decay_rate', - 'sparse_delete_threshold', - 'sparse_converter', - 'sparse_deconverter', - 'sparse_enable_cache', - 'sparse_cache_rate', - 'sparse_cache_file_num', - 'sparse_beta1_decay_rate', - 'sparse_beta2_decay_rate', - 'sparse_ada_epsilon', - 'sparse_optimizer', - 'sparse_ssd_unseenday_threshold', - 'embed_sparse_optimizer', - 'embed_sparse_learning_rate', - 'embed_sparse_weight_bounds', - 'embed_sparse_initial_range', - 'embed_sparse_initial_g2sum', - 'embed_sparse_beta1_decay_rate', - 'embed_sparse_beta2_decay_rate', - 'embedx_sparse_optimizer', - 'embedx_sparse_learning_rate', - 'embedx_sparse_weight_bounds', - 'embedx_sparse_initial_range', - 'embedx_sparse_initial_g2sum', - 'embedx_sparse_beta1_decay_rate', - 'embedx_sparse_beta2_decay_rate', - ] - - for key in strategy: - if key not in support_sparse_key_list: - raise ValueError("strategy key '%s' not support" % (key)) - - support_table_calss = ['DownpourSparseTable', 'DownpourSparseSSDTable'] - if strategy.get('sparse_table_class') is not None: - table_class = strategy.get('sparse_table_class') - if table_class not in support_table_calss: - raise ValueError( - "support sparse_table_class: [ 'DownpourSparseTable', 'DownpourSparseSSDTable'], \ - but actual %s" - % (table_class) - ) - else: - table_class = 'DownpourSparseTable' - - table.table_class = table_class - - if ( - table_class == 'DownpourSparseTable' - or table_class == 'DownpourSparseSSDTable' - ): - table.enable_sparse_table_cache = strategy.get( - 'sparse_enable_cache', True - ) - table.sparse_table_cache_rate = strategy.get( - 'sparse_cache_rate', 0.00055 - ) - table.sparse_table_cache_file_num = strategy.get( - 'sparse_cache_file_num', 16 - ) - table.compress_in_save = strategy.get( - 'sparse_compress_in_save', True - ) - table.shard_num = strategy.get('sparse_shard_num', 1000) - # DownpourFeatureValueAccessor: for ctr task, has cvm, embedding and sgd info - # DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info - # DownpourSparseValueAccessor : for general task, has embedding and sgd info - # DownpourCtrDoubleAccessor : for ctr task, which show clk are in double - # DownpourUnitAccessor : for ctr task, has cvm, slot, embedding and sgd info - - support_accessor_class = [ - 'DownpourFeatureValueAccessor', - 'DownpourCtrAccessor', - 'DownpourCtrDymfAccessor', - 'DownpourSparseValueAccessor', - 'DownpourCtrDoubleAccessor', - 'DownpourUnitAccessor', - 'DownpourDoubleUnitAccessor', - ] - if strategy.get('sparse_accessor_class') is not None: - accessor_class = strategy.get('sparse_accessor_class') - if accessor_class not in support_accessor_class: - raise ValueError( - "support sparse_accessor_class: ['DownpourFeatureValueAccessor', 'DownpourCtrAccessor', 'DownpourCtrDymfAccessor', \ - 'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor'], \ - but actual %s" - % (accessor_class) - ) - else: - accessor_class = 'DownpourCtrAccessor' - - table.accessor.accessor_class = accessor_class - - if ( - accessor_class == 'DownpourFeatureValueAccessor' - or accessor_class == 'DownpourCtrAccessor' - or accessor_class == 'DownpourCtrDymfAccessor' - or accessor_class == 'DownpourCtrDoubleAccessor' - ): - table.accessor.sparse_sgd_param.learning_rate = strategy.get( - 'sparse_learning_rate', 0.05 - ) - table.accessor.sparse_sgd_param.initial_g2sum = strategy.get( - 'sparse_initial_g2sum', 3 - ) - table.accessor.sparse_sgd_param.initial_range = strategy.get( - 'sparse_initial_range', 1e-4 - ) - if strategy.get('sparse_weight_bounds') is None: - table.accessor.sparse_sgd_param.weight_bounds.extend( - [-10, 10] - ) - else: - table.accessor.sparse_sgd_param.weight_bounds.extend( - strategy.get('sparse_weight_bounds') - ) - table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8) - table.accessor.embedx_threshold = strategy.get( - 'sparse_embedx_threshold', 10 - ) - table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3 - table.accessor.downpour_accessor_param.nonclk_coeff = ( - strategy.get('sparse_nonclk_coeff', 0.1) - ) - table.accessor.downpour_accessor_param.click_coeff = ( - strategy.get('sparse_click_coeff', 1) - ) - table.accessor.downpour_accessor_param.base_threshold = ( - strategy.get('sparse_base_threshold', 1.5) - ) - table.accessor.downpour_accessor_param.delta_threshold = ( - strategy.get('sparse_delta_threshold', 0.25) - ) - table.accessor.downpour_accessor_param.delta_keep_days = ( - strategy.get('sparse_delta_keep_days', 16) - ) - table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get( - 'sparse_delete_after_unseen_days', 30 - ) - table.accessor.downpour_accessor_param.ssd_unseenday_threshold = strategy.get( - 'sparse_ssd_unseenday_threshold', 1 - ) - table.accessor.downpour_accessor_param.show_click_decay_rate = ( - strategy.get('sparse_show_click_decay_rate', 0.98) - ) - table.accessor.downpour_accessor_param.delete_threshold = ( - strategy.get('sparse_delete_threshold', 0.8) - ) - converter = strategy.get( - 'sparse_converter', - "(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)", - ) - deconverter = strategy.get( - 'sparse_deconverter', - "(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)", - ) - - table1 = table.accessor.table_accessor_save_param.add() - table1.param = 1 - table1.converter = converter - table1.deconverter = deconverter - - table2 = table.accessor.table_accessor_save_param.add() - table2.param = 2 - table2.converter = converter - table2.deconverter = deconverter - elif accessor_class == 'DownpourSparseValueAccessor': - optimizer_name = strategy.get("sparse_optimizer", "adam") - table.accessor.sparse_commonsgd_param.name = optimizer_name - table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8) - table.accessor.fea_dim = int(table.accessor.embedx_dim) - if optimizer_name == "naive": - table.accessor.sparse_commonsgd_param.naive.learning_rate = strategy.get( - 'sparse_learning_rate', 0.05 - ) - table.accessor.sparse_commonsgd_param.naive.initial_range = strategy.get( - 'sparse_initial_range', 1e-4 - ) - if strategy.get('sparse_weight_bounds') is None: - table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend( - [-10, 10] - ) - else: - table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend( - strategy.get('sparse_weight_bounds') - ) - elif optimizer_name == "adagrad": - table.accessor.sparse_commonsgd_param.adagrad.learning_rate = strategy.get( - 'sparse_learning_rate', 0.05 - ) - table.accessor.sparse_commonsgd_param.adagrad.initial_range = strategy.get( - 'sparse_initial_range', 1e-4 - ) - table.accessor.sparse_commonsgd_param.adagrad.initial_g2sum = strategy.get( - 'sparse_initial_g2sum', 3 - ) - if strategy.get('sparse_weight_bounds') is None: - table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend( - [-10, 10] - ) - else: - table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend( - strategy.get('sparse_weight_bounds') - ) - elif optimizer_name == "adam": - table.accessor.sparse_commonsgd_param.adam.learning_rate = ( - strategy.get('sparse_learning_rate', 0.001) - ) - table.accessor.sparse_commonsgd_param.adam.initial_range = ( - strategy.get('sparse_initial_range', 1e-4) - ) - table.accessor.sparse_commonsgd_param.adam.beta1_decay_rate = strategy.get( - 'sparse_beta1_decay_rate', 0.9 - ) - table.accessor.sparse_commonsgd_param.adam.beta2_decay_rate = strategy.get( - 'sparse_beta2_decay_rate', 0.999 - ) - table.accessor.sparse_commonsgd_param.adam.ada_epsilon = ( - strategy.get('sparse_ada_epsilon', 1e-8) - ) - if strategy.get('sparse_weight_bounds') is None: - table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend( - [-10, 10] - ) - else: - table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend( - strategy.get('sparse_weight_bounds') - ) - converter = strategy.get( - 'sparse_converter', - "(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)", - ) - deconverter = strategy.get( - 'sparse_deconverter', - "(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)", - ) - - table1 = table.accessor.table_accessor_save_param.add() - table1.param = 1 - table1.converter = converter - table1.deconverter = deconverter - - table2 = table.accessor.table_accessor_save_param.add() - table2.param = 2 - table2.converter = converter - table2.deconverter = deconverter - elif ( - accessor_class == 'DownpourUnitAccessor' - or accessor_class == 'DownpourDoubleUnitAccessor' - ): - self.add_sparse_table_common_config(table, strategy) - self.add_sparse_optimizer( - table.accessor.embed_sgd_param, strategy, "embed_" - ) - self.add_sparse_optimizer( - table.accessor.embedx_sgd_param, strategy, "embedx_" - ) - - def add_dense_table( - self, table_id, param_var, grad_var, strategy, sparse_table_names - ): - """ - Args: - table_id(int): id of sparse params table - param_var(list): param vars - grad_var(list): param grad vars - strategy(dict): the dense config dict - sparse_table_names(list): sparse table names - Returns: - return None - """ - fea_dim = 0 - dense_param_vars = [] - for p in param_var: - if p.name not in sparse_table_names: - dense_param_vars.append(p) - - for param in dense_param_vars: - fea_dim += reduce(lambda x, y: x * y, param.shape, 1) - - for table in self._server.downpour_server_param.downpour_table_param: - if table.table_id == table_id: - if table.type == pslib.PS_DENSE_TABLE: - table.accessor.fea_dim = fea_dim - return - else: - raise ValueError( - "expect table %s type=%s, but actual type=%s" - % (table_id, pslib.PS_DENSE_TABLE, table.type) - ) - - if strategy is None: - strategy = dict() - table = self._server.downpour_server_param.downpour_table_param.add() - table.table_id = table_id - support_dense_key_list = [ - 'dense_table_class', - 'dense_compress_in_save', - 'dense_accessor_class', - 'dense_optimizer', - 'dense_learning_rate', - 'dense_avg_decay', - 'dense_ada_decay', - 'dense_ada_epsilon', - 'dense_mom_decay', - 'dense_naive_lr', - ] - - for key in strategy: - if key not in support_dense_key_list: - raise ValueError("strategy key '%s' not support" % (key)) - - table.table_class = strategy.get( - 'dense_table_class', "DownpourDenseTable" - ) - table.type = pslib.PS_DENSE_TABLE - table.compress_in_save = strategy.get('dense_compress_in_save', True) - table.accessor.accessor_class = strategy.get( - 'dense_accessor_class', "DownpourDenseValueAccessor" - ) - table.accessor.dense_sgd_param.name = strategy.get( - 'dense_optimizer', "adam" - ) - table.accessor.dense_sgd_param.adam.learning_rate = strategy.get( - 'dense_learning_rate', 5e-06 - ) - table.accessor.dense_sgd_param.adam.avg_decay_rate = strategy.get( - 'dense_avg_decay', 0.999993 - ) - table.accessor.dense_sgd_param.adam.ada_decay_rate = strategy.get( - 'dense_ada_decay', 0.9999 - ) - table.accessor.dense_sgd_param.adam.ada_epsilon = strategy.get( - 'dense_ada_epsilon', 1e-8 - ) - table.accessor.dense_sgd_param.adam.mom_decay_rate = strategy.get( - 'dense_mom_decay', 0.99 - ) - table.accessor.dense_sgd_param.naive.learning_rate = strategy.get( - 'dense_naive_lr', 0.0002 - ) - table.accessor.fea_dim = fea_dim - - def add_data_norm_table( - self, - table_id, - learning_rate, - param_var, - grad_var, - strategy, - sparse_table_names, - ): - """ - Args: - table_id(int): id of datanorm table - learning_rate(float): the learning rate used to update parameters - param_var(list): param vars - grad_var(list): param grad vars - strategy(dict): the datanorm config dict - sparse_table_names(list): sparse table names - Returns: - return None - """ - fea_dim = 0 - dense_param_vars = [] - for p in param_var: - if p.name not in sparse_table_names: - dense_param_vars.append(p) - - for param in dense_param_vars: - fea_dim += reduce(lambda x, y: x * y, param.shape, 1) - - for table in self._server.downpour_server_param.downpour_table_param: - if table.table_id == table_id: - if table.type == pslib.PS_DENSE_TABLE: - table.accessor.fea_dim = fea_dim - return - else: - raise ValueError( - "expect table %s type=%s, but actual type=%s" - % (table_id, pslib.PS_DENSE_TABLE, table.type) - ) - if strategy is None: - strategy = dict() - - support_datanorm_key_list = [ - 'datanorm_table_class', - 'datanorm_compress_in_save', - 'datanorm_accessor_class', - 'datanorm_operation', - 'datanorm_decay_rate', - ] - - for key in strategy: - if key not in support_datanorm_key_list: - raise ValueError("strategy key '%s' not support" % (key)) - - table = self._server.downpour_server_param.downpour_table_param.add() - table.table_id = table_id - table.table_class = strategy.get( - 'datanorm_table_class', 'DownpourDenseTable' - ) - table.type = pslib.PS_DENSE_TABLE - table.compress_in_save = strategy.get('datanorm_compress_in_save', True) - table.accessor.accessor_class = strategy.get( - 'datanorm_accessor_class', 'DownpourDenseValueAccessor' - ) - table.accessor.dense_sgd_param.name = strategy.get( - 'datanorm_operation', 'summary' - ) - table.accessor.dense_sgd_param.summary.summary_decay_rate = ( - strategy.get('datanorm_decay_rate', 0.999999) - ) - table.accessor.fea_dim = fea_dim - - def add_sparse_optimizer(self, sgd, strategy, prefix): - optimizer_name = strategy.get(prefix + "sparse_optimizer", "adagrad") - sgd.name = optimizer_name - if optimizer_name == "naive": - sgd.naive.learning_rate = strategy.get( - prefix + 'sparse_learning_rate', 0.05 - ) - sgd.naive.initial_range = strategy.get( - prefix + 'sparse_initial_range', 1e-4 - ) - bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) - sgd.naive.weight_bounds.extend(bounds) - elif optimizer_name == "adagrad": - sgd.adagrad.learning_rate = strategy.get( - prefix + 'sparse_learning_rate', 0.05 - ) - sgd.adagrad.initial_range = strategy.get( - prefix + 'sparse_initial_range', 1e-4 - ) - if prefix == "embed_": - sgd.adagrad.initial_range = 0 - sgd.adagrad.initial_g2sum = strategy.get( - prefix + 'sparse_initial_g2sum', 3 - ) - bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) - sgd.adagrad.weight_bounds.extend(bounds) - elif optimizer_name == "std_adagrad": - sgd.adagrad.learning_rate = strategy.get( - prefix + 'sparse_learning_rate', 0.05 - ) - sgd.adagrad.initial_range = strategy.get( - prefix + 'sparse_initial_range', 1e-4 - ) - if prefix == "embed_": - sgd.adagrad.initial_range = 0 - sgd.adagrad.initial_g2sum = strategy.get( - prefix + 'sparse_initial_g2sum', 3 - ) - bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) - sgd.adagrad.weight_bounds.extend(bounds) - elif optimizer_name == "adam": - sgd.adam.learning_rate = strategy.get( - prefix + 'sparse_learning_rate', 0.001 - ) - sgd.adam.initial_range = strategy.get( - prefix + 'sparse_initial_range', 1e-4 - ) - sgd.adam.beta1_decay_rate = strategy.get( - prefix + 'sparse_beta1_decay_rate', 0.9 - ) - sgd.adam.beta2_decay_rate = strategy.get( - prefix + 'sparse_beta2_decay_rate', 0.999 - ) - sgd.adam.ada_epsilon = strategy.get( - prefix + 'sparse_ada_epsilon', 1e-8 - ) - bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10]) - sgd.adam.weight_bounds.extend(bounds) - - def add_sparse_table_common_config(self, table, strategy): - table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8) - table.accessor.embedx_threshold = strategy.get( - 'sparse_embedx_threshold', 10 - ) - table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3 - table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get( - 'sparse_nonclk_coeff', 0.1 - ) - table.accessor.downpour_accessor_param.click_coeff = strategy.get( - 'sparse_click_coeff', 1 - ) - table.accessor.downpour_accessor_param.base_threshold = strategy.get( - 'sparse_base_threshold', 1.5 - ) - table.accessor.downpour_accessor_param.delta_threshold = strategy.get( - 'sparse_delta_threshold', 0.25 - ) - table.accessor.downpour_accessor_param.delta_keep_days = strategy.get( - 'sparse_delta_keep_days', 16 - ) - table.accessor.downpour_accessor_param.delete_after_unseen_days = ( - strategy.get('sparse_delete_after_unseen_days', 30) - ) - table.accessor.downpour_accessor_param.show_click_decay_rate = ( - strategy.get('sparse_show_click_decay_rate', 0.98) - ) - table.accessor.downpour_accessor_param.delete_threshold = strategy.get( - 'sparse_delete_threshold', 0.8 - ) - converter = strategy.get( - 'sparse_converter', - "(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)", - ) - deconverter = strategy.get( - 'sparse_deconverter', - "(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)", - ) - - table1 = table.accessor.table_accessor_save_param.add() - table1.param = 1 - table1.converter = converter - table1.deconverter = deconverter - - table2 = table.accessor.table_accessor_save_param.add() - table2.param = 2 - table2.converter = converter - table2.deconverter = deconverter - - def get_desc(self): - """ - Return downpour server program_desc - """ - return self._server - - -class DownpourWorker(Worker): - """ - DownpourWorker class is used to generate worker program_desc - Args: - window (int): push params frequency - worker: it is pslib.DownpourTrainerParameter - Examples: - worker = DownpourWorker(1) - """ - - def __init__(self, window): - self.window = window - self._worker = pslib.DownpourTrainerParameter() - - def add_sparse_table( - self, table_id, slot_key_vars, slot_value_vars, slot_value_grads=None - ): - """ - Args: - table_id(int): id of sparse params table - slot_key_vars(list): slot key id - slot_value_vars(list): slot key value after embedding - slot_value_grads(list): grad of all params, default is None - Returns: - return None - """ - if slot_value_grads is None: - slot_value_grad_names = [ - var.name + "@GRAD" for var in slot_value_vars - ] - else: - value_to_key = {} - for i in range(len(slot_key_vars)): - value_to_key[slot_value_vars[i].name] = slot_key_vars[i] - slot_value_grad_names = [] - all_grad_names = [var.name for var in slot_value_grads] - for var in slot_value_vars: - if var.name + "@GRAD" in all_grad_names: - slot_value_grad_names.append(var.name + "@GRAD") - sorted_slot_value_vars = [ - i - for i in slot_value_vars - if i.name + "@GRAD" in slot_value_grad_names - ] - sorted_slot_value_vars += [ - i - for i in slot_value_vars - if i.name + "@GRAD" not in slot_value_grad_names - ] - sorted_slot_key_vars = [ - value_to_key[v.name] for v in sorted_slot_value_vars - ] - - target_table = None - for table in self._worker.sparse_table: - if table.table_id == table_id: - keys = table.slot_key - key_names = [var.name for var in sorted_slot_key_vars] - for key_name in key_names: - if key_name not in keys: - raise ValueError( - "sparse table %s slot_key error" % table_id - ) - target_table = table - break - - table = target_table - if table is not None: - self._worker.sparse_table.remove(table) - table = self._worker.sparse_table.add() - table.table_id = table_id - table.slot_key.extend([var.name for var in sorted_slot_key_vars]) - table.slot_value.extend([var.name for var in sorted_slot_value_vars]) - table.slot_gradient.extend(slot_value_grad_names) - - def add_dense_table( - self, - table_id, - learning_rate, - param_vars, - grad_vars, - dense_start_table_id, - sparse_table_names, - ): - r""" - Args: - table_id(int): id of sparse params table - learning_rate(float): the learning rate used to update parameters. \ - Can be a float value - param_vars(list): all dense param. it is a list. - grad_vars(list): all dense grad parm it is a list. - dense_start_table_id(int): dense table start index - sparse_table_names(list): sparse table names - Returns: - return None - """ - sparse_table_name_grad = [] - for name in sparse_table_names: - sparse_table_name_grad.append(name + "@GRAD") - - dense_param_name = [] - for p in param_vars: - if p.name not in sparse_table_names: - dense_param_name.append(p.name) - - dense_grad_name = [] - for g in grad_vars: - if g.name not in sparse_table_name_grad: - dense_grad_name.append(g.name) - - dense_param_name.sort() - dense_grad_name.sort() - - for table in self._worker.dense_table: - if table.table_id == table_id: - desc_dense_param_name = list(table.dense_variable_name) - desc_dense_param_name.sort() - - if dense_param_name == desc_dense_param_name: - desc_dense_grad_name = list( - table.dense_gradient_variable_name - ) - desc_dense_grad_name.sort() - if dense_grad_name == desc_dense_grad_name: - return - else: - raise ValueError( - "dense table %s dense_gradient_variable_name " - "error" % table_id - ) - else: - raise ValueError( - "dense table %s dense_variable_name error" % table_id - ) - - table = self._worker.dense_table.add() - table.table_id = table_id - - # def cmp_fc(x, y): - # if x.startswith("fc_") and y.startswith("fc_"): - # index_x = x.find('.') - # index_y = y.find('.') - # if index_x > 0 and index_y > 0: - # num_x = x[3:index_x] - # num_y = y[3:index_y] - # if num_x.isdigit() and num_y.isdigit(): - # if int(num_x) < int(num_y): - # return -1 - # if int(num_x) > int(num_y): - # return 1 - # if x[index_x + 1] == 'w' and y[index_y + 1] == 'b': - # return -1 - # if x[index_x + 1] == 'b' and y[index_y + 1] == 'w': - # return 1 - # if x < y: - # return -1 - # else: - # return 1 - - # table.dense_variable_name.extend(sorted(dense_param_name, cmp_fc)) - # table.dense_gradient_variable_name.extend( - # sorted(dense_grad_name, cmp_fc)) - table.dense_variable_name.extend(dense_param_name) - table.dense_gradient_variable_name.extend(dense_grad_name) - - def get_desc(self): - """ - Return downpour worker program_desc - """ - return self._worker diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py deleted file mode 100644 index 49a771b3d0e..00000000000 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ /dev/null @@ -1,1077 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Optimizer Factory.""" - -__all__ = ["DistributedAdam", "FLEET_GLOBAL_DICT"] -import paddle -from paddle.framework import core -from paddle.distributed.distribute_lookup_table import ( - find_distributed_lookup_table, -) -from paddle.distributed.distribute_lookup_table import ( - find_distributed_lookup_table_inputs, -) -from paddle.distributed.distribute_lookup_table import ( - find_distributed_lookup_table_outputs, -) -from google.protobuf import text_format -from collections import OrderedDict -import copy -from .node import DownpourWorker, DownpourServer -from . import ps_pb2 as pslib -import os -import logging - -OpRole = core.op_proto_and_checker_maker.OpRole -# this dict is for store info about pull/push sparse ops. -FLEET_GLOBAL_DICT = { - # global settings - "enable": False, - "emb_to_table": {}, - "emb_to_accessor": {}, - "emb_to_size": {}, - # current embedding settings - "cur_sparse_id": 0, - "cur_accessor": "", - "click_name": "", - "scale_sparse_grad": None, -} - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) - - -class DistributedOptimizerImplBase: - """ - DistributedOptimizerImplBase - base class of optimizers - """ - - def __init__(self, optimizer): - self._optimizer = optimizer - self._learning_rate = optimizer._learning_rate - self._regularization = optimizer.regularization - - def minimize( - self, - losses, - startup_program=None, - parameter_list=None, - no_grad_set=None, - ): - """ - Args: - losses(Variable): loss variable defined by user - startup_program(Program): startup program that defined by user - parameter_list(str list): parameter names defined by users - no_grad_set(set): a set of variables that is defined by users - so that these variables do not need gradient computation - """ - pass - - -class DistributedAdam(DistributedOptimizerImplBase): - """ - DistributedAdam - adam optimizer in distributed training - """ - - def __init__(self, optimizer): - # todo(guru4elephant): add more optimizers here as argument - # todo(guru4elephant): make learning_rate as a variable - super().__init__(optimizer) - self._window = 1 - self.type = "downpour" - self.data_norm_name = [ - ".batch_size", - ".batch_square_sum", - ".batch_sum", - ".batch_size@GRAD", - ".batch_square_sum@GRAD", - ".batch_sum@GRAD", - ] - self.supported_embedding_types = [ - "lookup_table", - "pull_sparse", - "pull_sparse_v2", - "pull_box_sparse", - "pull_gpups_sparse", - ] - self.supported_embedding_grad_types = [ - "lookup_table_grad", - "push_sparse", - "push_sparse_v2", - ] - op_maker = core.op_proto_and_checker_maker - self.op_role_key = op_maker.kOpRoleAttrName() - - def _find_distributed_lookup_table_inputs(self, program, table_names): - """ - Find input variable of distribute lookup table in program. - We could support multi-distribute table now. - Args: - program(Program): given program, locate distributed lookup table - table_name(str): given table names that is found beforehand - Returns: - inputs - """ - local_vars = program.current_block().vars - inputs_dict = dict() - for table_name in table_names: - inputs_dict[table_name] = [] - - for op in program.global_block().ops: - if op.type in self.supported_embedding_types: - if op.input("W")[0] in table_names: - inputs_dict[op.input("W")[0]].extend( - [local_vars[name] for name in op.input("Ids")] - ) - return inputs_dict - - def _find_distributed_lookup_table_outputs(self, program, table_names): - """ - Find output variable of distribute lookup table in program. - We could support multi-distribute table now. - Args: - programs(Program): given program, locate distributed lookup table - table_name(str): given table name that is found beforehand - Returns: - outputs - """ - local_vars = program.current_block().vars - outputs_dict = dict() - for table_name in table_names: - outputs_dict[table_name] = [] - - for op in program.global_block().ops: - if op.type in self.supported_embedding_types: - if op.input("W")[0] in table_names: - outputs_dict[op.input("W")[0]].extend( - [local_vars[name] for name in op.output("Out")] - ) - return outputs_dict - - def _find_distributed_lookup_table_grads(self, program, table_names): - local_vars = program.current_block().vars - grads_dict = dict() - for table_name in table_names: - grads_dict[table_name] = [] - - for op in program.global_block().ops: - if op.type in self.supported_embedding_grad_types: - if op.input("W")[0] in table_names: - grads_dict[op.input("W")[0]].extend( - [local_vars[name] for name in op.input("Out@GRAD")] - ) - return grads_dict - - def _is_optimizer_op(self, op): - return self.op_role_key in op.attr_names and int( - op.all_attrs()[self.op_role_key] - ) & int(OpRole.Optimize) - - def _remove_optimize_op_for_embedding(self, loss, table_name): - """ - find multi-sparse-table - """ - table_name = [name + "@GRAD" for name in table_name] - need_remove_op_index = [] - block = loss.block.program.global_block() - for ids, op in list(enumerate(block.ops)): - if self._is_optimizer_op(op): - if op.input("Grad")[0] in table_name: - need_remove_op_index.append(ids) - - need_remove_op_index.sort(reverse=True) - for index in need_remove_op_index: - block._remove_op(index) - - def _find_multi_distributed_lookup_table(self, losses): - """ - find multi-sparse-table - """ - table_names = set() - cnt = 0 - tmp_list = [] - ret_list = [] - for loss in losses: - for op in loss.block.program.global_block().ops: - if op.type in self.supported_embedding_types: - if op.attr('is_distributed') is True: - table_name = op.input("W")[0] - if table_name not in table_names: - table_names.add(table_name) - tmp_list.append([table_name, cnt]) - cnt += 1 - tmp_list.sort(key=lambda k: k[1]) - for x in tmp_list: - ret_list.append(x[0]) - return ret_list - - def _if_last_block(self, op, _equal_dict): - # for conditional_block op - cond_str = op.input('Cond')[0] - bool_test = False - if cond_str.startswith('equal'): - bool_test = True - vars_ = op.input('Input') - equal_keys = _equal_dict.keys() - for var_cond in vars_: - if var_cond in equal_keys: - if bool_test: - print("the conditional block is error") - return False - return True - - def _generte_cond_para_map( - self, op, _fill_value_dict, _equal_fill_dict, _now_program, _all_params - ): - # generate cond value to parameter map recursively - cond_str = op.input('Cond')[0] - vars_ = op.input('Input') - - if self._if_last_block(op, _equal_fill_dict): - vars_ = op.input('Input') - cond_key = "" - if cond_str.startswith('equal'): - cond_key = int(_fill_value_dict[_equal_fill_dict[cond_str]]) - else: - cond_key = -1 - p_list = [] - for var_cond in vars_: - if var_cond in _all_params: - p_list.append(var_cond) - - self._cond_params[cond_key] = p_list - self._other_params.extend(p_list) - else: - ops_cond = _now_program.block(int(op.attr('sub_block').id)).ops - for op in ops_cond: - if op.type == 'conditional_block': - self._generte_cond_para_map( - op, - _fill_value_dict, - _equal_fill_dict, - _now_program, - _all_params, - ) - - def _has_conditional_block(self, loss): - now_program = loss.block.program - root_block = now_program.block(0) - ops_ = root_block.ops - for op in ops_: - if op.type == 'conditional_block': - return True - return False - - def _check_params_grads(self, params, grads): - if len(params) != len(grads): - raise ValueError( - "params size != grads size, %s vs %s" - % (len(params), len(grads)) - ) - - pname2grad = dict() - for i in range(len(params)): - pname = params[i].name - gname = grads[i].name - if pname != gname[:-5]: - raise ValueError(" params != grads , %s vs %s" % (pname, gname)) - pname2grad[pname] = grads[i] - - return pname2grad - - def _generate_multi_dense_table( - self, - params, - grads, - cond_params, - other_params, - sparse_table_names, - dense_table_id=0, - ): - # generate multi dense table by cond value - pname2grad = self._check_params_grads(params, grads) - root_params_list = [] - root_grads_list = [] - dense_tables = [] - for i, p in enumerate(params): - if p.name not in other_params and p.name not in sparse_table_names: - root_params_list.append(p) - root_grads_list.append(grads[i]) - if len(root_params_list) > 0: - dense_tables.append(dense_table_id) - dense_table_id += 1 - lists_params = [[] for i in range(len(cond_params.keys()))] - lists_grads = [[] for i in range(len(cond_params.keys()))] - - key_id = 0 - name2key = dict() - cond2denseid = dict() - for key, value in cond_params.items(): - cond2denseid[key] = dense_table_id - dense_tables.append(dense_table_id) - dense_table_id += 1 - for v in value: - name2key[v] = key_id - key_id += 1 - - for p in params: - if p.name in other_params: - lists_params[name2key[p.name]].append(p) - lists_grads[name2key[p.name]].append(pname2grad[p.name]) - - return ( - dense_tables, - cond2denseid, - lists_params, - lists_grads, - root_params_list, - root_grads_list, - ) - - def _gen_distributed_emb_to_size_dict(self, program): - d_size = dict() - local_vars = program.current_block().vars - - for op in program.global_block().ops: - if op.type in self.supported_embedding_types: - if op.attr('is_distributed') is True: - table_name = op.input("W")[0] - emb_size = local_vars[table_name].shape[-1] - if d_size.get(table_name) is None: - d_size[table_name] = emb_size - elif d_size[table_name] != emb_size: - raise ValueError( - "embedding size error: %s vs %s" - % (emb_size, d_size[table_name]) - ) - - return d_size - - def _check_config_fleet_with_program_op( - self, strategy, table_name, emb_to_size - ): - if strategy.get(table_name) is None: - strategy[table_name] = dict() - st = strategy[table_name] - - accessor = "DownpourCtrAccessor" - if st.get("sparse_accessor_class") is not None: - accessor = st["sparse_accessor_class"] - - # set sparse_embedx_dim in the strategy according to accessor and use_cvm config - if ( - accessor == "DownpourFeatureValueAccessor" - or accessor == "DownpourCtrAccessor" - or accessor == "DownpourCtrDymfAccessor" - or accessor == "DownpourDoubleUnitAccessor" - or accessor == "DownpourUnitAccessor" - ): - if ( - st.get("sparse_embedx_dim") is not None - and strategy.get("use_cvm") == True - and st["sparse_embedx_dim"] != emb_to_size[table_name] - 3 - ): - raise ValueError( - "fleet config sparse_embedx_dim=%s not" - " equal to embedding dim - 3 = %s" - % (st["sparse_embedx_dim"], emb_to_size[table_name] - 3) - ) - if ( - st.get("sparse_embedx_dim") is not None - and strategy.get("use_cvm") == False - and st["sparse_embedx_dim"] != emb_to_size[table_name] - 1 - ): - raise ValueError( - "fleet config sparse_embedx_dim=%s not" - " equal to embedding dim - 1 = %s" - % (st["sparse_embedx_dim"], emb_to_size[table_name] - 1) - ) - if ( - st.get("sparse_embedx_dim") is None - and strategy.get("use_cvm") == True - ): - logger.warning( - "sparse embedding dim for table name '{}' is: {}, while sparse_embedx_dim " - "with same sparse table name is not set in config_fleet.py. " - "Hence automatically set sparse_embedx_dim = {} - 3.".format( - table_name, - emb_to_size[table_name], - emb_to_size[table_name], - ) - ) - st["sparse_embedx_dim"] = emb_to_size[table_name] - 3 - if ( - st.get("sparse_embedx_dim") is None - and strategy.get("use_cvm") == False - ): - logger.warning( - "sparse embedding dim for table name '{}' is: {}, while sparse_embedx_dim " - "with same sparse table name is not set in config_fleet.py. " - "Hence automatically set sparse_embedx_dim = {} - 1.".format( - table_name, - emb_to_size[table_name], - emb_to_size[table_name], - ) - ) - st["sparse_embedx_dim"] = emb_to_size[table_name] - 1 - elif accessor == "DownpourSparseValueAccessor": - if ( - st.get("sparse_embedx_dim") is not None - and st["sparse_embedx_dim"] != emb_to_size[table_name] - ): - raise ValueError( - "fleet config sparse_embedx_dim=%s not" - " equal to embedding dim = %s" - % (st["sparse_embedx_dim"], emb_to_size[table_name]) - ) - if st.get("sparse_embedx_dim") is None: - logger.warning( - "sparse embedding dim for table name '{}' is: {}, while sparse_embedx_dim " - "with same sparse table name is not set in config_fleet.py. " - "Hence automatically set sparse_embedx_dim = {}.".format( - table_name, - emb_to_size[table_name], - emb_to_size[table_name], - ) - ) - st["sparse_embedx_dim"] = emb_to_size[table_name] - - return strategy - - def _minimize( - self, - losses, - startup_program=None, - parameter_list=None, - no_grad_set=None, - strategy={}, - ): - """ - DownpounSGD is a distributed optimizer so - that user can call minimize to generate backward - operators and optimization operators within minimize function - Args: - loss(Variable): loss variable defined by user - startup_program(Program): startup program that defined by user - parameter_list(str list): parameter names defined by users - no_grad_set(set): a set of variables that is defined by users - so that these variables do not need gradient computation - strategy(dict): user-defined properties - Returns: - [optimize_ops, grads_and_weights] - """ - # sparse table names of each program - prog_id_to_sparse_table = OrderedDict() - # inputs_dict and outputs_dict of sparse tables of each program - prog_id_to_inputs_dict = OrderedDict() - prog_id_to_outputs_dict = OrderedDict() - # related to PSParameter - ps_param = pslib.PSParameter() - # related to ServerParameter - server = DownpourServer() - # program to worker (related to DownpourTrainerParameter) - prog_id_to_worker = OrderedDict() - # param_grads of each program - prog_id_to_param_grads = OrderedDict() - # sparse_grads of each program - prog_id_to_sparse_grads = OrderedDict() - # unique program set - program_id_set = set() - - sparse_table_to_index = OrderedDict() - sparse_table_index = 0 - for num in range(len(losses)): - loss = losses[num] - parameters = None - if parameter_list is not None: - parameters = parameter_list[num] - prog_id = str(id(loss.block.program)) - # param_grads of program - params_grads = sorted( - paddle.static.append_backward(loss, parameters, no_grad_set), - key=lambda x: x[0].name, - ) - - flag_use_ps_gpu = strategy.get("use_ps_gpu", False) - if flag_use_ps_gpu: - if not isinstance(startup_program, list): - startup_program = [startup_program] - optimizer = copy.deepcopy(self._optimizer) - optimize_ops = optimizer.apply_optimize( - loss, - startup_program=startup_program[num], - params_grads=params_grads, - ) - embedding_table = self._find_multi_distributed_lookup_table( - [loss] - ) - self._remove_optimize_op_for_embedding(loss, embedding_table) - # has condition_block op means multi-task - flag_multi_task = self._has_conditional_block(loss) - if flag_multi_task: - self._cond_params = dict() - self._other_params = [] - now_program = loss.block.program - root_block = now_program.block(0) - all_params = [] - for par in root_block.all_parameters(): - all_params.append(par.name) - - ops_ = root_block.ops - fill_value_dict = dict() - equal_fill_dict = dict() - for op in ops_: - # conditional_block op must has fill_constant and equal op - if op.type == 'fill_constant': - fill_value_dict[op.output('Out')[0]] = op.attr('value') - if op.type == 'equal': - equal_fill_dict[op.output('Out')[0]] = op.input('Y')[0] - if op.type == 'conditional_block': - self._generte_cond_para_map( - op, - fill_value_dict, - equal_fill_dict, - now_program, - all_params, - ) - - if prog_id not in program_id_set: - program_id_set.add(prog_id) - sparse_table = self._find_multi_distributed_lookup_table([loss]) - prog_id_to_sparse_table[prog_id] = sparse_table - - # get sparse_table_to_index - for tn in sparse_table: - if sparse_table_to_index.get(tn) is None: - sparse_table_to_index[tn] = sparse_table_index - sparse_table_index += 1 - - # get {table_name: emb_size} dict from program ops - emb_to_size = self._gen_distributed_emb_to_size_dict( - loss.block.program - ) - - # get inputs_dict - inputs_dict = self._find_distributed_lookup_table_inputs( - loss.block.program, sparse_table - ) - prog_id_to_inputs_dict[prog_id] = inputs_dict - # get outputs_dict - outputs_dict = self._find_distributed_lookup_table_outputs( - loss.block.program, sparse_table - ) - prog_id_to_outputs_dict[prog_id] = outputs_dict - - prog_id_to_worker[prog_id] = DownpourWorker(self._window) - - grads_dict = self._find_distributed_lookup_table_grads( - loss.block.program, sparse_table - ) - prog_id_to_sparse_grads[prog_id] = grads_dict - - if prog_id not in prog_id_to_param_grads: - prog_id_to_param_grads[prog_id] = [] - prog_id_to_param_grads[prog_id].append(params_grads) - - # if strategy.get("parallel_compute") - - # if user specify a fleet_desc.prototxt file, then load the file - # instead of creating default fleet_desc.prototxt. - # user can specify server_param or trainer_param or fs_client_param. - if strategy.get("fleet_desc_file") is not None: - fleet_desc_file = strategy["fleet_desc_file"] - with open(fleet_desc_file) as f: - text_format.Merge(f.read(), ps_param) - server.get_desc().CopyFrom(ps_param.server_param) - if len(ps_param.trainer_param) == 1: - for k in prog_id_to_worker: - prog_id_to_worker[k].get_desc().CopyFrom( - ps_param.trainer_param[0] - ) - else: - if len(ps_param.trainer_param) != len(prog_id_to_worker): - raise ValueError( - "trainer param size != program size, %s vs %s" - % (len(ps_param.trainer_param), len(prog_id_to_worker)) - ) - idx = 0 - # prog_id_to_worker is OrderedDict - for k in prog_id_to_worker: - prog_id_to_worker[k].get_desc().CopyFrom( - ps_param.trainer_param[idx] - ) - idx += 1 - - # check config in op defination and fleet config - if FLEET_GLOBAL_DICT["enable"]: - one_slot = None - strategy["device_worker"] = "Hogwild" - emb_to_table = FLEET_GLOBAL_DICT["emb_to_table"] - emb_to_accessor = FLEET_GLOBAL_DICT["emb_to_accessor"] - emb_to_size = FLEET_GLOBAL_DICT["emb_to_size"] - if len(sparse_table_to_index) != len(emb_to_table): - raise ValueError( - "sparse tables from program != sparse tables from op: %s " - "vs %s" % (len(sparse_table_to_index), len(emb_to_table)) - ) - for key in sparse_table_to_index: - if ( - key not in emb_to_table - or sparse_table_to_index[key] != emb_to_table[key] - ): - print("sparse_table_to_index ", sparse_table_to_index) - print("emb_to_table ", emb_to_table) - raise ValueError("key error: %s" % key) - if strategy.get(key) is None: - strategy[key] = dict() - st = strategy[key] - - accessor = None - if st.get("sparse_accessor_class") is not None: - accessor = st["sparse_accessor_class"] - tables = ( - server.get_desc().downpour_server_param.downpour_table_param - ) - for table in tables: - if table.table_id == sparse_table_to_index[key]: - accessor = table.accessor.accessor_class - break - - for loss in losses: - for op in loss.block.program.global_block().ops: - if op.type in self.supported_embedding_types: - if accessor is not None and op.has_attr( - "AccessorClass" - ): - op._set_attr("AccessorClass", accessor) - if one_slot is None: - one_slot = ( - loss.block.program.global_block().var( - op.input("Ids")[0] - ) - ) - - # if accessor is None, use default accessor in op definition - if accessor is None: - accessor = emb_to_accessor[key] - # set sparse_embedx_dim in strategy, - # user do not have to set it in config_fleet - if ( - accessor == "DownpourFeatureValueAccessor" - or accessor == "DownpourCtrDymfAccessor" - or accessor == "DownpourCtrAccessor" - or accessor == "DownpourDoubleUnitAccessor" - or accessor == "DownpourUnitAccessor" - ): - if ( - st.get("sparse_embedx_dim") is not None - and st["sparse_embedx_dim"] != emb_to_size[key] - 3 - ): - raise ValueError( - "fleet config sparse_embedx_dim=%s not" - " equal to embedding size - 3 = %s" - % (st["sparse_embedx_dim"], emb_to_size[key] - 3) - ) - st["sparse_embedx_dim"] = emb_to_size[key] - 3 - elif accessor == "DownpourSparseValueAccessor": - if ( - st.get("sparse_embedx_dim") is not None - and st["sparse_embedx_dim"] != emb_to_size[key] - ): - raise ValueError( - "fleet config sparse_embedx_dim=%s not" - " equal to embedding size = %s" - % (st["sparse_embedx_dim"], emb_to_size[key]) - ) - st["sparse_embedx_dim"] = emb_to_size[key] - - # ServerParameter add all sparse tables - for tn in sparse_table_to_index: - sparse_table_index = sparse_table_to_index[tn] - st = self._check_config_fleet_with_program_op( - strategy, tn, emb_to_size - ) - if st.get(tn) is not None: - server.add_sparse_table(sparse_table_index, st[tn]) - else: - server.add_sparse_table(sparse_table_index, None) - - # each DownpourTrainerParameter add its own sparse tables - program_id_set.clear() - for loss in losses: - prog_id = str(id(loss.block.program)) - if prog_id not in program_id_set: - program_id_set.add(prog_id) - worker = prog_id_to_worker[prog_id] - inputs_dict = prog_id_to_inputs_dict[prog_id] - outputs_dict = prog_id_to_outputs_dict[prog_id] - for tn in prog_id_to_sparse_table[prog_id]: - sparse_table_index = sparse_table_to_index[tn] - grads_dict = prog_id_to_sparse_grads[prog_id] - worker.add_sparse_table( - sparse_table_index, - inputs_dict[tn], - outputs_dict[tn], - grads_dict[tn], - ) - - dense_start_table_id = len(sparse_table_to_index) - dense_table_index = len(sparse_table_to_index) - program_configs = {} - # ServerParameter add all dense tables - # each DownpourTrainerParameter add its own dense tables - program_id_set.clear() - for loss_index in range(len(losses)): - program_id = str(id(losses[loss_index].block.program)) - if program_id not in program_id_set: - program_id_set.add(program_id) - worker = prog_id_to_worker[program_id] - sparse_table_names = prog_id_to_sparse_table[program_id] - sparse_table_index = [ - sparse_table_to_index[i] for i in sparse_table_names - ] - - program_configs[program_id] = { - "pull_sparse": [t_index for t_index in sparse_table_index], - "push_sparse": [t_index for t_index in sparse_table_index], - } - - params_grads = prog_id_to_param_grads[program_id] - for pg in params_grads: - params = [] - grads = [] - data_norm_params = [] - data_norm_grads = [] - for i in pg: - is_data_norm_data = False - for data_norm_name in self.data_norm_name: - if i[0].name.endswith(data_norm_name): - is_data_norm_data = True - data_norm_params.append(i[0]) - if not is_data_norm_data: - params.append(i[0]) - - for i in pg: - is_data_norm_data = False - for data_norm_grad in self.data_norm_name: - if i[0].name.endswith(data_norm_grad): - is_data_norm_data = True - data_norm_grads.append(i[1]) - if not is_data_norm_data: - grads.append(i[1]) - # for new dense table - multi_task_dense_tables_push = [] - multi_task_dense_tables_pull = [] - if flag_multi_task: - ( - dense_tables, - cond2denseid, - lists_params, - lists_grads, - root_params_list, - root_grads_list, - ) = self._generate_multi_dense_table( - params, - grads, - self._cond_params, - self._other_params, - sparse_table_names, - dense_table_index, - ) - program_configs[program_id][ - 'cond2denseid' - ] = cond2denseid - multi_task_dense_tables_push = dense_tables - multi_task_dense_tables_pull = dense_tables[:] - - if strategy.get('dense_table') is not None: - if flag_multi_task: - server_dense_table_index = dense_table_index - if len(root_params_list) > 0: - server.add_dense_table( - server_dense_table_index, - root_params_list, - root_grads_list, - strategy['dense_table'], - sparse_table_names, - ) - server_dense_table_index += 1 - - for i in range(len(lists_params)): - server.add_dense_table( - server_dense_table_index, - lists_params[i], - lists_grads[i], - strategy['dense_table'], - sparse_table_names, - ) - server_dense_table_index += 1 - else: - server.add_dense_table( - dense_table_index, - params, - grads, - strategy['dense_table'], - sparse_table_names, - ) - - else: - server.add_dense_table( - dense_table_index, - params, - grads, - None, - sparse_table_names, - ) - - if flag_multi_task: - - if len(root_params_list) > 0: - worker.add_dense_table( - dense_table_index, - self._learning_rate, - root_params_list, - root_grads_list, - dense_start_table_id, - sparse_table_names, - ) - dense_table_index += 1 - - for i in range(len(lists_params)): - worker.add_dense_table( - dense_table_index, - self._learning_rate, - lists_params[i], - lists_grads[i], - dense_start_table_id, - sparse_table_names, - ) - dense_table_index += 1 - - dense_table_index -= 1 - else: - worker.add_dense_table( - dense_table_index, - self._learning_rate, - params, - grads, - dense_start_table_id, - sparse_table_names, - ) - - if FLEET_GLOBAL_DICT["enable"]: - cur_prog = losses[loss_index].block.program - cur_prog.global_block().append_op( - type="push_dense", - inputs={"Ids": one_slot}, - attrs={ - "InputNames": [i.name for i in grads], - "TableId": dense_table_index, - "ScaleDataNorm": strategy.get( - "scale_datanorm", -1 - ), - }, - ) - - if ( - "pull_dense" in program_configs[program_id] - and "push_dense" in program_configs[program_id] - and len(program_configs[program_id]["pull_dense"]) > 0 - ): - if flag_multi_task: - program_configs[program_id]["pull_dense"].extend( - multi_task_dense_tables_pull - ) - program_configs[program_id]["push_dense"].extend( - multi_task_dense_tables_push - ) - else: - program_configs[program_id]["pull_dense"].extend( - [dense_table_index] - ) - program_configs[program_id]["push_dense"].extend( - [dense_table_index] - ) - else: - if flag_multi_task: - program_configs[program_id][ - "pull_dense" - ] = multi_task_dense_tables_pull - program_configs[program_id][ - "push_dense" - ] = multi_task_dense_tables_push - else: - program_configs[program_id]["pull_dense"] = [ - dense_table_index - ] - program_configs[program_id]["push_dense"] = [ - dense_table_index - ] - - if len(data_norm_params) != 0 and len(data_norm_grads) != 0: - dense_table_index += 1 - if strategy.get('datanorm_table') is not None: - server.add_data_norm_table( - dense_table_index, - self._learning_rate, - data_norm_params, - data_norm_grads, - strategy['datanorm_table'], - sparse_table_names, - ) - else: - server.add_data_norm_table( - dense_table_index, - self._learning_rate, - data_norm_params, - data_norm_grads, - None, - sparse_table_names, - ) - - worker.add_dense_table( - dense_table_index, - self._learning_rate, - data_norm_params, - data_norm_grads, - dense_start_table_id, - sparse_table_names, - ) - - if FLEET_GLOBAL_DICT["enable"]: - cur_prog = losses[loss_index].block.program - cur_prog.global_block().append_op( - type="push_dense", - inputs={"Ids": one_slot}, - attrs={ - "InputNames": [ - i.name for i in data_norm_grads - ], - "TableId": dense_table_index, - "ScaleDataNorm": strategy.get( - "scale_datanorm", -1 - ), - }, - ) - - program_configs[program_id]["pull_dense"].extend( - [dense_table_index] - ) - program_configs[program_id]["push_dense"].extend( - [dense_table_index] - ) - dense_table_index += 1 - - # Todo(guru4elephant): figure out how to support more sparse parameters - # currently only support lookup_table - worker_skipped_ops = ["lookup_table", "lookup_table_grad"] - if len(worker.get_desc().skip_op) == 0: - worker.get_desc().skip_op.extend(worker_skipped_ops) - - ps_param.server_param.CopyFrom(server.get_desc()) - # prog_id_to_worker is OrderedDict - if len(ps_param.trainer_param) == 0: - for k in prog_id_to_worker: - tp = ps_param.trainer_param.add() - tp.CopyFrom(prog_id_to_worker[k].get_desc()) - - if strategy.get("fs_uri") is not None: - ps_param.fs_client_param.uri = strategy["fs_uri"] - elif ps_param.fs_client_param.uri == "": - ps_param.fs_client_param.uri = "hdfs://your_hdfs_uri" - if strategy.get("fs_user") is not None: - ps_param.fs_client_param.user = strategy["fs_user"] - elif ps_param.fs_client_param.user == "": - ps_param.fs_client_param.user = "your_hdfs_user" - if strategy.get("fs_passwd") is not None: - ps_param.fs_client_param.passwd = strategy["fs_passwd"] - elif ps_param.fs_client_param.passwd == "": - ps_param.fs_client_param.passwd = "your_hdfs_passwd" - if strategy.get("fs_hadoop_bin") is not None: - ps_param.fs_client_param.hadoop_bin = strategy["fs_hadoop_bin"] - elif ps_param.fs_client_param.hadoop_bin == "": - ps_param.fs_client_param.hadoop_bin = "$HADOOP_HOME/bin/hadoop" - - opt_info = {} - opt_info["program_id_to_worker"] = prog_id_to_worker - opt_info["program_configs"] = program_configs - opt_info["trainer"] = strategy.get("trainer", "DistMultiTrainer") - opt_info["device_worker"] = strategy.get("device_worker", "DownpourSGD") - opt_info["optimizer"] = "DownpourSGD" - opt_info["fleet_desc"] = ps_param - opt_info["worker_skipped_ops"] = worker_skipped_ops - opt_info["use_cvm"] = strategy.get("use_cvm", False) - opt_info["no_cvm"] = strategy.get("no_cvm", False) - opt_info["scale_sparse_gradient_with_batch_size"] = strategy.get( - "scale_sparse_gradient_with_batch_size", True - ) - opt_info["worker_class"] = strategy.get( - "worker_class", "DownpourWorker" - ) - opt_info["stat_var_names"] = strategy.get("stat_var_names", []) - opt_info["local_tables"] = strategy.get("local_tables", []) - opt_info["async_tables"] = strategy.get("async_tables", []) - opt_info["async_tables"] = strategy.get("async_tables", []) - opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1) - opt_info["check_nan_var_names"] = strategy.get( - "check_nan_var_names", [] - ) - opt_info["dump_slot"] = False - opt_info["dump_converter"] = "" - opt_info["dump_fields"] = strategy.get("dump_fields", []) - opt_info["dump_file_num"] = strategy.get("dump_file_num", 16) - opt_info["user_define_dump_filename"] = strategy.get( - "user_define_dump_filename", "" - ) - opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "") - opt_info["dump_param"] = strategy.get("dump_param", []) - gpus_env = os.getenv("FLAGS_selected_gpus", "0") - opt_info["worker_places"] = [int(s) for s in gpus_env.split(",")] - opt_info["use_ps_gpu"] = strategy.get("use_ps_gpu", False) - if server._server.downpour_server_param.downpour_table_param[ - 0 - ].accessor.accessor_class in [ - "DownpourCtrAccessor", - "DownpourCtrDoubleAccessor", - "DownpourUnitAccessor", - "DownpourDoubleUnitAccessor", - "DownpourCtrDymfAccessor", - ]: - opt_info["dump_slot"] = True - elif ( - server._server.downpour_server_param.downpour_table_param[ - 0 - ].accessor.accessor_class - == "DownpourSparseValueAccessor" - ): - opt_info["no_cvm"] = True - opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {}) - opt_info["copy_table"] = strategy.get("copy_table", {}) - opt_info["loss_names"] = strategy.get("loss_names", []) - - for loss in losses: - loss.block.program._fleet_opt = opt_info - - param_grads_list = [] - for loss in losses: - prog_id = str(id(loss.block.program)) - param_grads_list.append(prog_id_to_param_grads[prog_id]) - return None, param_grads_list, opt_info diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_recv_save_op.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_recv_save_op.py index 184efab6e97..39487aa6212 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/test_recv_save_op.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_recv_save_op.py @@ -25,8 +25,8 @@ from dist_test_utils import remove_ps_flag import paddle.fluid as fluid import paddle.fluid.core as core from paddle.fluid.framework import Program, program_guard -from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode from paddle.fluid.op import Operator +from paddle.incubate.fleet.parameter_server.mode import DistributedMode def run_pserver(pserver_id): diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index 023782edb81..4641f6bd5a6 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -140,7 +140,7 @@ import paddle.fluid as fluid from paddle.distributed.communicator import Communicator import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode +from paddle.incubate.fleet.parameter_server.mode import DistributedMode import paddle.distributed.fleet as fleet from test_communicator_geo import TestCommunicatorGeoEnd2End diff --git a/python/paddle/fluid/transpiler/geo_sgd_transpiler.py b/python/paddle/fluid/transpiler/geo_sgd_transpiler.py index f6c584eea6f..8b6cbd8055a 100644 --- a/python/paddle/fluid/transpiler/geo_sgd_transpiler.py +++ b/python/paddle/fluid/transpiler/geo_sgd_transpiler.py @@ -49,7 +49,7 @@ from .distribute_transpiler import ( same_or_split_var, ServerRuntimeConfig, ) -from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode +from paddle.incubate.fleet.parameter_server.mode import DistributedMode from paddle.distributed.distribute_lookup_table import ( find_distributed_lookup_table, ) diff --git a/python/paddle/incubate/fleet/parameter_server/ir/public.py b/python/paddle/incubate/fleet/parameter_server/ir/public.py index 7e7067f84d5..9b58396d74e 100755 --- a/python/paddle/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/incubate/fleet/parameter_server/ir/public.py @@ -19,10 +19,10 @@ import warnings from functools import reduce import paddle -from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode from paddle.framework import core from paddle.incubate.fleet.parameter_server.ir import vars_metatools from paddle.incubate.fleet.parameter_server.ir.ps_dispatcher import RoundRobin +from paddle.incubate.fleet.parameter_server.mode import DistributedMode OP_NAME_SCOPE = "op_namescope" CLIP_OP_NAME_SCOPE = "gradient_clip" diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/.gitignore b/python/paddle/incubate/fleet/parameter_server/pslib/.gitignore similarity index 100% rename from python/paddle/fluid/incubate/fleet/parameter_server/pslib/.gitignore rename to python/paddle/incubate/fleet/parameter_server/pslib/.gitignore diff --git a/python/paddle/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 3a480a36a45..2a764225321 100644 --- a/python/paddle/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -28,7 +28,7 @@ from . import ps_pb2 as pslib from .node import DownpourServer, DownpourWorker OpRole = core.op_proto_and_checker_maker.OpRole -# this dict is for store info about pull/push sparse ops. +# this dict is for storing info about pull/push sparse ops. FLEET_GLOBAL_DICT = { # global settings "enable": False, -- GitLab