From 34369944f5cef577ed615f86aaed51e6d20152ce Mon Sep 17 00:00:00 2001 From: jiaqi <173596896@qq.com> Date: Wed, 15 May 2019 13:59:25 +0800 Subject: [PATCH] support config file, cvm, load, save, shrink (#17319) * support config file, cvm, load, save, shrink test=develop * fix error of worker_num & add table.compress_in_save test=develop * fix code style test=develop * fix save model bug test=develop --- .../fluid/incubate/fleet/base/role_maker.py | 4 +- .../fleet/parameter_server/pslib/__init__.py | 107 ++++++++++++++++-- .../fleet/parameter_server/pslib/node.py | 50 ++++++-- .../pslib/optimizer_factory.py | 17 ++- 4 files changed, 159 insertions(+), 19 deletions(-) diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 5371252213..0c1c44cc15 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -269,8 +269,8 @@ class MPISymetricRoleMaker(MPIRoleMaker): """ if not self._role_is_generated: # TODO(guru4elephant): only allow to be called once - self._worker_endpoints = self._get_ips() - self._server_endpoints = self._get_ips() + self._worker_endpoints = self._get_ips()[1::2] + self._server_endpoints = self._get_ips()[::2] if 0 == self._get_rank() % self._proc_per_node % 2: self._node_type = 0 diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index ec066187c2..9684a087a4 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -106,14 +106,33 @@ class PSLib(Fleet): raise NameError( "You should run DistributedOptimizer.minimize() first") - def init_server(self, model_dir=None): - pass + def init_server(self, model_dir=None, **kwargs): + """ + init_server() will be called by user. It will load model from model_dir. + + Args: + model_dir(str): load model path, can be local or hdfs/afs path. + kwargs: user-defined attributes, currently support following: + model(int): load model mode. + 0 is for load whole model, + 1 is for load delta model (load diff), + default is 0. + + Example: + >>> fleet.init_server("/you/path/to/model", mode = 0) + + """ + mode = kwargs.get("mode", 0) + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.load_model(model_dir, mode) + self._role_maker._barrier_worker() def run_server(self): """ init_pserver(): will be called by user. When a user knows current process is_worker(), he/she should call init_pserver() to initialize global information about parameter server - """ + """ if self._opt_info: if "fleet_desc" in self._opt_info: self._dist_desc_str = text_format.MessageToString( @@ -162,7 +181,7 @@ class PSLib(Fleet): self._role_maker._barrier_all() self._role_maker._finalize() - def distributed_optimizer(self, optimizer, strategy=None): + def distributed_optimizer(self, optimizer, strategy={}): self._optimizer = DownpourOptimizer(optimizer, strategy) return self._optimizer @@ -177,8 +196,81 @@ class PSLib(Fleet): """ self._fleet_ptr.save_model(dirname) - def save_persistables(self, dirname, main_program=None): - self._fleet_ptr.save_model(dirname) + def save_persistables(self, dirname, main_program=None, **kwargs): + """ + save presistable parameters, + when using fleet, it will save sparse and dense feature + + Args: + dirname(str): save path. It can be hdfs/afs path or local path + main_program(Program): fluid program, default None + kwargs: use define property, current support following + mode(int): 0 means save all pserver model, + 1 means save delta pserver model (save diff), + 2 means save xbox base, + 3 means save batch model. + + Example: + >>> fleet.save_persistables(dirname="/you/path/to/model", mode = 0) + + """ + mode = kwargs.get("mode", 0) + self._fleet_ptr.client_flush() + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.save_model(dirname, mode) + self._role_maker._barrier_worker() + + def shrink_sparse_table(self): + """ + shrink cvm of all sparse embedding in pserver, the decay rate + is defined as "show_click_decay_rate" in fleet_desc.prototxt + + Example: + >>> fleet.shrink_sparse_table() + + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + for i in self._opt_info["fleet_desc"].trainer_param.sparse_table: + self._fleet_ptr.shrink_sparse_table(i.table_id) + self._role_maker._barrier_worker() + + def shrink_dense_table(self, decay, scope=None, table_id=None): + """ + shrink all dense params in pserver by multiplying by decay + + Args: + decay(float): the decay rate, usually range in (0, 1) + scope(Scope): Scope object, default is fluid.global_scope() + table_id(int): table id of shrinking dense table. None means shrink all, + you should specify it when using multiple scopes, + default is None. + + Example: + >>> fleet.shrink_dense_table(0.98, myscope1, 1) + >>> fleet.shrink_dense_table(0.98, myscope1, 2) + >>> fleet.shrink_dense_table(0.98, myscope2, 3) + + """ + if scope is None: + scope = fluid.global_scope() + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + for i in self._opt_info["fleet_desc"].trainer_param.dense_table: + if table_id is not None and table_id != i.table_id: + continue + var_list = [var for var in i.dense_variable_name] + skip = False + for var in var_list: + if scope.find_var(var) is None: + skip = True + break + if skip: + continue + self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list, + decay) + self._role_maker._barrier_worker() def _set_opt_info(self, opt_info): """ @@ -273,7 +365,8 @@ class DownpourOptimizer(DistributedOptimizer): losses, startup_programs, parameter_list, - no_grad_set) + no_grad_set, + self._strategy) fleet._set_opt_info(opt_info) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index 7a1925a95f..6701155808 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -62,10 +62,18 @@ class DownpourServer(Server): Returns: return None """ + for table in self._server.downpour_server_param.downpour_table_param: + if table.table_id == table_id: + if table.type == pslib.PS_SPARSE_TABLE: + return + else: + raise ValueError("expect table %s type=%s, but actual type=%s" \ + %(table_id, pslib.PS_SPARSE_TABLE, table.type)) table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id table.table_class = "DownpourSparseTable" table.type = pslib.PS_SPARSE_TABLE + table.compress_in_save = True table.accessor.accessor_class = "DownpourFeatureValueAccessor" table.accessor.sparse_sgd_param.learning_rate = learning_rate table.accessor.sparse_sgd_param.initial_g2sum = 3 @@ -94,10 +102,24 @@ class DownpourServer(Server): Returns: return None """ + fea_dim = 0 + for param in filter(lambda x: x.name.find("embedding") == -1, + param_var): + fea_dim += reduce(lambda x, y: x * y, param.shape, 1) + + for table in self._server.downpour_server_param.downpour_table_param: + if table.table_id == table_id: + if table.type == pslib.PS_DENSE_TABLE: + table.accessor.fea_dim = fea_dim + return + else: + raise ValueError("expect table %s type=%s, but actual type=%s" \ + %(table_id, pslib.PS_DENSE_TABLE, table.type)) table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id table.table_class = "DownpourDenseTable" table.type = pslib.PS_DENSE_TABLE + table.compress_in_save = True table.accessor.accessor_class = "DownpourDenseValueAccessor" table.accessor.dense_sgd_param.name = "adam" table.accessor.dense_sgd_param.adam.learning_rate = learning_rate @@ -106,10 +128,6 @@ class DownpourServer(Server): table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8 table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99 table.accessor.dense_sgd_param.naive.learning_rate = 0.0002 - fea_dim = 0 - for param in filter(lambda x: x.name.find("embedding") == -1, - param_var): - fea_dim += reduce(lambda x, y: x * y, param.shape, 1) table.accessor.fea_dim = fea_dim def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var): @@ -123,17 +141,27 @@ class DownpourServer(Server): Returns: return None """ + fea_dim = 0 + for param in filter(lambda x: x.name.find("embedding") == -1, + param_var): + fea_dim += reduce(lambda x, y: x * y, param.shape, 1) + + for table in self._server.downpour_server_param.downpour_table_param: + if table.table_id == table_id: + if table.type == pslib.PS_DENSE_TABLE: + table.accessor.fea_dim = fea_dim + return + else: + raise ValueError("expect table %s type=%s, but actual type=%s" \ + %(table_id, pslib.PS_DENSE_TABLE, table.type)) table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id table.table_class = "DownpourDenseTable" table.type = pslib.PS_DENSE_TABLE + table.compress_in_save = True table.accessor.accessor_class = "DownpourDenseValueAccessor" table.accessor.dense_sgd_param.name = "summary" table.accessor.dense_sgd_param.summary.summary_decay_rate = 0.999999 - fea_dim = 0 - for param in filter(lambda x: x.name.find("embedding") == -1, - param_var): - fea_dim += reduce(lambda x, y: x * y, param.shape, 1) table.accessor.fea_dim = fea_dim def get_desc(self): @@ -169,6 +197,9 @@ class DownpourWorker(Worker): Returns: return None """ + for table in self._worker.sparse_table: + if table.table_id == table_id: + return table = self._worker.sparse_table.add() table.table_id = table_id table.slot_key.extend([var.name for var in slot_key_vars]) @@ -187,6 +218,9 @@ class DownpourWorker(Worker): Returns: return None """ + for table in self._worker.dense_table: + if table.table_id == table_id: + return table = self._worker.dense_table.add() table.table_id = table_id table.dense_variable_name.extend( diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 31f964a0e3..895fb6889c 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -52,7 +52,8 @@ class DistributedAdam(DistributedOptimizerImplBase): losses, startup_program=None, parameter_list=None, - no_grad_set=None): + no_grad_set=None, + strategy={}): """ DownpounSGD is a distributed optimizer so that user can call minimize to generate backward @@ -63,6 +64,7 @@ class DistributedAdam(DistributedOptimizerImplBase): parameter_list(str list): parameter names defined by users no_grad_set(set): a set of variables that is defined by users so that these variables do not need gradient computation + strategy(dict): user-defined properties Returns: [optimize_ops, grads_and_weights] """ @@ -76,6 +78,15 @@ class DistributedAdam(DistributedOptimizerImplBase): ps_param = pslib.PSParameter() server = DownpourServer() worker = DownpourWorker(self._window) + # if user specify a fleet_desc.prototxt file, then load the file + # instead of creating default fleet_desc.prototxt. + # user can specify server_param or trainer_param or fs_client_param. + if strategy.get("fleet_desc_file") is not None: + fleet_desc_file = strategy["fleet_desc_file"] + with open(fleet_desc_file) as f: + text_format.Merge(f.read(), ps_param) + server.get_desc().CopyFrom(ps_param.server_param) + worker.get_desc().CopyFrom(ps_param.trainer_param) sparse_table_index = 0 server.add_sparse_table(sparse_table_index, self._learning_rate, prefetch_slots, prefetch_slots_emb) @@ -140,7 +151,8 @@ class DistributedAdam(DistributedOptimizerImplBase): # Todo(guru4elephant): figure out how to support more sparse parameters # currently only support lookup_table worker_skipped_ops = ["lookup_table", "lookup_table_grad"] - ps_param.trainer_param.skip_op.extend(worker_skipped_ops) + if len(ps_param.trainer_param.skip_op) == 0: + ps_param.trainer_param.skip_op.extend(worker_skipped_ops) opt_info = {} opt_info["program_configs"] = program_configs @@ -149,6 +161,7 @@ class DistributedAdam(DistributedOptimizerImplBase): opt_info["optimizer"] = "DownpourSGD" opt_info["fleet_desc"] = ps_param opt_info["worker_skipped_ops"] = worker_skipped_ops + opt_info["use_cvm"] = strategy.get("use_cvm", False) for loss in losses: loss.block.program._fleet_opt = opt_info -- GitLab