From 7ea9104b7e363cbdabd0c9b01ca2742350b8b022 Mon Sep 17 00:00:00 2001 From: chengduo <30176695+chengduoZH@users.noreply.github.com> Date: Wed, 25 Sep 2019 18:52:15 +0800 Subject: [PATCH] Polish executor doc (#1216) * Polish executor doc --- doc/fluid/api_cn/executor_cn/Executor_cn.rst | 73 ++---- .../api_cn/fluid_cn/CompiledProgram_cn.rst | 169 ++++++------ doc/fluid/api_cn/fluid_cn/Executor_cn.rst | 74 +++--- .../api_cn/fluid_cn/ParallelExecutor_cn.rst | 243 ++++++++---------- 4 files changed, 257 insertions(+), 302 deletions(-) diff --git a/doc/fluid/api_cn/executor_cn/Executor_cn.rst b/doc/fluid/api_cn/executor_cn/Executor_cn.rst index 1ea7c1ac2..e5b4e0cbe 100644 --- a/doc/fluid/api_cn/executor_cn/Executor_cn.rst +++ b/doc/fluid/api_cn/executor_cn/Executor_cn.rst @@ -3,23 +3,16 @@ Executor ------------------------------- - .. py:class:: paddle.fluid.executor.Executor (place) +Executor支持单GPU、多GPU以及CPU运行。在Executor构造时,需要传入设备。 +参数: + - **place** (fluid.CPUPlace()|fluid.CUDAPlace(N)) – 该参数表示Executor执行所在的设备,这里的N为GPU对应的ID。 + +返回:初始化后的 ``Executor`` 对象 - -执行引擎(Executor)使用python脚本驱动,支持在单/多GPU、单/多CPU环境下运行。 -Python Executor可以接收传入的program,并根据feed map(输入映射表)和fetch_list(结果获取表) -向program中添加feed operators(数据输入算子)和fetch operators(结果获取算子)。 -feed map为该program提供输入数据。fetch_list提供program训练结束后用户预期的变量(或识别类场景中的命名)。 - -应注意,执行器会执行program中的所有算子而不仅仅是依赖于fetch_list的那部分。 - -Executor将全局变量存储到全局作用域中,并为临时变量创建局部作用域。 -当每一mini-batch上的前向/反向运算完成后,局部作用域的内容将被废弃, -但全局作用域中的变量将在Executor的不同执行过程中一直存在。 - +返回类型:Executor **示例代码** @@ -69,18 +62,12 @@ Executor将全局变量存储到全局作用域中,并为临时变量创建局 feed={"X": x}, fetch_list=[loss.name]) - -参数: - - **place** (fluid.CPUPlace|fluid.CUDAPlace(n)) – 指明了 ``Executor`` 的执行场所 - - - .. py:method:: close() -关闭这个执行器(Executor)。 +关闭执行器。该接口主要用于对于分布式训练,调用该接口后不可以再使用该执行器。该接口会释放在PServers上和目前Trainer有关联的资源。 -调用这个方法后不可以再使用这个执行器。 对于分布式训练, 该函数会释放在PServers上和目前Trainer有关联的资源。 +返回:无 **示例代码** @@ -96,13 +83,26 @@ Executor将全局变量存储到全局作用域中,并为临时变量创建局 .. py:method:: run(program=None, feed=None, fetch_list=None, feed_var_name='feed', fetch_var_name='fetch', scope=None, return_numpy=True,use_program_cache=False) +执行指定的Program或者CompiledProgram。需要注意的是,执行器会执行Program或CompiledProgram中的所有算子,而不会根据fetch_list对Program或CompiledProgram中的算子进行裁剪。同时,需要传入运行该模型用到的scope,如果没有指定scope,执行器将使用全局scope,即fluid.global_scope()。 + +参数: + - **program** (Program|CompiledProgram) – 该参数为被执行的Program或CompiledProgram,如果未提供该参数,即该参数为None,在该接口内,main_program将被设置为fluid.default_main_program()。默认为:None。 + - **feed** (list|dict) – 该参数表示模型的输入变量。如果是单卡训练,``feed`` 为 ``dict`` 类型,如果是多卡训练,参数 ``feed`` 可以是 ``dict`` 或者 ``list`` 类型变量,如果该参数类型为 ``dict`` ,feed中的数据将会被分割(split)并分送给多个设备(CPU/GPU),即输入数据被均匀分配到不同设备上;如果该参数类型为 ``list`` ,则列表中的各个元素都会直接分别被拷贝到各设备中。默认为:None。 + - **fetch_list** (list) – 该参数表示模型运行之后需要返回的变量。默认为:None。 + - **feed_var_name** (str) – 该参数表示数据输入算子(feed operator)的输入变量名称。默认为:"feed"。 + - **fetch_var_name** (str) – 该参数表示结果获取算子(fetch operator)的输出变量名称。默认为:"fetch"。 + - **scope** (Scope) – 该参数表示执行当前program所使用的作用域,用户可以为不同的program指定不同的作用域。默认值:fluid.global_scope()。 + - **return_numpy** (bool) – 该参数表示是否将返回返回的计算结果(fetch list中指定的变量)转化为numpy;如果为False,则每个变量返回的类型为LoDTensor,否则返回变量的类型为numpy.ndarray。默认为:True。 + - **use_program_cache** (bool) – 该参数表示是否对输入的Program进行缓存。如果该参数为True,在以下情况时,模型运行速度可能会更快:输入的program为 ``fluid.Program`` ,并且模型运行过程中,调用该接口的参数(program、 feed变量名和fetch_list变量)名始终不变。默认为:False。 + +返回:返回fetch_list中指定的变量值 + +返回类型:List -调用该执行器对象的此方法可以执行program。通过feed map提供待学习数据,以及借助fetch_list得到相应的结果。 -Python执行器(Executor)可以接收传入的program,并根据输入映射表(feed map)和结果获取表(fetch_list) -向program中添加数据输入算子(feed operators)和结果获取算子(fetch operators)。 -feed map为该program提供输入数据。fetch_list提供program训练结束后用户预期的变量(或识别类场景中的命名)。 +.. note:: + 1. 如果是多卡训练,并且feed参数为dict类型,输入数据将被均匀分配到不同的卡上,例如:使用2块GPU训练,输入样本数为3,即[0, 1, 2],经过拆分之后,GPU0上的样本数为1,即[0],GPU1上的样本数为2,即[1, 2]。如果样本数少于设备数,程序会报错,因此运行模型时,应额外注意数据集的最后一个batch的样本数是否少于当前可用的CPU核数或GPU卡数,如果是少于,建议丢弃该batch。 + 2. 如果可用的CPU核数或GPU卡数大于1,则fetch出来的结果为不同设备上的相同变量值(fetch_list中的变量)在第0维拼接在一起。 -应注意,执行器会执行program中的所有算子而不仅仅是依赖于fetch_list的那部分。 **示例代码** @@ -128,21 +128,6 @@ feed map为该program提供输入数据。fetch_list提供program训练结束后 outs = exe.run(feed={'X': x}, fetch_list=[loss.name]) -参数: - - **program** (Program|CompiledProgram) – 需要执行的program,如果没有给定那么默认使用default_main_program (未编译的) - - **feed** (dict) – 前向输入的变量,数据,词典dict类型, 例如 {“image”: ImageData, “label”: LabelData} - - **fetch_list** (list) – 用户想得到的变量或者命名的列表, 该方法会根据这个列表给出结果 - - **feed_var_name** (str) – 前向算子(feed operator)变量的名称 - - **fetch_var_name** (str) – 结果获取算子(fetch operator)的输出变量名称 - - **scope** (Scope) – 执行这个program的域,用户可以指定不同的域。缺省为全局域 - - **return_numpy** (bool) – 如果为True,则将结果张量(fetched tensor)转化为numpy - - **use_program_cache** (bool) – 是否跨批使用缓存程序设置。设置为True时,只有当(1)程序没有用数据并行编译,并且(2)program、 feed变量名和fetch_list变量名与上一步相比没有更改时,运行速度才会更快。 - -返回: 根据fetch_list来获取结果 - -返回类型: list(numpy.array) - - .. py:method:: infer_from_dataset(program=None, dataset=None, scope=None, thread=0, debug=False, fetch_list=None, fetch_info=None, print_period=100) infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在分布式训练中,推进梯度将在infer_from_dataset中禁用。 infer_from_dataset()可以非常容易地用于多线程中的评估。 @@ -157,7 +142,7 @@ infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在 - **fetch_info** (String List) – 每个变量的打印信息,默认为None - **print_period** (int) – 每两次打印之间间隔的mini-batches的数量,默认为100 -返回: None +返回:None **示例代码** @@ -193,7 +178,7 @@ infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在 - **fetch_info** (String List) – 每个变量的打印信息,默认为None - **print_period** (int) – 每两次打印之间间隔的mini-batches的数量,默认为100 -返回: None +返回:None **示例代码** @@ -213,5 +198,3 @@ infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在 exe.run(fluid.default_startup_program()) exe.train_from_dataset(program=fluid.default_main_program(), dataset=dataset) - - diff --git a/doc/fluid/api_cn/fluid_cn/CompiledProgram_cn.rst b/doc/fluid/api_cn/fluid_cn/CompiledProgram_cn.rst index 5da81c412..c240d3e7d 100644 --- a/doc/fluid/api_cn/fluid_cn/CompiledProgram_cn.rst +++ b/doc/fluid/api_cn/fluid_cn/CompiledProgram_cn.rst @@ -3,105 +3,110 @@ CompiledProgram ------------------------------- -.. py:class:: paddle.fluid.CompiledProgram(program_or_graph) +.. py:class:: paddle.fluid.CompiledProgram(program_or_graph, build_strategy=None) -编译成一个用来执行的Graph。 +CompiledProgram根据 `build_strategy` 的配置将输入的Program或Graph进行转换和优化,例如:计算图中算子融合、计算图执行过程中开启内存/显存优化等,关于build_strategy更多信息。请参阅 ``fluid.BuildStrategy`` 。 -1. 首先使用layers(网络层)创建程序。 -2. (可选)可使用CompiledProgram来在运行之前优化程序。 -3. 定义的程序或CompiledProgram由Executor运行。 +参数: + - **program_or_graph** (Graph|Program): 该参数为被执行的Program或Graph。 + - **build_strategy** (BuildStrategy): 通过配置build_strategy,对计算图进行转换和优化,例如:计算图中算子融合、计算图执行过程中开启内存/显存优化等。关于build_strategy更多信息,请参阅 ``fluid.BuildStrategy`` 。 默认为None。 -CompiledProgram用于转换程序以进行各种优化。例如, +返回:初始化后的 ``CompiledProgram`` 对象 -- 预先计算一些逻辑,以便每次运行更快。 -- 转换Program,使其可以在多个设备中运行。 -- 转换Program以进行优化预测或分布式训练。注意:此部分尚未完成。 +返回类型:CompiledProgram **代码示例** .. code-block:: python - import paddle.fluid as fluid - import paddle.fluid.compiler as compiler - import numpy - import os - - place = fluid.CUDAPlace(0) # fluid.CPUPlace() - exe = fluid.Executor(place) - - data = fluid.layers.data(name='X', shape=[1], dtype='float32') - hidden = fluid.layers.fc(input=data, size=10) - loss = fluid.layers.mean(hidden) - fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) - - fluid.default_startup_program().random_seed=1 - exe.run(fluid.default_startup_program()) - compiled_prog = compiler.CompiledProgram( - fluid.default_main_program()) - - x = numpy.random.random(size=(10, 1)).astype('float32') - loss_data, = exe.run(compiled_prog, - feed={"X": x}, - fetch_list=[loss.name]) -参数: - - **program_or_graph** (Graph|Program): 如果它是Program,那么它将首先被降成一个graph,以便进一步优化。如果它是一个graph(以前可能优化过),它将直接用于进一步的优化。注意:只有使用 with_data_parallel 选项编译时才支持graph。 + import paddle.fluid as fluid + import paddle.fluid.compiler as compiler + import numpy + import os + + place = fluid.CUDAPlace(0) # fluid.CPUPlace() + exe = fluid.Executor(place) + + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) + + exe.run(fluid.default_startup_program()) + build_strategy = fluid.BuildStrategy() + build_strategy.fuse_all_optimizer_ops = True + compiled_prog = compiler.CompiledProgram( + fluid.default_main_program(), + build_strategy=build_strategy) + + x = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = exe.run(compiled_prog, + feed={"X": x}, + fetch_list=[loss.name]) -.. py:method:: with_data_parallel(loss_name=None, build_strategy=None, exec_strategy=None, share_vars_from=None, places=None) - -配置Program使其以数据并行方式运行。 -**代码示例** +.. py:method:: with_data_parallel(loss_name=None, build_strategy=None, exec_strategy=None, share_vars_from=None, places=None) -.. code-block:: python - - import paddle.fluid as fluid - import paddle.fluid.compiler as compiler - import numpy - import os - - use_cuda = True - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - #注意:如果你使用CPU运行程序,需要具体设置CPU_NUM, - #否则fluid会把逻辑核的所有数目设为CPU_NUM, - #在这种情况下,输入的batch size应大于CPU_NUM, - #否则程序会异常中断。 - if not use_cuda: - os.environ['CPU_NUM'] = str(2) - - exe = fluid.Executor(place) - - data = fluid.layers.data(name='X', shape=[1], dtype='float32') - hidden = fluid.layers.fc(input=data, size=10) - loss = fluid.layers.mean(hidden) - fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) - - fluid.default_startup_program().random_seed=1 - exe.run(fluid.default_startup_program()) - compiled_prog = compiler.CompiledProgram( - fluid.default_main_program()).with_data_parallel( - loss_name=loss.name) - - x = numpy.random.random(size=(10, 1)).astype('float32') - loss_data, = exe.run(compiled_prog, - feed={"X": x}, - fetch_list=[loss.name]) +该接口用于将输入的Program或Graph进行转换,以便通过数据并行模式运行该模型。用户可以通过 `build_strategy` 和 `exec_strategy` 设置计算图构建和计算图执行过程中可以进行的一些优化,例如:将梯度聚合的AllReduce操作进行融合、指定计算图运行过程中使用的线程池大小等。**注意:如果在构建CompiledProgram和调用with_data_parallel时都指定了build_strategy,在CompiledProgram中的build_strategy会被复写,因此,如果是数据并行训练,建议在调用with_data_parallel接口是设置build_strategy**。 参数: - - **loss_name** (str) - 损失函数名称必须在训练过程中设置。 默认None。 - - **build_strategy** (BuildStrategy) - build_strategy用于构建图,因此它可以在具有优化拓扑的多个设备/核上运行。 有关更多信息,请参阅 ``fluid.BuildStrategy`` 。 默认None。 - - **exec_strategy** (ExecutionStrategy) - exec_strategy用于选择执行图的方式,例如使用多少线程,每次清理临时变量之前进行的迭代次数。 有关更多信息,请参阅 ``fluid.ExecutionStrategy`` 。 默认None。 - - **share_vars_from** (CompiledProgram) - 如果有,此CompiledProgram将共享来自share_vars_from的变量。 share_vars_from指定的Program必须由此CompiledProgram之前的Executor运行,以便vars准备就绪。 - - **places** (list(CUDAPlace)|list(CPUPlace)|None) - 如果提供,则仅在给定位置编译程序。否则,编译时使用的位置由Executor确定,使用的位置由环境变量控制:如果使用GPU,则标记FLAGS_selected_gpus或CUDA_VISIBLE_DEVICES设备;如果使用CPU,则标记CPU_NUM。例如,如果要在GPU 0和GPU 1上运行,请设置places=[fluid.CUDAPlace(0), fluid.CUDAPlace(1)]。如果要在2个CPU核心上运行,请设置places=[fluid.CPUPlace()]*2。 + - **loss_name** (str) - 该参数为模型最后得到的损失变量的名字,**注意:如果是模型训练,必须设置loss_name,否则计算结果可能会有问题。** 默认为:None。 + - **build_strategy** (BuildStrategy): 通过配置build_strategy,对计算图进行转换和优化,例如:计算图中算子融合、计算图执行过程中开启内存/显存优化等。关于build_strategy更多的信息,请参阅 ``fluid.BuildStrategy`` 。 默认为:None。 + - **exec_strategy** (ExecutionStrategy) - 通过exec_strategy指定执行计算图过程可以调整的选项,例如线程池大小等。 关于exec_strategy更多信息,请参阅 ``fluid.ExecutionStrategy`` 。 默认为:None。 + - **share_vars_from** (CompiledProgram) - 如果设置了share_vars_from,当前的CompiledProgram将与share_vars_from指定的CompiledProgram共享参数值。需要设置该参数的情况:模型训练过程中需要进行模型测试,并且训练和测试都是采用数据并行模式,那么测试对应的CompiledProgram在调用with_data_parallel时,需要将share_vars_from设置为训练对应的CompiledProgram。由于CompiledProgram只有在第一次执行时才会将变量分发到其他设备上,因此share_vars_from指定的CompiledProgram必须在当前CompiledProgram之前运行。默认为:None。 + - **places** (list(CUDAPlace)|list(CPUPlace)) - 该参数指定模型运行所在的设备。如果希望在GPU0和GPU1上运行,places为[fluid.CUDAPlace(0), fluid.CUDAPlace(1)];如果希望使用2个CPU运行,places为[fluid.CPUPlace()] * 2。 如果没有设置该参数,即该参数为None,模型执行时,将从环境变量中获取可用的设备:如果使用GPU,模型执行时,从环境变量FLAGS_selected_gpus或CUDA_VISIBLE_DEVICES中获取当前可用的设备ID;如果使用CPU,模型执行时,从环境变量CPU_NUM中获取当前可利用的CPU个数。例如:export CPU_NUM=4,如果没有设置该环境变量,执行器会在环境变量中添加该变量,并将其值设为1。默认为:None。 -返回: self +返回:配置之后的 ``CompiledProgram`` 对象 -.. py:method:: with_inference_optimize(config) +返回类型:CompiledProgram -添加预测优化。 +.. note:: + 1. 如果只是进行多卡测试,不需要设置loss_name以及share_vars_from。 + 2. 如果程序中既有模型训练又有模型测试,则构建模型测试所对应的CompiledProgram时必须设置share_vars_from,否则模型测试和模型训练所使用的参数是不一致。 -参数: - - **config** - 用于创建预测器的NativeConfig或AnalysisConfig的实例 - -返回: self +**代码示例** +.. code-block:: python + + import paddle.fluid as fluid + import paddle.fluid.compiler as compiler + import numpy + import os + + use_cuda = True + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + # 注意:如果你使用CPU运行程序,需要具体设置CPU_NUM, + # 否则fluid会把逻辑核的所有数目设为CPU_NUM, + # 在这种情况下,输入的batch size应大于CPU_NUM, + # 否则程序会异常中断。 + if not use_cuda: + os.environ['CPU_NUM'] = str(2) + + exe = fluid.Executor(place) + + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + test_program = fluid.default_main_program().clone(for_test=True) + fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) + + exe.run(fluid.default_startup_program()) + build_strategy = fluid.BuildStrategy() + build_strategy.fuse_all_reduce_ops = True + compiled_train_prog = compiler.CompiledProgram( + fluid.default_main_program()).with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + # 注意:如果此处不设置share_vars_from=compiled_train_prog,测试过程中用的参数与训练使用的参数是不一致 + compiled_test_prog = compiler.CompiledProgram( + test_program).with_data_parallel( + share_vars_from=compiled_train_prog) + + train_data = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = exe.run(compiled_train_prog, + feed={"X": train_data}, + fetch_list=[loss.name]) + test_data = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = exe.run(compiled_test_prog, + feed={"X": test_data}, + fetch_list=[loss.name]) \ No newline at end of file diff --git a/doc/fluid/api_cn/fluid_cn/Executor_cn.rst b/doc/fluid/api_cn/fluid_cn/Executor_cn.rst index 76e26ad45..628625e0b 100644 --- a/doc/fluid/api_cn/fluid_cn/Executor_cn.rst +++ b/doc/fluid/api_cn/fluid_cn/Executor_cn.rst @@ -6,20 +6,14 @@ Executor .. py:class:: paddle.fluid.Executor (place) +Executor支持单GPU、多GPU以及CPU运行。在Executor构造时,需要传入设备。 +参数: + - **place** (fluid.CPUPlace()|fluid.CUDAPlace(N)) – 该参数表示Executor执行所在的设备,这里的N为GPU对应的ID。 + +返回:初始化后的 ``Executor`` 对象 - -执行引擎(Executor)使用python脚本驱动,支持在单/多GPU、单/多CPU环境下运行。 -Python Executor可以接收传入的program,并根据feed map(输入映射表)和fetch_list(结果获取表) -向program中添加feed operators(数据输入算子)和fetch operators(结果获取算子)。 -feed map为该program提供输入数据。fetch_list提供program训练结束后用户预期的变量(或识别类场景中的命名)。 - -应注意,执行器会执行program中的所有算子而不仅仅是依赖于fetch_list的那部分。 - -Executor将全局变量存储到全局作用域中,并为临时变量创建局部作用域。 -当每一mini-batch上的前向/反向运算完成后,局部作用域的内容将被废弃, -但全局作用域中的变量将在Executor的不同执行过程中一直存在。 - +返回类型:Executor **示例代码** @@ -69,18 +63,12 @@ Executor将全局变量存储到全局作用域中,并为临时变量创建局 feed={"X": x}, fetch_list=[loss.name]) - -参数: - - **place** (fluid.CPUPlace|fluid.CUDAPlace(n)) – 指明了 ``Executor`` 的执行场所 - - - .. py:method:: close() -关闭这个执行器(Executor)。 +关闭执行器。该接口主要用于对于分布式训练,调用该接口后不可以再使用该执行器。该接口会释放在PServers上和目前Trainer有关联的资源。 -调用这个方法后不可以再使用这个执行器。 对于分布式训练, 该函数会释放在PServers上和目前Trainer有关联的资源。 +返回:无 **示例代码** @@ -96,13 +84,26 @@ Executor将全局变量存储到全局作用域中,并为临时变量创建局 .. py:method:: run(program=None, feed=None, fetch_list=None, feed_var_name='feed', fetch_var_name='fetch', scope=None, return_numpy=True,use_program_cache=False) +执行指定的Program或者CompiledProgram。需要注意的是,执行器会执行Program或CompiledProgram中的所有算子,而不会根据fetch_list对Program或CompiledProgram中的算子进行裁剪。同时,需要传入运行该模型用到的scope,如果没有指定scope,执行器将使用全局scope,即fluid.global_scope()。 + +参数: + - **program** (Program|CompiledProgram) – 该参数为被执行的Program或CompiledProgram,如果未提供该参数,即该参数为None,在该接口内,main_program将被设置为fluid.default_main_program()。默认为:None。 + - **feed** (list|dict) – 该参数表示模型的输入变量。如果是单卡训练,``feed`` 为 ``dict`` 类型,如果是多卡训练,参数 ``feed`` 可以是 ``dict`` 或者 ``list`` 类型变量,如果该参数类型为 ``dict`` ,feed中的数据将会被分割(split)并分送给多个设备(CPU/GPU),即输入数据被均匀分配到不同设备上;如果该参数类型为 ``list`` ,则列表中的各个元素都会直接分别被拷贝到各设备中。默认为:None。 + - **fetch_list** (list) – 该参数表示模型运行之后需要返回的变量。默认为:None。 + - **feed_var_name** (str) – 该参数表示数据输入算子(feed operator)的输入变量名称。默认为:"feed"。 + - **fetch_var_name** (str) – 该参数表示结果获取算子(fetch operator)的输出变量名称。默认为:"fetch"。 + - **scope** (Scope) – 该参数表示执行当前program所使用的作用域,用户可以为不同的program指定不同的作用域。默认值:fluid.global_scope()。 + - **return_numpy** (bool) – 该参数表示是否将返回返回的计算结果(fetch list中指定的变量)转化为numpy;如果为False,则每个变量返回的类型为LoDTensor,否则返回变量的类型为numpy.ndarray。默认为:True。 + - **use_program_cache** (bool) – 该参数表示是否对输入的Program进行缓存。如果该参数为True,在以下情况时,模型运行速度可能会更快:输入的program为 ``fluid.Program`` ,并且模型运行过程中,调用该接口的参数(program、 feed变量名和fetch_list变量)名始终不变。默认为:False。 + +返回:返回fetch_list中指定的变量值 + +返回类型:List -调用该执行器对象的此方法可以执行program。通过feed map提供待学习数据,以及借助fetch_list得到相应的结果。 -Python执行器(Executor)可以接收传入的program,并根据输入映射表(feed map)和结果获取表(fetch_list) -向program中添加数据输入算子(feed operators)和结果获取算子(fetch operators)。 -feed map为该program提供输入数据。fetch_list提供program训练结束后用户预期的变量(或识别类场景中的命名)。 +.. note:: + 1. 如果是多卡训练,并且feed参数为dict类型,输入数据将被均匀分配到不同的卡上,例如:使用2块GPU训练,输入样本数为3,即[0, 1, 2],经过拆分之后,GPU0上的样本数为1,即[0],GPU1上的样本数为2,即[1, 2]。如果样本数少于设备数,程序会报错,因此运行模型时,应额外注意数据集的最后一个batch的样本数是否少于当前可用的CPU核数或GPU卡数,如果是少于,建议丢弃该batch。 + 2. 如果可用的CPU核数或GPU卡数大于1,则fetch出来的结果为不同设备上的相同变量值(fetch_list中的变量)在第0维拼接在一起。 -应注意,执行器会执行program中的所有算子而不仅仅是依赖于fetch_list的那部分。 **示例代码** @@ -127,21 +128,6 @@ feed map为该program提供输入数据。fetch_list提供program训练结束后 x = numpy.random.random(size=(10, 1)).astype('float32') outs = exe.run(feed={'X': x}, fetch_list=[loss.name]) - -参数: - - **program** (Program|CompiledProgram) – 需要执行的program,如果没有给定那么默认使用default_main_program (未编译的) - - **feed** (dict) – 前向输入的变量,数据,词典dict类型, 例如 {“image”: ImageData, “label”: LabelData} - - **fetch_list** (list) – 用户想得到的变量或者命名的列表, 该方法会根据这个列表给出结果 - - **feed_var_name** (str) – 前向算子(feed operator)变量的名称 - - **fetch_var_name** (str) – 结果获取算子(fetch operator)的输出变量名称 - - **scope** (Scope) – 执行这个program的域,用户可以指定不同的域。缺省为全局域 - - **return_numpy** (bool) – 如果为True,则将结果张量(fetched tensor)转化为numpy - - **use_program_cache** (bool) – 是否在不同的批次间使用相同的缓存程序设置。设置为True时,只有当(1)程序没有用数据并行编译,并且(2)program、 feed变量名和fetch_list变量名与上一步相比没有更改时,运行速度才会更快。 - -返回: 根据fetch_list来获取结果 - -返回类型: list(numpy.array) - .. py:method:: infer_from_dataset(program=None, dataset=None, scope=None, thread=0, debug=False, fetch_list=None, fetch_info=None, print_period=100) @@ -157,7 +143,7 @@ infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在 - **fetch_info** (String List) – 每个变量的打印信息,默认为None - **print_period** (int) – 每两次打印之间间隔的mini-batches的数量,默认为100 -返回: None +返回:None **示例代码** @@ -193,7 +179,7 @@ infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在 - **fetch_info** (String List) – 每个变量的打印信息,默认为None - **print_period** (int) – 每两次打印之间间隔的mini-batches的数量,默认为100 -返回: None +返回:None **示例代码** @@ -211,7 +197,5 @@ infer_from_dataset的文档与train_from_dataset几乎完全相同,只是在 filelist = [] # 您可以设置您自己的filelist,如filelist = ["dataA.txt"] dataset.set_filelist(filelist) exe.run(fluid.default_startup_program()) - exe.infer_from_dataset(program=fluid.default_main_program(), + exe.train_from_dataset(program=fluid.default_main_program(), dataset=dataset) - - diff --git a/doc/fluid/api_cn/fluid_cn/ParallelExecutor_cn.rst b/doc/fluid/api_cn/fluid_cn/ParallelExecutor_cn.rst index 60ad4cd97..ed30dbe90 100644 --- a/doc/fluid/api_cn/fluid_cn/ParallelExecutor_cn.rst +++ b/doc/fluid/api_cn/fluid_cn/ParallelExecutor_cn.rst @@ -5,80 +5,98 @@ ParallelExecutor .. py:class:: paddle.fluid.ParallelExecutor(use_cuda, loss_name=None, main_program=None, share_vars_from=None, exec_strategy=None, build_strategy=None, num_trainers=1, trainer_id=0, scope=None) +``ParallelExecutor`` 是 ``Executor`` 的一个升级版本,可以支持基于数据并行的多节点模型训练和测试。如果采用数据并行模式, ``ParallelExecutor`` 在构造时会将参数分发到不同的节点上,并将输入的 ``Program`` 拷贝到不同的节点,在执行过程中,各个节点独立运行模型,将模型反向计算得到的参数梯度在多个节点之间进行聚合,之后各个节点独立的进行参数的更新。如果使用GPU运行模型,即 ``use_cuda=True`` ,节点指代GPU, ``ParallelExecutor`` 将自动获取在当前机器上可用的GPU资源,用户也可以通过在环境变量设置可用的GPU资源,例如:希望使用GPU0、GPU1计算,export CUDA_VISIBLEDEVICES=0,1;如果在CPU上进行操作,即 ``use_cuda=False`` ,节点指代CPU,**注意:此时需要用户在环境变量中手动添加 CPU_NUM ,并将该值设置为CPU设备的个数,例如:export CPU_NUM=4,如果没有设置该环境变量,执行器会在环境变量中添加该变量,并将其值设为1**。 +参数: + - **use_cuda** (bool) – 该参数表示是否使用GPU执行。 + - **loss_name** (str) - 该参数为模型最后得到的损失变量的名字。**注意:如果是数据并行模型训练,必须设置loss_name,否则计算结果可能会有问题。** 默认为:None。 + - **main_program** (Program) – 需要被执行的Program 。如果未提供该参数,即该参数为None,在该接口内,main_program将被设置为fluid.default_main_program()。 默认为:None。 + - **share_vars_from** (ParallelExecutor) - 如果设置了share_vars_from,当前的ParallelExecutor将与share_vars_from指定的ParallelExecutor共享参数值。需要设置该参数的情况:模型训练过程中需要进行模型测试,并且训练和测试都是采用数据并行模式,那么测试对应的ParallelExecutor在调用with_data_parallel时,需要将share_vars_from设置为训练所对应的ParallelExecutor。由于ParallelExecutor只有在第一次执行时才会将参数变量分发到其他设备上,因此share_vars_from指定的ParallelExecutor必须在当前ParallelExecutor之前运行。默认为:None。 + - **exec_strategy** (ExecutionStrategy) - 通过exec_strategy指定执行计算图过程可以调整的选项,例如线程池大小等。 关于exec_strategy更多信息,请参阅 ``fluid.ExecutionStrategy`` 。 默认为:None。 + - **build_strategy** (BuildStrategy): 通过配置build_strategy,对计算图进行转换和优化,例如:计算图中算子融合、计算图执行过程中开启内存/显存优化等。关于build_strategy更多的信息,请参阅 ``fluid.BuildStrategy`` 。 默认为:None。 + - **num_trainers** (int) – 进行GPU分布式训练时需要设置该参数。如果该参数值大于1,NCCL将会通过多层级节点的方式来初始化。每个节点应有相同的GPU数目。默认为:1。 + - **trainer_id** (int) – 进行GPU分布式训练时需要设置该参数。该参数必须与num_trainers参数同时使用。trainer_id指明是当前所在节点的 “rank”(层级)。trainer_id从0开始计数。默认为:0。 + - **scope** (Scope) – 指定执行Program所在的作用域。默认为:fluid.global_scope()。 + +返回:初始化后的 ``ParallelExecutor`` 对象 +返回类型:ParallelExecutor -``ParallelExecutor`` 专门设计用来实现数据并行计算,着力于向不同结点(node)分配数据,并行地在不同结点中对数据进行操作。如果在GPU上使用该类运行程序,node则用来指代GPU, ``ParallelExecutor`` 也将自动获取在当前机器上可用的GPU资源。如果在CPU上进行操作,node则指代CPU,同时你也可以通过添加环境变量 ``CPU_NUM`` 来设置CPU设备的个数。例如,``CPU_NUM=4``。但是如果没有设置该环境变量,该类会调用 ``multiprocessing.cpu_count`` 来获取当前系统中CPU的个数。 +抛出异常:``TypeError`` + - 如果提供的参数 ``share_vars_from`` 不是 ``ParallelExecutor`` 类型的,将会抛出此异常。 + +.. note:: + 1. 如果只是进行多卡测试,不需要设置loss_name以及share_vars_from。 + 2. 如果程序中既有模型训练又有模型测试,则构建模型测试所对应的ParallelExecutor时必须设置share_vars_from,否则模型测试和模型训练所使用的参数是不一致。 **示例代码** .. code-block:: python - import paddle.fluid as fluid - import numpy - import os - - use_cuda = True - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - - # 注意:如果你使用CPU运行程序,需要具体设置CPU_NUM, - # 否则fluid会把逻辑核的所有数目设为CPU_NUM, - # 在这种情况下,输入的batch size应大于CPU_NUM, - # 否则程序会异常中断。 - if not use_cuda: - os.environ['CPU_NUM'] = str(2) - - exe = fluid.Executor(place) - - train_program = fluid.Program() - startup_program = fluid.Program() - with fluid.program_guard(train_program, startup_program): - data = fluid.layers.data(name='X', shape=[1], dtype='float32') - hidden = fluid.layers.fc(input=data, size=10) - loss = fluid.layers.mean(hidden) - test_program = fluid.default_main_program().clone(for_test=True) - fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) - + import paddle.fluid as fluid + import numpy + import os + + use_cuda = True + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + + # 注意:如果你使用CPU运行程序,需要具体设置CPU_NUM, + # 否则fluid会把逻辑核的所有数目设为CPU_NUM, + # 在这种情况下,输入的batch size应大于CPU_NUM, + # 否则程序会异常中断。 + if not use_cuda: + os.environ['CPU_NUM'] = str(2) + + exe = fluid.Executor(place) + + train_program = fluid.Program() + startup_program = fluid.Program() + with fluid.program_guard(train_program, startup_program): + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + test_program = fluid.default_main_program().clone(for_test=True) + fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) - exe.run(startup_program) - - train_exe = fluid.ParallelExecutor(use_cuda=use_cuda, - main_program=train_program, - loss_name=loss.name) - test_exe = fluid.ParallelExecutor(use_cuda=use_cuda, - main_program=test_program, - share_vars_from=train_exe) - - x = numpy.random.random(size=(10, 1)).astype('float32') - loss_data, = train_exe.run(feed={"X": x}, - fetch_list=[loss.name]) - - loss_data, = test_exe.run(feed={"X": x}, - fetch_list=[loss.name]) + exe.run(startup_program) + + train_exe = fluid.ParallelExecutor(use_cuda=use_cuda, + main_program=train_program, + loss_name=loss.name) + # 注意:如果此处不设置share_vars_from=train_exe,测试过程中用的参数与训练使用的参数是不一致 + test_exe = fluid.ParallelExecutor(use_cuda=use_cuda, + main_program=test_program, + share_vars_from=train_exe) + + train_data = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = train_exe.run(feed={"X": train_data}, + fetch_list=[loss.name]) -参数: - - **use_cuda** (bool) – 是否使用CUDA - - **loss_name** (str) – 在训练阶段,必须提供loss function名称。默认为None - - **main_program** (Program) – 需要执行的program。如果未提供, 那么将使用 ``default_main_program``。 默认为None - - **share_vars_from** (ParallelExecutor) – 如果提供了该参数, 则该 ``ParallelExecutor`` 与指定的 ``ParallelExecutor`` 共享变量。默 认为空 - - **exec_strategy** (ExecutionStrategy) – ``exec_strategy`` 用于调控program在 ``ParallelExecutor`` 中的执行方式,例如,执行该program需要的线程数, 释放在执行过程中产生的临时变量需要的重复(iterations)次数。 请参考 ``fluid.ExecutionStrategy`` 获取详细介绍。该参数默认为 None - - **build_strategy** (BuildStrategy) – 设置成员 ``build_strategy`` 可以控制在 ``ParallelExecutor`` 中搭建SSA Graph的方式,例如, ``reduce_strategy`` , ``gradient_scale_strategy`` 。 请参考 ``fluid.BuildStrategy`` 获取详细介绍。 该参数默认为None - - **num_trainers** (int) – 如果该值大于1, NCCL将会通过多层级node的方式来初始化。每个node应有相同的GPU数目。 随之会启用分布式训练。该参数默认为1 - - **trainer_id** (int) – 必须与 ``num_trainers`` 参数同时使用。``trainer_id`` 是当前所在node的 “rank”(层级),从0开始计数。该参数默认为0 - - **scope** (Scope) – 指定执行program所在的作用域, 默认使用 ``fluid.global_scope()`` + test_data = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = test_exe.run(feed={"X": test_data}, + fetch_list=[loss.name]) -返回:初始化后的 ``ParallelExecutor`` 对象 +.. py:method:: run(fetch_list, feed=None, feed_dict=None, return_numpy=True) -返回类型: ParallelExecutor +该接口用于运行当前模型,需要注意的是,执行器会执行Program中的所有算子,而不会根据fetch_list对Program中的算子进行裁剪。 -抛出异常:``TypeError`` - 如果提供的参数 ``share_vars_from`` 不是 ``ParallelExecutor`` 类型的,将会弹出此异常 +参数: + - **fetch_list** (list) – 该变量表示模型运行之后需要返回的变量。 + - **feed** (list|dict) – 该变量表示模型的输入变量。如果该参数类型为 ``dict`` ,feed中的数据将会被分割(split)并分送给多个设备(CPU/GPU);如果该参数类型为 ``list`` ,则列表中的各个元素都会直接分别被拷贝到各设备中。默认为:None。 + - **feed_dict** – 该参数已经停止使用。默认为:None。 + - **return_numpy** (bool) – 该变量表示是否将fetched tensor转换为numpy。默认为:True。 -.. py:method:: run(fetch_list, feed=None, feed_dict=None, return_numpy=True) +返回:返回fetch_list中指定的变量值 + +返回类型:List -使用 ``fetch_list`` 执行一个 ``ParallelExecutor`` 对象。 +抛出异常: + - ``ValueError`` - 如果feed参数是list类型,但是它的长度不等于可用设备(执行场所)的数目,再或者给定的feed不是dict类型,抛出此异常 + - ``TypeError`` - 如果feed参数是list类型,但是它里面的元素不是dict类型时,抛出此异常 -参数 ``feed`` 可以是 ``dict`` 或者 ``list`` 类型变量。如果该参数是 ``dict`` 类型,feed中的数据将会被分割(split)并分送给多个设备(CPU/GPU)。 -反之,如果它是 ``list`` ,则列表中的各个元素都会直接分别被拷贝到各设备中。 +.. note:: + 1. 如果feed参数为dict类型,输入数据将被均匀分配到不同的卡上,例如:使用2块GPU训练,输入样本数为3,即[0, 1, 2],经过拆分之后,GPU0上的样本数为1,即[0],GPU1上的样本数为2,即[1, 2]。如果样本数少于设备数,程序会报错,因此运行模型时,应额外注意数据集的最后一个batch的样本数是否少于当前可用的CPU核数或GPU卡数,如果是少于,建议丢弃该batch。 + 2. 如果可用的CPU核数或GPU卡数大于1,则fetch出来的结果为不同设备上的相同变量值(fetch_list中的变量)在第0维拼接在一起。 **示例代码** @@ -107,7 +125,6 @@ ParallelExecutor loss = fluid.layers.mean(hidden) fluid.optimizer.SGD(learning_rate=0.01).minimize(loss) - startup_program.random_seed=1 exe.run(startup_program) train_exe = fluid.ParallelExecutor(use_cuda=use_cuda, @@ -117,7 +134,6 @@ ParallelExecutor # 图像会被split到设备中。假设有两个设备,那么每个设备将会处理形为 (5, 1)的图像 x = numpy.random.random(size=(10, 1)).astype('float32') loss_data, = train_exe.run(feed={"X": x}, - fetch_list=[loss.name]) # 如果feed参数是list类型: @@ -126,83 +142,50 @@ ParallelExecutor # 第二个设备处理形为 (9, 1) 的图像 # # 使用 exe.device_count 得到设备数目 + x1 = numpy.random.random(size=(10, 1)).astype('float32') x2 = numpy.random.random(size=(9, 1)).astype('float32') - loss_data, = train_exe.run(feed=[{"X": x}, {"X": x2}], + loss_data, = train_exe.run(feed=[{"X": x1}, {"X": x2}], fetch_list=[loss.name]) -参数: - - **fetch_list** (list) – 获取的变量名列表 - - **feed** (list|dict|None) – feed变量。 如果该参数是 ``dict`` 类型,feed中的数据将会被分割(split)并分送给多个设备(CPU/GPU)。反之,如果它是 ``list`` ,则列表中的各个元素都直接分别被拷贝到各设备中。默认为None - - **feed_dict** – 该参数已经停止使用。feed参数的别名, 为向后兼容而立。默认为None - - **return_numpy** (bool) – 是否将fetched tensor转换为numpy。默认为True - -返回: 获取的结果列表 - -返回类型:List - -抛出异常: - - ``ValueError`` - 如果feed参数是list类型,但是它的长度不等于可用设备(执行场所)的数目,再或者给定的feed不是dict类型,抛出此异常 - - ``TypeError`` - 如果feed参数是list类型,但是它里面的元素不是dict类型时,弹出此异常 - -.. note:: - 1. 如果feed参数为dict类型,那么传入 ``ParallelExecutor`` 的数据量 *必须* 大于可用的CPU核数或GPU卡数。否则,C++端将会抛出异常。应额外注意核对数据集的最后一个batch是否比可用的CPU核数或GPU卡数大。 - 2. 如果可用的CPU核数或GPU卡数大于一个,则为每个变量最后获取的结果都是list类型,且这个list中的每个元素都是各CPU核或GPU卡上的变量 - -**代码示例** - -.. code-block:: python - - import paddle.fluid as fluid - pe = fluid.ParallelExecutor(use_cuda=use_cuda, - loss_name=avg_cost.name, - main_program=fluid.default_main_program()) - loss = pe.run(feed=feeder.feed(cur_batch), - fetch_list=[avg_cost.name])) - .. py:method:: drop_local_exe_scopes() -立即删除本地执行作用域。 -  -在程序执行期间,生成中间结果被放置在本地执行作用域内,在某些模型中,这些中间结果的创建和删除较为费时。为了解决这个问题,ParallelExecutor在ExecutionStrategy中提供了可选项,如num_iteration_per_drop_scope,此选项指示在删除本地执行作用域之前要运行的迭代次数。 但在某些情况下,每次迭代都会产生不同的中间结果,这将导致本地执行作用域所需的内存逐渐增加。 如果你想在这个时候运行另一个程序,可能没有足够的存储空间,此时你应该删除其他程序的本地执行作用域。 - +立即清除scope中的临时变量。模型运行过程中,生成的中间临时变量将被放到local execution scope中,为了避免对临时变量频繁的申请与释放,ParallelExecutor中采取的策略是间隔若干次迭代之后清理一次临时变量。ParallelExecutor在ExecutionStrategy中提供了num_iteration_per_drop_scope选项,该选项表示间隔多少次迭代之后清理一次临时变量。如果num_iteration_per_drop_scope值为100,但是希望在迭代50次之后清理一次临时变量,可以通过手动调用该接口。 + +返回:无 **代码示例** .. code-block:: python - import paddle.fluid as fluid - import numpy - import os - - use_cuda = True - # 注意:如果你使用CPU运行程序,需要具体设置CPU_NUM, - # 否则fluid会把逻辑核的所有数目设为CPU_NUM, - # 在这种情况下,输入的batch size应大于CPU_NUM, - # 否则程序会异常中断。 - if not use_cuda: - os.environ['CPU_NUM'] = str(2) - - train_program = fluid.Program() - startup_program = fluid.Program() - with fluid.program_guard(train_program, startup_program): - data = fluid.layers.data(name='X', shape=[1], dtype='float32') - hidden = fluid.layers.fc(input=data, size=10) - loss = fluid.layers.mean(hidden) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_program) - - parallel_exe = fluid.ParallelExecutor(use_cuda=use_cuda, - main_program=train_program, - loss_name=loss.name) - - x = numpy.random.random(size=(10, 1)).astype('float32') - loss_data, = parallel_exe.run(feed={"X": x}, - fetch_list=[loss.name]) - - parallel_exe.drop_local_exe_scopes() - - - - + import paddle.fluid as fluid + import numpy + import os + + use_cuda = True + # 注意:如果你使用CPU运行程序,需要具体设置CPU_NUM, + # 否则fluid会把逻辑核的所有数目设为CPU_NUM, + # 在这种情况下,输入的batch size应大于CPU_NUM, + # 否则程序会异常中断。 + if not use_cuda: + os.environ['CPU_NUM'] = str(2) + + train_program = fluid.Program() + startup_program = fluid.Program() + with fluid.program_guard(train_program, startup_program): + data = fluid.layers.data(name='X', shape=[1], dtype='float32') + hidden = fluid.layers.fc(input=data, size=10) + loss = fluid.layers.mean(hidden) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_program) + + parallel_exe = fluid.ParallelExecutor(use_cuda=use_cuda, + main_program=train_program, + loss_name=loss.name) + + x = numpy.random.random(size=(10, 1)).astype('float32') + loss_data, = parallel_exe.run(feed={"X": x}, + fetch_list=[loss.name]) + + parallel_exe.drop_local_exe_scopes() -- GitLab