diff --git a/doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst b/doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst index 89df472364f24f0300b3f575d390e0cea93669f4..a92d9f1ab5462a915bfb9ef263122524d2359fe9 100644 --- a/doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst +++ b/doc/fluid/user_guides/howto/training/fleet_api_howto_cn.rst @@ -1,21 +1,30 @@ +.. _fleet_api_howto_cn: + 使用FleetAPI进行分布式训练 -==================== +========================== FleetAPI 设计说明 ---------------- +----------------- + +Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet +API接口灵活定义。具体的设计原理可以参考\ `Fleet +API设计文档 `__\ 。当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。 -Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet API接口灵活定义。具体的设计原理可以参考https://github.com/PaddlePaddle/Fleet/blob/develop/README.md -当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。 +Fleet API快速上手示例 +--------------------- -快速上手示例 ------------------------------- -用户可以使用Fleet API轻易实现GPU多卡训练(单机多卡/多机多卡)。多卡训练在现代AI模型中非常常见,例如Resnet50、Bert等都是非常常见的需要多机多卡训练的模型。下面的代码示例,以一个简单的例子入手展示如何使用Fleet API进行单机多卡训练。代码示例可以参考:https://github.com/PaddlePaddle/Fleet +下面会针对Fleet +API最常见的两种使用场景,用一个模型做示例,目的是让用户有快速上手体验的模板。快速上手的示例源代码可以在\ `Fleet +Quick +Start `__\ 找到。 -神经网络模型的定义如下: +假设我们定义MLP网络如下: -.. code-block:: python +.. code:: python - def mlp(input_x, input_y, hid_dim=128, label_dim=2): + import paddle.fluid as fluid + + def mlp(input_x, input_y, hid_dim=128, label_dim=2): fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh') fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh') prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax') @@ -23,175 +32,254 @@ Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddleP avg_cost = fluid.layers.mean(x=cost) return avg_cost +定义一个在内存生成数据的Reader如下: + +.. code:: python -一个简单的训练程序如下: + import numpy as np -.. code-block:: python + def gen_data(): + return {"x": np.random.random(size=(128, 32)).astype('float32'), + "y": np.random.randint(2, size=(128, 1)).astype('int64')} - import paddle.fluid as fluid - from nets import mlp - from utils import gen_data +单机Trainer定义 +^^^^^^^^^^^^^^^ - input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') - input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') +.. code:: python - cost = mlp(input_x, input_y) - optimizer = fluid.optimizer.SGD(learning_rate=0.01) - optimizer.minimize(cost) - place = fluid.CUDAPlace(0) + import paddle.fluid as fluid + from nets import mlp + from utils import gen_data - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - step = 1001 - for i in range(step): - exe.run(feed=gen_data()) + input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') + input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') + cost = mlp(input_x, input_y) + optimizer = fluid.optimizer.SGD(learning_rate=0.01) + optimizer.minimize(cost) + place = fluid.CUDAPlace(0) -如果用户想使用高性能芯片,例如GPU多卡进行训练,使用Fleet API可以在增加少量代码的情况下实现。 + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + step = 1001 + for i in range(step): + cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name]) + print("step%d cost=%f" % (i, cost_val[0])) -.. code-block:: python +Parameter Server训练方法 +^^^^^^^^^^^^^^^^^^^^^^^^ - import paddle.fluid as fluid - from utils import gen_data - from nets import mlp - from paddle.fluid.incubate.fleet.collective import fleet - from paddle.fluid.incubate.fleet.base import role_maker +参数服务器方法对于大规模数据,简单模型的并行训练非常适用,我们基于单机模型的定义给出其实用Parameter +Server进行训练的示例如下: - input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') - input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') +.. code:: python - cost = mlp(input_x, input_y) - optimizer = fluid.optimizer.SGD(learning_rate=0.01) + import paddle.fluid as fluid + from nets import mlp + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.incubate.fleet.base import role_maker + from utils import gen_data - role = role_maker.PaddleCloudRoleMaker() - fleet.init(role) - optimizer = fleet.distributed_optimizer(optimizer) - optimizer.minimize(cost) + input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') + input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') - place = fluid.CUDAPlace(0) + cost = mlp(input_x, input_y) + optimizer = fluid.optimizer.SGD(learning_rate=0.01) - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - step = 1001 - for i in range(step): - exe.run(feed=gen_data()) + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + optimizer = fleet.distributed_optimizer(optimizer) + optimizer.minimize(cost) + if fleet.is_server(): + fleet.init_server() + fleet.run_server() + elif fleet.is_worker(): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + step = 1001 + for i in range(step): + cost_val = exe.run( + program=fluid.default_main_program(), + feed=gen_data(), + fetch_list=[cost.name]) + print("worker_index: %d, step%d cost = %f" % + (fleet.worker_index(), i, cost_val[0])) -在单机运行多卡程序的执行命令如下: +Collective训练方法 +^^^^^^^^^^^^^^^^^^ -.. code-block:: python +collective +training通常在GPU多机多卡训练中使用,一般在复杂模型的训练中比较常见,我们基于上面的单机模型定义给出使用Collective方法进行分布式训练的示例如下: - export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 - python -m paddle.distributed.launch collective_trainer.py +.. code:: python + import paddle.fluid as fluid + from nets import mlp + from paddle.fluid.incubate.fleet.collective import fleet + from paddle.fluid.incubate.fleet.base import role_maker + from utils import gen_data + input_x = fluid.layers.data(name="x", shape=[32], dtype='float32') + input_y = fluid.layers.data(name="y", shape=[1], dtype='int64') -FleetAPI 接口说明 ------------------------------- -.. csv-table:: - :header: "接口", "说明" + cost = mlp(input_x, input_y) + optimizer = fluid.optimizer.SGD(learning_rate=0.01) + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) - "init", "fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置" - "distributed_optimizer", "fleet多机训练策略优化,接收一个标准Optimizer及相应的多机运行策略,fleet会根据优化策略进行优化" - "init_server", "fleet加载model_dir中保存的模型相关参数进行parameter server的初始化" - "run_server", "fleet启动parameter server服务" - "init_worker", "fleet初始化当前worker运行环境" - "is_worker", "判断当前节点是否是Worker节点,是则返回True,否则返回False" - "is_server", "判断当前节点是否是Server节点,是则返回True,否则返回False" - "save_inference_model", "fleet保存预测相关的模型及参数,参数及用法参考 code:`fluid.io.save_inference_model`" - "save_persistables", "fleet保存多机模型参数,参数及用法参考 code:`fluid.io.save_persistables`" + optimizer = fleet.distributed_optimizer(optimizer) + optimizer.minimize(cost) + place = fluid.CUDAPlace(0) + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + step = 1001 + for i in range(step): + cost_val = exe.run( + program=fluid.default_main_program(), + feed=gen_data(), + fetch_list=[cost.name]) + print("worker_index: %d, step%d cost = %f" % + (fleet.worker_index(), i, cost_val[0])) -FleetAPI 一般训练步骤 ------------------------------- +更多使用示例 +------------ -通过import引入需要使用的模式 -++++++++++++++++++ +`点击率预估 <>`__ -使用parameter server方式的训练: +`语义匹配 <>`__ -.. code-block:: python +`向量学习 <>`__ - from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +`基于Resnet50的图像分类 <>`__ +`基于Transformer的机器翻译 <>`__ -初始化 -++++++++++++++++++ -Fleet使用 code:`fleet.init(role_maker=None)` 进行初始化 +`基于Bert的语义表示学习 <>`__ -当用户不指定role_maker, 则Fleet默认用户使用MPI环境,会采用MPISymetricRoleMaker. +Fleet API相关的接口说明 +----------------------- -如果用户使用非MPI环境,则需要通过UserDefinedRoleMaker自行定义执行环境。 -例如: +Fleet API接口 +~~~~~~~~~~~~~ -.. code-block:: python +- init(role\_maker=None) +- fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置 +- is\_worker() +- Parameter + Server训练中使用,判断当前节点是否是Worker节点,是则返回True,否则返回False +- is\_server(model\_dir=None) +- Parameter + Server训练中使用,判断当前节点是否是Server节点,是则返回True,否则返回False +- init\_server() +- Parameter + Server训练中,fleet加载model\_dir中保存的模型相关参数进行parameter + server的初始化 +- run\_server() +- Parameter Server训练中使用,用来启动server端服务 +- init\_worker() +- Parameter Server训练中使用,用来启动worker端服务 +- stop\_worker() +- 训练结束后,停止worker +- distributed\_optimizer(optimizer, strategy=None) +- 分布式优化算法装饰器,用户可带入单机optimizer,并配置分布式训练策略,返回一个分布式的optimizer - role = UserDefinedRoleMaker(current_id=0, - role=Role.WORKER, - worker_num=3, - server_endpoints=["127.0.0.1:6001","127.0.0.1:6002"]) - fleet.init(role_maker=role) +RoleMaker +~~~~~~~~~ +- MPISymetricRoleMaker -分布式策略及多机配置 -++++++++++++++++ +- 描述:MPISymetricRoleMaker会假设每个节点启动两个进程,1worker+1pserver,这种RoleMaker要求用户的集群上有mpi环境。 -对于Transpiler模式,需要使用 DistributeTranspilerConfig 指定多机配置。 -Fleet需要在用户定义的optimizer之上装饰 code:`fleet.distributed_optimizer` 来完成多机分布式策略的配置。 +- 示例: -.. csv-table:: - :header: "接口", "说明" + .. code:: python - "sync_mode", "Fleet可以支持同步训练或异步训练, 默认会生成同步训练的分布式程序,通过指定 :code:`sync_mode=False` 参数即可生成异步训练的程序" - "split_method", "指定参数在parameter server上的分布方式, 默认使用 `RoundRobin`, 也可选`HashName`" - "slice_var_up", "指定是否将较大(大于8192个元素)的参数切分到多个parameter server以均衡计算负载,默认为开启" + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.incubate.fleet.base import role_maker + role = role_maker.MPISymetricRoleMaker() + fleet.init(role) -例如: +- 启动方法: -.. code-block:: python + .. code:: shell - config = DistributeTranspilerConfig() - config.sync_mode = True - - # build network - # ... - avg_cost = model() - - optimizer = fluid.optimizer.Adam(learning_rate=0.001) - # 加入 fleet distributed_optimizer 加入分布式策略配置及多机优化 - optimizer = fleet.distributed_optimizer(optimizer, config) - optimizer.minimize(avg_cost) + mpirun -np 2 python trainer.py +- PaddleCloudRoleMaker -具体训练流程 -++++++++++++++++ +- 描述:PaddleCloudRoleMaker是一个高级封装,支持使用paddle.distributed.launch或者paddle.distributed.launch\_ps启动脚本 -.. code-block:: python +- Parameter Server训练示例: - # 启动server - if fleet.is_server(): - fleet.init_server() - fleet.run_server() - - # 启动worker - if fleet.is_worker(): - # 初始化worker配置 - fleet.init_worker() - - feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) - train_reader = paddle.batch(fake_reader(), batch_size=24) - - exe.run(fleet.startup_program) - - PASS_NUM = 10 - for pass_id in range(PASS_NUM): - for batch_id, data in enumerate(train_reader()): - avg_loss_value, auc_value, auc_batch_value = \ - exe.run(fleet.main_program, feed=feeder.feed(data), fetch_list=[avg_cost, auc, auc_batch]) - print("Pass %d, cost = %f, auc = %f, batch_auc = %f" % (pass_id, avg_loss_value, auc_value, auc_batch_value)) - # 通知server,当前节点训练结束 - fleet.stop_worker() + .. code:: python + + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.incubate.fleet.base import role_maker + + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + +- 启动方法: + + .. code:: python + + python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 trainer.py + +- Collective训练示例: + + .. code:: python + + from paddle.fluid.incubate.fleet.collective import fleet + from paddle.fluid.incubate.fleet.base import role_maker + + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) + +- 启动方法: + + .. code:: python + + python -m paddle.distributed.launch trainer.py + +- UserDefinedRoleMaker + +- 描述:用户自定义节点的角色信息,IP和端口信息 + +- 示例: + + .. code:: python + + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.incubate.fleet.base import role_maker + + role = role_maker.UserDefinedRoleMaker( + current_id=int(os.getenv("CURRENT_ID")), + role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER"))) + else role_maker.Role.SERVER, + worker_num=int(os.getenv("WORKER_NUM")), + server_endpoints=pserver_endpoints) + fleet.init(role) + +Strategy +~~~~~~~~ + +- Parameter Server Training +- Sync\_mode +- Collective Training +- LocalSGD +- ReduceGrad + +Fleet Mode +~~~~~~~~~~ + +- Parameter Server Training +``python from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet`` +- Collective Training +``python from paddle.fluid.incubate.fleet.collective import fleet``