未验证 提交 9ded7565 编写于 作者: T tangwei12 提交者: GitHub

【paddle.fleet】FleetAPI 2.0 (#26772)

* add FleetAPI doc
Co-authored-by: Nliuyuhui <liuyuhui@baidu.com>
上级 b0111778
......@@ -22,8 +22,6 @@ from .runtime_factory import RuntimeFactory
from .util_factory import UtilFactory
from paddle.fluid.wrapped_decorator import wrap_decorator
#__all__ = ['Fleet']
def _inited_runtime_handler_(func):
def __impl__(*args, **kwargs):
......@@ -43,65 +41,123 @@ inited_runtime_handler = wrap_decorator(_inited_runtime_handler_)
class Fleet(object):
"""
Unified API for distributed training of PaddlePaddle
Please reference the https://github.com/PaddlePaddle/Fleet for details
Please reference the https://github.com/PaddlePaddle/FleetX for details
Returns:
Fleet: A Fleet instance
Examples:
Example for collective training:
.. code-block:: python
import paddle.distributed.fleet as fleet
role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
fleet.init(is_collective=True)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
# do distributed training
Example for parameter server training:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
if fleet.is_first_worker():
print("this is first worker")
print("current node index: {}".format(fleet.worker_index()))
print("total number of worker num: {}".format(fleet.worker_num()))
if fleet.is_worker():
print("this is worker")
print("worker endpoints: {}".format(fleet.worker_endpoints(to_string=True)))
print("server num: {}".format(fleet.server_num()))
print("server endpoints: {}".format(fleet.server_endpoints(to_string=True)))
if fleet.is_server():
print("this is server")
fleet.stop_worker()
"""
def __init__(self):
self._runtime_handle = None
self._util = None
self._role_maker = None
self.strategy_compiler = None
self._is_collective = False
self._runtime_handle = None
self._util = None
def init(self, role_maker=None, is_collective=False):
"""
Initialize role_maker in Fleet.
This function is responsible for the distributed architecture
what you want to run your code behind,such as Transpiler,
Collective in PaddleCloudRoleMaker or UserDefinedRoleMaker
This function is responsible for the distributed architecture
what you want to run your code behind.
Args:
role_maker (RoleMakerBase, optional): A ``RoleMakerBase`` containing the configuration
of environment variables related to distributed training.If you did not initialize
the rolemaker by yourself, it will be automatically initialized to PaddleRoleMaker.
The default value is None.
is_collective (Boolean, optional): A ``Boolean`` variable determines whether the program
runs on the CPU or GPU. False means set distributed training using CPU, and True means
GPU.The default value is False.The default value is False.
Returns:
None
Examples1:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
Examples2:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init(is_collective=True)
Examples3:
.. code-block:: python
import paddle.distributed.fleet as fleet
role = fleet.PaddleCloudRoleMaker
fleet.init(role)
"""
if isinstance(role_maker, RoleMakerBase):
self._role_maker = role_maker
elif role_maker == None:
if role_maker is None:
if isinstance(is_collective, bool):
self._is_collective = is_collective
self._role_maker = PaddleCloudRoleMaker(
is_collective=self._is_collective)
else:
raise ValueError(
"Something wrong occurred, please check whether is_collective is bool value"
)
"`is_collective` should be instance of `bool`, but got {}".
format(type(is_collective)))
else:
raise ValueError(
"Something wrong occurred, please check whether rolemaker is instance of RoleMakerBase"
)
if isinstance(role_maker, RoleMakerBase):
self._role_maker = role_maker
else:
raise ValueError(
"`role_maker` should be subclass of `RoleMakerBase`, but got {}".
format(type(role_maker)))
self.strategy_compiler = StrategyCompiler()
return None
......@@ -113,6 +169,14 @@ class Fleet(object):
bool: True if this is the first node of worker,
False if not.
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.is_first_worker()
"""
return self._role_maker.is_first_worker()
......@@ -122,6 +186,14 @@ class Fleet(object):
Returns:
int: node id
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.worker_index()
"""
return self._role_maker.worker_index()
......@@ -131,6 +203,14 @@ class Fleet(object):
Returns:
int: worker numbers
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.worker_num()
"""
return self._role_maker.worker_num()
......@@ -141,15 +221,31 @@ class Fleet(object):
Returns:
bool: True if this is a node of worker,
False if not.
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.is_worker()
"""
return self._role_maker.is_worker()
def worker_endpoints(self, to_string=False):
"""
Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
Get current worker endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
Returns:
list/string: server endpoints
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.worker_endpoints()
"""
'''
if to_string:
......@@ -165,6 +261,12 @@ class Fleet(object):
Returns:
int: server number
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.server_num()
"""
return len(self._role_maker.get_pserver_endpoints())
......@@ -174,6 +276,14 @@ class Fleet(object):
Returns:
int: node id
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.server_index()
"""
return self._role_maker.server_index()
......@@ -183,14 +293,20 @@ class Fleet(object):
Returns:
list/string: server endpoints
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.server_endpoints()
"""
'''
if to_string:
return ",".join(self._role_maker.get_pserver_endpoints())
else:
return self._role_maker.get_pserver_endpoints()
'''
return ["127.0.0.1:1001", "127.0.0.1:1002"]
def is_server(self):
"""
......@@ -199,6 +315,14 @@ class Fleet(object):
Returns:
bool: True if this is a node of server,
False if not.
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
fleet.is_server()
"""
return self._role_maker.is_server(
) or self._role_maker._is_heter_worker()
......@@ -208,6 +332,19 @@ class Fleet(object):
"""
Utility functions that can be used under certain runtime
return util
Returns:
UtilBase: instance of UtilBase, can use distributed ops/tools easily.
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
util = fleet.util
files = ["1.log", "2.log", "3.log", "4.log"]
files = util.get_file_shard()
"""
return self._util
......@@ -215,41 +352,114 @@ class Fleet(object):
def util(self, util):
"""
Set Utility functions for userd-defined runtime
set util
Returns:
None
"""
self._util = util
def barrier_worker(self):
"""
barrier between workers
barrier all workers
Returns:
None
"""
self._role_maker.barrier_worker()
@inited_runtime_handler
def init_worker(self):
"""
init worker
initialize `Communicator` for parameter server training.
Returns:
None
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
# build net
# fleet.distributed_optimizer(...)
fleet.init_worker()
"""
self._runtime_handle._init_worker()
@inited_runtime_handler
def init_server(self, *args, **kwargs):
"""
init server
init_server executor to initialize startup program,
if the `args` is not empty, it will run load_persistables for increment training.
Returns:
None
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
# build net
# fleet.distributed_optimizer(...)
fleet.init_server()
"""
self._runtime_handle._init_server(*args, **kwargs)
@inited_runtime_handler
def run_server(self):
"""
run server
run server will run pserver main program with executor.
Returns:
None
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
# build net
# fleet.distributed_optimizer(...)
if fleet.is_server():
fleet.init_server()
"""
self._runtime_handle._run_server()
@inited_runtime_handler
def stop_worker(self):
"""
stop worker
stop `Communicator` and give training complete notice to parameter server.
Returns:
None
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
# build net
# fleet.distributed_optimizer(...)
fleet.init_server()
"""
self._runtime_handle._stop_worker()
......@@ -260,27 +470,98 @@ class Fleet(object):
target_vars,
main_program=None,
export_for_deployment=True):
"""
save inference model for inference.
Returns:
None
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
fleet.init()
# build net
# fleet.distributed_optimizer(...)
fleet.init_server()
"""
self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, target_vars, main_program,
export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None):
"""
saves all persistable variables from :code:`main_program` to
the folder :code:`dirname`. You can refer to
The :code:`dirname` is used to specify the folder where persistable variables
are going to be saved. If you would like to save variables in separate
files, set :code:`filename` None.
Args:
executor(Executor): The executor to run for saving persistable variables.
You can refer to :ref:`api_guide_executor_en` for
more details.
dirname(str, optional): The saving directory path.
When you need to save the parameter to the memory, set it to None.
main_program(Program, optional): The program whose persistbale variables will
be saved. Default: None.
Returns:
None
Examples:
.. code-block:: text
import paddle.distributed.fleet as fleet
import paddle.fluid as fluid
fleet.init()
# build net
# fleet.distributed_optimizer(...)
exe = fluid.Executor(fluid.CPUPlace())
fleet.save_persistables(exe, "dirname", fluid.default_main_program())
"""
self._runtime_handle._save_persistables(executor, dirname, main_program)
def distributed_optimizer(self, optimizer, strategy=None):
"""
distirbuted_optimizer
Optimizer for distributed training.
For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
Which has basic Optimizer function and special features for distributed training.
Args:
optimizer(Optimizer): The executor to run for init server.
strategy(DistributedStrategy): Extra properties for distributed optimizer.
Returns:
Fleet instance with minimize interface like optimizers
Fleet: instance of fleet.
Examples:
.. code-block:: python
import paddle.distributed.fleet as fleet
role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
import paddle.distributed.fleet as fleet
role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
"""
self.user_defined_optimizer = optimizer
if strategy == None:
......@@ -317,23 +598,25 @@ class Fleet(object):
``fetch_list`` before run, see details in ``Executor``.
Examples:
import paddle
import paddle.distributed.fleet as fleet
.. code-block:: python
fc_1 = paddle.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = paddlen.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = paddle.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = paddle.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = paddle.layers.mean(x=cost)
import paddle
import paddle.distributed.fleet as fleet
role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
fc_1 = paddle.fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = paddle.fluid.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
role = fleet.role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
# for more examples, please reference https://github.com/PaddlePaddle/Fleet
# for more examples, please reference https://github.com/PaddlePaddle/FleetX
"""
context = {}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册