From 3c65cc1bbd2eae58b6eec73a94fe0648d8854a53 Mon Sep 17 00:00:00 2001 From: dongdaxiang Date: Tue, 19 Mar 2019 16:54:56 +0800 Subject: [PATCH] add document for role_maker and fleet parameter, data_generator --- paddle/fluid/framework/data_set.cc | 4 +- .../fluid/incubate/data_generator/__init__.py | 6 + .../fluid/incubate/fleet/base/role_maker.py | 149 +++++++++++++++--- .../fleet/parameter_server/__init__.py | 114 +++++++++++++- 4 files changed, 246 insertions(+), 27 deletions(-) diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index e7128869d..755c858bc 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -47,7 +47,7 @@ template void DatasetImpl::SetThreadNum(int thread_num) { int file_cnt = filelist_.size(); if (file_cnt != 0 && thread_num > file_cnt) { - VLOG(1) << "DataSet thread num = " << thread_num + VLOG(3) << "DataSet thread num = " << thread_num << ", file num = " << file_cnt << ". Changing DataSet thread num = " << file_cnt; thread_num = file_cnt; @@ -178,7 +178,7 @@ void DatasetImpl::DestroyReaders() { t.join(); } std::vector>().swap(readers_); - LOG(WARNING) << "readers size: " << readers_.size(); + VLOG(3) << "readers size: " << readers_.size(); } template diff --git a/python/paddle/fluid/incubate/data_generator/__init__.py b/python/paddle/fluid/incubate/data_generator/__init__.py index ad16e1a13..75fda01c1 100644 --- a/python/paddle/fluid/incubate/data_generator/__init__.py +++ b/python/paddle/fluid/incubate/data_generator/__init__.py @@ -19,6 +19,12 @@ __all__ = ['MultiSlotDataGenerator'] class DataGenerator(object): + """ + DataGenerator is a general Base class for user to inherit + A user who wants to define his/her own python processing logic + with paddle.fluid.dataset should inherit this class. + """ + def __init__(self): self._proto_info = None self.batch_size_ = 32 diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 0867b7f65..9f57b9a2e 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -11,36 +11,68 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import sys class RoleMakerBase(object): + """ + RoleMakerBase is a base class for assigning a role to current process + in distributed training. + A paddle developer can implement RoleMakerBase to design a role maker + for worker or pserver assignment. + """ + def __init__(self): self.role_maker_name_ = "" self.trainer_endpoints_ = [] self.pserver_endpoints_ = [] + self.role_is_generated_ = False def is_worker(self): + """ + return is_worker() of current process + """ raise NotImplementedError("Please implement this method in child class") def is_server(self): + """ + return is_server() of current process + """ raise NotImplementedError("Please implement this method in child class") def get_local_ip(self): + """ + return get local ip + """ import socket self.ip_ = socket.gethostbyname(socket.gethostname()) return self.ip_ def get_trainer_endpoints(self): + """ + return trainer endpoints + """ return self.trainer_endpoints_ def get_pserver_endpoints(self): + """ + return pserver endpoints + """ return self.pserver_endpoints_ def generate_role(self): + """ + generate_role() should be called to identify current process's role + """ raise NotImplementedError("Please implement this method in child class") class MPIRoleMaker(RoleMakerBase): + """ + MPIRoleMaker is a MPI-API based role maker which is a counter-part of K8SRoleMaker + mpi4py will be used if a developer inherits MPIRoleMaker + """ + def __init__(self): from mpi4py import MPI self.comm_ = MPI.COMM_WORLD @@ -48,26 +80,44 @@ class MPIRoleMaker(RoleMakerBase): self.ips_ = None def get_rank(self): + """ + return rank + """ self.rank_ = self.comm_.Get_rank() return self.rank_ def get_size(self): + """ + return size + """ self.size_ = self.comm_.Get_size() return self.size_ def all_gather(self, obj): + """ + all_gather(obj) will call MPI's allgather function + """ self.barrier_all() return self.comm_.allgather(obj) def barrier_all(self): + """ + barrier_all() will call MPI's barrier_all function + """ self.comm_.barrier() def get_ips(self): + """ + collect current distributed job's ip list + """ if self.ips_ == None: self.ips_ = self.comm_.allgather(self.get_local_ip()) return self.ips_ def finalize(self): + """ + finalize the current MPI instance. + """ self.comm_.finalize() @@ -83,44 +133,99 @@ class MPISymetricRoleMaker(MPIRoleMaker): self.node_type_ = None self.proc_per_node_ = 2 + def _check_role_generation(self): + if not self.role_is_generated_: + sys.stderr.write("generate_role() should be called first") + sys.exit(-1) + return False + return True + def is_first_worker(self): - return self.is_worker() and 0 == self.worker_index() + """ + return whether current process is the first worker assigned by role maker + """ + if self._check_role_generation(): + return self.is_worker() and 0 == self.worker_index() + return False def is_worker(self): - return self.node_type_ == 1 + """ + return whether current process is worker assigned by role maker + """ + if self._check_role_generation(): + return self.node_type_ == 1 + return False def is_server(self): - return self.node_type_ == 0 + """ + return whether current process is server assigned by role maker + """ + if self._check_role_generation(): + return self.node_type_ == 0 + return False def worker_num(self): - if self.is_worker(): - return self.get_size() + """ + return the current number of worker + """ + if self._check_role_generation(): + if self.is_worker(): + return self.get_size() + return 0 def server_num(self): - if self.is_server(): - return self.get_size() + """ + return the current number of server + """ + if self._check_role_generation(): + if self.is_server(): + return self.get_size() + return 0 def worker_index(self): - return self.rank_ / self.proc_per_node_ + """ + return the index of worker + """ + if self._check_role_generation(): + return self.rank_ / self.proc_per_node_ + return 0 def server_index(self): - return self.rank_ / self.proc_per_node_ + """ + return the index of server + """ + if self._check_role_generation(): + return self.rank_ / self.proc_per_node_ + return 0 def barrier_worker(self): - if self.is_worker(): - self.node_type_comm_.barrier() + """ + barrier all workers in current distributed job + """ + if self._check_role_generation(): + if self.is_worker(): + self.node_type_comm_.barrier() def barrier_server(self): - if self.is_server(): - self.node_type_comm_.barrier() + """ + barrier all servers in current distributed job + """ + if self._check_role_generation(): + if self.is_server(): + self.node_type_comm_.barrier() def generate_role(self): - # TODO(guru4elephant): only allow to be called once - self.trainer_endpoints_ = self.get_ips() - self.pserver_endpoints_ = self.get_ips() - - if 0 == self.get_rank() % self.proc_per_node_ % 2: - self.node_type_ = 0 - else: - self.node_type_ = 1 - self.node_type_comm_ = self.comm_.Split(self.node_type_) + """ + generate currently process's role + """ + if not self.role_is_generated_: + # TODO(guru4elephant): only allow to be called once + self.trainer_endpoints_ = self.get_ips() + self.pserver_endpoints_ = self.get_ips() + + if 0 == self.get_rank() % self.proc_per_node_ % 2: + self.node_type_ = 0 + else: + self.node_type_ = 1 + self.node_type_comm_ = self.comm_.Split(self.node_type_) + self.role_is_generated_ = True diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index cee409cde..d8efba432 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -22,7 +22,44 @@ import paddle.fluid as fluid class Fleet(object): """ - + Fleet in Python. Fleet is used in distributed training. It is designed as a singlton instance + in c++. A Fleet() object will be initialized automatically when a user import this package as + fleet. The General interface Fleet supports are: + init(): which should be called only once in user's python scripts. init() will initialize + FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying + current node's role, e.g. worker, server, etc. + stop(): will be called after a user finishes his/her training task. Fleet instance will be + destroyed when stop() is called. + init_pserver(): will be called by user. When a user knows current process is_worker(), he/she + should call init_pserver() to initialize global information about parameter server + init_worker(): will be called by user. When a user knows current process is_server(), he/she + should call init_worker() to initialize global information about worker and connect + worker with pserver. + get_worker_num(): return the number of current task's worker node + get_server_num(): return the number of current task's pserver node + is_worker(): return whether current process is a worker + is_server(): return thether current process is a server + init_pserver_model(): initialize model parameters in pserver, called from a worker node + save_pserver_model(): save model parameters in pserver, called from a server node + + Example: + + .. code-block:: python + import paddle.fluid.incubate.fleet.parameter_server as fleet + from my_model import bow_net + model = bow_net() + fleet.init() + sgd_optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.0001) + sgd_optimizer = fleet.DistributedOptimizer(sgd_optimizer) + sgd_optimizer.minimize(model.loss) + exe = paddle.fluid.Executor(paddle.fluid.CPUPlace()) + if fleet.is_worker(): + exe.run(paddle.fluid.default_startup_program()) + fleet.init_worker() # init worker should be called before training + # do other things like training + elif fleet.is_server(): + fleet.init_pserver() + fleet.stop() """ def __init__(self): @@ -35,6 +72,11 @@ class Fleet(object): # TODO(guru4elephant) # this is a temporary solution # we will support more configurable RoleMaker for users in the future + """ + init(): which should be called only once in user's python scripts. init() will initialize + FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying + current node's role, e.g. worker, server, etc. + """ if not self.is_initialized_: self.role_maker_ = MPISymetricRoleMaker() self.role_maker_.generate_role() @@ -42,6 +84,10 @@ class Fleet(object): self.is_initialized_ = True def stop(self): + """ + stop(): will be called after a user finishes his/her training task. Fleet instance will be + destroyed when stop() is called. + """ self.role_maker_.barrier_worker() if self.role_maker_.is_first_worker(): self._fleet_ptr.stop_server() @@ -50,6 +96,10 @@ class Fleet(object): self.role_maker_.finalize() def init_pserver(self): + """ + init_pserver(): will be called by user. When a user knows current process is_worker(), he/she + should call init_pserver() to initialize global information about parameter server + """ if self._opt_info: if "fleet_desc" in self._opt_info: self._dist_desc_str = text_format.MessageToString( @@ -73,6 +123,11 @@ class Fleet(object): sys.exit(-1) def init_worker(self): + """ + init_worker(): will be called by user. When a user knows current process is_server(), he/she + should call init_worker() to initialize global information about worker and connect + worker with pserver. + """ if self._opt_info: if "fleet_desc" in self._opt_info: self._dist_desc_str = text_format.MessageToString( @@ -93,30 +148,61 @@ class Fleet(object): sys.exit(-1) def get_worker_num(self): + """ + return the number of current job's worker num + """ return self.role_maker_.worker_num() def get_server_num(self): + """ + return the number of current job's server num + """ return self.role_maker_.server_num() def is_worker(self): + """ + return whether current node is a worker + """ return self.role_maker_.is_worker() def is_server(self): + """ + return whether current node is pserver + """ return self.role_maker_.is_server() def init_pserver_model(self): + """ + init pserver model called from pserver + """ if self.role_maker_.is_first_worker(): self._fleet_ptr.init_model() self.role_maker_.barrier_worker() def save_pserver_model(self, save_path): + """ + save pserver model called from a worker + """ self._fleet_ptr.save_model(save_path) def _set_opt_info(self, opt_info): + """ + this function saves the result from DistributedOptimizer.minimize() + """ self._opt_info = opt_info class DistributedOptimizer(object): + """ + DistributedOptimizer is a wrapper for paddle.fluid.optimizer + A user should pass a paddle.fluid.optimizer to DistributedOptimizer + minimize() function is implemented. + DistributedOptimizer is the starting point for a user who wants to + run distributed training. The optimized information will be stored in + Fleet() instance who holds the global information about current distributed + training. + """ + def __init__(self, optimizer, dist_config={}): super(DistributedOptimizer, self).__init__() self._optimizer = optimizer @@ -136,16 +222,38 @@ class DistributedOptimizer(object): parameter_list=None, no_grad_set=None, callbacks=None): - pass + """ + Currently, backward function can not be called through DistributedOptimizer + """ + raise NotImplementedError() def apply_gradients(self, params_grads): - pass + """ + Currently, apply_gradients function can not be called through DistributedOptimizer + """ + raise NotImplementedError() def minimize(self, loss, startup_program=None, parameter_list=None, no_grad_set=None): + """ + minimize a program through loss, loss can be a list in DistributedOptimizer + Args: + loss (Variable|Variable List): loss variable or loss variable list to run optimization. + startup_program (Program): startup_program for initializing parameters + in `parameter_list`. + parameter_list (list): list of Variables to update. + no_grad_set (set|None): set of Variables should be ignored. + Returns: + tuple: (optimize_ops, params_grads) which are, list of operators appended; + and list of (param, grad) Variables pair for optimization. + Note that in parameter server mode, a worker will not get anything about optimize_os + Because optmizer algorithms run on pserver side. We will make this usable in pserver + process, but currently the optimization part is written into Fleet(). A user does not + need to care about how to startup a pserver node. + """ optimize_ops, param_grads, opt_info = \ self._distributed_optimizer.minimize( loss, -- GitLab