From a087119409e5fa7ff01b0421eaff9050d5ccbc31 Mon Sep 17 00:00:00 2001 From: Aurelius84 Date: Fri, 17 Sep 2021 14:50:33 +0800 Subject: [PATCH] Intergrate MultiThreadedWorkQueue to execute program ops (#35356) * format code * format interface * polish interface * Remove std::memory_order * modify into SpinLock * remove fetch_context_pool_ * fix comment * modify into WorkQueueGroup * refine code * fix pointer * fix paddle_enforce * split into AsyncWorkQueue * polish code * specify std::memory_relax * fix atomic fetch_sub * fix num_thread --- .../framework/new_executor/CMakeLists.txt | 2 +- .../framework/new_executor/event_manager.cc | 1 - .../framework/new_executor/event_manager.h | 1 - .../framework/new_executor/interpretercore.cc | 108 ++++++++++-------- .../framework/new_executor/interpretercore.h | 13 ++- .../interpretercore_garbage_collector.cc | 4 +- .../new_executor/interpretercore_util.cc | 21 ++++ .../new_executor/interpretercore_util.h | 34 ++++++ .../new_executor/new_executor_defs.h | 12 +- .../fluid/framework/new_executor/workqueue.h | 11 +- .../framework/new_executor/workqueue_test.cc | 20 ++-- 11 files changed, 151 insertions(+), 76 deletions(-) diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index 09744bf600..f66d743620 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -4,7 +4,7 @@ graph_to_program_pass variable_helper timer monitor) cc_library(workqueue SRCS workqueue.cc DEPS enforce) cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS}) -cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS}) +cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue) cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog) cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context) cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager) diff --git a/paddle/fluid/framework/new_executor/event_manager.cc b/paddle/fluid/framework/new_executor/event_manager.cc index a3eb1abaa6..64018cea67 100644 --- a/paddle/fluid/framework/new_executor/event_manager.cc +++ b/paddle/fluid/framework/new_executor/event_manager.cc @@ -30,7 +30,6 @@ void EventManager::WaitEvent(const Instruction& instruction, } void EventManager::RecordEvent(const Instruction& instruction, - const OpFuncNode& op_func_node, const platform::Place& place) { // If InterpreterCore in on CPUPlace, do nothing. if (platform::is_cpu_place(place)) return; diff --git a/paddle/fluid/framework/new_executor/event_manager.h b/paddle/fluid/framework/new_executor/event_manager.h index a2f7b52732..2289be346e 100644 --- a/paddle/fluid/framework/new_executor/event_manager.h +++ b/paddle/fluid/framework/new_executor/event_manager.h @@ -21,7 +21,6 @@ namespace framework { class EventManager { public: void RecordEvent(const Instruction& instruction, - const OpFuncNode& op_func_node, const platform::Place& place); void WaitEvent(const Instruction& instruction, const platform::Place& place); diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 2a7d0d05e9..9687ef2ff9 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -23,6 +23,8 @@ DEFINE_bool(new_executor_use_inplace, true, "Use inplace in new executor"); namespace paddle { namespace framework { +// NOTE(Aurelius84): Need a better strategy to determine it. +static constexpr size_t kHostNumThreads = 4; InterpreterCore::InterpreterCore(const platform::Place& place, const ProgramDesc& main_prog, @@ -32,7 +34,8 @@ InterpreterCore::InterpreterCore(const platform::Place& place, : place_(place), main_program_(main_prog), global_scope_(global_scope), - stream_analyzer_(place) { + stream_analyzer_(place), + async_work_queue_(kHostNumThreads) { is_build_ = false; feed_names_ = feed_names; @@ -89,7 +92,7 @@ paddle::framework::FetchList InterpreterCore::Run( Convert(); } else { FeedInput(); - ExecuteInstructionList(vec_instruction_, *global_scope_, place_); + ExecuteInstructionList(vec_instruction_); } // return Fetch Tensors @@ -112,6 +115,7 @@ void InterpreterCore::Convert() { temp_inst.kernel_func_.operator_base_ = op_base; temp_inst.input_index_ = vec_func_list_[i].input_index; temp_inst.output_index_ = vec_func_list_[i].output_index; + temp_inst.type_ = vec_func_list_[i].type_; OpInOutInfo info; @@ -168,8 +172,8 @@ void InterpreterCore::Convert() { } } - // In Program, op order is a very import information. - // Op can noly add op after it as next as next ops. + // In Program, op order is a very important information. + // Op can only add op after it as next as next ops. std::vector filter_next; filter_next.reserve(vec_temp.size()); for (auto item : vec_temp) { @@ -319,62 +323,73 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { } void InterpreterCore::ExecuteInstructionList( - const std::vector& vec_instr, const VariableScope& var_scope, - const platform::Place& place, bool is_dry_run) { - std::queue working_queue; - auto working_dependecy_count = dependecy_count_; + const std::vector& vec_instr, bool is_dry_run) { + auto atomic_deps = async_work_queue_.PrepareAtomicDeps(dependecy_count_); + auto atomic_var_ref = async_work_queue_.PrepareAtomicVarRef(vec_meta_info_); + std::atomic op_run_number{0}; + for (size_t i = 0; i < dependecy_count_.size(); ++i) { if (dependecy_count_[i] == 0) { - working_queue.push(i); + async_work_queue_.AddTask(vec_instr[i].type_, [&, i, is_dry_run]() { + RunInstructionAsync(i, &atomic_deps, &atomic_var_ref, &op_run_number, + is_dry_run); + }); } } - auto working_var_ref = vec_meta_info_; - - size_t run_op_number = 0; - while (!working_queue.empty()) { - auto instr_id = working_queue.front(); - working_queue.pop(); - auto& instr_node = vec_instr[instr_id]; - // step1 : stream_wait (non-block host) or sync (block host) - event_manager_.WaitEvent(instr_node, place_); - // step2: run instruction - RunInstruction(instr_node); - ++run_op_number; - - if (is_dry_run) { - dry_run_profiler_.ParseMemoryInfo(var_scope.var_list); - } + async_work_queue_.WaitEmpty(); - // step3: insert event for out_vars if needed - event_manager_.RecordEvent(instr_node, vec_func_list_[instr_id], place_); + PADDLE_ENFORCE_EQ( + op_run_number.load(), vec_instr.size(), + platform::errors::Fatal( + "Required op_run_number == %d, but received op_run_number = %d.", + vec_instr.size(), op_run_number.load())); +} - // step4: update working_queue - auto& next_instr = instr_node.next_instruction_.all_next_ops_; +void InterpreterCore::RunInstructionAsync(size_t instr_id, + AtomicVectorSizeT* atomic_deps, + AtomicVectorSizeT* atomic_var_ref, + std::atomic* op_run_number, + bool is_dry_run) { + auto& instr_node = vec_instruction_[instr_id]; + event_manager_.WaitEvent(instr_node, place_); - for (auto next_i : next_instr) { - --working_dependecy_count[next_i]; - if (working_dependecy_count[next_i] == 0) { - working_queue.push(next_i); - } - } + RunInstruction(instr_node); - // GC infomation - CheckGC(instr_id, instr_node.gc_check_var_list, var_scope, place, - working_var_ref); + event_manager_.RecordEvent(instr_node, place_); + op_run_number->fetch_add(1, std::memory_order_relaxed); + + if (is_dry_run) { + dry_run_profiler_.ParseMemoryInfo(global_scope_->var_list); } + + auto& next_instr = instr_node.next_instruction_.all_next_ops_; + + for (auto next_i : next_instr) { + // fetch_sub return value before applying sub + bool is_ready = + atomic_deps->at(next_i)->fetch_sub(1, std::memory_order_relaxed) == 1; + if (is_ready) { + async_work_queue_.AddTask(vec_instruction_[next_i].type_, [=]() { + RunInstructionAsync(next_i, atomic_deps, atomic_var_ref, op_run_number, + is_dry_run); + }); + } + } + // GC infomation + CheckGC(instr_id, instr_node.gc_check_var_list, atomic_var_ref); } void InterpreterCore::CheckGC(size_t instr_id, const std::vector& gc_check_list, - const VariableScope& var_scope, - const platform::Place& place, - std::vector& working_var_ref) { + AtomicVectorSizeT* atomic_var_ref) { + auto& var_scope = *global_scope_; + for (auto var_id : gc_check_list) { - --working_var_ref[var_id].var_ref_count_; - if (var_scope.vec_meta_info_[var_id].vardesc_ && - !var_scope.vec_meta_info_[var_id].vardesc_->Persistable() && - working_var_ref[var_id].var_ref_count_ == 0) { + bool is_ready = atomic_var_ref->at(var_id)->fetch_sub( + 1, std::memory_order_relaxed) == 1; + if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ && + !var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) { gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id], vec_instruction_[instr_id].dev_ctx_); } @@ -417,8 +432,7 @@ const CostInfo& InterpreterCore::DryRun( // DryRun may be called many times. dry_run_profiler_.Reset(); dry_run_profiler_.Start(); - ExecuteInstructionList(vec_instruction_, *global_scope_, place_, - /*is_dry_run=*/true); + ExecuteInstructionList(vec_instruction_, /*is_dry_run=*/true); platform::DeviceContextPool::Instance().Get(place_)->Wait(); dry_run_profiler_.Pause(); diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index d1eff9272d..371f5bba2a 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -29,10 +29,12 @@ #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/memory/allocation/spin_lock.h" #include "paddle/fluid/platform/device_event.h" namespace paddle { namespace framework { +using AtomicVectorSizeT = std::vector>>; class InterpreterCore { public: @@ -58,16 +60,17 @@ class InterpreterCore { void RunInstruction(const Instruction& instr_node); void ExecuteInstructionList(const std::vector& vec_instr, - const VariableScope& var_scope, - const platform::Place& place, bool is_dry_run = false); void DryRunPrepare(const std::vector& feed_tensors); void CheckGC(size_t instr_id, const std::vector& gc_check_list, - const VariableScope& var_scope, const platform::Place& place, - std::vector& working_var_ref); // NOLINT + AtomicVectorSizeT* working_var_ref); + void RunInstructionAsync(size_t instr_id, + AtomicVectorSizeT* working_dependecy_count, + AtomicVectorSizeT* working_var_ref, + std::atomic* op_run_number, bool is_dry_run); void AddFetch(const std::vector& fetch_names); void BuildSkipShareLoDInfo(); @@ -93,9 +96,11 @@ class InterpreterCore { InterpreterProfiler dry_run_profiler_; StreamAnalyzer stream_analyzer_; EventManager event_manager_; + interpretercore::AsyncWorkQueue async_work_queue_; InterpreterCoreGarbageCollector gc_; std::vector gc_event_; + std::unique_ptr group_thread_pool_; }; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc index 0f90e37c7b..2ae84d9dcd 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc @@ -23,8 +23,8 @@ InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() { max_memory_size_ = static_cast(GetEagerDeletionThreshold()); cur_memory_size_ = 0; - WorkQueueOptions options; - options.num_threads = 1; + WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, + /*track_task*/ false); queue_ = CreateSingleThreadedWorkQueue(options); } diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 9802d5fa91..3ab02ac2b7 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -18,6 +18,27 @@ namespace paddle { namespace framework { namespace interpretercore { +AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicDeps( + const std::vector& dependecy_count) { + AtomicVectorSizeT working_dependecy_count(dependecy_count.size()); + for (size_t i = 0; i < dependecy_count.size(); ++i) { + working_dependecy_count[i] = + std::make_unique>(dependecy_count[i]); + } + return std::move(working_dependecy_count); +} + +AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicVarRef( + const std::vector& vec_meta_info) { + AtomicVectorSizeT working_var_ref(vec_meta_info.size()); + + for (size_t i = 0; i < vec_meta_info.size(); ++i) { + working_var_ref[i] = + std::make_unique>(vec_meta_info[i].var_ref_count_); + } + return std::move(working_var_ref); +} + bool var_can_be_deleted(const std::string& name, const BlockDesc& block) { auto* var_desc = block.FindVar(name); if (var_desc == nullptr || var_desc->Persistable()) { diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 95c7bdac90..259f1c6155 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -32,6 +32,7 @@ #include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/garbage_collector.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h" +#include "paddle/fluid/framework/new_executor/workqueue.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" @@ -48,6 +49,39 @@ namespace framework { namespace interpretercore { +using AtomicVectorSizeT = std::vector>>; + +class AsyncWorkQueue { + public: + explicit AsyncWorkQueue(size_t host_num_threads) + : host_num_thread_(host_num_threads) { + std::vector group_options; + // for execute host Kernel + group_options.emplace_back(/*num_threads*/ host_num_threads, + /*allow_spinning*/ true, + /*track_task*/ true); + // for launch device Kernel + group_options.emplace_back(/*num_threads*/ 1, + /*allow_spinning*/ true, /*track_task*/ true); + queue_group_ = CreateWorkQueueGroup(group_options); + } + + AtomicVectorSizeT PrepareAtomicDeps( + const std::vector& dependecy_count); + AtomicVectorSizeT PrepareAtomicVarRef( + const std::vector& vec_meta_info); + + void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); } + + void AddTask(const OpFuncType& op_func_type, std::function fn) { + queue_group_->AddTask(static_cast(op_func_type), std::move(fn)); + } + + private: + size_t host_num_thread_; + std::unique_ptr queue_group_; +}; + std::string get_memcpy_type(const platform::Place& src_place, const platform::Place& dst_place); diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index ebbe3ed17b..ca880c76c4 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -507,6 +507,12 @@ struct InstructionInfo { std::vector dependecy_count_; }; +enum class OpFuncType { + kQueueSync = 0, // CPU kernel, block host + kQueueAsync = 1, // GPU Kernel or d2h, h2d, send, recv, broadcast +}; +class RuntimeInferShapeContext; + struct Instruction { OpKernelFunc kernel_func_; std::shared_ptr runtime_ctx_; @@ -522,15 +528,11 @@ struct Instruction { std::vector output_events_; platform::DeviceContext* dev_ctx_; // not owned + OpFuncType type_; std::vector> vec_inplace_in_to_out_; }; -enum class OpFuncType { - kQueueAsync, // GPU Kernel or d2h, h2d, send, recv, broadcast - kQueueSync, // CPU kernel, block host -}; - struct OpFuncNode { // int unsed; std::map> input_index; diff --git a/paddle/fluid/framework/new_executor/workqueue.h b/paddle/fluid/framework/new_executor/workqueue.h index 32e90641bb..ead9d9949b 100644 --- a/paddle/fluid/framework/new_executor/workqueue.h +++ b/paddle/fluid/framework/new_executor/workqueue.h @@ -22,9 +22,14 @@ namespace paddle { namespace framework { struct WorkQueueOptions { - size_t num_threads{0}; - bool allow_spinning{true}; - bool track_task{false}; + WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task) + : num_threads(num_threads), + allow_spinning(allow_spinning), + track_task(track_task) {} + + size_t num_threads; + bool allow_spinning; + bool track_task; }; class WorkQueue { diff --git a/paddle/fluid/framework/new_executor/workqueue_test.cc b/paddle/fluid/framework/new_executor/workqueue_test.cc index cec1274259..c229a84b14 100644 --- a/paddle/fluid/framework/new_executor/workqueue_test.cc +++ b/paddle/fluid/framework/new_executor/workqueue_test.cc @@ -26,9 +26,8 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) { std::atomic counter{0}; constexpr unsigned kLoopNum = 1000000; // CreateSingleThreadedWorkQueue - WorkQueueOptions options; - options.num_threads = 1; - options.track_task = true; + WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, + /*track_task*/ true); auto work_queue = CreateSingleThreadedWorkQueue(options); // NumThreads EXPECT_EQ(work_queue->NumThreads(), 1u); @@ -58,9 +57,8 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) { constexpr unsigned kExternalLoopNum = 100; constexpr unsigned kLoopNum = 1000000; // CreateMultiThreadedWorkQueue - WorkQueueOptions options; - options.num_threads = 10; - options.track_task = true; + WorkQueueOptions options(/*num_threads*/ 10, /*allow_spinning*/ true, + /*track_task*/ true); auto work_queue = CreateMultiThreadedWorkQueue(options); // NumThreads EXPECT_EQ(work_queue->NumThreads(), 10u); @@ -91,12 +89,10 @@ TEST(WorkQueue, TestWorkQueueGroup) { constexpr unsigned kExternalLoopNum = 100; constexpr unsigned kLoopNum = 1000000; // CreateMultiThreadedWorkQueue - WorkQueueOptions sq_options; - sq_options.num_threads = 1; - sq_options.track_task = true; - WorkQueueOptions mq_options; - mq_options.num_threads = 10; - mq_options.track_task = true; + WorkQueueOptions sq_options(/*num_threads*/ 1, /*allow_spinning*/ true, + /*track_task*/ true); + WorkQueueOptions mq_options(/*num_threads*/ 10, /*allow_spinning*/ true, + /*track_task*/ true); auto queue_group = CreateWorkQueueGroup({sq_options, mq_options}); // NumThreads EXPECT_EQ(queue_group->QueueNumThreads(0), 1u); -- GitLab