From 1a4a51db2b8834d893cc011439e9f5367ec4f125 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 25 Apr 2019 15:46:11 +0800 Subject: [PATCH] Fleet unify distributed training (#16791) * implement distributed transpiler with fleet --- python/paddle/fluid/dataset.py | 2 +- .../fluid/incubate/fleet/base/fleet_base.py | 341 ++++++++++++++++ .../fluid/incubate/fleet/base/role_maker.py | 42 +- .../incubate/fleet/collective/__init__.py | 163 ++++++++ .../fluid/incubate/fleet/p2p/__init__.py | 12 - .../fleet/parameter_server/__init__.py | 363 +----------------- .../distributed_transpiler/__init__.py | 248 ++++++++++++ .../fleet/parameter_server/pslib/__init__.py | 273 +++++++++++++ .../parameter_server/{ => pslib}/node.py | 0 .../{ => pslib}/optimizer_factory.py | 0 .../parameter_server/{ => pslib}/ps_pb2.py | 0 .../fluid/tests/unittests/test_dist_base.py | 2 +- .../unittests/test_listen_and_serv_op.py | 19 +- .../fluid/transpiler/distribute_transpiler.py | 3 +- python/setup.py.in | 4 +- 15 files changed, 1082 insertions(+), 390 deletions(-) create mode 100644 python/paddle/fluid/incubate/fleet/base/fleet_base.py create mode 100644 python/paddle/fluid/incubate/fleet/collective/__init__.py delete mode 100644 python/paddle/fluid/incubate/fleet/p2p/__init__.py create mode 100644 python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py create mode 100644 python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py rename python/paddle/fluid/incubate/fleet/parameter_server/{ => pslib}/node.py (100%) rename python/paddle/fluid/incubate/fleet/parameter_server/{ => pslib}/optimizer_factory.py (100%) rename python/paddle/fluid/incubate/fleet/parameter_server/{ => pslib}/ps_pb2.py (100%) diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index d539940133..c97e0bc6e8 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -233,7 +233,7 @@ class InMemoryDataset(DatasetBase): Examples: >>> import paddle.fluid as fluid - >>> import paddle.fluid.incubate.fleet.parameter_server as fleet + >>> from paddle.fluid.incubate.fleet.pslib import fleet >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset") >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py new file mode 100644 index 0000000000..c8177842ef --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -0,0 +1,341 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import abc +import sys + +from enum import Enum + +from paddle.fluid.optimizer import SGD + +from role_maker import RoleMakerBase, Role +from role_maker import MPISymetricRoleMaker +from role_maker import UserDefinedRoleMaker + + +class Mode(Enum): + TRANSPILER = 1, + PSLIB = 2, + COLLECTIVE = 3 + + +class Fleet(object): + """ + Fleet is the base class, transpiler and pslib are implementation of Fleet. + + Args: + mode(Mode): the implementation of Fleet's mode. + + Returns: + None + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, mode): + assert isinstance(mode, Mode) + self.is_initialized = False + self.mode = mode + self.workers = 0 + self.servers = 0 + self.worker_endpoints = [] + self.server_endpoints = [] + self.role = Role.WORKER + self.current_endpoint = None + self.current_id = 0 + self.optimizer = None + self.role_maker_ = None + + def is_first_worker(self): + """ + Check whether the node is the first instance of worker. + + Returns: + bool: True if this is the first node of worker, + False if not. + """ + return self.is_worker() and self.current_id == 0 + + def worker_id(self): + """ + Get current worker id. + + Returns: + int: node id + """ + return self.current_id + + def get_workers(self): + """ + Get current total worker number. + + Returns: + int: worker number + """ + return self.workers + + def is_worker(self): + """ + Check whether the node is an instance of worker. + + Returns: + bool: True if this is a node of worker, + False if not. + """ + return self.role == Role.WORKER + + def is_server(self): + """ + Check whether the node is an instance of server. + + Returns: + bool: True if this is a node of server, + False if not. + """ + return self.role == Role.SERVER + + def split_files(self, files): + """ + split files before distributed training, + for example, files is [a, b, c ,d, e] and trainer_num = 2, + then trainer 0 gets [a, b, c] and trainer 1 gets [d, e] + + Args: + files(list): file list need to be read. + + Returns: + list: files belongs to this worker. + """ + file_num = len(files) + trainer_id = self.worker_id() + trainer_num = self.get_workers() + if trainer_num > file_num: + raise ValueError("trainer_num should be <= file_num : " + "%s > %s" % (trainer_num, file_num)) + start = 0 + end = 0 + for i in range(0, trainer_id + 1): + length = file_num / trainer_num + (i < (file_num % trainer_num)) + start = end + end += length + return files[start:end] + + def init(self, role_maker=None): + """ + should be called only once in user's python scripts, + init() will initialize RoleMaker which is used for identifying + current node's role, e.g. worker, server, etc. + + Args: + role_maker(RoleMakerBase): subclass of RoleMakerBase. + + Returns: + None + """ + + if role_maker and not isinstance(role_maker, RoleMakerBase): + raise ValueError("role_maker must be an instance of RoleMakerBase") + + self.role_maker_ = role_maker + + if isinstance(role_maker, MPISymetricRoleMaker): + self.role_maker_._generate_role() + self.role = Role.WORKER if role_maker._is_worker() else Role.SERVER + self.workers = role_maker._worker_num() + self.servers = role_maker._server_num() + self.server_endpoints = role_maker._get_pserver_endpoints() + self.worker_endpoints = role_maker._get_trainer_endpoints() + self.current_id = role_maker._worker_index( + ) if role_maker._is_worker() else role_maker._server_index() + self.current_endpoint = self.worker_endpoints[self.current_id] \ + if role_maker._is_worker() else self.server_endpoints[self.current_id] + + elif isinstance(role_maker, UserDefinedRoleMaker): + self.current_id = role_maker.current_id + self.current_endpoint = role_maker.current_endpoint + self.workers = role_maker.workers + self.worker_endpoints = role_maker.worker_endpoints + self.servers = role_maker.servers + self.server_endpoints = role_maker.server_endpoints + self.role = role_maker.role + + else: + raise ValueError( + "role_maker must be an instance of UserDefinedRoleMaker/MPISymetricRoleMaker" + ) + + self.is_initialized = True + + @abc.abstractmethod + def init_worker(self, executor): + pass + + @abc.abstractmethod + def run_worker(self, executor, main_program=None): + pass + + @abc.abstractmethod + def init_server(self, executor, model_dir=None): + pass + + @abc.abstractmethod + def run_server(self, executor): + pass + + @abc.abstractmethod + def stop_worker(self): + pass + + @abc.abstractmethod + def stop(self, executor): + pass + + @abc.abstractmethod + def distributed_optimizer(self, optimizer, strategy=None): + pass + + @abc.abstractmethod + def save_inference_model(self, + executor, + dirname, + feeded_var_names, + target_vars, + main_program=None, + export_for_deployment=True): + pass + + @abc.abstractmethod + def save_persistables(self, executor, dirname, main_program=None): + pass + + def to_string(self): + infos = """ + mode = {} + workers = {} + server_endpoints = {} + role = {} + current_endpoint = {} + current_id = {} + """.format(self.mode, self.workers, self.server_endpoints, self.role, + self.current_endpoint, self.current_id) + return infos + + +class DistributedOptimizer(object): + """ + DistributedOptimizer is a wrapper for paddle.fluid.optimizer + A user should pass a paddle.fluid.optimizer to DistributedOptimizer + minimize() function is implemented. + DistributedOptimizer is the starting point for a user who wants to + run distributed training. The optimized information will be stored in + Fleet() instance who holds the global information about current distributed + training. + + Args: + optimizer(Optimizer): subclass of Optimizer. + strategy(dict): the user define config for Optimizer. + + Returns: + None + + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, optimizer, strategy=None): + if not isinstance(optimizer, SGD.__bases__): + raise ValueError("optimizer must be an instance of Optimizer") + + if strategy and not isinstance(strategy, dict): + raise ValueError("strategy must be an instance of Dict") + + self._optimizer = optimizer + self._strategy = strategy + + @abc.abstractmethod + def backward(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None, + callbacks=None): + """ + First part of `minimize`, do auto-diff to append backward ops for + the current program. + + Args: + loss (Variable): loss variable to run optimizations. + startup_program (Program): startup_program for initializing parameters + in `parameter_list`. + parameter_list (list): list of Variables to update. + no_grad_set (set|None): set of Variables should be ignored. + callbacks (list|None): list of callables to run when appending backward + operator for one parameter. + + Return: + list: list of (param, grad) pair, grad is the output of backward. + + Examples: + See examples in `apply_gradients`. + """ + pass + + @abc.abstractmethod + def apply_gradients(self, params_grads): + """ + Second part of `minimize`, appending optimization operators for + given `params_grads` pairs. + + Args: + params_grads (list): list of (param, grad) pair to do optimization. + + Returns: + list: A list of operators appended to the current program. + + Examples: + .. code-block:: python + + loss = network() + optimizer = fluid.optimizer.SGD(learning_rate=0.1) + params_grads = optimizer.backward(loss) + # you may append operations for params_grads here + # ... + optimizer.apply_gradients(params_grads) + """ + pass + + @abc.abstractmethod + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + """ + Add operations to minimize `loss` by updating `parameter_list`. + + This method combines interface `backward()` and + `apply_gradients()` into one. + + Args: + loss (Variable): loss variable to run optimizations. + startup_program (Program): startup_program for initializing parameters + in `parameter_list`. + parameter_list (list): list of Variables to update. + no_grad_set (set|None): set of Variables should be ignored. + + Returns: + tuple: (optimize_ops, params_grads) which are, list of operators appended; + and list of (param, grad) Variables pair for optimization. + """ + pass diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index ffc7ae0172..dfd2273b48 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -13,6 +13,13 @@ # limitations under the License. import sys +from enum import Enum + + +class Role(Enum): + WORKER = 1, + SERVER = 2 + class RoleMakerBase(object): """ @@ -23,7 +30,6 @@ class RoleMakerBase(object): """ def __init__(self): - self._role_maker_name = "" self._trainer_endpoints = [] self._pserver_endpoints = [] self._role_is_generated = False @@ -239,3 +245,37 @@ class MPISymetricRoleMaker(MPIRoleMaker): self._node_type = 1 self._node_type_comm = self._comm.Split(self._node_type) self._role_is_generated = True + + +class UserDefinedRoleMaker(RoleMakerBase): + def __init__(self, + current_id=0, + current_endpoint=None, + workers=0, + worker_endpoints=None, + servers=0, + server_endpoints=None, + role=Role.WORKER): + """ + UserDefinedRoleMaker is designed for worker and server assignment + under manual. Typically, a worker and a server node will be appointed + on each physical node, It can be assign by user. + """ + super(UserDefinedRoleMaker, self).__init__() + + self.current_id = current_id + self.current_endpoint = current_endpoint + self.workers = workers + self.worker_endpoints = worker_endpoints + self.servers = servers + self.server_endpoints = server_endpoints + self.role = role + + def _is_worker(self): + return self.role == Role.WORKER + + def _is_server(self): + return self.role == Role.SERVER + + def _generate_role(self): + self.role_is_generated_ = True diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py new file mode 100644 index 0000000000..49ecaee07a --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -0,0 +1,163 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +import sys +import logging + +import paddle.fluid as fluid +import paddle.fluid.io as io +import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler + +from ..base.fleet_base import Fleet +from ..base.fleet_base import Mode +from ..base.fleet_base import DistributedOptimizer + + +class Collective(Fleet): + def __init__(self): + super(Collective, self).__init__(Mode.COLLECTIVE) + self.local_ip_ = 0 + + def init(self, role_maker=None): + """ + should be called only once in user's python scripts, + init() will initialize RoleMaker which is used for identifying + current node's role, e.g. worker, server, etc. + + Args: + role_maker(RoleMakerBase): subclass of RoleMakerBase. + + Returns: + None + """ + + super(Collective, self).init(role_maker) + self._role_maker._generate_role() + + def init_worker(self, executor): + logging.warn( + "You should not call 'init_worker' method for collective mode.") + + def run_worker(self, executor, main_program=None): + logging.warn( + "You should not call 'run_worker' method for collective mode.") + + def init_server(self, executor, model_dir=None): + logging.warn( + "You should not call 'init_server' method for collective mode.") + + def run_server(self, executor): + logging.warn( + "You should not call 'run_server' method for collective mode.") + + def stop_worker(self): + logging.warn( + "You should not call 'stop_worker' method for collective mode.") + + def stop(self, executor): + """ + stop(): will be called after a user finishes his/her training task. + """ + logging.warn("You should not call 'stop' method for collective mode.") + + def distributed_optimizer(self, optimizer, strategy=None): + self.optimizer = CollectiveOptimizer(optimizer, strategy) + return self.optimizer + + def save_inference_model(self, + executor, + dirname, + feeded_var_names=None, + target_vars=None, + main_program=None, + export_for_deployment=True): + io.save_inference_model(dirname, feeded_var_names, target_vars, + executor, main_program, None, None, + export_for_deployment) + + def save_persistables(self, executor, dirname, main_program=None): + io.save_persistables(executor, dirname, main_program, None) + + +fleet = Collective() + + +class CollectiveOptimizer(DistributedOptimizer): + """ + DistributedOptimizer is a wrapper for paddle.fluid.optimizer + A user should pass a paddle.fluid.optimizer to DistributedOptimizer + minimize() function is implemented. + DistributedOptimizer is the starting point for a user who wants to + run distributed training. The optimized information will be stored in + Fleet() instance who holds the global information about current distributed + training. + """ + + def __init__(self, optimizer, strategy=None): + super(CollectiveOptimizer, self).__init__(optimizer, strategy) + assert strategy is None, "You cannot set 'strategy' for collective." + + def backward(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None, + callbacks=None): + return self._optimizer.backward(loss, startup_program, parameter_list, + no_grad_set, callbacks) + + def apply_gradients(self, params_grads): + return self._optimizer.apply_gradients(params_grads) + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + """ + minimize a program through loss + Args: + loss (Variable|Variable List): loss variable or loss variable list to run optimization. + startup_program (Program): startup_program for initializing parameters + in `parameter_list`. + parameter_list (list): list of Variables to update. + no_grad_set (set|None): set of Variables should be ignored. + Returns: + tuple: (optimize_ops, params_grads) which are, list of operators appended; + and list of (param, grad) Variables pair for optimization. + Note that in parameter server mode, a worker will not get anything about optimize_os + Because optmizer algorithms run on pserver side. We will make this usable in pserver + process, but currently the optimization part is written into Fleet(). A user does not + need to care about how to startup a pserver node. + """ + optimize_ops, param_grads = self._optimizer.minimize( + loss, startup_program, parameter_list, no_grad_set) + + worker_endpoints = fleet.worker_endpoints + trainer_id = fleet.current_id + current_endpoint = fleet.current_endpoint + + startup_program = startup_program if startup_program else \ + fluid.framework.default_startup_program + + # call transpiler + config = dist_transpiler.DistributeTranspilerConfig() + config.mode = "nccl2" + t = dist_transpiler.DistributeTranspiler(config=config) + t.transpile( + trainer_id, + trainers=','.join(worker_endpoints), + startup_program=startup_program, + current_endpoint=current_endpoint) + + return optimize_ops, param_grads diff --git a/python/paddle/fluid/incubate/fleet/p2p/__init__.py b/python/paddle/fluid/incubate/fleet/p2p/__init__.py deleted file mode 100644 index 8647330f32..0000000000 --- a/python/paddle/fluid/incubate/fleet/p2p/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 4a7665b9bc..33ed0ecf10 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -10,365 +10,4 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and - -import sys -import os -from ..base.role_maker import MPISymetricRoleMaker -from .optimizer_factory import * -from google.protobuf import text_format -import paddle.fluid.optimizer as local_optimizer -import paddle.fluid as fluid - - -class Fleet(object): - """ - Fleet in Python. Fleet is used in distributed training. It is designed as a singlton instance - in c++. A Fleet() object will be initialized automatically when a user import this package as - fleet. The General interface Fleet supports are: - init(): which should be called only once in user's python scripts. init() will initialize - FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying - current node's role, e.g. worker, server, etc. - stop(): will be called after a user finishes his/her training task. Fleet instance will be - destroyed when stop() is called. - init_pserver(): will be called by user. When a user knows current process is_worker(), he/she - should call init_pserver() to initialize global information about parameter server - init_worker(): will be called by user. When a user knows current process is_server(), he/she - should call init_worker() to initialize global information about worker and connect - worker with pserver. - get_worker_num(): return the number of current task's worker node - get_server_num(): return the number of current task's pserver node - is_worker(): return whether current process is a worker - is_server(): return thether current process is a server - init_pserver_model(): initialize model parameters in pserver, called from a worker node - save_pserver_model(): save model parameters in pserver, called from a server node - - Example: - - .. code-block:: python - import paddle.fluid.incubate.fleet.parameter_server as fleet - from my_model import bow_net - model = bow_net() - fleet.init() - sgd_optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.0001) - sgd_optimizer = fleet.DistributedOptimizer(sgd_optimizer) - sgd_optimizer.minimize(model.loss) - exe = paddle.fluid.Executor(paddle.fluid.CPUPlace()) - if fleet.is_worker(): - exe.run(paddle.fluid.default_startup_program()) - fleet.init_worker() # init worker should be called before training - # do other things like training - elif fleet.is_server(): - fleet.init_pserver() - fleet.stop() - """ - - def __init__(self): - self._opt_info = None # for fleet only - self._role_maker = None - self._local_ip = 0 - self._is_initialized = False - - def init(self): - # TODO(guru4elephant) - # this is a temporary solution - # we will support more configurable RoleMaker for users in the future - """ - init(): which should be called only once in user's python scripts. init() will initialize - FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying - current node's role, e.g. worker, server, etc. - """ - if not self.is_initialized_: - self._role_maker = MPISymetricRoleMaker() - self._role_maker._generate_role() - self._fleet_ptr = fluid.core.Fleet() - self._is_initialized = True - - def stop(self): - """ - stop(): will be called after a user finishes his/her training task. Fleet instance will be - destroyed when stop() is called. - """ - self._role_maker._barrier_worker() - if self._role_maker._is_first_worker(): - self._fleet_ptr.stop_server() - self._role_maker._barrier_worker() - self._role_maker._barrier_all() - self._role_maker._finalize() - - def init_pserver(self): - """ - init_pserver(): will be called by user. When a user knows current process is_worker(), he/she - should call init_pserver() to initialize global information about parameter server - """ - if self._opt_info: - if "fleet_desc" in self._opt_info: - self._dist_desc_str = text_format.MessageToString( - self._opt_info["fleet_desc"]) - self._dist_desc = self._opt_info["fleet_desc"] - else: - print("You should run DistributedOptimizer.minimize() first") - sys.exit(-1) - self._fleet_ptr.init_server(self._dist_desc_str, - self.role_maker_._get_rank()) - self._local_ip = self._fleet_ptr.run_server() - # barrier_all for init_server - self._role_maker._barrier_all() - self._all_ips = self._role_maker._all_gather(self.local_ip_) - - self._fleet_ptr.gather_servers(self._all_ips, - self._role_maker._get_size()) - # barrier_all for init_worker, wait all workers start - self._role_maker._barrier_all() - else: - print("You should run DistributedOptimizer.minimize() first") - sys.exit(-1) - - def init_worker(self, programs, scopes=None): - """ - init_worker(): will be called by user. When a user knows current process is_server(), he/she - should call init_worker() to initialize global information about worker and connect - worker with pserver. You should run startup program before init_worker. - - Args: - programs(Program|list): a Program or a list of Programs - scopes(Scope|list): a Scope or a list of Scopes, default None. - """ - if not isinstance(programs, list): - programs = [programs] - if scopes is None: - scopes = [fluid.global_scope()] * len(programs) - if len(scopes) != len(programs): - print( - "You should make sure len(scopes) == len(programs) or set scopes None" - ) - sys.exit(-1) - if self._opt_info: - if "fleet_desc" in self._opt_info: - self._dist_desc_str = text_format.MessageToString( - self._opt_info["fleet_desc"]) - self._dist_desc = self._opt_info["fleet_desc"] - else: - print("You should run DistributedOptimizer.minimize() first") - sys.exit(-1) - # barrier_all for init_server, wait for server starts - self._role_maker._barrier_all() - self._all_ips = self._role_maker._all_gather(self.local_ip_) - self._fleet_ptr.init_worker(self._dist_desc_str, self._all_ips, - self._role_maker._get_size(), - self._role_maker._get_rank()) - # barrier_all for init_worker - self._role_maker._barrier_all() - # prepare for client to client communication - info = self._fleet_ptr.get_clients_info() - all_info = self._role_maker._worker_gather(info[0]) - self._fleet_ptr.gather_clients(all_info) - self._fleet_ptr.create_client2client_connection() - # barrier for init model - self._role_maker._barrier_worker() - if self._role_maker._is_first_worker(): - tables = self._dist_desc.trainer_param.dense_table - for prog, scope in zip(programs, scopes): - prog_id = str(id(prog)) - prog_conf = self._opt_info['program_configs'][prog_id] - prog_tables = {} - for key in prog_conf: - if "dense" not in key: - continue - for table_id in prog_conf[key]: - prog_tables[int(table_id)] = 0 - for table in tables: - if int(table.table_id) not in prog_tables: - continue - var_name_list = [] - for i in range(0, len(table.dense_variable_name)): - var_name = table.dense_variable_name[i] - if scope.find_var(var_name) is None: - print("var " + var_name + - " not found in scope, " + - "you should run startup program first") - sys.exit(-1) - var_name_list.append(var_name) - self._fleet_ptr.init_model(scope, - int(table.table_id), - var_name_list) - # barrier for init model done - self._role_maker._barrier_worker() - else: - print("You should run DistributedOptimizer.minimize() first") - sys.exit(-1) - - def get_worker_num(self): - """ - return the number of current job's worker num - """ - return self._role_maker._worker_num() - - def get_server_num(self): - """ - return the number of current job's server num - """ - return self._role_maker._server_num() - - def get_worker_index(self): - """ - return the mpi rank of current worker - """ - return self._role_maker._worker_index() - - def is_worker(self): - """ - return whether current node is a worker - """ - return self._role_maker._is_worker() - - def is_server(self): - """ - return whether current node is pserver - """ - return self._role_maker._is_server() - - def init_pserver_model(self): - """ - init pserver model called from pserver - """ - if self._role_maker._is_first_worker(): - self._fleet_ptr.init_model() - self._role_maker._barrier_worker() - - def save_pserver_model(self, save_path): - """ - save pserver model called from a worker - """ - self._fleet_ptr.save_model(save_path) - - def split_filelist(self, filelist): - """ - split filelist before distributed training, - for example, filelist is [a, b, c ,d, e] and trainer_num = 2, - then trainer 0 gets [a, b, c] and trainer 1 gets [d, e] - - Example: - >>> all_filelist = ["a.txt", "b.txt", "c.txt"] - >>> my_filelist = fleet.split_filelist(all_filelist) - >>> dataset = fluid.DatasetFactory().create_dataset() - >>> dataset.set_filelist(my_filelist) - - Args: - filelist(list): list of filename, can be local or hdfs/afs. - - Returns: - list of filename which belongs to this trainer. - """ - file_num = len(filelist) - trainer_id = self.get_worker_index() - trainer_num = self.get_worker_num() - if trainer_num > file_num: - raise ValueError("trainer_num should be <= file_num : " - "%s > %s" % (trainer_num, file_num)) - # get interval of filelist, it's [ ) - start = 0 - end = 0 - for i in range(0, trainer_id + 1): - length = file_num / trainer_num + (i < (file_num % trainer_num)) - start = end - end += length - my_filelist = filelist[start:end] - return my_filelist - - def _set_opt_info(self, opt_info): - """ - this function saves the result from DistributedOptimizer.minimize() - """ - self._opt_info = opt_info - - -class DistributedOptimizer(object): - """ - DistributedOptimizer is a wrapper for paddle.fluid.optimizer - A user should pass a paddle.fluid.optimizer to DistributedOptimizer - minimize() function is implemented. - DistributedOptimizer is the starting point for a user who wants to - run distributed training. The optimized information will be stored in - Fleet() instance who holds the global information about current distributed - training. - """ - - def __init__(self, optimizer, dist_config={}): - super(DistributedOptimizer, self).__init__() - self._optimizer = optimizer - self._optimizer_name = "Distributed%s" % optimizer.type.capitalize() - if optimizer.type != "adam": - print("Currently, distributed optimizer only supports Adam" - "Will config built-in adam for you." - "We will support more functions in DistributedOptimizer", - sys.stderr) - self._optimizer_name = "DistributedAdam" - - self._distributed_optimizer = globals()[self._optimizer_name](optimizer) - - def backward(self, - loss, - startup_program=None, - parameter_list=None, - no_grad_set=None, - callbacks=None): - """ - Currently, backward function can not be called through DistributedOptimizer - """ - raise NotImplementedError() - - def apply_gradients(self, params_grads): - """ - Currently, apply_gradients function can not be called through DistributedOptimizer - """ - raise NotImplementedError() - - def minimize(self, - loss, - startup_program=None, - parameter_list=None, - no_grad_set=None): - """ - minimize a program through loss, loss can be a list in DistributedOptimizer - Args: - loss (Variable|Variable List): loss variable or loss variable list to run optimization. - startup_program (Program): startup_program for initializing parameters - in `parameter_list`. - parameter_list (list): list of Variables to update. - no_grad_set (set|None): set of Variables should be ignored. - Returns: - tuple: (optimize_ops, params_grads) which are, list of operators appended; - and list of (param, grad) Variables pair for optimization. - Note that in parameter server mode, a worker will not get anything about optimize_os - Because optmizer algorithms run on pserver side. We will make this usable in pserver - process, but currently the optimization part is written into Fleet(). A user does not - need to care about how to startup a pserver node. - """ - optimize_ops, param_grads, opt_info = \ - self._distributed_optimizer._minimize( - loss, - startup_program, - parameter_list, - no_grad_set) - - fleet_instance._set_opt_info(opt_info) - return [optimize_ops, param_grads] - - -# this is a temporary solution -# TODO(guru4elephant) -# will make this more flexible for more Parameter Server Archs -fleet_instance = Fleet() - -init = fleet_instance.init -stop = fleet_instance.stop -init_pserver = fleet_instance.init_pserver -init_worker = fleet_instance.init_worker -is_worker = fleet_instance.is_worker -is_server = fleet_instance.is_server -init_pserver_model = fleet_instance.init_pserver_model -save_pserver_model = fleet_instance.save_pserver_model -worker_num = fleet_instance.get_worker_num -server_num = fleet_instance.get_server_num -worker_index = fleet_instance.get_worker_index -split_filelist = fleet_instance.split_filelist +# limitations under the License. diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py new file mode 100644 index 0000000000..5eeac2a731 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distributed_transpiler/__init__.py @@ -0,0 +1,248 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys + +from paddle.fluid.executor import Executor + +from paddle.fluid.framework import Program +from paddle.fluid.framework import default_main_program +from paddle.fluid.framework import default_startup_program + +from paddle.fluid.optimizer import Optimizer + +import paddle.fluid.io as io + +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler + +from ...base.role_maker import Role +from ...base.fleet_base import Fleet +from ...base.fleet_base import Mode +from ...base.fleet_base import DistributedOptimizer + + +class DistributedTranspiler(Fleet): + """ + A subclass for compatibility with fluid.transpiler.DistributeTranspiler. + """ + + def __init__(self): + super(DistributedTranspiler, self).__init__(Mode.TRANSPILER) + self._transpiler = OriginTranspiler() + self._startup_program = None + self._main_program = None + + def init_worker(self, executor): + """ + `init_worker` has many many functions to do before training, + first, wait for all parameter servers launch completely. + second, run executor to initialize startup program + third, wait for all worker initialize completely. + + Args: + executor(Executor): The executor to run for init startup program. + + Returns: + None + """ + if not isinstance(executor, Executor): + raise ValueError("executor must be an instance of Executor") + + if not self._startup_program: + raise ValueError( + "startup_program is None, need invoke DistributedOptimizer.minimize first" + ) + + executor.run(self._startup_program) + + def run_worker(self, executor, main_program=None): + pass + + def init_server(self, executor, model_dir=None): + """ + `init_server` has many many functions to do before start pserver, + first, run executor to initialize startup program, + second, if the `model_dir` is not empty, it will load parameters from it for increment training. + + Args: + executor(Executor): The executor to run for init server. + model_dir(str): The directory path. + + Returns: + None + """ + if not isinstance(executor, Executor): + raise ValueError("executor must be an instance of Executor") + + if not self._startup_program: + raise ValueError( + "startup_program is None, need invoke DistributedOptimizer.minimize first" + ) + + executor.run(self._startup_program) + + if model_dir: + if not os.path.isdir(model_dir): + raise ValueError("There is no directory named '%s'", model_dir) + + io.load_persistables(executor, model_dir, self._startup_program) + + def run_server(self, executor): + """ + `run_server` execute executor to start pserver main program. + + Args: + executor(Executor): The executor to run for init server. + + Returns: + None + """ + if not isinstance(executor, Executor): + raise ValueError("executor must be an instance of Executor") + + if not self._main_program: + raise ValueError( + "main_program is None, need invoke DistributedOptimizer.minimize first" + ) + + executor.run(self._main_program) + + def stop_worker(self): + pass + + def stop(self, executor): + """ + Close this executor. + + For the distributed training, this method would free the resource on PServers related to + the current Trainer. + + Args: + executor(Executor): The executor to run for init server. + + Returns: + None + """ + + if not isinstance(executor, Executor): + raise ValueError("executor must be an instance of Executor") + executor.close() + + def distributed_optimizer(self, optimizer, strategy=None): + """ + Optimizer for distributed training. + + For the distributed training, this method would rebuild a new instance of DistributedOptimizer. + Which has basic Optimizer function and special features for distributed training. + + Args: + optimizer(Optimizer): The executor to run for init server. + strategy(dict): Extra properties for distributed optimizer. + + Returns: + TranspilerOptimizer: subclass of DistributedOptimizer. + """ + + if not isinstance(optimizer, Optimizer): + raise ValueError("optimizer must be an instance of Optimizer") + self.optimizer = TranspilerOptimizer(optimizer, strategy) + return self.optimizer + + def save_inference_model(self, + executor, + dirname, + feeded_var_names, + target_vars, + main_program=None, + export_for_deployment=True): + """ + Prune the given `main_program` to build a new program especially for inference, + and then save it and all related parameters to given `dirname` by the `executor`. + """ + io.save_inference_model(dirname, feeded_var_names, target_vars, + executor, main_program, None, None, + export_for_deployment) + + def save_persistables(self, executor, dirname, main_program=None): + """ + This function filters out all variables with `persistable==True` from the + give `main_program` and then saves these variables to the folder `dirname` + or file `filename`. + + The `dirname` is used to specify the folder where persistable variables + are going to be saved. If you would like to save variables in separate + files, set `filename` None; if you would like to save all variables in a + single file, use `filename` to specify the file name. + """ + io.save_persistables(executor, dirname, main_program, None) + + def _transpile(self, config): + if not isinstance(config, DistributeTranspilerConfig): + raise ValueError( + "config must be an instance of DistributeTranspilerConfig") + + self._transpiler = OriginTranspiler(config) + self._transpiler.transpile( + trainer_id=fleet.worker_id(), + pservers=fleet.server_endpoints, + trainers=fleet.worker_num()) + + if self.role == Role.WORKER: + self._main_program = self._transpiler.get_trainer_program() + self._startup_program = default_startup_program() + else: + self._main_program, self._startup_program = \ + self._transpiler.get_pserver_programs(self.current_endpoint) + + +fleet = DistributedTranspiler() + + +class TranspilerOptimizer(DistributedOptimizer): + def __init__(self, optimizer, strategy=None): + super(TranspilerOptimizer, self).__init__(optimizer, strategy) + + if strategy and not isinstance(strategy, DistributeTranspilerConfig): + raise ValueError( + "In {} mode, strategy must be an instance of DistributeTranspilerConfig". + format(fleet.mode)) + + def backward(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None, + callbacks=None): + return self._optimizer.backward(loss, startup_program, parameter_list, + no_grad_set, callbacks) + + def apply_gradients(self, params_grads): + return self._optimizer.apply_gradients(params_grads) + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + optimize_ops, params_grads = self._optimizer.minimize( + loss, startup_program, parameter_list, no_grad_set) + self.transpile() + return optimize_ops, params_grads + + def transpile(self): + if self._strategy is None: + self._strategy = DistributeTranspilerConfig() + + fleet._transpile(config=self._strategy) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py new file mode 100644 index 0000000000..b472c20bc1 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -0,0 +1,273 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +import sys +from .optimizer_factory import * +from google.protobuf import text_format + +import paddle.fluid as fluid +from paddle.fluid.framework import Program + +from ...base.fleet_base import Fleet +from ...base.fleet_base import Mode +from ...base.role_maker import MPISymetricRoleMaker +from ...base.fleet_base import DistributedOptimizer + + +class PSLib(Fleet): + def __init__(self): + super(PSLib, self).__init__(Mode.PSLIB) + self._opt_info = None + self.local_ip_ = 0 + self._fleet_ptr = None + + def init(self, role_maker=None): + super(PSLib, self).init(MPISymetricRoleMaker()) + self._fleet_ptr = fluid.core.Fleet() + + def init_worker(self, executor): + pass + + def run_worker(self, executor, main_program=None): + """ + init_worker(): will be called by user. When a user knows current process is_server(), he/she + should call init_worker() to initialize global information about worker and connect + worker with pserver. You should run startup program before init_worker. + + Args: + programs(Program|list): a Program or a list of Programs + scopes(Scope|list): a Scope or a list of Scopes, default None. + """ + if not isinstance(main_program, Program): + raise ValueError("main_program must be an instance of Program") + + programs = [main_program] + scopes = [fluid.global_scope()] * len(programs) + + if len(scopes) != len(programs): + print( + "You should make sure len(scopes) == len(programs) or set scopes None" + ) + sys.exit(-1) + if self._opt_info: + if "fleet_desc" in self._opt_info: + self._dist_desc_str = text_format.MessageToString( + self._opt_info["fleet_desc"]) + self._dist_desc = self._opt_info["fleet_desc"] + else: + print("You should run DistributedOptimizer.minimize() first") + sys.exit(-1) + # barrier_all for init_server, wait for server starts + self.role_maker_._barrier_all() + self.all_ips_ = self.role_maker_._all_gather(self.local_ip_) + self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_, + self.role_maker_._get_size(), + self.role_maker_._get_rank()) + # barrier_all for init_worker + self.role_maker_._barrier_all() + # prepare for client to client communication + info = self._fleet_ptr.get_clients_info() + all_info = self.role_maker_._worker_gather(info[0]) + self._fleet_ptr.gather_clients(all_info) + self._fleet_ptr.create_client2client_connection() + # barrier for init model + self.role_maker_._barrier_worker() + if self.role_maker_._is_first_worker(): + tables = self._dist_desc.trainer_param.dense_table + for prog, scope in zip(programs, scopes): + prog_id = str(id(prog)) + prog_conf = self._opt_info['program_configs'][prog_id] + prog_tables = {} + for key in prog_conf: + if "dense" not in key: + continue + for table_id in prog_conf[key]: + prog_tables[int(table_id)] = 0 + for table in tables: + if int(table.table_id) not in prog_tables: + continue + var_name_list = [] + for i in range(0, len(table.dense_variable_name)): + var_name = table.dense_variable_name[i] + if scope.find_var(var_name) is None: + print("var " + var_name + + " not found in scope, " + + "you should run startup program first") + sys.exit(-1) + var_name_list.append(var_name) + self._fleet_ptr.init_model(scope, + int(table.table_id), + var_name_list) + # barrier for init model done + self.role_maker_._barrier_worker() + else: + raise NameError( + "You should run DistributedOptimizer.minimize() first") + + def init_server(self, executor, model_dir=None): + pass + + def run_server(self, executor): + """ + init_pserver(): will be called by user. When a user knows current process is_worker(), he/she + should call init_pserver() to initialize global information about parameter server + """ + if self._opt_info: + if "fleet_desc" in self._opt_info: + self._dist_desc_str = text_format.MessageToString( + self._opt_info["fleet_desc"]) + self._dist_desc = self._opt_info["fleet_desc"] + else: + print("You should run DistributedOptimizer.minimize() first") + sys.exit(-1) + self._fleet_ptr.init_server(self._dist_desc_str, + self.role_maker_._get_rank()) + self.local_ip_ = self._fleet_ptr.run_server() + + # barrier_all for init_server + self.role_maker_._barrier_all() + self.all_ips_ = self.role_maker_._all_gather(self.local_ip_) + + self._fleet_ptr.gather_servers(self.all_ips_, + self.role_maker_._get_size()) + # barrier_all for init_worker, wait all workers start + self.role_maker_._barrier_all() + else: + raise NameError( + "You should run DistributedOptimizer.minimize() first") + + def stop_worker(self): + """ + stop(): will be called after a user finishes his/her training task. Fleet instance will be + destroyed when stop() is called. + """ + self.role_maker_._barrier_worker() + if self.role_maker_._is_first_worker(): + self._fleet_ptr.stop_server() + self.role_maker_._barrier_worker() + self.role_maker_._barrier_all() + self.role_maker_._finalize() + + def stop(self, executor): + """ + stop(): will be called after a user finishes his/her training task. Fleet instance will be + destroyed when stop() is called. + """ + self.role_maker_._barrier_worker() + if self.role_maker_._is_first_worker(): + self._fleet_ptr.stop_server() + self.role_maker_._barrier_worker() + self.role_maker_._barrier_all() + self.role_maker_._finalize() + + def distributed_optimizer(self, optimizer, strategy=None): + self.optimizer = DownpourOptimizer(optimizer, strategy) + return self.optimizer + + def save_inference_model(self, + executor, + dirname, + feeded_var_names=None, + target_vars=None, + main_program=None, + export_for_deployment=True): + """ + save pserver model called from a worker + """ + self._fleet_ptr.save_model(dirname) + + def save_persistables(self, executor, dirname, main_program=None): + self._fleet_ptr.save_model(dirname) + + def _set_opt_info(self, opt_info): + """ + this function saves the result from DistributedOptimizer.minimize() + """ + self._opt_info = opt_info + + +fleet = PSLib() + + +class DownpourOptimizer(DistributedOptimizer): + """ + DistributedOptimizer is a wrapper for paddle.fluid.optimizer + A user should pass a paddle.fluid.optimizer to DistributedOptimizer + minimize() function is implemented. + DistributedOptimizer is the starting point for a user who wants to + run distributed training. The optimized information will be stored in + Fleet() instance who holds the global information about current distributed + training. + """ + + def __init__(self, optimizer, strategy=None): + super(DownpourOptimizer, self).__init__(optimizer, strategy) + + self._optimizer = optimizer + self._optimizer_name = "Distributed%s" % optimizer.type.capitalize() + if optimizer.type != "adam": + print("Currently, distributed optimizer only support Adam" + "Will config built-in adam for you." + "We will support more functions in DistributedOptimizer", + sys.stderr) + self._optimizer_name = "DistributedAdam" + + self._distributed_optimizer = globals()[self._optimizer_name](optimizer) + + def backward(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None, + callbacks=None): + """ + Currently, backward function can not be called through DistributedOptimizer + """ + raise NotImplementedError() + + def apply_gradients(self, params_grads): + """ + Currently, apply_gradients function can not be called through DistributedOptimizer + """ + raise NotImplementedError() + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + """ + minimize a program through loss, loss can be a list in DistributedOptimizer + Args: + loss (Variable|Variable List): loss variable or loss variable list to run optimization. + startup_program (Program): startup_program for initializing parameters + in `parameter_list`. + parameter_list (list): list of Variables to update. + no_grad_set (set|None): set of Variables should be ignored. + Returns: + tuple: (optimize_ops, params_grads) which are, list of operators appended; + and list of (param, grad) Variables pair for optimization. + Note that in parameter server mode, a worker will not get anything about optimize_os + Because optmizer algorithms run on pserver side. We will make this usable in pserver + process, but currently the optimization part is written into Fleet(). A user does not + need to care about how to startup a pserver node. + """ + optimize_ops, param_grads, opt_info = \ + self._distributed_optimizer._minimize( + loss, + startup_program, + parameter_list, + no_grad_set) + + fleet._set_opt_info(opt_info) + return [optimize_ops, param_grads] diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py similarity index 100% rename from python/paddle/fluid/incubate/fleet/parameter_server/node.py rename to python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py similarity index 100% rename from python/paddle/fluid/incubate/fleet/parameter_server/optimizer_factory.py rename to python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ps_pb2.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/ps_pb2.py similarity index 100% rename from python/paddle/fluid/incubate/fleet/parameter_server/ps_pb2.py rename to python/paddle/fluid/incubate/fleet/parameter_server/pslib/ps_pb2.py diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index a5d8cd4660..a64cfe5cbf 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -52,6 +52,7 @@ class TestDistRunnerBase(object): # NOTE: import fluid until runtime, or else forking processes will cause error. config = fluid.DistributeTranspilerConfig() config.enable_dc_asgd = dc_asgd + config.sync_mode = sync_mode # config.runtime_split_send_recv = True t = fluid.DistributeTranspiler(config=config) t.transpile( @@ -59,7 +60,6 @@ class TestDistRunnerBase(object): program=main_program, pservers=pserver_endpoints, trainers=trainers, - sync_mode=sync_mode, current_endpoint=current_endpoint) return t diff --git a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py index a0358f8b40..e940359b36 100644 --- a/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py +++ b/python/paddle/fluid/tests/unittests/test_listen_and_serv_op.py @@ -43,12 +43,11 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): pserver_endpoints = ip + ":" + port current_endpoint = ip + ":" + port - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id, - pservers=pserver_endpoints, - trainers=trainers, - sync_mode=sync_mode) + + config = fluid.DistributeTranspilerConfig() + config.sync_mode = sync_mode + t = fluid.DistributeTranspiler(config=config) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) exe.run(pserver_startup) @@ -77,13 +76,11 @@ def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, pserver_endpoints = ps1 + "," + ps2 config = fluid.DistributeTranspilerConfig() + config.sync_mode = sync_mode config.slice_var_up = False + t = fluid.DistributeTranspiler(config=config) - t.transpile( - trainer_id, - pservers=pserver_endpoints, - trainers=trainers, - sync_mode=sync_mode) + t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers) pserver_prog = t.get_pserver_program(ps2) # pserver2 have no parameter diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 19a1f8bf74..e6963f5461 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -158,6 +158,7 @@ class DistributeTranspilerConfig(object): wait_port = True # split the send recv var in runtime runtime_split_send_recv = False + sync_mode = None class DistributeTranspiler(object): @@ -329,7 +330,7 @@ class DistributeTranspiler(object): return self.trainer_num = trainers - self.sync_mode = sync_mode + self.sync_mode = self.config.sync_mode if self.config.sync_mode else sync_mode self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") self.pserver_endpoints = pserver_endpoints diff --git a/python/setup.py.in b/python/setup.py.in index eef8afac65..1180c1f69f 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -127,7 +127,9 @@ packages=['paddle', 'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.fleet.base', 'paddle.fluid.incubate.fleet.parameter_server', - 'paddle.fluid.incubate.fleet.p2p'] + 'paddle.fluid.incubate.fleet.parameter_server.distributed_transpiler', + 'paddle.fluid.incubate.fleet.parameter_server.pslib', + 'paddle.fluid.incubate.fleet.collective'] with open('@PADDLE_SOURCE_DIR@/python/requirements.txt') as f: setup_requires = f.read().splitlines() -- GitLab