未验证 提交 1a4a51db 编写于 作者: T tangwei12 提交者: GitHub

Fleet unify distributed training (#16791)

* implement distributed transpiler with fleet
上级 e707119a
...@@ -233,7 +233,7 @@ class InMemoryDataset(DatasetBase): ...@@ -233,7 +233,7 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
>>> import paddle.fluid as fluid >>> import paddle.fluid as fluid
>>> import paddle.fluid.incubate.fleet.parameter_server as fleet >>> from paddle.fluid.incubate.fleet.pslib import fleet
>>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset") >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset")
>>> filelist = ["a.txt", "b.txt"] >>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist) >>> dataset.set_filelist(filelist)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import abc
import sys
from enum import Enum
from paddle.fluid.optimizer import SGD
from role_maker import RoleMakerBase, Role
from role_maker import MPISymetricRoleMaker
from role_maker import UserDefinedRoleMaker
class Mode(Enum):
TRANSPILER = 1,
PSLIB = 2,
COLLECTIVE = 3
class Fleet(object):
"""
Fleet is the base class, transpiler and pslib are implementation of Fleet.
Args:
mode(Mode): the implementation of Fleet's mode.
Returns:
None
"""
__metaclass__ = abc.ABCMeta
def __init__(self, mode):
assert isinstance(mode, Mode)
self.is_initialized = False
self.mode = mode
self.workers = 0
self.servers = 0
self.worker_endpoints = []
self.server_endpoints = []
self.role = Role.WORKER
self.current_endpoint = None
self.current_id = 0
self.optimizer = None
self.role_maker_ = None
def is_first_worker(self):
"""
Check whether the node is the first instance of worker.
Returns:
bool: True if this is the first node of worker,
False if not.
"""
return self.is_worker() and self.current_id == 0
def worker_id(self):
"""
Get current worker id.
Returns:
int: node id
"""
return self.current_id
def get_workers(self):
"""
Get current total worker number.
Returns:
int: worker number
"""
return self.workers
def is_worker(self):
"""
Check whether the node is an instance of worker.
Returns:
bool: True if this is a node of worker,
False if not.
"""
return self.role == Role.WORKER
def is_server(self):
"""
Check whether the node is an instance of server.
Returns:
bool: True if this is a node of server,
False if not.
"""
return self.role == Role.SERVER
def split_files(self, files):
"""
split files before distributed training,
for example, files is [a, b, c ,d, e] and trainer_num = 2,
then trainer 0 gets [a, b, c] and trainer 1 gets [d, e]
Args:
files(list): file list need to be read.
Returns:
list: files belongs to this worker.
"""
file_num = len(files)
trainer_id = self.worker_id()
trainer_num = self.get_workers()
if trainer_num > file_num:
raise ValueError("trainer_num should be <= file_num : "
"%s > %s" % (trainer_num, file_num))
start = 0
end = 0
for i in range(0, trainer_id + 1):
length = file_num / trainer_num + (i < (file_num % trainer_num))
start = end
end += length
return files[start:end]
def init(self, role_maker=None):
"""
should be called only once in user's python scripts,
init() will initialize RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.
Args:
role_maker(RoleMakerBase): subclass of RoleMakerBase.
Returns:
None
"""
if role_maker and not isinstance(role_maker, RoleMakerBase):
raise ValueError("role_maker must be an instance of RoleMakerBase")
self.role_maker_ = role_maker
if isinstance(role_maker, MPISymetricRoleMaker):
self.role_maker_._generate_role()
self.role = Role.WORKER if role_maker._is_worker() else Role.SERVER
self.workers = role_maker._worker_num()
self.servers = role_maker._server_num()
self.server_endpoints = role_maker._get_pserver_endpoints()
self.worker_endpoints = role_maker._get_trainer_endpoints()
self.current_id = role_maker._worker_index(
) if role_maker._is_worker() else role_maker._server_index()
self.current_endpoint = self.worker_endpoints[self.current_id] \
if role_maker._is_worker() else self.server_endpoints[self.current_id]
elif isinstance(role_maker, UserDefinedRoleMaker):
self.current_id = role_maker.current_id
self.current_endpoint = role_maker.current_endpoint
self.workers = role_maker.workers
self.worker_endpoints = role_maker.worker_endpoints
self.servers = role_maker.servers
self.server_endpoints = role_maker.server_endpoints
self.role = role_maker.role
else:
raise ValueError(
"role_maker must be an instance of UserDefinedRoleMaker/MPISymetricRoleMaker"
)
self.is_initialized = True
@abc.abstractmethod
def init_worker(self, executor):
pass
@abc.abstractmethod
def run_worker(self, executor, main_program=None):
pass
@abc.abstractmethod
def init_server(self, executor, model_dir=None):
pass
@abc.abstractmethod
def run_server(self, executor):
pass
@abc.abstractmethod
def stop_worker(self):
pass
@abc.abstractmethod
def stop(self, executor):
pass
@abc.abstractmethod
def distributed_optimizer(self, optimizer, strategy=None):
pass
@abc.abstractmethod
def save_inference_model(self,
executor,
dirname,
feeded_var_names,
target_vars,
main_program=None,
export_for_deployment=True):
pass
@abc.abstractmethod
def save_persistables(self, executor, dirname, main_program=None):
pass
def to_string(self):
infos = """
mode = {}
workers = {}
server_endpoints = {}
role = {}
current_endpoint = {}
current_id = {}
""".format(self.mode, self.workers, self.server_endpoints, self.role,
self.current_endpoint, self.current_id)
return infos
class DistributedOptimizer(object):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
Args:
optimizer(Optimizer): subclass of Optimizer.
strategy(dict): the user define config for Optimizer.
Returns:
None
"""
__metaclass__ = abc.ABCMeta
def __init__(self, optimizer, strategy=None):
if not isinstance(optimizer, SGD.__bases__):
raise ValueError("optimizer must be an instance of Optimizer")
if strategy and not isinstance(strategy, dict):
raise ValueError("strategy must be an instance of Dict")
self._optimizer = optimizer
self._strategy = strategy
@abc.abstractmethod
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
"""
First part of `minimize`, do auto-diff to append backward ops for
the current program.
Args:
loss (Variable): loss variable to run optimizations.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
callbacks (list|None): list of callables to run when appending backward
operator for one parameter.
Return:
list: list of (param, grad) pair, grad is the output of backward.
Examples:
See examples in `apply_gradients`.
"""
pass
@abc.abstractmethod
def apply_gradients(self, params_grads):
"""
Second part of `minimize`, appending optimization operators for
given `params_grads` pairs.
Args:
params_grads (list): list of (param, grad) pair to do optimization.
Returns:
list: A list of operators appended to the current program.
Examples:
.. code-block:: python
loss = network()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
params_grads = optimizer.backward(loss)
# you may append operations for params_grads here
# ...
optimizer.apply_gradients(params_grads)
"""
pass
@abc.abstractmethod
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
Add operations to minimize `loss` by updating `parameter_list`.
This method combines interface `backward()` and
`apply_gradients()` into one.
Args:
loss (Variable): loss variable to run optimizations.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
Returns:
tuple: (optimize_ops, params_grads) which are, list of operators appended;
and list of (param, grad) Variables pair for optimization.
"""
pass
...@@ -13,6 +13,13 @@ ...@@ -13,6 +13,13 @@
# limitations under the License. # limitations under the License.
import sys import sys
from enum import Enum
class Role(Enum):
WORKER = 1,
SERVER = 2
class RoleMakerBase(object): class RoleMakerBase(object):
""" """
...@@ -23,7 +30,6 @@ class RoleMakerBase(object): ...@@ -23,7 +30,6 @@ class RoleMakerBase(object):
""" """
def __init__(self): def __init__(self):
self._role_maker_name = ""
self._trainer_endpoints = [] self._trainer_endpoints = []
self._pserver_endpoints = [] self._pserver_endpoints = []
self._role_is_generated = False self._role_is_generated = False
...@@ -239,3 +245,37 @@ class MPISymetricRoleMaker(MPIRoleMaker): ...@@ -239,3 +245,37 @@ class MPISymetricRoleMaker(MPIRoleMaker):
self._node_type = 1 self._node_type = 1
self._node_type_comm = self._comm.Split(self._node_type) self._node_type_comm = self._comm.Split(self._node_type)
self._role_is_generated = True self._role_is_generated = True
class UserDefinedRoleMaker(RoleMakerBase):
def __init__(self,
current_id=0,
current_endpoint=None,
workers=0,
worker_endpoints=None,
servers=0,
server_endpoints=None,
role=Role.WORKER):
"""
UserDefinedRoleMaker is designed for worker and server assignment
under manual. Typically, a worker and a server node will be appointed
on each physical node, It can be assign by user.
"""
super(UserDefinedRoleMaker, self).__init__()
self.current_id = current_id
self.current_endpoint = current_endpoint
self.workers = workers
self.worker_endpoints = worker_endpoints
self.servers = servers
self.server_endpoints = server_endpoints
self.role = role
def _is_worker(self):
return self.role == Role.WORKER
def _is_server(self):
return self.role == Role.SERVER
def _generate_role(self):
self.role_is_generated_ = True
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import sys
import logging
import paddle.fluid as fluid
import paddle.fluid.io as io
import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler
from ..base.fleet_base import Fleet
from ..base.fleet_base import Mode
from ..base.fleet_base import DistributedOptimizer
class Collective(Fleet):
def __init__(self):
super(Collective, self).__init__(Mode.COLLECTIVE)
self.local_ip_ = 0
def init(self, role_maker=None):
"""
should be called only once in user's python scripts,
init() will initialize RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.
Args:
role_maker(RoleMakerBase): subclass of RoleMakerBase.
Returns:
None
"""
super(Collective, self).init(role_maker)
self._role_maker._generate_role()
def init_worker(self, executor):
logging.warn(
"You should not call 'init_worker' method for collective mode.")
def run_worker(self, executor, main_program=None):
logging.warn(
"You should not call 'run_worker' method for collective mode.")
def init_server(self, executor, model_dir=None):
logging.warn(
"You should not call 'init_server' method for collective mode.")
def run_server(self, executor):
logging.warn(
"You should not call 'run_server' method for collective mode.")
def stop_worker(self):
logging.warn(
"You should not call 'stop_worker' method for collective mode.")
def stop(self, executor):
"""
stop(): will be called after a user finishes his/her training task.
"""
logging.warn("You should not call 'stop' method for collective mode.")
def distributed_optimizer(self, optimizer, strategy=None):
self.optimizer = CollectiveOptimizer(optimizer, strategy)
return self.optimizer
def save_inference_model(self,
executor,
dirname,
feeded_var_names=None,
target_vars=None,
main_program=None,
export_for_deployment=True):
io.save_inference_model(dirname, feeded_var_names, target_vars,
executor, main_program, None, None,
export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None):
io.save_persistables(executor, dirname, main_program, None)
fleet = Collective()
class CollectiveOptimizer(DistributedOptimizer):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
"""
def __init__(self, optimizer, strategy=None):
super(CollectiveOptimizer, self).__init__(optimizer, strategy)
assert strategy is None, "You cannot set 'strategy' for collective."
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
return self._optimizer.backward(loss, startup_program, parameter_list,
no_grad_set, callbacks)
def apply_gradients(self, params_grads):
return self._optimizer.apply_gradients(params_grads)
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
minimize a program through loss
Args:
loss (Variable|Variable List): loss variable or loss variable list to run optimization.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
Returns:
tuple: (optimize_ops, params_grads) which are, list of operators appended;
and list of (param, grad) Variables pair for optimization.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optmizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
"""
optimize_ops, param_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
worker_endpoints = fleet.worker_endpoints
trainer_id = fleet.current_id
current_endpoint = fleet.current_endpoint
startup_program = startup_program if startup_program else \
fluid.framework.default_startup_program
# call transpiler
config = dist_transpiler.DistributeTranspilerConfig()
config.mode = "nccl2"
t = dist_transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
trainers=','.join(worker_endpoints),
startup_program=startup_program,
current_endpoint=current_endpoint)
return optimize_ops, param_grads
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
...@@ -10,365 +10,4 @@ ...@@ -10,365 +10,4 @@
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
from ..base.role_maker import MPISymetricRoleMaker
from .optimizer_factory import *
from google.protobuf import text_format
import paddle.fluid.optimizer as local_optimizer
import paddle.fluid as fluid
class Fleet(object):
"""
Fleet in Python. Fleet is used in distributed training. It is designed as a singlton instance
in c++. A Fleet() object will be initialized automatically when a user import this package as
fleet. The General interface Fleet supports are:
init(): which should be called only once in user's python scripts. init() will initialize
FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.
stop(): will be called after a user finishes his/her training task. Fleet instance will be
destroyed when stop() is called.
init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
should call init_pserver() to initialize global information about parameter server
init_worker(): will be called by user. When a user knows current process is_server(), he/she
should call init_worker() to initialize global information about worker and connect
worker with pserver.
get_worker_num(): return the number of current task's worker node
get_server_num(): return the number of current task's pserver node
is_worker(): return whether current process is a worker
is_server(): return thether current process is a server
init_pserver_model(): initialize model parameters in pserver, called from a worker node
save_pserver_model(): save model parameters in pserver, called from a server node
Example:
.. code-block:: python
import paddle.fluid.incubate.fleet.parameter_server as fleet
from my_model import bow_net
model = bow_net()
fleet.init()
sgd_optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.0001)
sgd_optimizer = fleet.DistributedOptimizer(sgd_optimizer)
sgd_optimizer.minimize(model.loss)
exe = paddle.fluid.Executor(paddle.fluid.CPUPlace())
if fleet.is_worker():
exe.run(paddle.fluid.default_startup_program())
fleet.init_worker() # init worker should be called before training
# do other things like training
elif fleet.is_server():
fleet.init_pserver()
fleet.stop()
"""
def __init__(self):
self._opt_info = None # for fleet only
self._role_maker = None
self._local_ip = 0
self._is_initialized = False
def init(self):
# TODO(guru4elephant)
# this is a temporary solution
# we will support more configurable RoleMaker for users in the future
"""
init(): which should be called only once in user's python scripts. init() will initialize
FleetWrapper in CPP, it will also initialize a RoleMaker which is used for identifying
current node's role, e.g. worker, server, etc.
"""
if not self.is_initialized_:
self._role_maker = MPISymetricRoleMaker()
self._role_maker._generate_role()
self._fleet_ptr = fluid.core.Fleet()
self._is_initialized = True
def stop(self):
"""
stop(): will be called after a user finishes his/her training task. Fleet instance will be
destroyed when stop() is called.
"""
self._role_maker._barrier_worker()
if self._role_maker._is_first_worker():
self._fleet_ptr.stop_server()
self._role_maker._barrier_worker()
self._role_maker._barrier_all()
self._role_maker._finalize()
def init_pserver(self):
"""
init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
should call init_pserver() to initialize global information about parameter server
"""
if self._opt_info:
if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString(
self._opt_info["fleet_desc"])
self._dist_desc = self._opt_info["fleet_desc"]
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
self._fleet_ptr.init_server(self._dist_desc_str,
self.role_maker_._get_rank())
self._local_ip = self._fleet_ptr.run_server()
# barrier_all for init_server
self._role_maker._barrier_all()
self._all_ips = self._role_maker._all_gather(self.local_ip_)
self._fleet_ptr.gather_servers(self._all_ips,
self._role_maker._get_size())
# barrier_all for init_worker, wait all workers start
self._role_maker._barrier_all()
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
def init_worker(self, programs, scopes=None):
"""
init_worker(): will be called by user. When a user knows current process is_server(), he/she
should call init_worker() to initialize global information about worker and connect
worker with pserver. You should run startup program before init_worker.
Args:
programs(Program|list): a Program or a list of Programs
scopes(Scope|list): a Scope or a list of Scopes, default None.
"""
if not isinstance(programs, list):
programs = [programs]
if scopes is None:
scopes = [fluid.global_scope()] * len(programs)
if len(scopes) != len(programs):
print(
"You should make sure len(scopes) == len(programs) or set scopes None"
)
sys.exit(-1)
if self._opt_info:
if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString(
self._opt_info["fleet_desc"])
self._dist_desc = self._opt_info["fleet_desc"]
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
# barrier_all for init_server, wait for server starts
self._role_maker._barrier_all()
self._all_ips = self._role_maker._all_gather(self.local_ip_)
self._fleet_ptr.init_worker(self._dist_desc_str, self._all_ips,
self._role_maker._get_size(),
self._role_maker._get_rank())
# barrier_all for init_worker
self._role_maker._barrier_all()
# prepare for client to client communication
info = self._fleet_ptr.get_clients_info()
all_info = self._role_maker._worker_gather(info[0])
self._fleet_ptr.gather_clients(all_info)
self._fleet_ptr.create_client2client_connection()
# barrier for init model
self._role_maker._barrier_worker()
if self._role_maker._is_first_worker():
tables = self._dist_desc.trainer_param.dense_table
for prog, scope in zip(programs, scopes):
prog_id = str(id(prog))
prog_conf = self._opt_info['program_configs'][prog_id]
prog_tables = {}
for key in prog_conf:
if "dense" not in key:
continue
for table_id in prog_conf[key]:
prog_tables[int(table_id)] = 0
for table in tables:
if int(table.table_id) not in prog_tables:
continue
var_name_list = []
for i in range(0, len(table.dense_variable_name)):
var_name = table.dense_variable_name[i]
if scope.find_var(var_name) is None:
print("var " + var_name +
" not found in scope, " +
"you should run startup program first")
sys.exit(-1)
var_name_list.append(var_name)
self._fleet_ptr.init_model(scope,
int(table.table_id),
var_name_list)
# barrier for init model done
self._role_maker._barrier_worker()
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
def get_worker_num(self):
"""
return the number of current job's worker num
"""
return self._role_maker._worker_num()
def get_server_num(self):
"""
return the number of current job's server num
"""
return self._role_maker._server_num()
def get_worker_index(self):
"""
return the mpi rank of current worker
"""
return self._role_maker._worker_index()
def is_worker(self):
"""
return whether current node is a worker
"""
return self._role_maker._is_worker()
def is_server(self):
"""
return whether current node is pserver
"""
return self._role_maker._is_server()
def init_pserver_model(self):
"""
init pserver model called from pserver
"""
if self._role_maker._is_first_worker():
self._fleet_ptr.init_model()
self._role_maker._barrier_worker()
def save_pserver_model(self, save_path):
"""
save pserver model called from a worker
"""
self._fleet_ptr.save_model(save_path)
def split_filelist(self, filelist):
"""
split filelist before distributed training,
for example, filelist is [a, b, c ,d, e] and trainer_num = 2,
then trainer 0 gets [a, b, c] and trainer 1 gets [d, e]
Example:
>>> all_filelist = ["a.txt", "b.txt", "c.txt"]
>>> my_filelist = fleet.split_filelist(all_filelist)
>>> dataset = fluid.DatasetFactory().create_dataset()
>>> dataset.set_filelist(my_filelist)
Args:
filelist(list): list of filename, can be local or hdfs/afs.
Returns:
list of filename which belongs to this trainer.
"""
file_num = len(filelist)
trainer_id = self.get_worker_index()
trainer_num = self.get_worker_num()
if trainer_num > file_num:
raise ValueError("trainer_num should be <= file_num : "
"%s > %s" % (trainer_num, file_num))
# get interval of filelist, it's [ )
start = 0
end = 0
for i in range(0, trainer_id + 1):
length = file_num / trainer_num + (i < (file_num % trainer_num))
start = end
end += length
my_filelist = filelist[start:end]
return my_filelist
def _set_opt_info(self, opt_info):
"""
this function saves the result from DistributedOptimizer.minimize()
"""
self._opt_info = opt_info
class DistributedOptimizer(object):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
"""
def __init__(self, optimizer, dist_config={}):
super(DistributedOptimizer, self).__init__()
self._optimizer = optimizer
self._optimizer_name = "Distributed%s" % optimizer.type.capitalize()
if optimizer.type != "adam":
print("Currently, distributed optimizer only supports Adam"
"Will config built-in adam for you."
"We will support more functions in DistributedOptimizer",
sys.stderr)
self._optimizer_name = "DistributedAdam"
self._distributed_optimizer = globals()[self._optimizer_name](optimizer)
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
"""
Currently, backward function can not be called through DistributedOptimizer
"""
raise NotImplementedError()
def apply_gradients(self, params_grads):
"""
Currently, apply_gradients function can not be called through DistributedOptimizer
"""
raise NotImplementedError()
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
minimize a program through loss, loss can be a list in DistributedOptimizer
Args:
loss (Variable|Variable List): loss variable or loss variable list to run optimization.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
Returns:
tuple: (optimize_ops, params_grads) which are, list of operators appended;
and list of (param, grad) Variables pair for optimization.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optmizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
"""
optimize_ops, param_grads, opt_info = \
self._distributed_optimizer._minimize(
loss,
startup_program,
parameter_list,
no_grad_set)
fleet_instance._set_opt_info(opt_info)
return [optimize_ops, param_grads]
# this is a temporary solution
# TODO(guru4elephant)
# will make this more flexible for more Parameter Server Archs
fleet_instance = Fleet()
init = fleet_instance.init
stop = fleet_instance.stop
init_pserver = fleet_instance.init_pserver
init_worker = fleet_instance.init_worker
is_worker = fleet_instance.is_worker
is_server = fleet_instance.is_server
init_pserver_model = fleet_instance.init_pserver_model
save_pserver_model = fleet_instance.save_pserver_model
worker_num = fleet_instance.get_worker_num
server_num = fleet_instance.get_server_num
worker_index = fleet_instance.get_worker_index
split_filelist = fleet_instance.split_filelist
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from paddle.fluid.executor import Executor
from paddle.fluid.framework import Program
from paddle.fluid.framework import default_main_program
from paddle.fluid.framework import default_startup_program
from paddle.fluid.optimizer import Optimizer
import paddle.fluid.io as io
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler
from ...base.role_maker import Role
from ...base.fleet_base import Fleet
from ...base.fleet_base import Mode
from ...base.fleet_base import DistributedOptimizer
class DistributedTranspiler(Fleet):
"""
A subclass for compatibility with fluid.transpiler.DistributeTranspiler.
"""
def __init__(self):
super(DistributedTranspiler, self).__init__(Mode.TRANSPILER)
self._transpiler = OriginTranspiler()
self._startup_program = None
self._main_program = None
def init_worker(self, executor):
"""
`init_worker` has many many functions to do before training,
first, wait for all parameter servers launch completely.
second, run executor to initialize startup program
third, wait for all worker initialize completely.
Args:
executor(Executor): The executor to run for init startup program.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if not self._startup_program:
raise ValueError(
"startup_program is None, need invoke DistributedOptimizer.minimize first"
)
executor.run(self._startup_program)
def run_worker(self, executor, main_program=None):
pass
def init_server(self, executor, model_dir=None):
"""
`init_server` has many many functions to do before start pserver,
first, run executor to initialize startup program,
second, if the `model_dir` is not empty, it will load parameters from it for increment training.
Args:
executor(Executor): The executor to run for init server.
model_dir(str): The directory path.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if not self._startup_program:
raise ValueError(
"startup_program is None, need invoke DistributedOptimizer.minimize first"
)
executor.run(self._startup_program)
if model_dir:
if not os.path.isdir(model_dir):
raise ValueError("There is no directory named '%s'", model_dir)
io.load_persistables(executor, model_dir, self._startup_program)
def run_server(self, executor):
"""
`run_server` execute executor to start pserver main program.
Args:
executor(Executor): The executor to run for init server.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
if not self._main_program:
raise ValueError(
"main_program is None, need invoke DistributedOptimizer.minimize first"
)
executor.run(self._main_program)
def stop_worker(self):
pass
def stop(self, executor):
"""
Close this executor.
For the distributed training, this method would free the resource on PServers related to
the current Trainer.
Args:
executor(Executor): The executor to run for init server.
Returns:
None
"""
if not isinstance(executor, Executor):
raise ValueError("executor must be an instance of Executor")
executor.close()
def distributed_optimizer(self, optimizer, strategy=None):
"""
Optimizer for distributed training.
For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
Which has basic Optimizer function and special features for distributed training.
Args:
optimizer(Optimizer): The executor to run for init server.
strategy(dict): Extra properties for distributed optimizer.
Returns:
TranspilerOptimizer: subclass of DistributedOptimizer.
"""
if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")
self.optimizer = TranspilerOptimizer(optimizer, strategy)
return self.optimizer
def save_inference_model(self,
executor,
dirname,
feeded_var_names,
target_vars,
main_program=None,
export_for_deployment=True):
"""
Prune the given `main_program` to build a new program especially for inference,
and then save it and all related parameters to given `dirname` by the `executor`.
"""
io.save_inference_model(dirname, feeded_var_names, target_vars,
executor, main_program, None, None,
export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None):
"""
This function filters out all variables with `persistable==True` from the
give `main_program` and then saves these variables to the folder `dirname`
or file `filename`.
The `dirname` is used to specify the folder where persistable variables
are going to be saved. If you would like to save variables in separate
files, set `filename` None; if you would like to save all variables in a
single file, use `filename` to specify the file name.
"""
io.save_persistables(executor, dirname, main_program, None)
def _transpile(self, config):
if not isinstance(config, DistributeTranspilerConfig):
raise ValueError(
"config must be an instance of DistributeTranspilerConfig")
self._transpiler = OriginTranspiler(config)
self._transpiler.transpile(
trainer_id=fleet.worker_id(),
pservers=fleet.server_endpoints,
trainers=fleet.worker_num())
if self.role == Role.WORKER:
self._main_program = self._transpiler.get_trainer_program()
self._startup_program = default_startup_program()
else:
self._main_program, self._startup_program = \
self._transpiler.get_pserver_programs(self.current_endpoint)
fleet = DistributedTranspiler()
class TranspilerOptimizer(DistributedOptimizer):
def __init__(self, optimizer, strategy=None):
super(TranspilerOptimizer, self).__init__(optimizer, strategy)
if strategy and not isinstance(strategy, DistributeTranspilerConfig):
raise ValueError(
"In {} mode, strategy must be an instance of DistributeTranspilerConfig".
format(fleet.mode))
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
return self._optimizer.backward(loss, startup_program, parameter_list,
no_grad_set, callbacks)
def apply_gradients(self, params_grads):
return self._optimizer.apply_gradients(params_grads)
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
optimize_ops, params_grads = self._optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set)
self.transpile()
return optimize_ops, params_grads
def transpile(self):
if self._strategy is None:
self._strategy = DistributeTranspilerConfig()
fleet._transpile(config=self._strategy)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import sys
from .optimizer_factory import *
from google.protobuf import text_format
import paddle.fluid as fluid
from paddle.fluid.framework import Program
from ...base.fleet_base import Fleet
from ...base.fleet_base import Mode
from ...base.role_maker import MPISymetricRoleMaker
from ...base.fleet_base import DistributedOptimizer
class PSLib(Fleet):
def __init__(self):
super(PSLib, self).__init__(Mode.PSLIB)
self._opt_info = None
self.local_ip_ = 0
self._fleet_ptr = None
def init(self, role_maker=None):
super(PSLib, self).init(MPISymetricRoleMaker())
self._fleet_ptr = fluid.core.Fleet()
def init_worker(self, executor):
pass
def run_worker(self, executor, main_program=None):
"""
init_worker(): will be called by user. When a user knows current process is_server(), he/she
should call init_worker() to initialize global information about worker and connect
worker with pserver. You should run startup program before init_worker.
Args:
programs(Program|list): a Program or a list of Programs
scopes(Scope|list): a Scope or a list of Scopes, default None.
"""
if not isinstance(main_program, Program):
raise ValueError("main_program must be an instance of Program")
programs = [main_program]
scopes = [fluid.global_scope()] * len(programs)
if len(scopes) != len(programs):
print(
"You should make sure len(scopes) == len(programs) or set scopes None"
)
sys.exit(-1)
if self._opt_info:
if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString(
self._opt_info["fleet_desc"])
self._dist_desc = self._opt_info["fleet_desc"]
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
# barrier_all for init_server, wait for server starts
self.role_maker_._barrier_all()
self.all_ips_ = self.role_maker_._all_gather(self.local_ip_)
self._fleet_ptr.init_worker(self._dist_desc_str, self.all_ips_,
self.role_maker_._get_size(),
self.role_maker_._get_rank())
# barrier_all for init_worker
self.role_maker_._barrier_all()
# prepare for client to client communication
info = self._fleet_ptr.get_clients_info()
all_info = self.role_maker_._worker_gather(info[0])
self._fleet_ptr.gather_clients(all_info)
self._fleet_ptr.create_client2client_connection()
# barrier for init model
self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker():
tables = self._dist_desc.trainer_param.dense_table
for prog, scope in zip(programs, scopes):
prog_id = str(id(prog))
prog_conf = self._opt_info['program_configs'][prog_id]
prog_tables = {}
for key in prog_conf:
if "dense" not in key:
continue
for table_id in prog_conf[key]:
prog_tables[int(table_id)] = 0
for table in tables:
if int(table.table_id) not in prog_tables:
continue
var_name_list = []
for i in range(0, len(table.dense_variable_name)):
var_name = table.dense_variable_name[i]
if scope.find_var(var_name) is None:
print("var " + var_name +
" not found in scope, " +
"you should run startup program first")
sys.exit(-1)
var_name_list.append(var_name)
self._fleet_ptr.init_model(scope,
int(table.table_id),
var_name_list)
# barrier for init model done
self.role_maker_._barrier_worker()
else:
raise NameError(
"You should run DistributedOptimizer.minimize() first")
def init_server(self, executor, model_dir=None):
pass
def run_server(self, executor):
"""
init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
should call init_pserver() to initialize global information about parameter server
"""
if self._opt_info:
if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString(
self._opt_info["fleet_desc"])
self._dist_desc = self._opt_info["fleet_desc"]
else:
print("You should run DistributedOptimizer.minimize() first")
sys.exit(-1)
self._fleet_ptr.init_server(self._dist_desc_str,
self.role_maker_._get_rank())
self.local_ip_ = self._fleet_ptr.run_server()
# barrier_all for init_server
self.role_maker_._barrier_all()
self.all_ips_ = self.role_maker_._all_gather(self.local_ip_)
self._fleet_ptr.gather_servers(self.all_ips_,
self.role_maker_._get_size())
# barrier_all for init_worker, wait all workers start
self.role_maker_._barrier_all()
else:
raise NameError(
"You should run DistributedOptimizer.minimize() first")
def stop_worker(self):
"""
stop(): will be called after a user finishes his/her training task. Fleet instance will be
destroyed when stop() is called.
"""
self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker():
self._fleet_ptr.stop_server()
self.role_maker_._barrier_worker()
self.role_maker_._barrier_all()
self.role_maker_._finalize()
def stop(self, executor):
"""
stop(): will be called after a user finishes his/her training task. Fleet instance will be
destroyed when stop() is called.
"""
self.role_maker_._barrier_worker()
if self.role_maker_._is_first_worker():
self._fleet_ptr.stop_server()
self.role_maker_._barrier_worker()
self.role_maker_._barrier_all()
self.role_maker_._finalize()
def distributed_optimizer(self, optimizer, strategy=None):
self.optimizer = DownpourOptimizer(optimizer, strategy)
return self.optimizer
def save_inference_model(self,
executor,
dirname,
feeded_var_names=None,
target_vars=None,
main_program=None,
export_for_deployment=True):
"""
save pserver model called from a worker
"""
self._fleet_ptr.save_model(dirname)
def save_persistables(self, executor, dirname, main_program=None):
self._fleet_ptr.save_model(dirname)
def _set_opt_info(self, opt_info):
"""
this function saves the result from DistributedOptimizer.minimize()
"""
self._opt_info = opt_info
fleet = PSLib()
class DownpourOptimizer(DistributedOptimizer):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
"""
def __init__(self, optimizer, strategy=None):
super(DownpourOptimizer, self).__init__(optimizer, strategy)
self._optimizer = optimizer
self._optimizer_name = "Distributed%s" % optimizer.type.capitalize()
if optimizer.type != "adam":
print("Currently, distributed optimizer only support Adam"
"Will config built-in adam for you."
"We will support more functions in DistributedOptimizer",
sys.stderr)
self._optimizer_name = "DistributedAdam"
self._distributed_optimizer = globals()[self._optimizer_name](optimizer)
def backward(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None):
"""
Currently, backward function can not be called through DistributedOptimizer
"""
raise NotImplementedError()
def apply_gradients(self, params_grads):
"""
Currently, apply_gradients function can not be called through DistributedOptimizer
"""
raise NotImplementedError()
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
minimize a program through loss, loss can be a list in DistributedOptimizer
Args:
loss (Variable|Variable List): loss variable or loss variable list to run optimization.
startup_program (Program): startup_program for initializing parameters
in `parameter_list`.
parameter_list (list): list of Variables to update.
no_grad_set (set|None): set of Variables should be ignored.
Returns:
tuple: (optimize_ops, params_grads) which are, list of operators appended;
and list of (param, grad) Variables pair for optimization.
Note that in parameter server mode, a worker will not get anything about optimize_os
Because optmizer algorithms run on pserver side. We will make this usable in pserver
process, but currently the optimization part is written into Fleet(). A user does not
need to care about how to startup a pserver node.
"""
optimize_ops, param_grads, opt_info = \
self._distributed_optimizer._minimize(
loss,
startup_program,
parameter_list,
no_grad_set)
fleet._set_opt_info(opt_info)
return [optimize_ops, param_grads]
...@@ -52,6 +52,7 @@ class TestDistRunnerBase(object): ...@@ -52,6 +52,7 @@ class TestDistRunnerBase(object):
# NOTE: import fluid until runtime, or else forking processes will cause error. # NOTE: import fluid until runtime, or else forking processes will cause error.
config = fluid.DistributeTranspilerConfig() config = fluid.DistributeTranspilerConfig()
config.enable_dc_asgd = dc_asgd config.enable_dc_asgd = dc_asgd
config.sync_mode = sync_mode
# config.runtime_split_send_recv = True # config.runtime_split_send_recv = True
t = fluid.DistributeTranspiler(config=config) t = fluid.DistributeTranspiler(config=config)
t.transpile( t.transpile(
...@@ -59,7 +60,6 @@ class TestDistRunnerBase(object): ...@@ -59,7 +60,6 @@ class TestDistRunnerBase(object):
program=main_program, program=main_program,
pservers=pserver_endpoints, pservers=pserver_endpoints,
trainers=trainers, trainers=trainers,
sync_mode=sync_mode,
current_endpoint=current_endpoint) current_endpoint=current_endpoint)
return t return t
......
...@@ -43,12 +43,11 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -43,12 +43,11 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
pserver_endpoints = ip + ":" + port pserver_endpoints = ip + ":" + port
current_endpoint = ip + ":" + port current_endpoint = ip + ":" + port
t = fluid.DistributeTranspiler()
t.transpile( config = fluid.DistributeTranspilerConfig()
trainer_id, config.sync_mode = sync_mode
pservers=pserver_endpoints, t = fluid.DistributeTranspiler(config=config)
trainers=trainers, t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
sync_mode=sync_mode)
pserver_prog = t.get_pserver_program(current_endpoint) pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog)
exe.run(pserver_startup) exe.run(pserver_startup)
...@@ -77,13 +76,11 @@ def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers, ...@@ -77,13 +76,11 @@ def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers,
pserver_endpoints = ps1 + "," + ps2 pserver_endpoints = ps1 + "," + ps2
config = fluid.DistributeTranspilerConfig() config = fluid.DistributeTranspilerConfig()
config.sync_mode = sync_mode
config.slice_var_up = False config.slice_var_up = False
t = fluid.DistributeTranspiler(config=config) t = fluid.DistributeTranspiler(config=config)
t.transpile( t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
trainer_id,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=sync_mode)
pserver_prog = t.get_pserver_program(ps2) pserver_prog = t.get_pserver_program(ps2)
# pserver2 have no parameter # pserver2 have no parameter
......
...@@ -158,6 +158,7 @@ class DistributeTranspilerConfig(object): ...@@ -158,6 +158,7 @@ class DistributeTranspilerConfig(object):
wait_port = True wait_port = True
# split the send recv var in runtime # split the send recv var in runtime
runtime_split_send_recv = False runtime_split_send_recv = False
sync_mode = None
class DistributeTranspiler(object): class DistributeTranspiler(object):
...@@ -329,7 +330,7 @@ class DistributeTranspiler(object): ...@@ -329,7 +330,7 @@ class DistributeTranspiler(object):
return return
self.trainer_num = trainers self.trainer_num = trainers
self.sync_mode = sync_mode self.sync_mode = self.config.sync_mode if self.config.sync_mode else sync_mode
self.trainer_id = trainer_id self.trainer_id = trainer_id
pserver_endpoints = pservers.split(",") pserver_endpoints = pservers.split(",")
self.pserver_endpoints = pserver_endpoints self.pserver_endpoints = pserver_endpoints
......
...@@ -127,7 +127,9 @@ packages=['paddle', ...@@ -127,7 +127,9 @@ packages=['paddle',
'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.fleet',
'paddle.fluid.incubate.fleet.base', 'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.parameter_server', 'paddle.fluid.incubate.fleet.parameter_server',
'paddle.fluid.incubate.fleet.p2p'] 'paddle.fluid.incubate.fleet.parameter_server.distributed_transpiler',
'paddle.fluid.incubate.fleet.parameter_server.pslib',
'paddle.fluid.incubate.fleet.collective']
with open('@PADDLE_SOURCE_DIR@/python/requirements.txt') as f: with open('@PADDLE_SOURCE_DIR@/python/requirements.txt') as f:
setup_requires = f.read().splitlines() setup_requires = f.read().splitlines()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册