diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 094afff577a9e851640cfe947f72656d8395e556..2b75c3ba066ec6caccade89ab82157b0a4365677 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -15,6 +15,7 @@ #include "paddle/fluid/distributed/fleet_executor/carrier.h" #include +#include #include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h" @@ -24,6 +25,7 @@ #include "paddle/fluid/framework/garbage_collector.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable_helper.h" namespace paddle { @@ -55,23 +57,34 @@ void Carrier::Init( framework::Scope* scope, int64_t num_micro_batches, const platform::Place& place, - const std::vector& inference_root_scope_vars) { + const std::vector& inference_root_scope_vars, + const std::vector& micro_scope_list) { rank_ = rank; interceptor_id_to_rank_ = interceptor_id_to_rank; interceptor_id_to_node_ = interceptor_id_to_node; place_ = place; root_scope_ = scope; dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); + bool need_create_scope = micro_scope_list.empty(); PADDLE_ENFORCE_NOT_NULL( root_scope_, platform::errors::InvalidArgument("root_scope can not be nullptr")); - minibatch_scope_ = &root_scope_->NewScope(); - microbatch_scopes_.resize(num_micro_batches); - for (int i = 0; i < num_micro_batches; ++i) { - microbatch_scopes_[i] = &minibatch_scope_->NewScope(); - CopyParameters(i, program, inference_root_scope_vars); + + if (need_create_scope) { + minibatch_scope_ = &root_scope_->NewScope(); + microbatch_scopes_.resize(num_micro_batches); + for (int i = 0; i < num_micro_batches; ++i) { + microbatch_scopes_[i] = &minibatch_scope_->NewScope(); + CopyParameters(i, program, inference_root_scope_vars); + } + } else { + microbatch_scopes_ = micro_scope_list; + for (int i = 0; i < num_micro_batches; ++i) { + CopyParameters(i, program, inference_root_scope_vars); + } } + // Add source and sink interceptor id to rank interceptor_id_to_rank_.emplace(SOURCE_ID, rank); interceptor_id_to_rank_.emplace(SINK_ID, rank); diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 2523942e06223f6210461a625a1a3bce2dcedb92..8e7fad3e892d87d735bc79069692238e6b7015f4 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -25,6 +25,7 @@ #include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" #include "paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h" +#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/errors.h" @@ -60,7 +61,8 @@ class Carrier final { framework::Scope* scope, int64_t num_micro_batches, const platform::Place& place, - const std::vector& inference_root_scope_vars = {}); + const std::vector& inference_root_scope_vars = {}, + const std::vector& micro_scope_list = {}); void CopyParameters( int microbatch_id, diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 1f397a91746b96035fa420452f06702a43ef2c45..88363696ede257492b6f703c2a8ddaa97d5b5b15 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include +#include #include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/message_bus.h" @@ -24,6 +25,7 @@ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/variable.h" namespace paddle { namespace distributed { @@ -59,7 +61,8 @@ void FleetExecutor::Init( int64_t num_micro_batches, const std::vector& task_nodes, const std::unordered_map& task_id_to_rank, - const std::vector& inference_root_scope_vars) { + const std::vector& inference_root_scope_vars, + const std::vector& micro_scope_list) { PADDLE_ENFORCE_GT(task_nodes.size(), 0, platform::errors::InvalidArgument( @@ -144,7 +147,8 @@ void FleetExecutor::Init( place, num_micro_batches, program_desc, - inference_root_scope_vars); + inference_root_scope_vars, + micro_scope_list); GlobalVal::Get()->Barrier(); } @@ -154,7 +158,8 @@ void FleetExecutor::InitCarrier( const platform::Place& place, int64_t num_micro_batches, const framework::ProgramDesc& program_desc, - const std::vector& inference_root_scope_vars) { + const std::vector& inference_root_scope_vars, + const std::vector& micro_scope_list) { carrier->Init(exe_desc_.cur_rank(), runtime_graph_->interceptor_id_to_rank(), runtime_graph_->interceptor_id_to_node(), @@ -162,7 +167,8 @@ void FleetExecutor::InitCarrier( scope, num_micro_batches, place, - inference_root_scope_vars); + inference_root_scope_vars, + micro_scope_list); } void FleetExecutor::InitMessageBus() { diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index f633dbbc3600f6f4ba64aa3d5bf9917f732229f8..e8123bea1e19f7471c117035996bca4151832e45 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -18,6 +18,7 @@ #include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" +#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/platform/macros.h" #include "paddle/fluid/platform/place.h" @@ -45,7 +46,8 @@ class FleetExecutor final { int64_t num_micro_batches, const std::vector& task_nodes, const std::unordered_map& task_id_to_rank, - const std::vector& inference_root_scope_vars = {}); + const std::vector& inference_root_scope_vars = {}, + const std::vector& micro_scope_list = {}); void Run(const std::string& carrier_id); private: @@ -57,7 +59,8 @@ class FleetExecutor final { const platform::Place& place, int64_t num_micro_batches, const framework::ProgramDesc& program_desc, - const std::vector& inference_root_scope_vars = {}); + const std::vector& inference_root_scope_vars = {}, + const std::vector& micro_scope_list = {}); FleetExecutorDesc exe_desc_; std::shared_ptr runtime_graph_; std::unordered_set carrier_ids_; diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 2822a87a02172e22c2dc7c6899b7db4f5a7eea0f..da9d12802434f39326a7f276f320caecb9a05c86 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -2464,6 +2464,7 @@ class Executor: program=None, scope=None, fleet_opt=None, + micro_scope_list=[], with_standalone_executor=False, ): num_micro_batches = ( @@ -2532,6 +2533,7 @@ class Executor: fleet_opt['task_id_to_rank'] = task_id_to_rank place = core.Place() place.set_place(self.place) + # NOTE: the last argument is used to force create some vars in root scope, # won't be used during train. self._fleet_executor.init( @@ -2543,6 +2545,7 @@ class Executor: tasks, task_id_to_rank, [], + micro_scope_list, ) def _run_using_fleet_executor( @@ -2624,11 +2627,20 @@ class Executor: ) fetch_task.set_program(fetch_program) + micro_scope_list = [] + if ( + "inference_generation" in fleet_opt + and fleet_opt["inference_generation"] + ): + for i in range(int(fleet_opt["num_micro_batches"])): + micro_scope_list.append(cached_scope.new_scope()) + self._prepare_fleet_executor_carrier( cache_key, program=cached_program, scope=cached_scope, fleet_opt=fleet_opt, + micro_scope_list=micro_scope_list, with_standalone_executor=with_standalone_executor, ) @@ -2653,6 +2665,18 @@ class Executor: self._fleet_executor.run(cache_key) + if "fetch_var" in fleet_opt: + # If we speed up the generation in evaluation, we need to generate + # multiple queries at the same time. Each query will in separate scope in order + # not mix up. It indicate that final result will in multiple scopes and need to + # fetch each. + result_list = [] + for scope in micro_scope_list: + for var in fleet_opt["fetch_var"]: + tensor = core.get_variable_tensor(scope, var) + result_list.append(as_numpy(tensor)) + return result_list + if fetch_list: arr = cached_scope.find_var(fetch_var_name).get_fetch_list() tensors = arr._move_to_list() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_executor_cond_interceptor.py b/python/paddle/fluid/tests/unittests/test_fleet_executor_cond_interceptor.py index d3a57898a0dce8f4309ce0833262c88085ad5e71..1ca8c869a96bdfbc2847df0aa81101a28a9e3042 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_executor_cond_interceptor.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_executor_cond_interceptor.py @@ -14,6 +14,8 @@ import unittest +import numpy as np + import paddle import paddle.fluid.core as core from paddle.distributed.fleet.fleet_executor_utils import TaskNode @@ -21,13 +23,26 @@ from paddle.distributed.fleet.fleet_executor_utils import TaskNode paddle.enable_static() -def cond(i, ten): +def cond(i, ten, data): return i < ten -def body(i, ten): +def body(i, ten, data): i = i + 1 - return [i, ten] + data = data + 1 + return [i, ten, data] + + +num_micro_batches = 3 + + +def batch_generator_creator(): + def __reader__(): + for i in range(num_micro_batches): + data = np.full(shape=[1, 1], fill_value=i, dtype=np.float32) + yield data + + return __reader__ class TestFleetExecutor(unittest.TestCase): @@ -41,7 +56,16 @@ class TestFleetExecutor(unittest.TestCase): ten = paddle.full( shape=[1], fill_value=10, dtype='int64' ) # loop length - i, ten = paddle.static.nn.while_loop(cond, body, [i, ten]) + data = paddle.static.data(name='x', shape=[1]) + + loader = paddle.fluid.io.DataLoader.from_generator( + feed_list=[data], capacity=num_micro_batches * 4, iterable=False + ) + loader.set_batch_generator( + batch_generator_creator(), paddle.CUDAPlace(0) + ) + + paddle.static.nn.while_loop(cond, body, [i, ten, data]) program_a = paddle.static.Program() program_b = paddle.static.Program() @@ -49,18 +73,27 @@ class TestFleetExecutor(unittest.TestCase): for var_name in main_program.block(0).vars: if var_name != "_generated_var_0": var = main_program.block(0).var(var_name) - program_a.block(0).create_var( - name=var_name, - shape=var.shape, - dtype=var.dtype, - stop_gradient=var.stop_gradient, - ) - program_b.block(0).create_var( - name=var_name, - shape=var.shape, - dtype=var.dtype, - stop_gradient=var.stop_gradient, - ) + if ( + var_name == "create_py_reader_0" + or var_name == "double_buffer_0" + ): + program_a.block(0).create_var( + name=var_name, + persistable=var.persistable, + ) + else: + program_a.block(0).create_var( + name=var_name, + shape=var.shape, + dtype=var.dtype, + stop_gradient=var.stop_gradient, + ) + program_b.block(0).create_var( + name=var_name, + shape=var.shape, + dtype=var.dtype, + stop_gradient=var.stop_gradient, + ) for op in main_program.block(0).ops: if op.type != "while": @@ -89,7 +122,6 @@ class TestFleetExecutor(unittest.TestCase): ) cond_var_name = "tmp_0" - num_micro_batches = 3 task_a = TaskNode( 0, @@ -159,12 +191,19 @@ class TestFleetExecutor(unittest.TestCase): task_e.task_id(): 0, }, 'num_micro_batches': num_micro_batches, + 'inference_generation': True, + 'fetch_var': ['x'], }, } - place = paddle.fluid.CUDAPlace(0) - exe = paddle.fluid.Executor(place) - exe.run(main_program) + place = paddle.CUDAPlace(0) + exe = paddle.static.Executor(place) + loader.start() + res = exe.run(main_program) + ref_res = np.full([1], 10, dtype="float32") + for data in res: + np.testing.assert_allclose(data, ref_res, rtol=1e-05) + ref_res = ref_res + 1 if __name__ == "__main__":