未验证 提交 da7f022b 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge branch 'develop' into develop

.. _fleet_api_howto_cn:
使用FleetAPI进行分布式训练
====================
==========================
FleetAPI 设计说明
---------------
-----------------
Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet
API接口灵活定义。具体的设计原理可以参考\ `Fleet
API设计文档 <https://github.com/PaddlePaddle/Fleet/blob/develop/README.md>`__\ 。当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。
Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet API接口灵活定义。具体的设计原理可以参考https://github.com/PaddlePaddle/Fleet/blob/develop/README.md
当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。
Fleet API快速上手示例
---------------------
快速上手示例
------------------------------
用户可以使用Fleet API轻易实现GPU多卡训练(单机多卡/多机多卡)。多卡训练在现代AI模型中非常常见,例如Resnet50、Bert等都是非常常见的需要多机多卡训练的模型。下面的代码示例,以一个简单的例子入手展示如何使用Fleet API进行单机多卡训练。代码示例可以参考:https://github.com/PaddlePaddle/Fleet
下面会针对Fleet
API最常见的两种使用场景,用一个模型做示例,目的是让用户有快速上手体验的模板。快速上手的示例源代码可以在\ `Fleet
Quick
Start <https://github.com/PaddlePaddle/Fleet/tree/develop/examples/quick-start>`__\ 找到。
神经网络模型的定义如下:
假设我们定义MLP网络如下:
.. code-block:: python
.. code:: python
import paddle.fluid as fluid
def mlp(input_x, input_y, hid_dim=128, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
......@@ -23,10 +32,20 @@ Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddleP
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
定义一个在内存生成数据的Reader如下:
一个简单的训练程序如下:
.. code:: python
.. code-block:: python
import numpy as np
def gen_data():
return {"x": np.random.random(size=(128, 32)).astype('float32'),
"y": np.random.randint(2, size=(128, 1)).astype('int64')}
单机Trainer定义
^^^^^^^^^^^^^^^
.. code:: python
import paddle.fluid as fluid
from nets import mlp
......@@ -44,18 +63,22 @@ Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddleP
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
exe.run(feed=gen_data())
cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name])
print("step%d cost=%f" % (i, cost_val[0]))
Parameter Server训练方法
^^^^^^^^^^^^^^^^^^^^^^^^
如果用户想使用高性能芯片,例如GPU多卡进行训练,使用Fleet API可以在增加少量代码的情况下实现。
参数服务器方法对于大规模数据,简单模型的并行训练非常适用,我们基于单机模型的定义给出其实用Parameter
Server进行训练的示例如下:
.. code-block:: python
.. code:: python
import paddle.fluid as fluid
from utils import gen_data
from nets import mlp
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
......@@ -68,130 +91,195 @@ Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddleP
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
if fleet.is_server():
fleet.init_server()
fleet.run_server()
elif fleet.is_worker():
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
cost_val = exe.run(
program=fluid.default_main_program(),
feed=gen_data(),
fetch_list=[cost.name])
print("worker_index: %d, step%d cost = %f" %
(fleet.worker_index(), i, cost_val[0]))
Collective训练方法
^^^^^^^^^^^^^^^^^^
collective
training通常在GPU多机多卡训练中使用,一般在复杂模型的训练中比较常见,我们基于上面的单机模型定义给出使用Collective方法进行分布式训练的示例如下:
.. code:: python
import paddle.fluid as fluid
from nets import mlp
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
exe.run(feed=gen_data())
cost_val = exe.run(
program=fluid.default_main_program(),
feed=gen_data(),
fetch_list=[cost.name])
print("worker_index: %d, step%d cost = %f" %
(fleet.worker_index(), i, cost_val[0]))
更多使用示例
------------
`点击率预估 <>`__
在单机运行多卡程序的执行命令如下:
`语义匹配 <>`__
.. code-block:: python
`向量学习 <>`__
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
python -m paddle.distributed.launch collective_trainer.py
`基于Resnet50的图像分类 <>`__
`基于Transformer的机器翻译 <>`__
`基于Bert的语义表示学习 <>`__
FleetAPI 接口说明
------------------------------
.. csv-table::
:header: "接口", "说明"
Fleet API相关的接口说明
-----------------------
"init", "fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置"
"distributed_optimizer", "fleet多机训练策略优化,接收一个标准Optimizer及相应的多机运行策略,fleet会根据优化策略进行优化"
"init_server", "fleet加载model_dir中保存的模型相关参数进行parameter server的初始化"
"run_server", "fleet启动parameter server服务"
"init_worker", "fleet初始化当前worker运行环境"
"is_worker", "判断当前节点是否是Worker节点,是则返回True,否则返回False"
"is_server", "判断当前节点是否是Server节点,是则返回True,否则返回False"
"save_inference_model", "fleet保存预测相关的模型及参数,参数及用法参考 code:`fluid.io.save_inference_model`"
"save_persistables", "fleet保存多机模型参数,参数及用法参考 code:`fluid.io.save_persistables`"
Fleet API接口
~~~~~~~~~~~~~
- init(role\_maker=None)
- fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置
- is\_worker()
- Parameter
Server训练中使用,判断当前节点是否是Worker节点,是则返回True,否则返回False
- is\_server(model\_dir=None)
- Parameter
Server训练中使用,判断当前节点是否是Server节点,是则返回True,否则返回False
- init\_server()
- Parameter
Server训练中,fleet加载model\_dir中保存的模型相关参数进行parameter
server的初始化
- run\_server()
- Parameter Server训练中使用,用来启动server端服务
- init\_worker()
- Parameter Server训练中使用,用来启动worker端服务
- stop\_worker()
- 训练结束后,停止worker
- distributed\_optimizer(optimizer, strategy=None)
- 分布式优化算法装饰器,用户可带入单机optimizer,并配置分布式训练策略,返回一个分布式的optimizer
FleetAPI 一般训练步骤
------------------------------
RoleMaker
~~~~~~~~~
通过import引入需要使用的模式
++++++++++++++++++
- MPISymetricRoleMaker
使用parameter server方式的训练:
- 描述:MPISymetricRoleMaker会假设每个节点启动两个进程,1worker+1pserver,这种RoleMaker要求用户的集群上有mpi环境。
.. code-block:: python
- 示例:
.. code:: python
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.MPISymetricRoleMaker()
fleet.init(role)
初始化
++++++++++++++++++
Fleet使用 code:`fleet.init(role_maker=None)` 进行初始化
- 启动方法:
当用户不指定role_maker, 则Fleet默认用户使用MPI环境,会采用MPISymetricRoleMaker.
.. code:: shell
如果用户使用非MPI环境,则需要通过UserDefinedRoleMaker自行定义执行环境。
例如:
mpirun -np 2 python trainer.py
.. code-block:: python
- PaddleCloudRoleMaker
role = UserDefinedRoleMaker(current_id=0,
role=Role.WORKER,
worker_num=3,
server_endpoints=["127.0.0.1:6001","127.0.0.1:6002"])
fleet.init(role_maker=role)
- 描述:PaddleCloudRoleMaker是一个高级封装,支持使用paddle.distributed.launch或者paddle.distributed.launch\_ps启动脚本
- Parameter Server训练示例:
分布式策略及多机配置
++++++++++++++++
.. code:: python
对于Transpiler模式,需要使用 DistributeTranspilerConfig 指定多机配置。
Fleet需要在用户定义的optimizer之上装饰 code:`fleet.distributed_optimizer` 来完成多机分布式策略的配置。
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
.. csv-table::
:header: "接口", "说明"
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
"sync_mode", "Fleet可以支持同步训练或异步训练, 默认会生成同步训练的分布式程序,通过指定 :code:`sync_mode=False` 参数即可生成异步训练的程序"
"split_method", "指定参数在parameter server上的分布方式, 默认使用 `RoundRobin`, 也可选`HashName`"
"slice_var_up", "指定是否将较大(大于8192个元素)的参数切分到多个parameter server以均衡计算负载,默认为开启"
- 启动方法:
.. code:: python
例如:
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 trainer.py
.. code-block:: python
- Collective训练示例:
config = DistributeTranspilerConfig()
config.sync_mode = True
.. code:: python
# build network
# ...
avg_cost = model()
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
# 加入 fleet distributed_optimizer 加入分布式策略配置及多机优化
optimizer = fleet.distributed_optimizer(optimizer, config)
optimizer.minimize(avg_cost)
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
- 启动方法:
具体训练流程
++++++++++++++++
.. code:: python
.. code-block:: python
python -m paddle.distributed.launch trainer.py
# 启动server
if fleet.is_server():
fleet.init_server()
fleet.run_server()
- UserDefinedRoleMaker
- 描述:用户自定义节点的角色信息,IP和端口信息
- 示例:
.. code:: python
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.UserDefinedRoleMaker(
current_id=int(os.getenv("CURRENT_ID")),
role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER")))
else role_maker.Role.SERVER,
worker_num=int(os.getenv("WORKER_NUM")),
server_endpoints=pserver_endpoints)
fleet.init(role)
# 启动worker
if fleet.is_worker():
# 初始化worker配置
fleet.init_worker()
Strategy
~~~~~~~~
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
train_reader = paddle.batch(fake_reader(), batch_size=24)
- Parameter Server Training
- Sync\_mode
- Collective Training
- LocalSGD
- ReduceGrad
exe.run(fleet.startup_program)
Fleet Mode
~~~~~~~~~~
PASS_NUM = 10
for pass_id in range(PASS_NUM):
for batch_id, data in enumerate(train_reader()):
avg_loss_value, auc_value, auc_batch_value = \
exe.run(fleet.main_program, feed=feeder.feed(data), fetch_list=[avg_cost, auc, auc_batch])
print("Pass %d, cost = %f, auc = %f, batch_auc = %f" % (pass_id, avg_loss_value, auc_value, auc_batch_value))
# 通知server,当前节点训练结束
fleet.stop_worker()
- Parameter Server Training
``python from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet``
- Collective Training
``python from paddle.fluid.incubate.fleet.collective import fleet``
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册