未验证 提交 565d3095 编写于 作者: T tangwei12 提交者: GitHub

Reformat fleet API (#17135)

* fix some logic in distributed transpiler, test=develop
* reformat fleet API, test=develop
上级 a88a1faa
......@@ -15,18 +15,21 @@
from __future__ import print_function
import abc
import sys
from enum import Enum
from paddle.fluid.optimizer import SGD
from paddle.fluid.executor import Executor
from role_maker import RoleMakerBase, Role
from role_maker import RoleMakerBase
from role_maker import MPISymetricRoleMaker
from role_maker import UserDefinedRoleMaker
class Mode(Enum):
"""
There are various mode for fleet, each of them is designed for different model.
"""
TRANSPILER = 1,
PSLIB = 2,
COLLECTIVE = 3
......@@ -46,17 +49,11 @@ class Fleet(object):
def __init__(self, mode):
assert isinstance(mode, Mode)
self.is_initialized = False
self.mode = mode
self.workers = 0
self.servers = 0
self.worker_endpoints = []
self.server_endpoints = []
self.role = Role.WORKER
self.current_endpoint = None
self.current_id = 0
self.optimizer = None
self.role_maker_ = None
self._is_initialized = False
self._mode = mode
self._optimizer = None
self._role_maker = None
self._executor = None
def is_first_worker(self):
"""
......@@ -66,25 +63,25 @@ class Fleet(object):
bool: True if this is the first node of worker,
False if not.
"""
return self.is_worker() and self.current_id == 0
return self._role_maker.is_first_worker()
def worker_id(self):
def worker_index(self):
"""
Get current worker id.
Get current worker index.
Returns:
int: node id
"""
return self.current_id
return self._role_maker.worker_index()
def get_workers(self):
def worker_num(self):
"""
Get current total worker number.
Returns:
int: worker number
"""
return self.workers
return len(self._role_maker.get_trainer_endpoints())
def is_worker(self):
"""
......@@ -94,7 +91,51 @@ class Fleet(object):
bool: True if this is a node of worker,
False if not.
"""
return self.role == Role.WORKER
return self._role_maker.is_worker()
def worker_endpoints(self, to_string=False):
"""
Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
Returns:
list/string: server endpoints
"""
if to_string:
return ",".join(self._role_maker.get_trainer_endpoints())
else:
return self._role_maker.get_trainer_endpoints()
def server_num(self):
"""
Get current total worker number.
Returns:
int: server number
"""
return len(self._role_maker.get_pserver_endpoints())
def server_index(self):
"""
Get current server index.
Returns:
int: node id
"""
return self._role_maker.server_index()
def server_endpoints(self, to_string=False):
"""
Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
Returns:
list/string: server endpoints
"""
if to_string:
return ",".join(self._role_maker.get_pserver_endpoints())
else:
return self._role_maker.get_pserver_endpoints()
def is_server(self):
"""
......@@ -104,7 +145,7 @@ class Fleet(object):
bool: True if this is a node of server,
False if not.
"""
return self.role == Role.SERVER
return self._role_maker.is_server()
def split_files(self, files):
"""
......@@ -119,8 +160,8 @@ class Fleet(object):
list: files belongs to this worker.
"""
file_num = len(files)
trainer_id = self.worker_id()
trainer_num = self.get_workers()
trainer_id = self.worker_index()
trainer_num = self.worker_num()
if trainer_num > file_num:
raise ValueError("trainer_num should be <= file_num : "
"%s > %s" % (trainer_num, file_num))
......@@ -132,66 +173,49 @@ class Fleet(object):
end += length
return files[start:end]
def init(self, role_maker=None):
def init(self, executor, role_maker=None):
"""
should be called only once in user's python scripts,
init() will initialize RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.
Args:
executor(Executor): The executor to run fleet.
role_maker(RoleMakerBase): subclass of RoleMakerBase.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if role_maker and not isinstance(role_maker, RoleMakerBase):
raise ValueError("role_maker must be an instance of RoleMakerBase")
self.role_maker_ = role_maker
if isinstance(role_maker, MPISymetricRoleMaker):
self.role_maker_._generate_role()
self.role = Role.WORKER if role_maker._is_worker() else Role.SERVER
self.workers = role_maker._worker_num()
self.servers = role_maker._server_num()
self.server_endpoints = role_maker._get_pserver_endpoints()
self.worker_endpoints = role_maker._get_trainer_endpoints()
self.current_id = role_maker._worker_index(
) if role_maker._is_worker() else role_maker._server_index()
self.current_endpoint = self.worker_endpoints[self.current_id] \
if role_maker._is_worker() else self.server_endpoints[self.current_id]
self._role_maker = role_maker
self._role_maker.generate_role()
elif isinstance(role_maker, UserDefinedRoleMaker):
self.current_id = role_maker.current_id
self.current_endpoint = role_maker.current_endpoint
self.workers = role_maker.workers
self.worker_endpoints = role_maker.worker_endpoints
self.servers = role_maker.servers
self.server_endpoints = role_maker.server_endpoints
self.role = role_maker.role
self._role_maker = role_maker
else:
raise ValueError(
"role_maker must be an instance of UserDefinedRoleMaker/MPISymetricRoleMaker"
)
self.is_initialized = True
self._is_initialized = True
@abc.abstractmethod
def init_worker(self, executor):
def init_worker(self):
pass
@abc.abstractmethod
def run_worker(self, executor, main_program=None):
def init_server(self, model_dir=None):
pass
@abc.abstractmethod
def init_server(self, executor, model_dir=None):
pass
@abc.abstractmethod
def run_server(self, executor):
def run_server(self, ):
pass
@abc.abstractmethod
......@@ -199,7 +223,7 @@ class Fleet(object):
pass
@abc.abstractmethod
def stop(self, executor):
def stop(self):
pass
@abc.abstractmethod
......@@ -208,7 +232,6 @@ class Fleet(object):
@abc.abstractmethod
def save_inference_model(self,
executor,
dirname,
feeded_var_names,
target_vars,
......@@ -217,21 +240,9 @@ class Fleet(object):
pass
@abc.abstractmethod
def save_persistables(self, executor, dirname, main_program=None):
def save_persistables(self, dirname, main_program=None):
pass
def to_string(self):
infos = """
mode = {}
workers = {}
server_endpoints = {}
role = {}
current_endpoint = {}
current_id = {}
""".format(self.mode, self.workers, self.server_endpoints, self.role,
self.current_endpoint, self.current_id)
return infos
class DistributedOptimizer(object):
"""
......@@ -245,7 +256,7 @@ class DistributedOptimizer(object):
Args:
optimizer(Optimizer): subclass of Optimizer.
strategy(dict): the user define config for Optimizer.
strategy(any): the user define config for Optimizer.
Returns:
None
......@@ -257,9 +268,6 @@ class DistributedOptimizer(object):
if not isinstance(optimizer, SGD.__bases__):
raise ValueError("optimizer must be an instance of Optimizer")
if strategy and not isinstance(strategy, dict):
raise ValueError("strategy must be an instance of Dict")
self._optimizer = optimizer
self._strategy = strategy
......@@ -317,8 +325,9 @@ class DistributedOptimizer(object):
@abc.abstractmethod
def minimize(self,
loss,
startup_program=None,
losses,
scopes=None,
startup_programs=None,
parameter_list=None,
no_grad_set=None):
"""
......@@ -328,8 +337,9 @@ class DistributedOptimizer(object):
`apply_gradients()` into one.
Args:
loss (Variable): loss variable to run optimizations.
startup_program (Program): startup_program for initializing parameters
losses (Variable|Variable List): loss variable to run optimizations.
scopes (Scope| Scope List): scope instance.
startup_programs (Program|Program List): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
......
......@@ -11,10 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from __future__ import print_function
from enum import Enum
__all__ = [
'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker'
]
class Role(Enum):
WORKER = 1,
......@@ -30,47 +34,62 @@ class RoleMakerBase(object):
"""
def __init__(self):
self._trainer_endpoints = []
self._pserver_endpoints = []
self._worker_endpoints = []
self._server_endpoints = []
self._role_is_generated = False
self._role = None
self._current_id = -1
def _is_worker(self):
def is_worker(self):
"""
return is_worker() of current process
"""
raise NotImplementedError("Please implement this method in child class")
def _is_server(self):
def is_server(self):
"""
return is_server() of current process
"""
raise NotImplementedError("Please implement this method in child class")
def _get_local_ip(self):
def is_first_worker(self):
"""
return get local ip
Check whether the node is the first instance of worker.
Returns:
bool: True if this is the first node of worker,
False if not.
"""
import socket
self._ip = socket.gethostbyname(socket.gethostname())
return self._ip
raise NotImplementedError("Please implement this method in child class")
def _get_trainer_endpoints(self):
def worker_index(self):
"""
return trainer endpoints
Get current worker id.
Returns:
int: node id
"""
return self._trainer_endpoints
raise NotImplementedError("Please implement this method in child class")
def _get_pserver_endpoints(self):
def server_index(self):
"""
return pserver endpoints
Get current server id.
Returns:
int: node id
"""
return self._pserver_endpoints
raise NotImplementedError("Please implement this method in child class")
def _generate_role(self):
def get_trainer_endpoints(self):
"""
generate_role() should be called to identify current process's role
return trainer endpoints
"""
raise NotImplementedError("Please implement this method in child class")
return self._worker_endpoints
def get_pserver_endpoints(self):
"""
return pserver endpoints
"""
return self._server_endpoints
class MPIRoleMaker(RoleMakerBase):
......@@ -82,9 +101,11 @@ class MPIRoleMaker(RoleMakerBase):
def __init__(self):
super(MPIRoleMaker, self).__init__()
from mpi4py import MPI
self._comm = MPI.COMM_WORLD
self.MPI = MPI
self._comm = MPI.COMM_WORLD
self._node_type_comm = None
self._ips = None
self._ip = None
def _get_rank(self):
"""
......@@ -111,7 +132,7 @@ class MPIRoleMaker(RoleMakerBase):
"""
worker_gather(obj) will call MPI's allgather function
"""
if self._is_worker():
if self.is_worker():
self._node_type_comm.barrier()
return self._node_type_comm.allgather(obj)
return None
......@@ -122,19 +143,33 @@ class MPIRoleMaker(RoleMakerBase):
"""
self._comm.barrier()
def _finalize(self):
"""
finalize the current MPI instance.
"""
pass
def _get_ips(self):
"""
collect current distributed job's ip list
"""
if self._ips == None:
self._ips = self._comm.allgather(self._get_local_ip())
if not self._ips:
self._ips = self._comm.allgather(self.get_local_ip())
return self._ips
def _finalize(self):
def get_local_ip(self):
"""
finalize the current MPI instance.
return get local ip
"""
pass
import socket
self._ip = socket.gethostbyname(socket.gethostname())
return self._ip
def generate_role(self):
"""
generate_role() should be called to identify current process's role
"""
raise NotImplementedError("Please implement this method in child class")
class MPISymetricRoleMaker(MPIRoleMaker):
......@@ -151,20 +186,18 @@ class MPISymetricRoleMaker(MPIRoleMaker):
def _check_role_generation(self):
if not self._role_is_generated:
sys.stderr.write("generate_role() should be called first")
sys.exit(-1)
return False
raise NameError("generate_role() should be called first")
return True
def _is_first_worker(self):
def is_first_worker(self):
"""
return whether current process is the first worker assigned by role maker
"""
if self._check_role_generation():
return self._is_worker() and 0 == self._worker_index()
return self.is_worker() and 0 == self.worker_index()
return False
def _is_worker(self):
def is_worker(self):
"""
return whether current process is worker assigned by role maker
"""
......@@ -172,7 +205,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
return self._node_type == 1
return False
def _is_server(self):
def is_server(self):
"""
return whether current process is server assigned by role maker
"""
......@@ -185,7 +218,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
return the current number of worker
"""
if self._check_role_generation():
if self._is_worker():
if self.is_worker():
return self._get_size() / 2
return 0
......@@ -194,11 +227,11 @@ class MPISymetricRoleMaker(MPIRoleMaker):
return the current number of server
"""
if self._check_role_generation():
if self._is_server():
if self.is_server():
return self._get_size() / 2
return 0
def _worker_index(self):
def worker_index(self):
"""
return the index of worker
"""
......@@ -206,7 +239,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
return self._rank / self._proc_per_node
return 0
def _server_index(self):
def server_index(self):
"""
return the index of server
"""
......@@ -219,7 +252,7 @@ class MPISymetricRoleMaker(MPIRoleMaker):
barrier all workers in current distributed job
"""
if self._check_role_generation():
if self._is_worker():
if self.is_worker():
self._node_type_comm.barrier()
def _barrier_server(self):
......@@ -227,17 +260,17 @@ class MPISymetricRoleMaker(MPIRoleMaker):
barrier all servers in current distributed job
"""
if self._check_role_generation():
if self._is_server():
if self.is_server():
self._node_type_comm.barrier()
def _generate_role(self):
def generate_role(self):
"""
generate currently process's role
"""
if not self._role_is_generated:
# TODO(guru4elephant): only allow to be called once
self._trainer_endpoints = self._get_ips()
self._pserver_endpoints = self._get_ips()
self._worker_endpoints = self._get_ips()
self._server_endpoints = self._get_ips()
if 0 == self._get_rank() % self._proc_per_node % 2:
self._node_type = 0
......@@ -250,12 +283,9 @@ class MPISymetricRoleMaker(MPIRoleMaker):
class UserDefinedRoleMaker(RoleMakerBase):
def __init__(self,
current_id=0,
current_endpoint=None,
workers=0,
worker_endpoints=None,
servers=0,
server_endpoints=None,
role=Role.WORKER):
role=Role.WORKER,
worker_num=0,
server_endpoints=None):
"""
UserDefinedRoleMaker is designed for worker and server assignment
under manual. Typically, a worker and a server node will be appointed
......@@ -263,19 +293,22 @@ class UserDefinedRoleMaker(RoleMakerBase):
"""
super(UserDefinedRoleMaker, self).__init__()
self.current_id = current_id
self.current_endpoint = current_endpoint
self.workers = workers
self.worker_endpoints = worker_endpoints
self.servers = servers
self.server_endpoints = server_endpoints
self.role = role
self._current_id = current_id
self._role = role
self._worker_num = worker_num
self._server_endpoints = server_endpoints
def is_worker(self):
return self._role == Role.WORKER
def is_server(self):
return self._role == Role.SERVER
def _is_worker(self):
return self.role == Role.WORKER
def is_first_worker(self):
return self._role == Role.WORKER and self._current_id == 0
def _is_server(self):
return self.role == Role.SERVER
def worker_index(self):
return self._current_id
def _generate_role(self):
self.role_is_generated_ = True
def server_index(self):
return self._current_id
......@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import sys
import logging
import paddle.fluid as fluid
......@@ -26,37 +25,21 @@ from ..base.fleet_base import DistributedOptimizer
class Collective(Fleet):
def __init__(self):
super(Collective, self).__init__(Mode.COLLECTIVE)
self.local_ip_ = 0
self._local_ip = 0
def init(self, role_maker=None):
"""
should be called only once in user's python scripts,
init() will initialize RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.
Args:
role_maker(RoleMakerBase): subclass of RoleMakerBase.
Returns:
None
"""
super(Collective, self).init(role_maker)
self._role_maker._generate_role()
def init_worker(self, executor):
def init_worker(self):
logging.warn(
"You should not call 'init_worker' method for collective mode.")
def run_worker(self, executor, main_program=None):
def run_worker(self, main_programs=None, scopes=None):
logging.warn(
"You should not call 'run_worker' method for collective mode.")
def init_server(self, executor, model_dir=None):
def init_server(self, model_dir=None):
logging.warn(
"You should not call 'init_server' method for collective mode.")
def run_server(self, executor):
def run_server(self):
logging.warn(
"You should not call 'run_server' method for collective mode.")
......@@ -64,29 +47,28 @@ class Collective(Fleet):
logging.warn(
"You should not call 'stop_worker' method for collective mode.")
def stop(self, executor):
def stop(self):
"""
stop(): will be called after a user finishes his/her training task.
"""
logging.warn("You should not call 'stop' method for collective mode.")
def distributed_optimizer(self, optimizer, strategy=None):
self.optimizer = CollectiveOptimizer(optimizer, strategy)
return self.optimizer
self._optimizer = CollectiveOptimizer(optimizer, strategy)
return self._optimizer
def save_inference_model(self,
executor,
dirname,
feeded_var_names=None,
target_vars=None,
main_program=None,
export_for_deployment=True):
io.save_inference_model(dirname, feeded_var_names, target_vars,
executor, main_program, None, None,
self._executor, main_program, None, None,
export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None):
io.save_persistables(executor, dirname, main_program, None)
def save_persistables(self, dirname, main_program=None):
io.save_persistables(self._executor, dirname, main_program, None)
fleet = Collective()
......@@ -143,9 +125,9 @@ class CollectiveOptimizer(DistributedOptimizer):
optimize_ops, param_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
worker_endpoints = fleet.worker_endpoints
trainer_id = fleet.current_id
current_endpoint = fleet.current_endpoint
worker_endpoints = fleet.worker_endpoints()
trainer_id = fleet.worker_index()
current_endpoint = fleet.worker_endpoints()[trainer_id]
startup_program = startup_program if startup_program else \
fluid.framework.default_startup_program
......
......@@ -12,12 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from paddle.fluid.executor import Executor
from paddle.fluid.framework import Program
from paddle.fluid.framework import default_main_program
from paddle.fluid.framework import default_startup_program
from paddle.fluid.optimizer import Optimizer
......@@ -27,7 +22,6 @@ import paddle.fluid.io as io
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler
from ...base.role_maker import Role
from ...base.fleet_base import Fleet
from ...base.fleet_base import Mode
from ...base.fleet_base import DistributedOptimizer
......@@ -44,101 +38,75 @@ class DistributedTranspiler(Fleet):
self._startup_program = None
self._main_program = None
def init_worker(self, executor):
def init_worker(self):
"""
`init_worker` has many many functions to do before training,
first, wait for all parameter servers launch completely.
second, run executor to initialize startup program
third, wait for all worker initialize completely.
Args:
executor(Executor): The executor to run for init startup program.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if not self._startup_program:
raise ValueError(
"startup_program is None, need invoke DistributedOptimizer.minimize first"
)
executor.run(self._startup_program)
pass
def run_worker(self, executor, main_program=None):
def run_worker(self, main_programs=None, scopes=None):
pass
def init_server(self, executor, model_dir=None):
def init_server(self, model_dir=None):
"""
`init_server` has many many functions to do before start pserver,
first, run executor to initialize startup program,
second, if the `model_dir` is not empty, it will load parameters from it for increment training.
Args:
executor(Executor): The executor to run for init server.
model_dir(str): The directory path.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if not self._startup_program:
raise ValueError(
"startup_program is None, need invoke DistributedOptimizer.minimize first"
)
executor.run(self._startup_program)
self._executor.run(self._startup_program)
if model_dir:
if not os.path.isdir(model_dir):
raise ValueError("There is no directory named '%s'", model_dir)
io.load_persistables(executor, model_dir, self._startup_program)
io.load_persistables(self._executor, model_dir,
self._startup_program)
def run_server(self, executor):
def run_server(self):
"""
`run_server` execute executor to start pserver main program.
Args:
executor(Executor): The executor to run for init server.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if not self._main_program:
raise ValueError(
"main_program is None, need invoke DistributedOptimizer.minimize first"
)
executor.run(self._main_program)
self._executor.run(self._main_program)
def stop_worker(self):
pass
def stop(self, executor):
def stop(self):
"""
Close this executor.
For the distributed training, this method would free the resource on PServers related to
the current Trainer.
Args:
executor(Executor): The executor to run for init server.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
executor.close()
self._executor.close()
def distributed_optimizer(self, optimizer, strategy=None):
"""
......@@ -157,11 +125,10 @@ class DistributedTranspiler(Fleet):
if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")
self.optimizer = TranspilerOptimizer(optimizer, strategy)
return self.optimizer
self._optimizer = TranspilerOptimizer(optimizer, strategy)
return self._optimizer
def save_inference_model(self,
executor,
dirname,
feeded_var_names,
target_vars,
......@@ -172,10 +139,10 @@ class DistributedTranspiler(Fleet):
and then save it and all related parameters to given `dirname` by the `executor`.
"""
io.save_inference_model(dirname, feeded_var_names, target_vars,
executor, main_program, None, None,
self._executor, main_program, None, None,
export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None):
def save_persistables(self, dirname, main_program=None):
"""
This function filters out all variables with `persistable==True` from the
give `main_program` and then saves these variables to the folder `dirname`
......@@ -186,38 +153,56 @@ class DistributedTranspiler(Fleet):
files, set `filename` None; if you would like to save all variables in a
single file, use `filename` to specify the file name.
"""
io.save_persistables(executor, dirname, main_program, None)
io.save_persistables(self._executor, dirname, main_program, None)
def _transpile(self, config):
if not isinstance(config, DistributeTranspilerConfig):
raise ValueError(
"config must be an instance of DistributeTranspilerConfig")
self._transpiler = OriginTranspiler(config)
self._transpiler.transpile(
trainer_id=fleet.worker_id(),
pservers=fleet.server_endpoints,
trainer_id=fleet.worker_index(),
pservers=fleet.server_endpoints(to_string=True),
trainers=fleet.worker_num())
if self.role == Role.WORKER:
if self.is_worker():
self._main_program = self._transpiler.get_trainer_program()
self._startup_program = default_startup_program()
else:
self._main_program, self._startup_program = \
self._transpiler.get_pserver_programs(self.current_endpoint)
self._transpiler.get_pserver_programs(self.server_endpoints(self.server_index()))
fleet = DistributedTranspiler()
class TranspilerOptimizer(DistributedOptimizer):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
Args:
optimizer(Optimizer): subclass of Optimizer.
strategy(DistributeTranspilerConfig): instance of DistributeTranspilerConfig.
Returns:
None
"""
def __init__(self, optimizer, strategy=None):
super(TranspilerOptimizer, self).__init__(optimizer, strategy)
if strategy and not isinstance(strategy, DistributeTranspilerConfig):
raise ValueError(
"In {} mode, strategy must be an instance of DistributeTranspilerConfig".
format(fleet.mode))
if strategy:
if not isinstance(strategy, DistributeTranspilerConfig):
raise ValueError(
"In {} mode, strategy must be an instance of DistributeTranspilerConfig".
format(fleet._mode))
else:
self._strategy = strategy
else:
self._strategy = DistributeTranspilerConfig()
def backward(self,
loss,
......@@ -225,24 +210,68 @@ class TranspilerOptimizer(DistributedOptimizer):
parameter_list=None,
no_grad_set=None,
callbacks=None):
"""
First part of `minimize`, do auto-diff to append backward ops for
the current program.
Args:
loss (Variable): loss variable to run optimizations.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
callbacks (list|None): list of callables to run when appending backward
operator for one parameter.
Return:
list: list of (param, grad) pair, grad is the output of backward.
Examples:
See examples in `apply_gradients`.
"""
return self._optimizer.backward(loss, startup_program, parameter_list,
no_grad_set, callbacks)
def apply_gradients(self, params_grads):
"""
Second part of `minimize`, appending optimization operators for
given `params_grads` pairs.
Args:
params_grads (list): list of (param, grad) pair to do optimization.
Returns:
list: A list of operators appended to the current program.
Examples:
.. code-block:: python
loss = network()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
params_grads = optimizer.backward(loss)
# you may append operations for params_grads here
# ...
optimizer.apply_gradients(params_grads)
"""
return self._optimizer.apply_gradients(params_grads)
def minimize(self,
loss,
scope=None,
startup_program=None,
parameter_list=None,
no_grad_set=None):
optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
self.transpile()
return optimize_ops, params_grads
def transpile(self):
if self._strategy is None:
self._strategy = DistributeTranspilerConfig()
if isinstance(loss, list):
raise ValueError(
"DistributedTranspiler's minimize can not accept loss with list")
if isinstance(startup_program, list):
raise ValueError(
"DistributedTranspiler's minimize can not accept program with list"
)
optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
fleet._transpile(config=self._strategy)
return optimize_ops, params_grads
......@@ -28,63 +28,56 @@ class PSLib(Fleet):
def __init__(self):
super(PSLib, self).__init__(Mode.PSLIB)
self._opt_info = None
self.local_ip_ = 0
self._local_ip = 0
self._fleet_ptr = None
self._main_programs = []
self._scopes = []
def init(self, role_maker=None):
super(PSLib, self).init(MPISymetricRoleMaker())
def init(self, executor, role_maker=None):
super(PSLib, self).init(executor, MPISymetricRoleMaker())
self._fleet_ptr = fluid.core.Fleet()
def init_worker(self, executor):
pass
def run_worker(self, executor, main_program=None):
def init_worker(self):
"""
init_worker(): will be called by user. When a user knows current process is_server(), he/she
should call init_worker() to initialize global information about worker and connect
worker with pserver. You should run startup program before init_worker.
Args:
programs(Program|list): a Program or a list of Programs
scopes(Scope|list): a Scope or a list of Scopes, default None.
executor(Executor): The executor to run for init server.
programs(Program|None): The program that need to run.
"""
if not isinstance(main_program, Program):
raise ValueError("main_program must be an instance of Program")
programs = [main_program]
scopes = [fluid.global_scope()] * len(programs)
if len(self._main_programs) == 0:
raise ValueError(
"You should run DistributedOptimizer.minimize() first")
if len(scopes) != len(programs):
print(
"You should make sure len(scopes) == len(programs) or set scopes None"
)
sys.exit(-1)
if self._opt_info:
if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString(
self._opt_info["fleet_desc"])
self._dist_desc = self._opt_info["fleet_desc"]
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
raise Exception(
"You should run DistributedOptimizer.minimize() first")
# barrier_all for init_server, wait for server starts
self.role_maker_._barrier_all()
self.all_ips_ = self.role_maker_._all_gather(self.local_ip_)
self._role_maker._barrier_all()
self.all_ips_ = self._role_maker._all_gather(self._local_ip)
self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_,
self.role_maker_._get_size(),
self.role_maker_._get_rank())
self._role_maker._get_size(),
self._role_maker._get_rank())
# barrier_all for init_worker
self.role_maker_._barrier_all()
self._role_maker._barrier_all()
# prepare for client to client communication
info = self._fleet_ptr.get_clients_info()
all_info = self.role_maker_._worker_gather(info[0])
all_info = self._role_maker._worker_gather(info[0])
self._fleet_ptr.gather_clients(all_info)
self._fleet_ptr.create_client2client_connection()
# barrier for init model
self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker():
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
tables = self._dist_desc.trainer_param.dense_table
for prog, scope in zip(programs, scopes):
for prog, scope in zip(self._main_programs, self._scopes):
prog_id = str(id(prog))
prog_conf = self._opt_info['program_configs'][prog_id]
prog_tables = {}
......@@ -100,24 +93,23 @@ class PSLib(Fleet):
for i in range(0, len(table.dense_variable_name)):
var_name = table.dense_variable_name[i]
if scope.find_var(var_name) is None:
print("var " + var_name +
" not found in scope, " +
"you should run startup program first")
sys.exit(-1)
raise ValueError(
"var " + var_name + " not found in scope, "
+ "you should run startup program first")
var_name_list.append(var_name)
self._fleet_ptr.init_model(scope,
int(table.table_id),
var_name_list)
# barrier for init model done
self.role_maker_._barrier_worker()
self._role_maker._barrier_worker()
else:
raise NameError(
"You should run DistributedOptimizer.minimize() first")
def init_server(self, executor, model_dir=None):
def init_server(self, model_dir=None):
pass
def run_server(self, executor):
def run_server(self):
"""
init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
should call init_pserver() to initialize global information about parameter server
......@@ -128,22 +120,22 @@ class PSLib(Fleet):
self._opt_info["fleet_desc"])
self._dist_desc = self._opt_info["fleet_desc"]
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
raise Exception(
"You should run DistributedOptimizer.minimize() first")
self._fleet_ptr.init_server(self._dist_desc_str,
self.role_maker_._get_rank())
self.local_ip_ = self._fleet_ptr.run_server()
self._role_maker._get_rank())
self._local_ip = self._fleet_ptr.run_server()
# barrier_all for init_server
self.role_maker_._barrier_all()
self.all_ips_ = self.role_maker_._all_gather(self.local_ip_)
self._role_maker._barrier_all()
self.all_ips_ = self._role_maker._all_gather(self._local_ip)
self._fleet_ptr.gather_servers(self.all_ips_,
self.role_maker_._get_size())
self._role_maker._get_size())
# barrier_all for init_worker, wait all workers start
self.role_maker_._barrier_all()
self._role_maker._barrier_all()
else:
raise NameError(
raise Exception(
"You should run DistributedOptimizer.minimize() first")
def stop_worker(self):
......@@ -151,31 +143,30 @@ class PSLib(Fleet):
stop(): will be called after a user finishes his/her training task. Fleet instance will be
destroyed when stop() is called.
"""
self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker():
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.stop_server()
self.role_maker_._barrier_worker()
self.role_maker_._barrier_all()
self.role_maker_._finalize()
self._role_maker._barrier_worker()
self._role_maker._barrier_all()
self._role_maker._finalize()
def stop(self, executor):
def stop(self):
"""
stop(): will be called after a user finishes his/her training task. Fleet instance will be
destroyed when stop() is called.
"""
self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker():
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.stop_server()
self.role_maker_._barrier_worker()
self.role_maker_._barrier_all()
self.role_maker_._finalize()
self._role_maker._barrier_worker()
self._role_maker._barrier_all()
self._role_maker._finalize()
def distributed_optimizer(self, optimizer, strategy=None):
self.optimizer = DownpourOptimizer(optimizer, strategy)
return self.optimizer
self._optimizer = DownpourOptimizer(optimizer, strategy)
return self._optimizer
def save_inference_model(self,
executor,
dirname,
feeded_var_names=None,
target_vars=None,
......@@ -186,7 +177,7 @@ class PSLib(Fleet):
"""
self._fleet_ptr.save_model(dirname)
def save_persistables(self, executor, dirname, main_program=None):
def save_persistables(self, dirname, main_program=None):
self._fleet_ptr.save_model(dirname)
def _set_opt_info(self, opt_info):
......@@ -208,6 +199,13 @@ class DownpourOptimizer(DistributedOptimizer):
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
Args:
optimizer(Optimizer): subclass of Optimizer.
strategy(any): config for DownpourOptimizer.
Returns:
None
"""
def __init__(self, optimizer, strategy=None):
......@@ -242,32 +240,54 @@ class DownpourOptimizer(DistributedOptimizer):
raise NotImplementedError()
def minimize(self,
loss,
startup_program=None,
losses,
scopes=None,
startup_programs=None,
parameter_list=None,
no_grad_set=None):
"""
minimize a program through loss, loss can be a list in DistributedOptimizer
minimize a program through loss, loss can be a list in DistributedOptimizer.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optmizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
Args:
loss (Variable|Variable List): loss variable or loss variable list to run optimization.
startup_program (Program): startup_program for initializing parameters
losses (Variable|Variable List): loss variable or loss variable list to run optimization.
scopes (Scope| Scope List): scope instance.
startup_programs (Program|Program List): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
Returns:
tuple: (optimize_ops, params_grads) which are, list of operators appended;
and list of (param, grad) Variables pair for optimization.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optmizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
"""
if not isinstance(losses, list):
losses = [losses]
optimize_ops, param_grads, opt_info = \
self._distributed_optimizer._minimize(
loss,
startup_program,
losses,
startup_programs,
parameter_list,
no_grad_set)
fleet._set_opt_info(opt_info)
programs = [loss.block.program for loss in losses]
if scopes is None:
scopes = [fluid.global_scope()] * len(programs)
if len(scopes) != len(programs):
raise ValueError(
"You should make sure len(scopes) == len(programs) or set scopes None"
)
fleet._main_programs = programs
fleet._scopes = scopes
return [optimize_ops, param_grads]
......@@ -94,7 +94,7 @@ class DownpourServer(Server):
Returns:
return None
"""
table = self.server_.downpour_server_param.downpour_table_param.add()
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourDenseTable"
table.type = pslib.PS_DENSE_TABLE
......@@ -169,7 +169,7 @@ class DownpourWorker(Worker):
Returns:
return None
"""
table = self.worker_.sparse_table.add()
table = self._worker.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in slot_key_vars])
table.slot_value.extend([var.name for var in slot_value_vars])
......
......@@ -66,8 +66,6 @@ class DistributedAdam(DistributedOptimizerImplBase):
Returns:
[optimize_ops, grads_and_weights]
"""
if not isinstance(losses, list):
losses = [losses]
table_name = find_distributed_lookup_table(losses[0].block.program)
prefetch_slots = find_distributed_lookup_table_inputs(
......@@ -77,7 +75,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
ps_param = pslib.PSParameter()
server = DownpourServer()
worker = DownpourWorker(self.window_)
worker = DownpourWorker(self._window)
sparse_table_index = 0
server.add_sparse_table(sparse_table_index, self._learning_rate,
prefetch_slots, prefetch_slots_emb)
......@@ -88,17 +86,12 @@ class DistributedAdam(DistributedOptimizerImplBase):
param_grads_list = []
for loss_index in range(len(losses)):
#program_config = ps_param.trainer_param.program_config.add()
#program_config.program_id = str(
# id(losses[loss_index].block.program))
program_id = str(id(losses[loss_index].block.program))
program_configs[program_id] = {
"pull_sparse": [sparse_table_index],
"push_sparse": [sparse_table_index]
}
#program_config.pull_sparse_table_id.extend([sparse_table_index])
#program_config.push_sparse_table_id.extend([sparse_table_index])
params_grads = sorted(
fluid.backward.append_backward(losses[loss_index],
parameter_list, no_grad_set),
......@@ -130,8 +123,6 @@ class DistributedAdam(DistributedOptimizerImplBase):
params, grads)
program_configs[program_id]["pull_dense"] = [dense_table_index]
program_configs[program_id]["push_dense"] = [dense_table_index]
#program_config.pull_dense_table_id.extend([dense_table_index])
#program_config.push_dense_table_id.extend([dense_table_index])
if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
dense_table_index += 1
server.add_data_norm_table(dense_table_index,
......@@ -139,18 +130,13 @@ class DistributedAdam(DistributedOptimizerImplBase):
data_norm_params, data_norm_grads)
worker.add_dense_table(dense_table_index, self._learning_rate,
data_norm_params, data_norm_grads)
#program_config.pull_dense_table_id.extend([dense_table_index])
#program_config.push_dense_table_id.extend([dense_table_index])
program_configs[program_id]["pull_dense"].extend(
[dense_table_index])
program_configs[program_id]["push_dense"].extend(
[dense_table_index])
dense_table_index += 1
#program_configs.append(program_config)
ps_param.server_param.CopyFrom(server.get_desc())
ps_param.trainer_param.CopyFrom(worker.get_desc())
#for program_config in program_configs:
# ps_param.trainer_param.program_config.extend([program_config])
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册