未验证 提交 619e0a45 编写于 作者: 1 123malin 提交者: GitHub

【paddle.distributed】【Fleet.UtilBase】中文文档 (#2659)

* test=document, bug fix

* test=develop, add utilbase doc

* test=develop, bug fix

* test=develop, ci

* test=develop, ci

* test=develop, rm fs
上级 df223e87
......@@ -8,4 +8,12 @@ paddle/reader/ComposeNotAligned_cn.rst
paddle/fluid/layers/scatter_cn.rst
paddle/tensor/manipulation/scatter_cn.rst
paddle/distributed/fleet/Fleet_cn.rst
paddle/distributed/fleet/utils/fs/ExecuteError_cn.rst
paddle/distributed/fleet/utils/fs/FSFileExistsError_cn.rst
paddle/distributed/fleet/utils/fs/FSFileNotExistsError_cn.rst
paddle/distributed/fleet/utils/fs/FSShellCmdAborted_cn.rst
paddle/distributed/fleet/utils/fs/FSTimeOut_cn.rst
paddle/distributed/fleet/utils/fs/FS_cn.rst
paddle/distributed/fleet/utils/fs/HDFSClient_cn.rst
paddle/distributed/fleet/utils/fs/LocalFS_cn.rst
upgrade_guide_cn.md
......@@ -4,7 +4,189 @@ UtilBase
-------------------------------
.. py:class:: paddle.distributed.fleet.UtilBase
分布式训练工具类,主要提供集合通信、文件系统操作等接口。
.. py:method:: all_reduce(input, mode="sum", comm_world="worker")
在指定的通信集合间进行归约操作,并将归约结果返回给集合中每个实例。
参数:
- **input** (list|numpy.array) – 归约操作的输入。
- **mode** (str) - 归约操作的模式,包含求和,取最大值和取最小值,默认为求和归约。
- **comm_world** (str) - 归约操作的通信集合,包含: server集合(“server"),worker集合("worker")及所有节点集合("all"),默认为worker集合。
返回:
- Numpy.array|None: 一个和 `input` 形状一致的numpy数组或None.
**代码示例**:
.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
import numpy as np
def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)
if fleet.is_server():
input = [1, 2]
output = fleet_util.all_reduce(input, "sum", "server")
print(output)
# [2, 4]
elif fleet.is_worker():
input = np.array([3, 4])
output = fleet_util.all_reduce(input, "sum", "worker")
print(output)
# [6, 8]
output = fleet_util.all_reduce(input, "sum", "all")
print(output)
# [8, 12]
if __name__ == "__main__":
train()
.. py:method:: barrier(comm_world="worker")
在指定的通信集合间进行阻塞操作,以实现集合间进度同步。
参数:
- **comm_world** (str) - 阻塞操作的通信集合,包含: server集合(“server"),worker集合("worker")及所有节点集合("all"),默认为worker集合。
**代码示例**:
.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)
if fleet.is_server():
fleet_util.barrier("server")
print("all server arrive here")
elif fleet.is_worker():
fleet_util.barrier("worker")
print("all server arrive here")
fleet_util.barrier("all")
print("all servers and workers arrive here")
if __name__ == "__main__":
train()
.. py:method:: all_gather(input, comm_world="worker")
在指定的通信集合间进行聚合操作,并将聚合的结果返回给集合中每个实例。
参数:
- **input** (int|float) - 聚合操作的输入。
- **comm_world** (str) - 聚合操作的通信集合,包含: server集合(“server"),worker集合("worker")及所有节点集合("all"),默认为worker集合。
返回:
- **output** (List): List格式的聚合结果。
**代码示例**:
.. code-block:: python
# Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet as fleet
from paddle.distributed.fleet import PaddleCloudRoleMaker
import sys
def train():
role = PaddleCloudRoleMaker(
is_collective=False,
init_gloo=True,
path="./tmp_gloo")
fleet.init(role)
fleet_util._set_role_maker(role)
if fleet.is_server():
input = fleet.server_index()
output = fleet_util.all_gather(input, "server")
print(output)
# output = [0, 1]
elif fleet.is_worker():
input = fleet.worker_index()
output = fleet_util.all_gather(input, "worker")
# output = [0, 1]
print(output)
output = fleet_util.all_gather(input, "all")
print(output)
# output = [0, 1, 0, 1]
if __name__ == "__main__":
train()
.. py:method:: get_file_shard(files)
在数据并行的分布式训练中,获取属于当前训练节点的文件列表。
.. code-block:: text
示例 1: 原始所有文件列表 `files` = [a, b, c ,d, e],训练节点个数 `trainer_num` = 2,那么属于零号节点的训练文件为[a, b, c],属于1号节点的训练文件为[d, e]。
示例 2: 原始所有文件列表 `files` = [a, b],训练节点个数 `trainer_num` = 3,那么属于零号节点的训练文件为[a],属于1号节点的训练文件为[b],属于2号节点的训练文件为[]。
参数:
- **files** (List):原始所有文件列表。
返回:
- List: 属于当前训练节点的文件列表。
**代码示例**:
.. code-block:: python
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet_util._set_role_maker(role)
files = fleet_util.get_file_shard(["file1", "file2", "file3"])
# files = ["file1", "file2"]
.. py:method:: print_on_rank(message, rank_id)
在编号为 `rank_id` 的节点上打印指定信息。
参数:
- **message** (str) – 打印内容。
- **rank_id** (int) - 节点编号。
**代码示例**:
.. code-block:: python
from paddle.distributed.fleet.base.util_factory import fleet_util
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
fleet_util._set_role_maker(role)
fleet_util.print_on_rank("I'm worker 0", 0)
\ No newline at end of file
.. _cn_api_distributed_fleet_utils_fs_ExecuteError:
ExecuteError
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.ExecuteError
.. _cn_api_distributed_fleet_utils_fs_FSFileExistsError:
FSFileExistsError
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.FSFileExistsError
.. _cn_api_distributed_fleet_utils_fs_FSFileNotExistsError:
FSFileNotExistsError
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.FSFileNotExistsError
.. _cn_api_distributed_fleet_utils_fs_FSShellCmdAborted:
FSShellCmdAborted
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.FSShellCmdAborted
.. _cn_api_distributed_fleet_utils_fs_FSTimeOut:
FSTimeOut
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.FSTimeOut
.. _cn_api_distributed_fleet_utils_fs_FS:
FS
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.FS
......@@ -3,8 +3,279 @@
HDFSClient
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.HDFSClient
.. py:class:: paddle.distributed.fleet.utils.HDFSClient
一个HADOOP文件系统工具类。
参数:
- **hadoop_home** (str):HADOOP HOME地址。
- **configs** (dict): HADOOP文件系统配置。需包含 `fs.default.name` 和 `hadoop.job.ugi` 这两个字段。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls_dir("hdfs:/test_hdfs_client")
.. py:method:: ls_dir(fs_path)
列出 `fs_path` 路径下所有的文件和子目录。
参数:
- **fs_path** (str): HADOOP文件路径。
返回:
- Tuple, 一个包含所有子目录和文件名的2-Tuple,格式形如: ([subdirname1, subdirname1, ...], [filename1, filename2, ...])。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")
.. py:method:: mkdirs(fs_path)
创建一个目录。
参数:
- **fs_path** (str): HADOOP文件路径。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.mkdirs("hdfs:/test_hdfs_client")
.. py:method:: delete(fs_path)
删除HADOOP文件(或目录)。
参数:
- **fs_path** (str): HADOOP文件路径。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.delete("hdfs:/test_hdfs_client")
.. py:method:: is_file(fs_path)
判断当前路径是否是一个文件。
参数:
- **fs_path** (str): HADOOP文件路径。
返回:
- Bool:若当前路径存在且是一个文件,返回 `True` ,反之则返回 `False` 。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
ret = client.is_file("hdfs:/test_hdfs_client")
.. py:method:: is_dir(fs_path)
判断当前路径是否是一个目录。
参数:
- **fs_path** (str): HADOOP文件路径。
返回:
- Bool:若当前路径存在且是一个目录,返回 `True` ,反之则返回 `False` 。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
ret = client.is_file("hdfs:/test_hdfs_client")
.. py:method:: is_exist(fs_path)
判断当前路径是否存在。
参数:
- **fs_path** (str): HADOOP文件路径。
返回:
- Bool:若当前路径存在返回 `True` ,反之则返回 `False` 。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
ret = client.is_exist("hdfs:/test_hdfs_client")
.. py:method:: upload(local_path, fs_path)
上传本地文件至HADOOP文件系统。
参数:
- **local_path** (str): 本地文件路径。
- **fs_path** (str): HADOOP文件路径。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")
.. py:method:: download(fs_path, local_path)
下载HADOOP文件至本地文件系统。
参数:
- **local_path** (str): 本地文件路径。
- **fs_path** (str): HADOOP文件路径。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.download("hdfs:/test_hdfs_client", "./")
.. py:method:: touch(fs_path, exist_ok=True)
创建一个HADOOP文件。
参数:
- **fs_path** (str): HADOOP文件路径。
- **exist_ok** (bool): 路径已存在时程序是否报错。若 `exist_ok = True`,则直接返回,反之则抛出文件存在的异常,默认不抛出异常。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.touch("hdfs:/test_hdfs_client")
.. py:method:: mv(fs_src_path, fs_dst_path, overwrite=False)
HADOOP系统文件移动。
参数:
- **fs_src_path** (str): 移动前源文件路径名。
- **fs_dst_path** (str): 移动后目标文件路径名。
- **overwrite** (bool): 若目标文件已存在,是否删除进行重写,默认不重写并抛出异常。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")
.. py:method:: list_dirs(fs_path)
列出HADOOP文件路径下所有的子目录。
参数:
- **fs_path** (str): HADOOP文件路径。
返回:
- List: 该路径下所有的子目录名。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import HDFSClient
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
subdirs = client.list_dirs("hdfs:/test_hdfs_client")
......@@ -3,8 +3,196 @@
LocalFS
-------------------------------
.. py:class:: paddle.distributed.fleet.utils.fs.LocalFS
.. py:class:: paddle.distributed.fleet.utils.LocalFS
一个本地文件系统工具类。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
subdirs, files = client.ls_dir("./")
.. py:method:: ls_dir(fs_path)
列出 `fs_path` 路径下所有的文件和子目录。
参数:
- **fs_path** (str): 本地文件路径。
返回:
- Tuple, 一个包含所有子目录和文件名的2-Tuple,格式形如: ([subdirname1, subdirname1, ...], [filename1, filename2, ...])。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
subdirs, files = client.ls_dir("./")
.. py:method:: mkdirs(fs_path)
创建一个本地目录。
参数:
- **fs_path** (str): 本地文件路径。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.mkdirs("test_mkdirs")
client.delete("test_mkdirs")
.. py:method:: rename(fs_src_path, fs_dst_path)
重命名本地文件名。
参数:
- **fs_src_path** (str):重命名前原始文件名。
- **fs_dst_path** (str):新文件名。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.touch("test_rename_src")
print(client.is_exists("test_rename_src")) # True
client.rename("test_rename_src", "test_rename_dst")
print(client.is_exists("test_rename_src")) # False
print(client.is_exists("test_rename_dst")) # True
client.delete("test_rename_dst")
.. py:method:: delete(fs_path)
删除本地文件(或目录)。
参数:
- **fs_path** (str): 本地文件路径。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.mkdirs("test_localFS_mkdirs")
client.delete("test_localFS_mkdirs")
.. py:method:: is_file(fs_path)
判断当前路径是否是一个文件。
参数:
- **fs_path** (str): 本地文件路径。
返回:
- Bool:若当前路径存在且是一个文件,返回 `True` ,反之则返回 `False` 。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.touch("test_is_file")
print(client.is_file("test_is_file")) # True
client.delete("test_is_file")
.. py:method:: is_dir(fs_path)
判断当前路径是否是一个目录。
参数:
- **fs_path** (str): 本地文件路径。
返回:
- Bool:若当前路径存在且是一个目录,返回 `True` ,反之则返回 `False` 。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.mkdirs("test_is_dir")
print(client.is_dir("test_is_file")) # True
client.delete("test_is_dir")
.. py:method:: is_exist(fs_path)
判断当前路径是否存在。
参数:
- **fs_path** (str): 本地文件路径。
返回:
- Bool:若当前路径存在返回 `True` ,反之则返回 `False` 。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
ret = local_fs.is_exist("test_is_exist")
.. py:method:: touch(fs_path, exist_ok=True)
创建一个本地文件。
参数:
- **fs_path** (str): 本地文件路径。
- **exist_ok** (bool): 文件路径已存在时程序是否报错。若 `exist_ok = True`,则直接返回,反之则抛出文件存在的异常,默认不抛出异常。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.touch("test_touch")
client.delete("test_touch")
.. py:method:: mv(src_path, dst_path, overwrite=False)
本地文件移动。
参数:
- **src_path** (str): 移动前源文件路径名。
- **dst_path** (str): 移动后目标文件路径名。
- **overwrite** (bool): 若目标文件已存在,是否删除进行重写,默认不重写并抛出异常。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
client.touch("test_mv_src")
client.mv("test_mv_src", "test_mv_dst")
client.delete("test_mv_dst")
.. py:method:: list_dirs(fs_path)
列出本地路径下所有的子目录。
参数:
- **fs_path** (str): 本地文件路径。
返回:
- List: 该路径下所有的子目录名。
**示例代码**:
.. code-block:: python
from paddle.distributed.fleet.utils import LocalFS
client = LocalFS()
subdirs = client.list_dirs("./")
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册