未验证 提交 24fde264 编写于 作者: C Chen Weihang 提交者: GitHub

Add paddle.distributed dir and docs (#2482)

* add distributed dir and doc

* remove start_processes & move DataParallel

* add get rank and world size

* add cn docs

* add index to distributed_cn

* add white list api

* add api white list

* add cn to api name

* polish details

* add rst suffix

* add py function
上级 b9be73f0
==================
paddle.distributed
==================
.. toctree::
:maxdepth: 1
distributed/get_rank.rst
distributed/get_world_size.rst
distributed/init_parallel_env.rst
distributed/ParallelEnv.rst
distributed/prepare_context.rst
distributed/spawn.rst
.. _api_distributed_ParallelEnv:
ParallelEnv
-------------------------------
:doc_source: paddle.fluid.dygraph.parallel.ParallelEnv
\ No newline at end of file
.. THIS FILE IS GENERATED BY `gen_doc.{py|sh}`
!DO NOT EDIT THIS FILE MANUALLY!
.. _api_distributed_get_rank:
get_rank
--------
.. autofunction:: paddle.distributed.get_rank
:noindex:
\ No newline at end of file
.. THIS FILE IS GENERATED BY `gen_doc.{py|sh}`
!DO NOT EDIT THIS FILE MANUALLY!
.. _api_distributed_get_world_size:
get_world_size
--------------
.. autofunction:: paddle.distributed.get_world_size
:noindex:
\ No newline at end of file
.. THIS FILE IS GENERATED BY `gen_doc.{py|sh}`
!DO NOT EDIT THIS FILE MANUALLY!
.. _api_distributed_init_parallel_env:
init_parallel_env
-----------------
.. autofunction:: paddle.distributed.init_parallel_env
:noindex:
\ No newline at end of file
.. _api_distributed_prepare_context:
prepare_context
-------------------------------
:doc_source: paddle.fluid.dygraph.parallel.prepare_context
.. THIS FILE IS GENERATED BY `gen_doc.{py|sh}`
!DO NOT EDIT THIS FILE MANUALLY!
.. _api_distributed_spawn:
spawn
-----
.. autofunction:: paddle.distributed.spawn
:noindex:
\ No newline at end of file
...@@ -8,6 +8,7 @@ API Reference ...@@ -8,6 +8,7 @@ API Reference
../api_guides/index_en.rst ../api_guides/index_en.rst
dataset.rst dataset.rst
declarative.rst declarative.rst
distributed.rst
framework.rst framework.rst
imperative.rst imperative.rst
io.rst io.rst
......
...@@ -37,6 +37,7 @@ paddle ...@@ -37,6 +37,7 @@ paddle
paddle/CUDAPinnedPlace.rst paddle/CUDAPinnedPlace.rst
paddle/CUDAPlace.rst paddle/CUDAPlace.rst
paddle/cumsum.rst paddle/cumsum.rst
paddle/DataParallel.rst
paddle/default_main_program.rst paddle/default_main_program.rst
paddle/default_startup_program.rst paddle/default_startup_program.rst
paddle/diag.rst paddle/diag.rst
......
.. _api_paddle_DataParallel:
DataParallel
-------------------------------
:doc_source: paddle.fluid.dygraph.parallel.DataParallel
...@@ -12,5 +12,11 @@ paddle.distributed ...@@ -12,5 +12,11 @@ paddle.distributed
distributed_cn/all_reduce_cn.rst distributed_cn/all_reduce_cn.rst
distributed_cn/barrier_cn.rst distributed_cn/barrier_cn.rst
distributed_cn/broadcast_cn.rst distributed_cn/broadcast_cn.rst
distributed_cn/get_rank_cn.rst
distributed_cn/get_world_size_cn.rst
distributed_cn/init_parallel_env_cn.rst
distributed_cn/ParallelEnv_cn.rst
distributed_cn/prepare_context_cn.rst
distributed_cn/reduce_cn.rst distributed_cn/reduce_cn.rst
distributed_cn/scatter_cn.rst distributed_cn/scatter_cn.rst
distributed_cn/spawn_cn.rst
.. _cn_api_distributed_ParallelEnv:
ParallelEnv
-------------------------------
:doc_source: paddle.fluid.dygraph.parallel.ParallelEnv
\ No newline at end of file
.. _cn_api_distributed_get_rank:
get_rank
----------
.. py:function:: paddle.distributed.get_rank()
返回当前进程的rank。
当前进程rank的值等于环境变量 ``PADDLE_TRAINER_ID`` 的值,默认值为0。
返回
:::::::::
(int) 当前进程的rank。
代码示例
:::::::::
.. code-block:: python
import paddle
import paddle.distributed as dist
# execute this command in terminal: export PADDLE_TRAINER_ID=0
print("The rank is %d" % dist.get_rank())
# The rank is 0
.. _cn_api_distributed_get_world_size:
get_world_size
----------------
.. py:function:: paddle.distributed.get_world_size()
返回参与当前任务的进程数。
当前进程数等于环境变量 ``PADDLE_TRAINERS_NUM`` 的值,默认值为1。
返回
:::::::::
(int) 参与任务的进程数。
代码示例
:::::::::
.. code-block:: python
import paddle
import paddle.distributed as dist
# execute this command in terminal: export PADDLE_TRAINERS_NUM=4
print("The world_size is %d" % dist.get_world_size())
# The world_size is 4
.. _cn_api_distributed_init_parallel_env:
init_parallel_env
-----------------
.. py:function:: paddle.distributed.init_parallel_env()
初始化动态图模式下的并行训练环境。
.. note::
目前仅支持初始化GPU训练环境,使用NCCL进行通信。
返回
:::::::::
代码示例
:::::::::
.. code-block:: python
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train():
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize parallel environment
dist.init_parallel_env()
# 3. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
loss_fn = nn.MSELoss()
adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())
# 4. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
dist.spawn(train)
.. _cn_api_distributed_prepare_context:
prepare_context
-------------------------------
:doc_source: paddle.fluid.dygraph.parallel.prepare_context
.. _cn_api_distributed_spawn:
spawn
-----
.. py:function:: paddle.distributed.spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options)
使用 ``spawn`` 方法启动多进程任务。
参数
:::::::::
- func (function) - 由 ``spawn`` 方法启动的进程所调用的目标函数。该目标函数需要能够被 ``pickled`` (序列化),所以目标函数必须定义为模块的一级函数,不能是内部子函数或者类方法。
- args (tuple, 可选) - 传入目标函数 ``func`` 的参数。
- nprocs (int, 可选) - 启动进程的数目。默认值为-1。当 ``nproc`` 为-1时,模型执行时将会从环境变量中获取当前可用的所有设备进行使用:如果使用GPU执行任务,将会从环境变量 ``CUDA_VISIBLE_DEVICES`` 中获取当前所有可用的设备ID;如果使用CPU执行任务,将会从环境变量 ``CPU_NUM`` 中获取当前可用的CPU设备数,例如,可以通过指令 ``export CPU_NUM=4`` 配置默认可用CPU设备数,如果此环境变量没有设置,将会默认设置该环境变量的值为1。
- join (bool, 可选) - 对所有启动的进程执行阻塞的 ``join`` ,等待进程执行结束。默认为True。
- daemon (bool, 可选) - 配置启动进程的 ``daemon`` 属性。默认为False。
- **options (dict, 可选) - 其他初始化并行执行环境的配置选项。目前支持以下选项: (1) start_method (string) - 启动子进程的方法。进程的启动方法可以是 ``spawn`` , ``fork`` , ``forkserver`` 。 因为CUDA运行时环境不支持 ``fork`` 方法,当在子进程中使用CUDA时,需要使用 ``spawn`` 或者 ``forkserver`` 方法启动进程。默认方法为 ``spawn`` ; (2) cluster_node_ips (string) - 运行集群的节点(机器)IP,例如 "192.168.0.16,192.168.0.17" ,默认值为 "127.0.0.1" ; (3) node_ip (string) - 当前节点(机器)的IP。例如 "192.168.0.16" , 默认值为 "127.0.0.1" ; (4) started_port (int) - 一个训练节点(机器)上各训练进程的起始端口。例如 6170. 默认值为None ; (5) selected_gpus (string) - 指定训练使用的GPU ID, 例如 "0,1,2,3" , 默认值为None ; (6) print_config (bool) - 打印当前并行训练的配置, 默认值为False ; (7) use_paddlecloud (bool) - 配置是否使用PaddleCloud启动多进程任务,默认值为False。
返回
:::::::::
``MultiprocessContext`` 对象,持有创建的多个进程。
代码示例
:::::::::
.. code-block:: python
from __future__ import print_function
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train(print_result=False):
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize parallel environment
dist.init_parallel_env()
# 3. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
loss_fn = nn.MSELoss()
adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())
# 4. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
if print_result is True:
print("loss:", loss.numpy())
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
# Usage 1: only pass function.
# If your training method no need any argument, and
# use all visible devices for parallel training.
if __name__ == '__main__':
dist.spawn(train)
# Usage 2: pass function and arguments.
# If your training method need some arguments, and
# use all visible devices for parallel training.
if __name__ == '__main__':
dist.spawn(train, args=(True,))
# Usage 3: pass function, arguments and nprocs.
# If your training method need some arguments, and
# only use part of visible devices for parallel training.
# If your machine hold 8 cards {0,1,2,3,4,5,6,7},
# this case will use cards {0,1}; If you set
# CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use
# cards {4,5}
if __name__ == '__main__':
dist.spawn(train, args=(True,), nprocs=2)
# Usage 4: pass function, arguments, nprocs and selected_gpus.
# If your training method need some arguments, and
# only use part of visible devices for parallel training,
# but you can't set your machine's environment varibale
# CUDA_VISIBLE_DEVICES, such as it is None or all cards
# {0,1,2,3,4,5,6,7}, you can pass `selelcted_gpus` to
# select the GPU cards you want to use. For example,
# this case will use cards {4,5} if your machine hold 8 cards.
if __name__ == '__main__':
dist.spawn(train, args=(True,), nprocs=2, selelcted_gpus='4,5')
\ No newline at end of file
...@@ -9,15 +9,23 @@ DataParallel ...@@ -9,15 +9,23 @@ DataParallel
通过数据并行模式执行动态图模型。 通过数据并行模式执行动态图模型。
目前,``DataParallel`` 仅支持以多进程的方式执行动态图模型。使用方式如下: 目前,``DataParallel`` 仅支持以多进程的方式执行动态图模型。
``python -m paddle.distributed.launch –selected_gpus=0,1 dynamic_graph_test.py`` 支持两种使用方式:
其中 ``dynamic_graph_test.py`` 脚本的代码可以是下面的示例代码。 1. 使用 ``paddle.distributed.spawn`` 方法启动,例如:
``python demo.py`` (spawn need to be called in ``__main__`` method)
2. 使用 ``paddle.distributed.launch`` 方法启动,例如:
``python -m paddle.distributed.launch –selected_gpus=0,1 demo.py``
其中 ``demo.py`` 脚本的代码可以是下面的示例代码。
参数: 参数:
- **Layer** (Layer) - 需要通过数据并行方式执行的模型。 - **Layer** (Layer) - 需要通过数据并行方式执行的模型。
- **strategy** (ParallelStrategy) - 数据并行的策略,包括并行执行的环境配置 - **strategy** (ParallelStrategy,可选) - (deprecated) 数据并行的策略,包括并行执行的环境配置。默认为None
返回:支持数据并行的 ``Layer`` 返回:支持数据并行的 ``Layer``
...@@ -27,38 +35,53 @@ DataParallel ...@@ -27,38 +35,53 @@ DataParallel
.. code-block:: python .. code-block:: python
import numpy as np import paddle
import paddle.fluid as fluid import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
place = fluid.CUDAPlace(fluid.dygraph.ParallelEnv().dev_id) class LinearNet(nn.Layer):
with fluid.dygraph.guard(place): def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
# prepare the data parallel context def forward(self, x):
strategy = fluid.dygraph.prepare_context() return self._linear2(self._linear1(x))
linear = fluid.dygraph.Linear(1, 10, act="softmax") def train():
adam = fluid.optimizer.AdamOptimizer( # 1. enable dynamic mode
learning_rate=0.001, parameter_list=linear.parameters()) paddle.disable_static()
# make the module become the data parallelism module # 2. initialize parallel environment
linear = fluid.dygraph.DataParallel(linear, strategy) dist.init_parallel_env()
x_data = np.random.random(size=[10, 1]).astype(np.float32) # 3. create data parallel layer & optimizer
data = fluid.dygraph.to_variable(x_data) layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
hidden = linear(data) loss_fn = nn.MSELoss()
avg_loss = fluid.layers.mean(hidden) adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())
# scale the loss according to the number of trainers. # 4. run layer
avg_loss = linear.scale_loss(avg_loss) inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
avg_loss.backward() loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
# collect the gradients of trainers. adam.step()
linear.apply_collective_grads() adam.clear_grad()
adam.minimize(avg_loss) if __name__ == '__main__':
linear.clear_gradients() # 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
.. py:method:: scale_loss(loss) .. py:method:: scale_loss(loss)
...@@ -77,38 +100,53 @@ DataParallel ...@@ -77,38 +100,53 @@ DataParallel
.. code-block:: python .. code-block:: python
import numpy as np import paddle
import paddle.fluid as fluid import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
place = fluid.CUDAPlace(fluid.dygraph.ParallelEnv().dev_id) class LinearNet(nn.Layer):
with fluid.dygraph.guard(place): def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
# prepare the data parallel context def forward(self, x):
strategy = fluid.dygraph.prepare_context() return self._linear2(self._linear1(x))
linear = fluid.dygraph.Linear(1, 10, act="softmax") def train():
adam = fluid.optimizer.AdamOptimizer( # 1. enable dynamic mode
learning_rate=0.001, parameter_list=linear.parameters()) paddle.disable_static()
# make the module become the data parallelism module # 2. initialize parallel environment
linear = fluid.dygraph.DataParallel(linear, strategy) dist.init_parallel_env()
x_data = np.random.random(size=[10, 1]).astype(np.float32) # 3. create data parallel layer & optimizer
data = fluid.dygraph.to_variable(x_data) layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
hidden = linear(data) loss_fn = nn.MSELoss()
avg_loss = fluid.layers.mean(hidden) adam = opt.Adam(
learning_rate=0.001, parameters=dp_layer.parameters())
# scale the loss according to the number of trainers. # 4. run layer
avg_loss = linear.scale_loss(avg_loss) inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
avg_loss.backward() loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
# collect the gradients of trainers. adam.step()
linear.apply_collective_grads() adam.clear_grad()
adam.minimize(avg_loss) if __name__ == '__main__':
linear.clear_gradients() # 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
.. py:method:: apply_collective_grads() .. py:method:: apply_collective_grads()
...@@ -121,35 +159,50 @@ AllReduce(规约)参数的梯度值。 ...@@ -121,35 +159,50 @@ AllReduce(规约)参数的梯度值。
.. code-block:: python .. code-block:: python
import numpy as np import paddle
import paddle.fluid as fluid import paddle.nn as nn
import paddle.optimizer as opt
place = fluid.CUDAPlace(fluid.dygraph.ParallelEnv().dev_id) import paddle.distributed as dist
with fluid.dygraph.guard(place):
class LinearNet(nn.Layer):
# prepare the data parallel context def __init__(self):
strategy = fluid.dygraph.prepare_context() super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
linear = fluid.dygraph.Linear(1, 10, act="softmax") self._linear2 = nn.Linear(10, 1)
adam = fluid.optimizer.AdamOptimizer(
learning_rate=0.001, parameter_list=linear.parameters()) def forward(self, x):
return self._linear2(self._linear1(x))
# make the module become the data parallelism module
linear = fluid.dygraph.DataParallel(linear, strategy) def train():
# 1. enable dynamic mode
x_data = np.random.random(size=[10, 1]).astype(np.float32) paddle.disable_static()
data = fluid.dygraph.to_variable(x_data)
# 2. initialize parallel environment
hidden = linear(data) dist.init_parallel_env()
avg_loss = fluid.layers.mean(hidden)
# 3. create data parallel layer & optimizer
# scale the loss according to the number of trainers. layer = LinearNet()
avg_loss = linear.scale_loss(avg_loss) dp_layer = paddle.DataParallel(layer)
avg_loss.backward() loss_fn = nn.MSELoss()
adam = opt.Adam(
# collect the gradients of trainers. learning_rate=0.001, parameters=dp_layer.parameters())
linear.apply_collective_grads()
# 4. run layer
adam.minimize(avg_loss) inputs = paddle.randn([10, 10], 'float32')
linear.clear_gradients() outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
...@@ -14,3 +14,5 @@ distributed_cn/barrier_cn.rst ...@@ -14,3 +14,5 @@ distributed_cn/barrier_cn.rst
distributed_cn/broadcast_cn.rst distributed_cn/broadcast_cn.rst
distributed_cn/reduce_cn.rst distributed_cn/reduce_cn.rst
distributed_cn/scatter_cn.rst distributed_cn/scatter_cn.rst
distributed_cn/init_parallel_env_cn.rst
distributed_cn/spawn_cn.rst
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册