diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index c8177842efa9e6c9085e6733678a23a3eb704619..f2f72b0f505fd43607f5104e39f5167f55fa432e 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -15,18 +15,21 @@ from __future__ import print_function import abc -import sys from enum import Enum from paddle.fluid.optimizer import SGD +from paddle.fluid.executor import Executor -from role_maker import RoleMakerBase, Role +from role_maker import RoleMakerBase from role_maker import MPISymetricRoleMaker from role_maker import UserDefinedRoleMaker class Mode(Enum): + """ + There are various mode for fleet, each of them is designed for different model. + """ TRANSPILER = 1, PSLIB = 2, COLLECTIVE = 3 @@ -46,17 +49,11 @@ class Fleet(object): def __init__(self, mode): assert isinstance(mode, Mode) - self.is_initialized = False - self.mode = mode - self.workers = 0 - self.servers = 0 - self.worker_endpoints = [] - self.server_endpoints = [] - self.role = Role.WORKER - self.current_endpoint = None - self.current_id = 0 - self.optimizer = None - self.role_maker_ = None + self._is_initialized = False + self._mode = mode + self._optimizer = None + self._role_maker = None + self._executor = None def is_first_worker(self): """ @@ -66,25 +63,25 @@ class Fleet(object): bool: True if this is the first node of worker, False if not. """ - return self.is_worker() and self.current_id == 0 + return self._role_maker.is_first_worker() - def worker_id(self): + def worker_index(self): """ - Get current worker id. + Get current worker index. Returns: int: node id """ - return self.current_id + return self._role_maker.worker_index() - def get_workers(self): + def worker_num(self): """ Get current total worker number. Returns: int: worker number """ - return self.workers + return len(self._role_maker.get_trainer_endpoints()) def is_worker(self): """ @@ -94,7 +91,51 @@ class Fleet(object): bool: True if this is a node of worker, False if not. """ - return self.role == Role.WORKER + return self._role_maker.is_worker() + + def worker_endpoints(self, to_string=False): + """ + Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"]. + + Returns: + list/string: server endpoints + """ + + if to_string: + return ",".join(self._role_maker.get_trainer_endpoints()) + else: + return self._role_maker.get_trainer_endpoints() + + def server_num(self): + """ + Get current total worker number. + + Returns: + int: server number + """ + return len(self._role_maker.get_pserver_endpoints()) + + def server_index(self): + """ + Get current server index. + + Returns: + int: node id + """ + return self._role_maker.server_index() + + def server_endpoints(self, to_string=False): + """ + Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"]. + + Returns: + list/string: server endpoints + """ + + if to_string: + return ",".join(self._role_maker.get_pserver_endpoints()) + else: + return self._role_maker.get_pserver_endpoints() def is_server(self): """ @@ -104,7 +145,7 @@ class Fleet(object): bool: True if this is a node of server, False if not. """ - return self.role == Role.SERVER + return self._role_maker.is_server() def split_files(self, files): """ @@ -119,8 +160,8 @@ class Fleet(object): list: files belongs to this worker. """ file_num = len(files) - trainer_id = self.worker_id() - trainer_num = self.get_workers() + trainer_id = self.worker_index() + trainer_num = self.worker_num() if trainer_num > file_num: raise ValueError("trainer_num should be <= file_num : " "%s > %s" % (trainer_num, file_num)) @@ -132,66 +173,49 @@ class Fleet(object): end += length return files[start:end] - def init(self, role_maker=None): + def init(self, executor, role_maker=None): """ should be called only once in user's python scripts, init() will initialize RoleMaker which is used for identifying current node's role, e.g. worker, server, etc. Args: + executor(Executor): The executor to run fleet. role_maker(RoleMakerBase): subclass of RoleMakerBase. Returns: None """ + if not isinstance(executor, Executor): + raise ValueError("executor must be an instance of Executor") if role_maker and not isinstance(role_maker, RoleMakerBase): raise ValueError("role_maker must be an instance of RoleMakerBase") - self.role_maker_ = role_maker - if isinstance(role_maker, MPISymetricRoleMaker): - self.role_maker_._generate_role() - self.role = Role.WORKER if role_maker._is_worker() else Role.SERVER - self.workers = role_maker._worker_num() - self.servers = role_maker._server_num() - self.server_endpoints = role_maker._get_pserver_endpoints() - self.worker_endpoints = role_maker._get_trainer_endpoints() - self.current_id = role_maker._worker_index( - ) if role_maker._is_worker() else role_maker._server_index() - self.current_endpoint = self.worker_endpoints[self.current_id] \ - if role_maker._is_worker() else self.server_endpoints[self.current_id] + self._role_maker = role_maker + self._role_maker.generate_role() elif isinstance(role_maker, UserDefinedRoleMaker): - self.current_id = role_maker.current_id - self.current_endpoint = role_maker.current_endpoint - self.workers = role_maker.workers - self.worker_endpoints = role_maker.worker_endpoints - self.servers = role_maker.servers - self.server_endpoints = role_maker.server_endpoints - self.role = role_maker.role + self._role_maker = role_maker else: raise ValueError( "role_maker must be an instance of UserDefinedRoleMaker/MPISymetricRoleMaker" ) - self.is_initialized = True + self._is_initialized = True @abc.abstractmethod - def init_worker(self, executor): + def init_worker(self): pass @abc.abstractmethod - def run_worker(self, executor, main_program=None): + def init_server(self, model_dir=None): pass @abc.abstractmethod - def init_server(self, executor, model_dir=None): - pass - - @abc.abstractmethod - def run_server(self, executor): + def run_server(self, ): pass @abc.abstractmethod @@ -199,7 +223,7 @@ class Fleet(object): pass @abc.abstractmethod - def stop(self, executor): + def stop(self): pass @abc.abstractmethod @@ -208,7 +232,6 @@ class Fleet(object): @abc.abstractmethod def save_inference_model(self, - executor, dirname, feeded_var_names, target_vars, @@ -217,21 +240,9 @@ class Fleet(object): pass @abc.abstractmethod - def save_persistables(self, executor, dirname, main_program=None): + def save_persistables(self, dirname, main_program=None): pass - def to_string(self): - infos = """ - mode = {} - workers = {} - server_endpoints = {} - role = {} - current_endpoint = {} - current_id = {} - """.format(self.mode, self.workers, self.server_endpoints, self.role, - self.current_endpoint, self.current_id) - return infos - class DistributedOptimizer(object): """ @@ -245,7 +256,7 @@ class DistributedOptimizer(object): Args: optimizer(Optimizer): subclass of Optimizer. - strategy(dict): the user define config for Optimizer. + strategy(any): the user define config for Optimizer. Returns: None @@ -257,9 +268,6 @@ class DistributedOptimizer(object): if not isinstance(optimizer, SGD.__bases__): raise ValueError("optimizer must be an instance of Optimizer") - if strategy and not isinstance(strategy, dict): - raise ValueError("strategy must be an instance of Dict") - self._optimizer = optimizer self._strategy = strategy @@ -317,8 +325,9 @@ class DistributedOptimizer(object): @abc.abstractmethod def minimize(self, - loss, - startup_program=None, + losses, + scopes=None, + startup_programs=None, parameter_list=None, no_grad_set=None): """ @@ -328,8 +337,9 @@ class DistributedOptimizer(object): `apply_gradients()` into one. Args: - loss (Variable): loss variable to run optimizations. - startup_program (Program): startup_program for initializing parameters + losses (Variable|Variable List): loss variable to run optimizations. + scopes (Scope| Scope List): scope instance. + startup_programs (Program|Program List): startup_program for initializing parameters in `parameter_list`. parameter_list (list): list of Variables to update. no_grad_set (set|None): set of Variables should be ignored. diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index dfd2273b485adfd5f76c650feef864964ad335a2..5371252213b2624ca44bb54b20a385b306967f8e 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -11,10 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import sys +from __future__ import print_function from enum import Enum +__all__ = [ + 'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker' +] + class Role(Enum): WORKER = 1, @@ -30,47 +34,62 @@ class RoleMakerBase(object): """ def __init__(self): - self._trainer_endpoints = [] - self._pserver_endpoints = [] + self._worker_endpoints = [] + self._server_endpoints = [] self._role_is_generated = False + self._role = None + self._current_id = -1 - def _is_worker(self): + def is_worker(self): """ return is_worker() of current process """ raise NotImplementedError("Please implement this method in child class") - def _is_server(self): + def is_server(self): """ return is_server() of current process """ raise NotImplementedError("Please implement this method in child class") - def _get_local_ip(self): + def is_first_worker(self): """ - return get local ip + Check whether the node is the first instance of worker. + Returns: + bool: True if this is the first node of worker, + False if not. """ - import socket - self._ip = socket.gethostbyname(socket.gethostname()) - return self._ip + raise NotImplementedError("Please implement this method in child class") - def _get_trainer_endpoints(self): + def worker_index(self): """ - return trainer endpoints + Get current worker id. + + Returns: + int: node id """ - return self._trainer_endpoints + raise NotImplementedError("Please implement this method in child class") - def _get_pserver_endpoints(self): + def server_index(self): """ - return pserver endpoints + Get current server id. + + Returns: + int: node id """ - return self._pserver_endpoints + raise NotImplementedError("Please implement this method in child class") - def _generate_role(self): + def get_trainer_endpoints(self): """ - generate_role() should be called to identify current process's role + return trainer endpoints """ - raise NotImplementedError("Please implement this method in child class") + return self._worker_endpoints + + def get_pserver_endpoints(self): + """ + return pserver endpoints + """ + return self._server_endpoints class MPIRoleMaker(RoleMakerBase): @@ -82,9 +101,11 @@ class MPIRoleMaker(RoleMakerBase): def __init__(self): super(MPIRoleMaker, self).__init__() from mpi4py import MPI - self._comm = MPI.COMM_WORLD self.MPI = MPI + self._comm = MPI.COMM_WORLD + self._node_type_comm = None self._ips = None + self._ip = None def _get_rank(self): """ @@ -111,7 +132,7 @@ class MPIRoleMaker(RoleMakerBase): """ worker_gather(obj) will call MPI's allgather function """ - if self._is_worker(): + if self.is_worker(): self._node_type_comm.barrier() return self._node_type_comm.allgather(obj) return None @@ -122,19 +143,33 @@ class MPIRoleMaker(RoleMakerBase): """ self._comm.barrier() + def _finalize(self): + """ + finalize the current MPI instance. + """ + pass + def _get_ips(self): """ collect current distributed job's ip list """ - if self._ips == None: - self._ips = self._comm.allgather(self._get_local_ip()) + if not self._ips: + self._ips = self._comm.allgather(self.get_local_ip()) return self._ips - def _finalize(self): + def get_local_ip(self): """ - finalize the current MPI instance. + return get local ip """ - pass + import socket + self._ip = socket.gethostbyname(socket.gethostname()) + return self._ip + + def generate_role(self): + """ + generate_role() should be called to identify current process's role + """ + raise NotImplementedError("Please implement this method in child class") class MPISymetricRoleMaker(MPIRoleMaker): @@ -151,20 +186,18 @@ class MPISymetricRoleMaker(MPIRoleMaker): def _check_role_generation(self): if not self._role_is_generated: - sys.stderr.write("generate_role() should be called first") - sys.exit(-1) - return False + raise NameError("generate_role() should be called first") return True - def _is_first_worker(self): + def is_first_worker(self): """ return whether current process is the first worker assigned by role maker """ if self._check_role_generation(): - return self._is_worker() and 0 == self._worker_index() + return self.is_worker() and 0 == self.worker_index() return False - def _is_worker(self): + def is_worker(self): """ return whether current process is worker assigned by role maker """ @@ -172,7 +205,7 @@ class MPISymetricRoleMaker(MPIRoleMaker): return self._node_type == 1 return False - def _is_server(self): + def is_server(self): """ return whether current process is server assigned by role maker """ @@ -185,7 +218,7 @@ class MPISymetricRoleMaker(MPIRoleMaker): return the current number of worker """ if self._check_role_generation(): - if self._is_worker(): + if self.is_worker(): return self._get_size() / 2 return 0 @@ -194,11 +227,11 @@ class MPISymetricRoleMaker(MPIRoleMaker): return the current number of server """ if self._check_role_generation(): - if self._is_server(): + if self.is_server(): return self._get_size() / 2 return 0 - def _worker_index(self): + def worker_index(self): """ return the index of worker """ @@ -206,7 +239,7 @@ class MPISymetricRoleMaker(MPIRoleMaker): return self._rank / self._proc_per_node return 0 - def _server_index(self): + def server_index(self): """ return the index of server """ @@ -219,7 +252,7 @@ class MPISymetricRoleMaker(MPIRoleMaker): barrier all workers in current distributed job """ if self._check_role_generation(): - if self._is_worker(): + if self.is_worker(): self._node_type_comm.barrier() def _barrier_server(self): @@ -227,17 +260,17 @@ class MPISymetricRoleMaker(MPIRoleMaker): barrier all servers in current distributed job """ if self._check_role_generation(): - if self._is_server(): + if self.is_server(): self._node_type_comm.barrier() - def _generate_role(self): + def generate_role(self): """ generate currently process's role """ if not self._role_is_generated: # TODO(guru4elephant): only allow to be called once - self._trainer_endpoints = self._get_ips() - self._pserver_endpoints = self._get_ips() + self._worker_endpoints = self._get_ips() + self._server_endpoints = self._get_ips() if 0 == self._get_rank() % self._proc_per_node % 2: self._node_type = 0 @@ -250,12 +283,9 @@ class MPISymetricRoleMaker(MPIRoleMaker): class UserDefinedRoleMaker(RoleMakerBase): def __init__(self, current_id=0, - current_endpoint=None, - workers=0, - worker_endpoints=None, - servers=0, - server_endpoints=None, - role=Role.WORKER): + role=Role.WORKER, + worker_num=0, + server_endpoints=None): """ UserDefinedRoleMaker is designed for worker and server assignment under manual. Typically, a worker and a server node will be appointed @@ -263,19 +293,22 @@ class UserDefinedRoleMaker(RoleMakerBase): """ super(UserDefinedRoleMaker, self).__init__() - self.current_id = current_id - self.current_endpoint = current_endpoint - self.workers = workers - self.worker_endpoints = worker_endpoints - self.servers = servers - self.server_endpoints = server_endpoints - self.role = role + self._current_id = current_id + self._role = role + self._worker_num = worker_num + self._server_endpoints = server_endpoints + + def is_worker(self): + return self._role == Role.WORKER + + def is_server(self): + return self._role == Role.SERVER - def _is_worker(self): - return self.role == Role.WORKER + def is_first_worker(self): + return self._role == Role.WORKER and self._current_id == 0 - def _is_server(self): - return self.role == Role.SERVER + def worker_index(self): + return self._current_id - def _generate_role(self): - self.role_is_generated_ = True + def server_index(self): + return self._current_id diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index 49ecaee07a5474bbe92a2dd3947ef555d252fa0e..e381a0d8c7124b8e9dd099ef0d99faa6985a8548 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -import sys import logging import paddle.fluid as fluid @@ -26,37 +25,21 @@ from ..base.fleet_base import DistributedOptimizer class Collective(Fleet): def __init__(self): super(Collective, self).__init__(Mode.COLLECTIVE) - self.local_ip_ = 0 + self._local_ip = 0 - def init(self, role_maker=None): - """ - should be called only once in user's python scripts, - init() will initialize RoleMaker which is used for identifying - current node's role, e.g. worker, server, etc. - - Args: - role_maker(RoleMakerBase): subclass of RoleMakerBase. - - Returns: - None - """ - - super(Collective, self).init(role_maker) - self._role_maker._generate_role() - - def init_worker(self, executor): + def init_worker(self): logging.warn( "You should not call 'init_worker' method for collective mode.") - def run_worker(self, executor, main_program=None): + def run_worker(self, main_programs=None, scopes=None): logging.warn( "You should not call 'run_worker' method for collective mode.") - def init_server(self, executor, model_dir=None): + def init_server(self, model_dir=None): logging.warn( "You should not call 'init_server' method for collective mode.") - def run_server(self, executor): + def run_server(self): logging.warn( "You should not call 'run_server' method for collective mode.") @@ -64,29 +47,28 @@ class Collective(Fleet): logging.warn( "You should not call 'stop_worker' method for collective mode.") - def stop(self, executor): + def stop(self): """ stop(): will be called after a user finishes his/her training task. """ logging.warn("You should not call 'stop' method for collective mode.") def distributed_optimizer(self, optimizer, strategy=None): - self.optimizer = CollectiveOptimizer(optimizer, strategy) - return self.optimizer + self._optimizer = CollectiveOptimizer(optimizer, strategy) + return self._optimizer def save_inference_model(self, - executor, dirname, feeded_var_names=None, target_vars=None, main_program=None, export_for_deployment=True): io.save_inference_model(dirname, feeded_var_names, target_vars, - executor, main_program, None, None, + self._executor, main_program, None, None, export_for_deployment) - def save_persistables(self, executor, dirname, main_program=None): - io.save_persistables(executor, dirname, main_program, None) + def save_persistables(self, dirname, main_program=None): + io.save_persistables(self._executor, dirname, main_program, None) fleet = Collective() @@ -143,9 +125,9 @@ class CollectiveOptimizer(DistributedOptimizer): optimize_ops, param_grads = self._optimizer.minimize( loss, startup_program, parameter_list, no_grad_set) - worker_endpoints = fleet.worker_endpoints - trainer_id = fleet.current_id - current_endpoint = fleet.current_endpoint + worker_endpoints = fleet.worker_endpoints() + trainer_id = fleet.worker_index() + current_endpoint = fleet.worker_endpoints()[trainer_id] startup_program = startup_program if startup_program else \ fluid.framework.default_startup_program diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py index 5eeac2a7318ed2cf0f03822749ffe043ed6096f9..b2ed351da8c5d3071bd5fcf8860d55a636e09526 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py @@ -12,12 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -import sys -from paddle.fluid.executor import Executor - -from paddle.fluid.framework import Program -from paddle.fluid.framework import default_main_program from paddle.fluid.framework import default_startup_program from paddle.fluid.optimizer import Optimizer @@ -27,7 +22,6 @@ import paddle.fluid.io as io from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler -from ...base.role_maker import Role from ...base.fleet_base import Fleet from ...base.fleet_base import Mode from ...base.fleet_base import DistributedOptimizer @@ -44,101 +38,75 @@ class DistributedTranspiler(Fleet): self._startup_program = None self._main_program = None - def init_worker(self, executor): + def init_worker(self): """ `init_worker` has many many functions to do before training, first, wait for all parameter servers launch completely. second, run executor to initialize startup program third, wait for all worker initialize completely. - Args: - executor(Executor): The executor to run for init startup program. - Returns: None """ - if not isinstance(executor, Executor): - raise ValueError("executor must be an instance of Executor") - - if not self._startup_program: - raise ValueError( - "startup_program is None, need invoke DistributedOptimizer.minimize first" - ) - - executor.run(self._startup_program) + pass - def run_worker(self, executor, main_program=None): + def run_worker(self, main_programs=None, scopes=None): pass - def init_server(self, executor, model_dir=None): + def init_server(self, model_dir=None): """ `init_server` has many many functions to do before start pserver, first, run executor to initialize startup program, second, if the `model_dir` is not empty, it will load parameters from it for increment training. Args: - executor(Executor): The executor to run for init server. model_dir(str): The directory path. Returns: None """ - if not isinstance(executor, Executor): - raise ValueError("executor must be an instance of Executor") - if not self._startup_program: raise ValueError( "startup_program is None, need invoke DistributedOptimizer.minimize first" ) - executor.run(self._startup_program) + self._executor.run(self._startup_program) if model_dir: if not os.path.isdir(model_dir): raise ValueError("There is no directory named '%s'", model_dir) - io.load_persistables(executor, model_dir, self._startup_program) + io.load_persistables(self._executor, model_dir, + self._startup_program) - def run_server(self, executor): + def run_server(self): """ `run_server` execute executor to start pserver main program. - Args: - executor(Executor): The executor to run for init server. - Returns: None """ - if not isinstance(executor, Executor): - raise ValueError("executor must be an instance of Executor") - if not self._main_program: raise ValueError( "main_program is None, need invoke DistributedOptimizer.minimize first" ) - executor.run(self._main_program) + self._executor.run(self._main_program) def stop_worker(self): pass - def stop(self, executor): + def stop(self): """ Close this executor. For the distributed training, this method would free the resource on PServers related to the current Trainer. - Args: - executor(Executor): The executor to run for init server. - Returns: None """ - - if not isinstance(executor, Executor): - raise ValueError("executor must be an instance of Executor") - executor.close() + self._executor.close() def distributed_optimizer(self, optimizer, strategy=None): """ @@ -157,11 +125,10 @@ class DistributedTranspiler(Fleet): if not isinstance(optimizer, Optimizer): raise ValueError("optimizer must be an instance of Optimizer") - self.optimizer = TranspilerOptimizer(optimizer, strategy) - return self.optimizer + self._optimizer = TranspilerOptimizer(optimizer, strategy) + return self._optimizer def save_inference_model(self, - executor, dirname, feeded_var_names, target_vars, @@ -172,10 +139,10 @@ class DistributedTranspiler(Fleet): and then save it and all related parameters to given `dirname` by the `executor`. """ io.save_inference_model(dirname, feeded_var_names, target_vars, - executor, main_program, None, None, + self._executor, main_program, None, None, export_for_deployment) - def save_persistables(self, executor, dirname, main_program=None): + def save_persistables(self, dirname, main_program=None): """ This function filters out all variables with `persistable==True` from the give `main_program` and then saves these variables to the folder `dirname` @@ -186,38 +153,56 @@ class DistributedTranspiler(Fleet): files, set `filename` None; if you would like to save all variables in a single file, use `filename` to specify the file name. """ - io.save_persistables(executor, dirname, main_program, None) + io.save_persistables(self._executor, dirname, main_program, None) def _transpile(self, config): - if not isinstance(config, DistributeTranspilerConfig): - raise ValueError( - "config must be an instance of DistributeTranspilerConfig") - self._transpiler = OriginTranspiler(config) self._transpiler.transpile( - trainer_id=fleet.worker_id(), - pservers=fleet.server_endpoints, + trainer_id=fleet.worker_index(), + pservers=fleet.server_endpoints(to_string=True), trainers=fleet.worker_num()) - if self.role == Role.WORKER: + if self.is_worker(): self._main_program = self._transpiler.get_trainer_program() self._startup_program = default_startup_program() else: self._main_program, self._startup_program = \ - self._transpiler.get_pserver_programs(self.current_endpoint) + self._transpiler.get_pserver_programs(self.server_endpoints(self.server_index())) fleet = DistributedTranspiler() class TranspilerOptimizer(DistributedOptimizer): + """ + DistributedOptimizer is a wrapper for paddle.fluid.optimizer + A user should pass a paddle.fluid.optimizer to DistributedOptimizer + minimize() function is implemented. + DistributedOptimizer is the starting point for a user who wants to + run distributed training. The optimized information will be stored in + Fleet() instance who holds the global information about current distributed + training. + + Args: + optimizer(Optimizer): subclass of Optimizer. + strategy(DistributeTranspilerConfig): instance of DistributeTranspilerConfig. + + Returns: + None + """ + def __init__(self, optimizer, strategy=None): super(TranspilerOptimizer, self).__init__(optimizer, strategy) - if strategy and not isinstance(strategy, DistributeTranspilerConfig): - raise ValueError( - "In {} mode, strategy must be an instance of DistributeTranspilerConfig". - format(fleet.mode)) + if strategy: + if not isinstance(strategy, DistributeTranspilerConfig): + raise ValueError( + "In {} mode, strategy must be an instance of DistributeTranspilerConfig". + format(fleet._mode)) + else: + self._strategy = strategy + else: + self._strategy = DistributeTranspilerConfig() def backward(self, loss, @@ -225,24 +210,68 @@ class TranspilerOptimizer(DistributedOptimizer): parameter_list=None, no_grad_set=None, callbacks=None): + """ + First part of `minimize`, do auto-diff to append backward ops for + the current program. + + Args: + loss (Variable): loss variable to run optimizations. + startup_program (Program): startup_program for initializing parameters + in `parameter_list`. + parameter_list (list): list of Variables to update. + no_grad_set (set|None): set of Variables should be ignored. + callbacks (list|None): list of callables to run when appending backward + operator for one parameter. + + Return: + list: list of (param, grad) pair, grad is the output of backward. + + Examples: + See examples in `apply_gradients`. + """ return self._optimizer.backward(loss, startup_program, parameter_list, no_grad_set, callbacks) def apply_gradients(self, params_grads): + """ + Second part of `minimize`, appending optimization operators for + given `params_grads` pairs. + + Args: + params_grads (list): list of (param, grad) pair to do optimization. + + Returns: + list: A list of operators appended to the current program. + + Examples: + .. code-block:: python + + loss = network() + optimizer = fluid.optimizer.SGD(learning_rate=0.1) + params_grads = optimizer.backward(loss) + # you may append operations for params_grads here + # ... + optimizer.apply_gradients(params_grads) + """ return self._optimizer.apply_gradients(params_grads) def minimize(self, loss, + scope=None, startup_program=None, parameter_list=None, no_grad_set=None): - optimize_ops, params_grads = self._optimizer.minimize( - loss, startup_program, parameter_list, no_grad_set) - self.transpile() - return optimize_ops, params_grads - def transpile(self): - if self._strategy is None: - self._strategy = DistributeTranspilerConfig() + if isinstance(loss, list): + raise ValueError( + "DistributedTranspiler's minimize can not accept loss with list") + if isinstance(startup_program, list): + raise ValueError( + "DistributedTranspiler's minimize can not accept program with list" + ) + + optimize_ops, params_grads = self._optimizer.minimize( + loss, startup_program, parameter_list, no_grad_set) fleet._transpile(config=self._strategy) + return optimize_ops, params_grads diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index b472c20bc132ea343b9a3261a6e218565cbaea25..ec066187c238815a5b262fb752d10ad6a5730cbe 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -28,63 +28,56 @@ class PSLib(Fleet): def __init__(self): super(PSLib, self).__init__(Mode.PSLIB) self._opt_info = None - self.local_ip_ = 0 + self._local_ip = 0 self._fleet_ptr = None + self._main_programs = [] + self._scopes = [] - def init(self, role_maker=None): - super(PSLib, self).init(MPISymetricRoleMaker()) + def init(self, executor, role_maker=None): + super(PSLib, self).init(executor, MPISymetricRoleMaker()) self._fleet_ptr = fluid.core.Fleet() - def init_worker(self, executor): - pass - - def run_worker(self, executor, main_program=None): + def init_worker(self): """ init_worker(): will be called by user. When a user knows current process is_server(), he/she should call init_worker() to initialize global information about worker and connect worker with pserver. You should run startup program before init_worker. Args: - programs(Program|list): a Program or a list of Programs - scopes(Scope|list): a Scope or a list of Scopes, default None. + executor(Executor): The executor to run for init server. + programs(Program|None): The program that need to run. """ - if not isinstance(main_program, Program): - raise ValueError("main_program must be an instance of Program") - programs = [main_program] - scopes = [fluid.global_scope()] * len(programs) + if len(self._main_programs) == 0: + raise ValueError( + "You should run DistributedOptimizer.minimize() first") - if len(scopes) != len(programs): - print( - "You should make sure len(scopes) == len(programs) or set scopes None" - ) - sys.exit(-1) if self._opt_info: if "fleet_desc" in self._opt_info: self._dist_desc_str = text_format.MessageToString( self._opt_info["fleet_desc"]) self._dist_desc = self._opt_info["fleet_desc"] else: - print("You should run DistributedOptimizer.minimize() first") - sys.exit(-1) + raise Exception( + "You should run DistributedOptimizer.minimize() first") # barrier_all for init_server, wait for server starts - self.role_maker_._barrier_all() - self.all_ips_ = self.role_maker_._all_gather(self.local_ip_) + self._role_maker._barrier_all() + self.all_ips_ = self._role_maker._all_gather(self._local_ip) self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_, - self.role_maker_._get_size(), - self.role_maker_._get_rank()) + self._role_maker._get_size(), + self._role_maker._get_rank()) # barrier_all for init_worker - self.role_maker_._barrier_all() + self._role_maker._barrier_all() # prepare for client to client communication info = self._fleet_ptr.get_clients_info() - all_info = self.role_maker_._worker_gather(info[0]) + all_info = self._role_maker._worker_gather(info[0]) self._fleet_ptr.gather_clients(all_info) self._fleet_ptr.create_client2client_connection() # barrier for init model - self.role_maker_._barrier_worker() - if self.role_maker_._is_first_worker(): + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): tables = self._dist_desc.trainer_param.dense_table - for prog, scope in zip(programs, scopes): + for prog, scope in zip(self._main_programs, self._scopes): prog_id = str(id(prog)) prog_conf = self._opt_info['program_configs'][prog_id] prog_tables = {} @@ -100,24 +93,23 @@ class PSLib(Fleet): for i in range(0, len(table.dense_variable_name)): var_name = table.dense_variable_name[i] if scope.find_var(var_name) is None: - print("var " + var_name + - " not found in scope, " + - "you should run startup program first") - sys.exit(-1) + raise ValueError( + "var " + var_name + " not found in scope, " + + "you should run startup program first") var_name_list.append(var_name) self._fleet_ptr.init_model(scope, int(table.table_id), var_name_list) # barrier for init model done - self.role_maker_._barrier_worker() + self._role_maker._barrier_worker() else: raise NameError( "You should run DistributedOptimizer.minimize() first") - def init_server(self, executor, model_dir=None): + def init_server(self, model_dir=None): pass - def run_server(self, executor): + def run_server(self): """ init_pserver(): will be called by user. When a user knows current process is_worker(), he/she should call init_pserver() to initialize global information about parameter server @@ -128,22 +120,22 @@ class PSLib(Fleet): self._opt_info["fleet_desc"]) self._dist_desc = self._opt_info["fleet_desc"] else: - print("You should run DistributedOptimizer.minimize() first") - sys.exit(-1) + raise Exception( + "You should run DistributedOptimizer.minimize() first") self._fleet_ptr.init_server(self._dist_desc_str, - self.role_maker_._get_rank()) - self.local_ip_ = self._fleet_ptr.run_server() + self._role_maker._get_rank()) + self._local_ip = self._fleet_ptr.run_server() # barrier_all for init_server - self.role_maker_._barrier_all() - self.all_ips_ = self.role_maker_._all_gather(self.local_ip_) + self._role_maker._barrier_all() + self.all_ips_ = self._role_maker._all_gather(self._local_ip) self._fleet_ptr.gather_servers(self.all_ips_, - self.role_maker_._get_size()) + self._role_maker._get_size()) # barrier_all for init_worker, wait all workers start - self.role_maker_._barrier_all() + self._role_maker._barrier_all() else: - raise NameError( + raise Exception( "You should run DistributedOptimizer.minimize() first") def stop_worker(self): @@ -151,31 +143,30 @@ class PSLib(Fleet): stop(): will be called after a user finishes his/her training task. Fleet instance will be destroyed when stop() is called. """ - self.role_maker_._barrier_worker() - if self.role_maker_._is_first_worker(): + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): self._fleet_ptr.stop_server() - self.role_maker_._barrier_worker() - self.role_maker_._barrier_all() - self.role_maker_._finalize() + self._role_maker._barrier_worker() + self._role_maker._barrier_all() + self._role_maker._finalize() - def stop(self, executor): + def stop(self): """ stop(): will be called after a user finishes his/her training task. Fleet instance will be destroyed when stop() is called. """ - self.role_maker_._barrier_worker() - if self.role_maker_._is_first_worker(): + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): self._fleet_ptr.stop_server() - self.role_maker_._barrier_worker() - self.role_maker_._barrier_all() - self.role_maker_._finalize() + self._role_maker._barrier_worker() + self._role_maker._barrier_all() + self._role_maker._finalize() def distributed_optimizer(self, optimizer, strategy=None): - self.optimizer = DownpourOptimizer(optimizer, strategy) - return self.optimizer + self._optimizer = DownpourOptimizer(optimizer, strategy) + return self._optimizer def save_inference_model(self, - executor, dirname, feeded_var_names=None, target_vars=None, @@ -186,7 +177,7 @@ class PSLib(Fleet): """ self._fleet_ptr.save_model(dirname) - def save_persistables(self, executor, dirname, main_program=None): + def save_persistables(self, dirname, main_program=None): self._fleet_ptr.save_model(dirname) def _set_opt_info(self, opt_info): @@ -208,6 +199,13 @@ class DownpourOptimizer(DistributedOptimizer): run distributed training. The optimized information will be stored in Fleet() instance who holds the global information about current distributed training. + + Args: + optimizer(Optimizer): subclass of Optimizer. + strategy(any): config for DownpourOptimizer. + + Returns: + None """ def __init__(self, optimizer, strategy=None): @@ -242,32 +240,54 @@ class DownpourOptimizer(DistributedOptimizer): raise NotImplementedError() def minimize(self, - loss, - startup_program=None, + losses, + scopes=None, + startup_programs=None, parameter_list=None, no_grad_set=None): """ - minimize a program through loss, loss can be a list in DistributedOptimizer + minimize a program through loss, loss can be a list in DistributedOptimizer. + Note that in parameter server mode, a worker will not get anything about optimize_os + Because optmizer algorithms run on pserver side. We will make this usable in pserver + process, but currently the optimization part is written into Fleet(). A user does not + need to care about how to startup a pserver node. + Args: - loss (Variable|Variable List): loss variable or loss variable list to run optimization. - startup_program (Program): startup_program for initializing parameters + losses (Variable|Variable List): loss variable or loss variable list to run optimization. + scopes (Scope| Scope List): scope instance. + startup_programs (Program|Program List): startup_program for initializing parameters in `parameter_list`. parameter_list (list): list of Variables to update. no_grad_set (set|None): set of Variables should be ignored. + Returns: tuple: (optimize_ops, params_grads) which are, list of operators appended; and list of (param, grad) Variables pair for optimization. - Note that in parameter server mode, a worker will not get anything about optimize_os - Because optmizer algorithms run on pserver side. We will make this usable in pserver - process, but currently the optimization part is written into Fleet(). A user does not - need to care about how to startup a pserver node. """ + + if not isinstance(losses, list): + losses = [losses] + optimize_ops, param_grads, opt_info = \ self._distributed_optimizer._minimize( - loss, - startup_program, + losses, + startup_programs, parameter_list, no_grad_set) fleet._set_opt_info(opt_info) + + programs = [loss.block.program for loss in losses] + + if scopes is None: + scopes = [fluid.global_scope()] * len(programs) + + if len(scopes) != len(programs): + raise ValueError( + "You should make sure len(scopes) == len(programs) or set scopes None" + ) + + fleet._main_programs = programs + fleet._scopes = scopes + return [optimize_ops, param_grads] diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index 641c294c4a6edeb3d9823b4152b0ea158c8faa80..7a1925a95fd29259c137bc592aff653554381ada 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -94,7 +94,7 @@ class DownpourServer(Server): Returns: return None """ - table = self.server_.downpour_server_param.downpour_table_param.add() + table = self._server.downpour_server_param.downpour_table_param.add() table.table_id = table_id table.table_class = "DownpourDenseTable" table.type = pslib.PS_DENSE_TABLE @@ -169,7 +169,7 @@ class DownpourWorker(Worker): Returns: return None """ - table = self.worker_.sparse_table.add() + table = self._worker.sparse_table.add() table.table_id = table_id table.slot_key.extend([var.name for var in slot_key_vars]) table.slot_value.extend([var.name for var in slot_value_vars]) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index ba1f2c8f6ba43bcdb8d4240e33210370e5a454f6..31f964a0e341cf0a4f1bc551f3bea1a6a47d108e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -66,8 +66,6 @@ class DistributedAdam(DistributedOptimizerImplBase): Returns: [optimize_ops, grads_and_weights] """ - if not isinstance(losses, list): - losses = [losses] table_name = find_distributed_lookup_table(losses[0].block.program) prefetch_slots = find_distributed_lookup_table_inputs( @@ -77,7 +75,7 @@ class DistributedAdam(DistributedOptimizerImplBase): ps_param = pslib.PSParameter() server = DownpourServer() - worker = DownpourWorker(self.window_) + worker = DownpourWorker(self._window) sparse_table_index = 0 server.add_sparse_table(sparse_table_index, self._learning_rate, prefetch_slots, prefetch_slots_emb) @@ -88,17 +86,12 @@ class DistributedAdam(DistributedOptimizerImplBase): param_grads_list = [] for loss_index in range(len(losses)): - #program_config = ps_param.trainer_param.program_config.add() - #program_config.program_id = str( - # id(losses[loss_index].block.program)) program_id = str(id(losses[loss_index].block.program)) program_configs[program_id] = { "pull_sparse": [sparse_table_index], "push_sparse": [sparse_table_index] } - #program_config.pull_sparse_table_id.extend([sparse_table_index]) - #program_config.push_sparse_table_id.extend([sparse_table_index]) params_grads = sorted( fluid.backward.append_backward(losses[loss_index], parameter_list, no_grad_set), @@ -130,8 +123,6 @@ class DistributedAdam(DistributedOptimizerImplBase): params, grads) program_configs[program_id]["pull_dense"] = [dense_table_index] program_configs[program_id]["push_dense"] = [dense_table_index] - #program_config.pull_dense_table_id.extend([dense_table_index]) - #program_config.push_dense_table_id.extend([dense_table_index]) if len(data_norm_params) != 0 and len(data_norm_grads) != 0: dense_table_index += 1 server.add_data_norm_table(dense_table_index, @@ -139,18 +130,13 @@ class DistributedAdam(DistributedOptimizerImplBase): data_norm_params, data_norm_grads) worker.add_dense_table(dense_table_index, self._learning_rate, data_norm_params, data_norm_grads) - #program_config.pull_dense_table_id.extend([dense_table_index]) - #program_config.push_dense_table_id.extend([dense_table_index]) program_configs[program_id]["pull_dense"].extend( [dense_table_index]) program_configs[program_id]["push_dense"].extend( [dense_table_index]) dense_table_index += 1 - #program_configs.append(program_config) ps_param.server_param.CopyFrom(server.get_desc()) ps_param.trainer_param.CopyFrom(worker.get_desc()) - #for program_config in program_configs: - # ps_param.trainer_param.program_config.extend([program_config]) # Todo(guru4elephant): figure out how to support more sparse parameters # currently only support lookup_table worker_skipped_ops = ["lookup_table", "lookup_table_grad"]