From 4a3c4b8fa49f6a564564ff0e74977aae19aa5522 Mon Sep 17 00:00:00 2001 From: zhang wenhui Date: Fri, 23 Aug 2019 14:59:26 +0800 Subject: [PATCH] add fleet_desc config feature & multi_sparse table, test=develop (#18827) add fleet_desc config feature & multi_sparse table, --- python/paddle/fluid/device_worker.py | 44 ++-- .../fleet/parameter_server/pslib/__init__.py | 2 +- .../fleet/parameter_server/pslib/node.py | 243 +++++++++++++----- .../pslib/optimizer_factory.py | 114 ++++++-- .../fluid/tests/unittests/test_downpoursgd.py | 148 +++++++++++ 5 files changed, 451 insertions(+), 100 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_downpoursgd.py diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index 80989d5804..5eb5f3ec87 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -146,27 +146,29 @@ class DownpourSGD(DeviceWorker): dense_table.dense_value_name.extend(i.dense_variable_name) dense_table.table_id = \ i.table_id - sparse_table = downpour.sparse_table.add() - sparse_table.table_id = \ - self._fleet_desc.trainer_param.sparse_table[0].table_id - sparse_table.sparse_key_name.extend( - self._fleet_desc.trainer_param.sparse_table[0].slot_key) - sparse_table.sparse_value_name.extend( - self._fleet_desc.trainer_param.sparse_table[0].slot_value) - sparse_table.sparse_grad_name.extend( - self._fleet_desc.trainer_param.sparse_table[0].slot_gradient) - if opt_info["use_cvm"]: - sparse_table.emb_dim = \ - self._fleet_desc.server_param.downpour_server_param.downpour_table_param[ - 0].accessor.fea_dim - sparse_table.fea_dim = sparse_table.emb_dim - else: - sparse_table.emb_dim = \ - self._fleet_desc.server_param.downpour_server_param.downpour_table_param[ - 0].accessor.fea_dim - 2 - sparse_table.fea_dim = sparse_table.emb_dim + 2 - # TODO(guru4elephant): hard code here, need to improve - sparse_table.label_var_name = "click" + sparse_len = len(self._fleet_desc.trainer_param.sparse_table) + for i in range(sparse_len): + sparse_table = downpour.sparse_table.add() + sparse_table.table_id = \ + self._fleet_desc.trainer_param.sparse_table[i].table_id + sparse_table.sparse_key_name.extend( + self._fleet_desc.trainer_param.sparse_table[i].slot_key) + sparse_table.sparse_value_name.extend( + self._fleet_desc.trainer_param.sparse_table[i].slot_value) + sparse_table.sparse_grad_name.extend( + self._fleet_desc.trainer_param.sparse_table[i].slot_gradient) + if opt_info["use_cvm"]: + sparse_table.emb_dim = \ + self._fleet_desc.server_param.downpour_server_param.downpour_table_param[ + i].accessor.fea_dim + sparse_table.fea_dim = sparse_table.emb_dim + else: + sparse_table.emb_dim = \ + self._fleet_desc.server_param.downpour_server_param.downpour_table_param[ + i].accessor.fea_dim - 2 + sparse_table.fea_dim = sparse_table.emb_dim + 2 + # TODO(guru4elephant): hard code here, need to improve + sparse_table.label_var_name = "click" for i in self._fleet_desc.trainer_param.dense_table: if i.table_id in dense_table_set: diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index e0a5082a89..4791e85ac8 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -13,7 +13,7 @@ import os import sys -from optimizer_factory import * +from .optimizer_factory import * from google.protobuf import text_format import paddle.fluid as fluid from paddle.fluid.framework import Program diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index d126caee61..413f689f93 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -import ps_pb2 as pslib +from . import ps_pb2 as pslib class Server(object): @@ -43,25 +43,21 @@ class DownpourServer(Server): def __init__(self): self._server = pslib.ServerParameter() - self._server.downpour_server_param.service_param.start_server_port = 0 self._server.downpour_server_param.service_param.server_class = "DownpourBrpcPsServer" self._server.downpour_server_param.service_param.client_class = "DownpourBrpcPsClient" self._server.downpour_server_param.service_param.service_class = "DownpourPsService" self._server.downpour_server_param.service_param.start_server_port = 0 self._server.downpour_server_param.service_param.server_thread_num = 12 - def add_sparse_table(self, table_id, learning_rate, slot_key_vars, - slot_value_var): + def add_sparse_table(self, table_id, strategy): """ Args: table_id(int): id of sparse params table - learning_rate(float): the learning rate used to update parameters. \ - Can be a float value - slot_key_vars(string): slot key id - slot_value_var(string): slot key value after embedding + strategy(dict): the config dict. Returns: return None """ + for table in self._server.downpour_server_param.downpour_table_param: if table.table_id == table_id: if table.type == pslib.PS_SPARSE_TABLE: @@ -69,38 +65,100 @@ class DownpourServer(Server): else: raise ValueError("expect table %s type=%s, but actual type=%s" \ %(table_id, pslib.PS_SPARSE_TABLE, table.type)) + if strategy is None: + strategy = dict() table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id - table.table_class = "DownpourSparseTable" table.type = pslib.PS_SPARSE_TABLE - table.compress_in_save = True - table.shard_num = 1000 - table.accessor.accessor_class = "DownpourCtrAccessor" - table.accessor.sparse_sgd_param.learning_rate = 0.05 - table.accessor.sparse_sgd_param.initial_g2sum = 3 - table.accessor.sparse_sgd_param.initial_range = 1e-4 - table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10]) - - table.accessor.embedx_dim = 8 - table.accessor.embedx_threshold = 10 - table.accessor.fea_dim = 11 - table.accessor.downpour_accessor_param.nonclk_coeff = 0.1 - table.accessor.downpour_accessor_param.click_coeff = 1 - table.accessor.downpour_accessor_param.base_threshold = 1.5 - table.accessor.downpour_accessor_param.delta_threshold = 0.25 - table.accessor.downpour_accessor_param.delta_keep_days = 30 - table.accessor.downpour_accessor_param.delete_after_unseen_days = 30 - table.accessor.downpour_accessor_param.show_click_decay_rate = 0.98 - table.accessor.downpour_accessor_param.delete_threshold = 0.8 - - def add_dense_table(self, table_id, learning_rate, param_var, grad_var): + + support_sparse_key_list = ['sparse_table_class', 'sparse_compress_in_save', 'sparse_shard_num', \ + 'sparse_accessor_class', 'sparse_learning_rate', 'sparse_initial_g2sum', 'sparse_initial_range', \ + 'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \ + 'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \ + 'sparse_show_click_decay_rate', 'sparse_delete_threshold'] + + for key in strategy: + if key not in support_sparse_key_list: + raise ValueError("strategy key '%s' not support" % (key)) + + support_table_calss = ['DownpourSparseTable'] + if strategy.get('sparse_table_class') is not None: + table_class = strategy.get('sparse_table_class') + if table_class not in support_table_calss: + raise ValueError( + "support sparse_table_class: [ 'DownpourSparseTable' ], \ + but actual %s" % (table_class)) + else: + table_class = 'DownpourSparseTable' + + table.table_class = table_class + + if table_class == 'DownpourSparseTable': + table.compress_in_save = strategy.get('sparse_compress_in_save', + True) + table.shard_num = strategy.get('sparse_shard_num', 1000) + + support_accessor_class = [ + 'DownpourFeatureValueAccessor', 'DownpourCtrAccessor' + ] + if strategy.get('sparse_accessor_class') is not None: + accessor_class = strategy.get('sparse_accessor_class') + if accessor_class not in support_accessor_class: + raise ValueError( + "support sparse_accessor_class: ['DownpourFeatureValueAccessor', 'DownpourCtrAccessor'], \ + but actual %s" % (accessor_class)) + else: + accessor_class = 'DownpourCtrAccessor' + + table.accessor.accessor_class = accessor_class + + if accessor_class == 'DownpourFeatureValueAccessor' or accessor_class == 'DownpourCtrAccessor': + table.accessor.sparse_sgd_param.learning_rate = strategy.get( + 'sparse_learning_rate', 0.05) + table.accessor.sparse_sgd_param.initial_g2sum = strategy.get( + 'sparse_initial_g2sum', 3) + table.accessor.sparse_sgd_param.initial_range = strategy.get( + 'sparse_initial_range', 1e-4) + if strategy.get('sparse_weight_bounds') is None: + table.accessor.sparse_sgd_param.weight_bounds.extend( + [-10, 10]) + else: + table.accessor.sparse_sgd_param.weight_bounds.extend( + strategy.get('sparse_weight_bounds')) + table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8) + table.accessor.embedx_threshold = strategy.get( + 'sparse_embedx_threshold', 10) + table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3 + table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get( + 'sparse_nonclk_coeff', 0.1) + table.accessor.downpour_accessor_param.click_coeff = strategy.get( + 'sparse_click_coeff', 1) + table.accessor.downpour_accessor_param.base_threshold = strategy.get( + 'sparse_base_threshold', 1.5) + table.accessor.downpour_accessor_param.delta_threshold = strategy.get( + 'sparse_delta_threshold', 0.25) + table.accessor.downpour_accessor_param.delta_keep_days = strategy.get( + 'sparse_delta_keep_days', 16) + table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get( + 'sparse_delete_after_unseen_days', 30) + table.accessor.downpour_accessor_param.show_click_decay_rate = strategy.get( + 'sparse_show_click_decay_rate', 0.98) + table.accessor.downpour_accessor_param.delete_threshold = strategy.get( + 'sparse_delete_threshold', 0.8) + table1 = table.accessor.table_accessor_save_param.add() + table1.param = 1 + table1.converter = "(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)" + table1.deconverter = "(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)" + table2 = table.accessor.table_accessor_save_param.add() + table2.param = 2 + table2.converter = "(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)" + table2.deconverter = "(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)" + + def add_dense_table(self, table_id, param_var, grad_var, strategy): """ Args: table_id(int): id of sparse params table - learning_rate(float): the learning rate used to update parameters. \ - Can be a float value - param_var(list): all dense param. it is a list. - grad_var(list): all dense grad parm it is a list. + strategy(dict): the dense config dict. Returns: return None """ @@ -117,29 +175,47 @@ class DownpourServer(Server): else: raise ValueError("expect table %s type=%s, but actual type=%s" \ %(table_id, pslib.PS_DENSE_TABLE, table.type)) + + if strategy is None: + strategy = dict() table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id - table.table_class = "DownpourDenseTable" + support_dense_key_list = ['dense_table_class', 'dense_compress_in_save', 'dense_accessor_class', \ + 'dense_optimizer', 'dense_learning_rate', 'dense_avg_decay', 'dense_ada_decay', \ + 'dense_ada_epsilon', 'dense_mom_decay', 'dense_naive_lr'] + + for key in strategy: + if key not in support_dense_key_list: + raise ValueError("strategy key '%s' not support" % (key)) + + table.table_class = strategy.get('dense_table_class', + "DownpourDenseTable") table.type = pslib.PS_DENSE_TABLE - table.compress_in_save = True - table.accessor.accessor_class = "DownpourDenseValueAccessor" - table.accessor.dense_sgd_param.name = "adam" - table.accessor.dense_sgd_param.adam.learning_rate = learning_rate - table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993 - table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999 - table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8 - table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99 - table.accessor.dense_sgd_param.naive.learning_rate = 0.0002 + table.compress_in_save = strategy.get('dense_compress_in_save', True) + table.accessor.accessor_class = strategy.get( + 'dense_accessor_class', "DownpourDenseValueAccessor") + table.accessor.dense_sgd_param.name = strategy.get('dense_optimizer', + "adam") + table.accessor.dense_sgd_param.adam.learning_rate = strategy.get( + 'dense_learning_rate', 5e-06) + table.accessor.dense_sgd_param.adam.avg_decay_rate = strategy.get( + 'dense_avg_decay', 0.999993) + table.accessor.dense_sgd_param.adam.ada_decay_rate = strategy.get( + 'dense_ada_decay', 0.9999) + table.accessor.dense_sgd_param.adam.ada_epsilon = strategy.get( + 'dense_ada_epsilon', 1e-8) + table.accessor.dense_sgd_param.adam.mom_decay_rate = strategy.get( + 'dense_mom_decay', 0.99) + table.accessor.dense_sgd_param.naive.learning_rate = strategy.get( + 'dense_naive_lr', 0.0002) table.accessor.fea_dim = fea_dim - def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var): + def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var, + strategy): """ Args: - table_id(int): id of sparse params table - learning_rate(float): the learning rate used to update parameters. \ - Can be a float value - param_var(list): all dense param. it is a list. - grad_var(list): all dense grad parm it is a list. + table_id(int): id of datanorm table + strategy(dict): the datanorm config dict. Returns: return None """ @@ -156,14 +232,28 @@ class DownpourServer(Server): else: raise ValueError("expect table %s type=%s, but actual type=%s" \ %(table_id, pslib.PS_DENSE_TABLE, table.type)) + if strategy is None: + strategy = dict() + + support_datanorm_key_list = ['datanorm_table_class', 'datanorm_compress_in_save',\ + 'datanorm_accessor_class', 'datanorm_operation', 'datanorm_decay_rate'] + + for key in strategy: + if key not in support_datanorm_key_list: + raise ValueError("strategy key '%s' not support" % (key)) + table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id - table.table_class = "DownpourDenseDoubleTable" + table.table_class = strategy.get('datanorm_table_class', + "DownpourDenseDoubleTable") table.type = pslib.PS_DENSE_TABLE - table.compress_in_save = True - table.accessor.accessor_class = "DownpourDenseValueDoubleAccessor" - table.accessor.dense_sgd_param.name = "summarydouble" - table.accessor.dense_sgd_param.summary.summary_decay_rate = 0.999999 + table.compress_in_save = strategy.get('datanorm_compress_in_save', True) + table.accessor.accessor_class = strategy.get( + 'datanorm_accessor_class', "DownpourDenseValueDoubleAccessor") + table.accessor.dense_sgd_param.name = strategy.get('datanorm_operation', + "summarydouble") + table.accessor.dense_sgd_param.summary.summary_decay_rate = strategy.get( + 'datanorm_decay_rate', 0.999999) table.accessor.fea_dim = fea_dim def get_desc(self): @@ -187,13 +277,10 @@ class DownpourWorker(Worker): self.window = window self._worker = pslib.DownpourTrainerParameter() - def add_sparse_table(self, table_id, learning_rate, slot_key_vars, - slot_value_vars): + def add_sparse_table(self, table_id, slot_key_vars, slot_value_vars): """ Args: table_id(int): id of sparse params table - learning_rate(float): the learning rate used to update parameters. \ - Can be a float value slot_key_vars(string): slot key id slot_value_var(string): slot key value after embedding Returns: @@ -201,7 +288,26 @@ class DownpourWorker(Worker): """ for table in self._worker.sparse_table: if table.table_id == table_id: - return + if [var.name for var in slot_key_vars + ] == self._worker.sparse_table[table_id].slot_key: + if [var.name for var in slot_value_vars + ] == self._worker.sparse_table[table_id].slot_value: + if [ + var.name + "@GRAD" for var in slot_value_vars + ] == self._worker.sparse_table[table_id].slot_gradient: + return + else: + raise ValueError( + "sparse table %s slot_gradient error" % + table_id) + + else: + raise ValueError("sparse table %s slot_value error" % + table_id) + else: + raise ValueError("sparse table %s slot_key error" % + table_id) + table = self._worker.sparse_table.add() table.table_id = table_id table.slot_key.extend([var.name for var in slot_key_vars]) @@ -209,7 +315,8 @@ class DownpourWorker(Worker): table.slot_gradient.extend( [var.name + "@GRAD" for var in slot_value_vars]) - def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars): + def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars, + dense_start_table_id): """ Args: table_id(int): id of sparse params table @@ -222,7 +329,19 @@ class DownpourWorker(Worker): """ for table in self._worker.dense_table: if table.table_id == table_id: - return + if filter(lambda x: x.find("embedding") == -1, [p.name for p in param_vars]) ==\ + self._worker.dense_table[table_id - dense_start_table_id].dense_variable_name: + if filter(lambda x: x.find("embedding") == -1, [g.name for g in grad_vars]) ==\ + self._worker.dense_table[table_id - dense_start_table_id].dense_gradient_variable_name: + return + else: + raise ValueError( + "dense table %s dense_gradient_variable_name error" + % table_id) + else: + raise ValueError( + "dense table %s dense_variable_name error" % table_id) + table = self._worker.dense_table.add() table.table_id = table_id table.dense_variable_name.extend( diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 0363ff3761..6f1439fc91 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -13,13 +13,13 @@ # limitations under the License. __all__ = ["DistributedAdam"] -import ps_pb2 as pslib import paddle.fluid as fluid from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs from google.protobuf import text_format from .node import DownpourWorker, DownpourServer +from . import ps_pb2 as pslib class DistributedOptimizerImplBase(object): @@ -48,6 +48,63 @@ class DistributedAdam(DistributedOptimizerImplBase): ".batch_size@GRAD", ".batch_square_sum@GRAD", ".batch_sum@GRAD" ] + def _find_distributed_lookup_table_inputs(self, program, table_names): + """ + Find input variable of distribute lookup table in program. + We could support multi-distribute table now. + Args: + program(Program): given program, locate distributed lookup table + table_name(str): given table names that is found beforehand + Returns: + inputs + """ + local_vars = program.current_block().vars + inputs_dict = dict() + for table_name in table_names: + inputs_dict[table_name] = [] + + for op in program.global_block().ops: + if op.type == "lookup_table": + if op.input("W")[0] in table_names: + inputs_dict[op.input("W")[0]].extend( + [local_vars[name] for name in op.input("Ids")]) + return inputs_dict + + def _find_distributed_lookup_table_outputs(self, program, table_names): + """ + Find output variable of distribute lookup table in program. + We could support multi-distribute table now. + Args: + program(Program): given program, locate distributed lookup table + table_name(str): given table name that is found beforehand + Returns: + outputs + """ + local_vars = program.current_block().vars + outputs_dict = dict() + for table_name in table_names: + outputs_dict[table_name] = [] + + for op in program.global_block().ops: + if op.type == "lookup_table": + if op.input("W")[0] in table_names: + outputs_dict[op.input("W")[0]].extend( + [local_vars[name] for name in op.output("Out")]) + return outputs_dict + + def _find_multi_distributed_lookup_table(self, losses): + """ + find multi-sparse-table + """ + table_names = set() + for loss in losses: + for op in loss.block.program.global_block().ops: + if op.type == "lookup_table": + if op.attr('is_distributed') is True: + table_name = op.input("W")[0] + table_names.add(table_name) + return list(table_names) + def _minimize(self, losses, startup_program=None, @@ -69,10 +126,15 @@ class DistributedAdam(DistributedOptimizerImplBase): [optimize_ops, grads_and_weights] """ - table_name = find_distributed_lookup_table(losses[0].block.program) + table_name = self._find_multi_distributed_lookup_table(losses) prefetch_slots = find_distributed_lookup_table_inputs( + losses[0].block.program, table_name[0]) + inputs_dict = self._find_distributed_lookup_table_inputs( losses[0].block.program, table_name) prefetch_slots_emb = find_distributed_lookup_table_outputs( + losses[0].block.program, table_name[0]) + + outputs_dict = self._find_distributed_lookup_table_outputs( losses[0].block.program, table_name) ps_param = pslib.PSParameter() @@ -87,20 +149,29 @@ class DistributedAdam(DistributedOptimizerImplBase): text_format.Merge(f.read(), ps_param) server.get_desc().CopyFrom(ps_param.server_param) worker.get_desc().CopyFrom(ps_param.trainer_param) + sparse_table_index = 0 - server.add_sparse_table(sparse_table_index, self._learning_rate, - prefetch_slots, prefetch_slots_emb) - worker.add_sparse_table(sparse_table_index, self._learning_rate, - prefetch_slots, prefetch_slots_emb) - dense_table_index = 1 + for tn in table_name: + if strategy.get(tn) is not None: + server.add_sparse_table(sparse_table_index, strategy[tn]) + else: + server.add_sparse_table(sparse_table_index, None) + worker.add_sparse_table(sparse_table_index, inputs_dict[tn], + outputs_dict[tn]) + sparse_table_index += 1 + + dense_start_table_id = sparse_table_index + dense_table_index = sparse_table_index program_configs = {} param_grads_list = [] for loss_index in range(len(losses)): program_id = str(id(losses[loss_index].block.program)) program_configs[program_id] = { - "pull_sparse": [sparse_table_index], - "push_sparse": [sparse_table_index] + "pull_sparse": + [t_index for t_index in range(sparse_table_index)], + "push_sparse": + [t_index for t_index in range(sparse_table_index)] } params_grads = sorted( @@ -128,19 +199,30 @@ class DistributedAdam(DistributedOptimizerImplBase): data_norm_grads.append(i[1]) if not is_data_norm_data: grads.append(i[1]) - server.add_dense_table(dense_table_index, self._learning_rate, - params, grads) + if strategy.get('dense_table') is not None: + server.add_dense_table(dense_table_index, params, grads, + strategy['dense_table']) + else: + server.add_dense_table(dense_table_index, params, grads, None) worker.add_dense_table(dense_table_index, self._learning_rate, - params, grads) + params, grads, dense_start_table_id) program_configs[program_id]["pull_dense"] = [dense_table_index] program_configs[program_id]["push_dense"] = [dense_table_index] if len(data_norm_params) != 0 and len(data_norm_grads) != 0: dense_table_index += 1 - server.add_data_norm_table(dense_table_index, - self._learning_rate, - data_norm_params, data_norm_grads) + if strategy.get('datanorm_table') is not None: + server.add_data_norm_table( + dense_table_index, self._learning_rate, + data_norm_params, data_norm_grads, + strategy['datanorm_table']) + else: + server.add_data_norm_table( + dense_table_index, self._learning_rate, + data_norm_params, data_norm_grads, None) + worker.add_dense_table(dense_table_index, self._learning_rate, - data_norm_params, data_norm_grads) + data_norm_params, data_norm_grads, + dense_start_table_id) program_configs[program_id]["pull_dense"].extend( [dense_table_index]) program_configs[program_id]["push_dense"].extend( diff --git a/python/paddle/fluid/tests/unittests/test_downpoursgd.py b/python/paddle/fluid/tests/unittests/test_downpoursgd.py new file mode 100644 index 0000000000..3ab6219c78 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_downpoursgd.py @@ -0,0 +1,148 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import paddle +import paddle.fluid as fluid +import os +import signal +import subprocess +import time +import unittest +import sys +from op_test import OpTest +from paddle.fluid.trainer_desc import DistMultiTrainer +from paddle.fluid.device_worker import DownpourSGD +from google.protobuf import text_format +import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib + + +class TestListenAndServOp(OpTest): + def setUp(self): + pass + + def test_device_work_use_cvm(self): + if sys.platform == 'win32' or sys.platform == 'sys.platform': + pass + else: + print(sys.platform) + cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" + os.system(cmd) + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + x_emb = fluid.layers.embedding( + input=x, size=[1, 2], is_distributed=True) + y_predict = fluid.layers.fc(input=x_emb, size=1, act=None) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + + ps_param = pslib.PSParameter() + with open("fleet_desc.prototxt") as f: + text_format.Merge(f.read(), ps_param) + fleet_desc = ps_param + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + opt_info = {} + main_program = fluid.default_main_program() + program_id = str(id(avg_cost.block.program)) + program_configs = {} + program_configs[program_id] = { + "pull_sparse": [0], + "push_sparse": [0] + } + program_configs[program_id]["pull_dense"] = [1] + program_configs[program_id]["push_dense"] = [1] + + worker_skipped_ops = ["lookup_table", "lookup_table_grad"] + opt_info["program_configs"] = program_configs + opt_info["trainer"] = "DistMultiTrainer" + opt_info["device_worker"] = "DownpourSGD" + opt_info["optimizer"] = "DownpourSGD" + opt_info["fleet_desc"] = ps_param + opt_info["worker_skipped_ops"] = worker_skipped_ops + opt_info["use_cvm"] = True + opt_info["scale_datanorm"] = -1 + opt_info["dump_slot"] = False + + main_program._fleet_opt = opt_info + trainer = DistMultiTrainer() + trainer._set_program(main_program) + device_worker = DownpourSGD() + device_worker._set_fleet_desc(fleet_desc) + trainer._set_device_worker(device_worker) + trainer._set_fleet_desc(fleet_desc) + trainer._gen_trainer_desc() + cmd = "rm fleet_desc.prototxt*" + os.system(cmd) + + def test_device_work(self): + if sys.platform == 'win32' or sys.platform == 'sys.platform': + pass + else: + print(sys.platform) + cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" + os.system(cmd) + x = fluid.layers.data(name='x', shape=[1], dtype='float32') + x_emb = fluid.layers.embedding( + input=x, size=[1, 2], is_distributed=True) + y_predict = fluid.layers.fc(input=x_emb, size=1, act=None) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + + ps_param = pslib.PSParameter() + with open("fleet_desc.prototxt") as f: + text_format.Merge(f.read(), ps_param) + fleet_desc = ps_param + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + opt_info = {} + main_program = fluid.default_main_program() + program_id = str(id(avg_cost.block.program)) + program_configs = {} + program_configs[program_id] = { + "pull_sparse": [0], + "push_sparse": [0] + } + program_configs[program_id]["pull_dense"] = [1] + program_configs[program_id]["push_dense"] = [1] + + worker_skipped_ops = ["lookup_table", "lookup_table_grad"] + opt_info["program_configs"] = program_configs + opt_info["trainer"] = "DistMultiTrainer" + opt_info["device_worker"] = "DownpourSGD" + opt_info["optimizer"] = "DownpourSGD" + opt_info["fleet_desc"] = ps_param + opt_info["worker_skipped_ops"] = worker_skipped_ops + opt_info["use_cvm"] = False + opt_info["scale_datanorm"] = -1 + opt_info["dump_slot"] = False + + main_program._fleet_opt = opt_info + trainer = DistMultiTrainer() + trainer._set_program(main_program) + device_worker = DownpourSGD() + device_worker._set_fleet_desc(fleet_desc) + trainer._set_device_worker(device_worker) + trainer._set_fleet_desc(fleet_desc) + trainer._gen_trainer_desc() + cmd = "rm fleet_desc.prototxt*" + os.system(cmd) + + +if __name__ == "__main__": + unittest.main() -- GitLab