Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
FluidDoc
提交
dc0a7938
F
FluidDoc
项目概览
PaddlePaddle
/
FluidDoc
通知
5
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
23
列表
看板
标记
里程碑
合并请求
111
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
FluidDoc
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
23
Issue
23
列表
看板
标记
里程碑
合并请求
111
合并请求
111
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
dc0a7938
编写于
7月 30, 2019
作者:
D
Dong Daxiang
提交者:
GitHub
7月 30, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Update fleet_api_howto_cn.rst
test=document_preview
上级
a41e77aa
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
218 addition
and
139 deletion
+218
-139
doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst
doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst
+218
-139
未找到文件。
doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst
浏览文件 @
dc0a7938
使用FleetAPI进行分布式训练
====================
# 使用FleetAPI进行分布式训练
FleetAPI 设计说明
---------------
Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet API接口灵活定义。具体的设计原理可以参考https://github.com/PaddlePaddle/Fleet/blob/develop/README.md
当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。
快速上手示例
------------------------------
用户可以使用Fleet API轻易实现GPU多卡训练(单机多卡/多机多卡)。多卡训练在现代AI模型中非常常见,例如Resnet50、Bert等都是非常常见的需要多机多卡训练的模型。下面的代码示例,以一个简单的例子入手展示如何使用Fleet API进行单机多卡训练。代码示例可以参考:https://github.com/PaddlePaddle/Fleet
## FleetAPI 设计说明
神经网络模型的定义如下:
Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet API接口灵活定义。具体的设计原理可以参考[Fleet API设计文档](https://github.com/PaddlePaddle/Fleet/blob/develop/README.md)。当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。
.. code-block:: python
def mlp(input_x, input_y, hid_dim=128, label_dim=2):
## Fleet API快速上手示例
下面会针对Fleet API最常见的两种使用场景,用一个模型做示例,目的是让用户有快速上手体验的模板。快速上手的示例源代码可以在[Fleet Quick Start](https://github.com/PaddlePaddle/Fleet/tree/develop/examples/quick-start)找到。
假设我们定义MLP网络如下:
```python
import paddle.fluid as fluid
def mlp(input_x, input_y, hid_dim=128, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
```
定义一个在内存生成数据的Reader如下:
```python
import numpy as np
def gen_data():
return {"x": np.random.random(size=(128, 32)).astype('float32'),
"y": np.random.randint(2, size=(128, 1)).astype('int64')}
```
#### 单机Trainer定义
```python
import paddle.fluid as fluid
from nets import mlp
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer.minimize(cost)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name])
print("step%d cost=%f" % (i, cost_val[0]))
```
一个简单的训练程序如下:
#### Parameter Server训练方法
.. code-block:: python
参数服务器方法对于大规模数据,简单模型的并行训练非常适用,我们基于单机模型的定义给出其实用Parameter Server进行训练的示例如下:
import paddle.fluid as fluid
from nets import mlp
from utils import gen_data
```python
import paddle.fluid as fluid
from nets import mlp
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer.minimize(cost)
place = fluid.CUDAPlace(0)
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
if fleet.is_server():
fleet.init_server()
fleet.run_server()
elif fleet.is_worker():
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
exe.run(feed=gen_data())
cost_val = exe.run(
program=fluid.default_main_program(),
feed=gen_data(),
fetch_list=[cost.name])
print("worker_index: %d, step%d cost = %f" %
(fleet.worker_index(), i, cost_val[0]))
```
如果用户想使用高性能芯片,例如GPU多卡进行训练,使用Fleet API可以在增加少量代码的情况下实现。
.. code-block:: python
#### Collective训练方法
import paddle.fluid as fluid
from utils import gen_data
from nets import mlp
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
collective training通常在GPU多机多卡训练中使用,一般在复杂模型的训练中比较常见,我们基于上面的单机模型定义给出使用Collective方法进行分布式训练的示例如下:
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
```python
import paddle.fluid as fluid
from nets import mlp
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
from utils import gen_data
cost = mlp(input_x, input_y
)
optimizer = fluid.optimizer.SGD(learning_rate=0.01
)
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32'
)
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64'
)
role = role_maker.PaddleCloudRoleMaker(
)
fleet.init(role
)
optimizer = fleet.distributed_optimizer(optimizer
)
optimizer.minimize(cost
)
cost = mlp(input_x, input_y
)
optimizer = fluid.optimizer.SGD(learning_rate=0.01
)
role = role_maker.PaddleCloudRoleMaker(is_collective=True
)
fleet.init(role
)
place = fluid.CUDAPlace(0)
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
exe.run(feed=gen_data())
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
step = 1001
for i in range(step):
cost_val = exe.run(
program=fluid.default_main_program(),
feed=gen_data(),
fetch_list=[cost.name])
print("worker_index: %d, step%d cost = %f" %
(fleet.worker_index(), i, cost_val[0]))
```
## 更多使用示例
在单机运行多卡程序的执行命令如下:
[点击率预估]()
.. code-block:: python
[语义匹配]()
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
python -m paddle.distributed.launch collective_trainer.py
[向量学习]()
[基于Resnet50的图像分类]()
[基于Transformer的机器翻译]()
FleetAPI 接口说明
------------------------------
.. csv-table::
:header: "接口", "说明"
[基于Bert的语义表示学习]()
"init", "fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置"
"distributed_optimizer", "fleet多机训练策略优化,接收一个标准Optimizer及相应的多机运行策略,fleet会根据优化策略进行优化"
"init_server", "fleet加载model_dir中保存的模型相关参数进行parameter server的初始化"
"run_server", "fleet启动parameter server服务"
"init_worker", "fleet初始化当前worker运行环境"
"is_worker", "判断当前节点是否是Worker节点,是则返回True,否则返回False"
"is_server", "判断当前节点是否是Server节点,是则返回True,否则返回False"
"save_inference_model", "fleet保存预测相关的模型及参数,参数及用法参考 code:`fluid.io.save_inference_model`"
"save_persistables", "fleet保存多机模型参数,参数及用法参考 code:`fluid.io.save_persistables`"
FleetAPI 一般训练步骤
------------------------------
## Fleet API相关的接口说明
通过import引入需要使用的模式
++++++++++++++++++
### Fleet API接口
使用parameter server方式的训练:
- init(role_maker=None)
- fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置
- is_worker()
- Parameter Server训练中使用,判断当前节点是否是Worker节点,是则返回True,否则返回False
- is_server(model_dir=None)
- Parameter Server训练中使用,判断当前节点是否是Server节点,是则返回True,否则返回False
- init_server()
- Parameter Server训练中,fleet加载model_dir中保存的模型相关参数进行parameter server的初始化
- run_server()
- Parameter Server训练中使用,用来启动server端服务
- init_worker()
- Parameter Server训练中使用,用来启动worker端服务
- stop_worker()
- 训练结束后,停止worker
- distributed_optimizer(optimizer, strategy=None)
- 分布式优化算法装饰器,用户可带入单机optimizer,并配置分布式训练策略,返回一个分布式的optimizer
.. code-block:: python
### RoleMaker
- MPISymetricRoleMaker
- 描述:MPISymetricRoleMaker会假设每个节点启动两个进程,1worker+1pserver,这种RoleMaker要求用户的集群上有mpi环境。
- 示例:
```python
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.MPISymetricRoleMaker()
fleet.init(role)
```
- 启动方法:
初始化
++++++++++++++++++
Fleet使用 code:`fleet.init(role_maker=None)` 进行初始化
```shell
mpirun -np 2 python trainer.py
```
当用户不指定role_maker, 则Fleet默认用户使用MPI环境,会采用MPISymetricRoleMaker.
- PaddleCloudRoleMaker
如果用户使用非MPI环境,则需要通过UserDefinedRoleMaker自行定义执行环境。
例如:
- 描述:PaddleCloudRoleMaker是一个高级封装,支持使用paddle.distributed.launch或者paddle.distributed.launch_ps启动脚本
.. code-block:: python
- Parameter Server训练示例:
```python
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
```
role = UserDefinedRoleMaker(current_id=0,
role=Role.WORKER,
worker_num=3,
server_endpoints=["127.0.0.1:6001","127.0.0.1:6002"])
fleet.init(role_maker=role)
- 启动方法:
```python
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 trainer.py
```
分布式策略及多机配置
++++++++++++++++
- Collective训练示例:
对于Transpiler模式,需要使用 DistributeTranspilerConfig 指定多机配置。
Fleet需要在用户定义的optimizer之上装饰 code:`fleet.distributed_optimizer` 来完成多机分布式策略的配置。
```python
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
.. csv-table::
:header: "接口", "说明"
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
```
"sync_mode", "Fleet可以支持同步训练或异步训练, 默认会生成同步训练的分布式程序,通过指定 :code:`sync_mode=False` 参数即可生成异步训练的程序"
"split_method", "指定参数在parameter server上的分布方式, 默认使用 `RoundRobin`, 也可选`HashName`"
"slice_var_up", "指定是否将较大(大于8192个元素)的参数切分到多个parameter server以均衡计算负载,默认为开启"
- 启动方法:
```python
python -m paddle.distributed.launch trainer.py
```
例如:
- UserDefinedRoleMaker
.. code-block:: python
- 描述:用户自定义节点的角色信息,IP和端口信息
config = DistributeTranspilerConfig()
config.sync_mode = True
- 示例:
# build network
# ...
avg_cost = model()
```python
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
# 加入 fleet distributed_optimizer 加入分布式策略配置及多机优化
optimizer = fleet.distributed_optimizer(optimizer, config)
optimizer.minimize(avg_cost)
role = role_maker.UserDefinedRoleMaker(
current_id=int(os.getenv("CURRENT_ID")),
role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER")))
else role_maker.Role.SERVER,
worker_num=int(os.getenv("WORKER_NUM")),
server_endpoints=pserver_endpoints)
fleet.init(role)
```
### Strategy
具体训练流程
++++++++++++++++
- Parameter Server Training
- Sync_mode
- Collective Training
- LocalSGD
- ReduceGrad
.. code-block:: python
### Fleet Mode
# 启动server
if fleet.is_server():
fleet.init_server()
fleet.run_server()
- Parameter Server Training
# 启动worker
if fleet.is_worker():
# 初始化worker配置
fleet.init_worker()
```python
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
```
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
train_reader = paddle.batch(fake_reader(), batch_size=24)
exe.run(fleet.startup_program)
PASS_NUM = 10
for pass_id in range(PASS_NUM):
for batch_id, data in enumerate(train_reader()):
avg_loss_value, auc_value, auc_batch_value = \
exe.run(fleet.main_program, feed=feeder.feed(data), fetch_list=[avg_cost, auc, auc_batch])
print("Pass %d, cost = %f, auc = %f, batch_auc = %f" % (pass_id, avg_loss_value, auc_value, auc_batch_value))
# 通知server,当前节点训练结束
fleet.stop_worker()
- Collective Training
```python
from paddle.fluid.incubate.fleet.collective import fleet
```
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录