diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 7d9d3d5fef14a81064ff229b7abfd090988f3e50..083d989cb5267260651de192b4ecd1a944aebe3e 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -189,8 +189,6 @@ void InterpreterCore::Convert() { for (auto inst_id : filter_next) { dependecy_count_[inst_id]++; } - vec_instruction_[i].next_instruction_.all_next_ops_ = - std::move(filter_next); } for (size_t i = 0; i < vec_instruction_.size(); ++i) { @@ -356,31 +354,81 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) { void InterpreterCore::ExecuteInstructionList( const std::vector& vec_instr) { - auto atomic_deps = async_work_queue_.PrepareAtomicDeps(dependecy_count_); - auto atomic_var_ref = async_work_queue_.PrepareAtomicVarRef(vec_meta_info_); - std::atomic op_run_number{0}; + async_work_queue_.PrepareAtomicDeps(dependecy_count_); + async_work_queue_.PrepareAtomicVarRef(vec_meta_info_); + op_run_number_ = 0; for (size_t i = 0; i < dependecy_count_.size(); ++i) { if (dependecy_count_[i] == 0) { - async_work_queue_.AddTask(vec_instr[i].type_, [&, i]() { - RunInstructionAsync(i, &atomic_deps, &atomic_var_ref, &op_run_number); - }); + async_work_queue_.AddTask(vec_instr[i].type_, + [&, i] { RunInstructionAsync(i); }); } } async_work_queue_.WaitEmpty(); PADDLE_ENFORCE_EQ( - op_run_number.load(), vec_instr.size(), + op_run_number_.load(), vec_instr.size(), platform::errors::Fatal( "Required op_run_number == %d, but received op_run_number = %d.", - vec_instr.size(), op_run_number.load())); + vec_instr.size(), op_run_number_.load())); } -void InterpreterCore::RunInstructionAsync(size_t instr_id, - AtomicVectorSizeT* atomic_deps, - AtomicVectorSizeT* atomic_var_ref, - std::atomic* op_run_number) { +void InterpreterCore::RunNextInstruction(const Instruction& instr) { + auto& next_instr = instr.next_instruction_; + auto& atomic_deps = async_work_queue_.AtomicDeps(); + auto IsReady = [&](size_t next_id) { + return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1; + }; + + if (instr.type_ == OpFuncType::kQueueAsync) { + // move all sync_ops into other threads + for (auto next_id : next_instr.synchronize_run_) { + if (IsReady(next_id)) { + async_work_queue_.AddTask( + vec_instruction_[next_id].type_, + [&, next_id] { RunInstructionAsync(next_id); }); + } + } + // keep all async_ops running in current thread + for (auto next_id : next_instr.direct_run_) { + if (IsReady(next_id)) { + RunInstructionAsync(next_id); + } + } + for (auto next_id : next_instr.event_wait_run_) { + if (IsReady(next_id)) { + RunInstructionAsync(next_id); + } + } + } else { + // move async_ops into async_thread + for (auto next_id : next_instr.event_wait_run_) { + if (IsReady(next_id)) { + async_work_queue_.AddTask( + vec_instruction_[next_id].type_, + [&, next_id] { RunInstructionAsync(next_id); }); + } + } + + for (size_t i = 0; i < next_instr.direct_run_.size(); ++i) { + auto next_id = next_instr.direct_run_[i]; + if (IsReady(next_id)) { + // only keep one op running in current thread + if (i == 0) { + RunInstructionAsync(next_id); + continue; + } + // move rest ops into other threads + async_work_queue_.AddTask( + vec_instruction_[next_id].type_, + [&, next_id] { RunInstructionAsync(next_id); }); + } + } + } +} + +void InterpreterCore::RunInstructionAsync(size_t instr_id) { auto& instr_node = vec_instruction_[instr_id]; platform::RecordEvent instruction_event( instr_node.kernel_func_.operator_base_->Type()); @@ -389,32 +437,22 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id, RunInstruction(instr_node); event_manager_.RecordEvent(instr_node, place_); - op_run_number->fetch_add(1, std::memory_order_relaxed); + op_run_number_.fetch_add(1, std::memory_order_relaxed); - auto& next_instr = instr_node.next_instruction_.all_next_ops_; - - for (auto next_i : next_instr) { - // fetch_sub return value before applying sub - bool is_ready = - atomic_deps->at(next_i)->fetch_sub(1, std::memory_order_relaxed) == 1; - if (is_ready) { - async_work_queue_.AddTask(vec_instruction_[next_i].type_, [=]() { - RunInstructionAsync(next_i, atomic_deps, atomic_var_ref, op_run_number); - }); - } - } // GC infomation - CheckGC(instr_id, instr_node.gc_check_var_list, atomic_var_ref); + CheckGC(instr_id, instr_node.gc_check_var_list); + + RunNextInstruction(instr_node); } void InterpreterCore::CheckGC(size_t instr_id, - const std::vector& gc_check_list, - AtomicVectorSizeT* atomic_var_ref) { + const std::vector& gc_check_list) { auto& var_scope = *global_scope_; + auto& atomic_var_ref = async_work_queue_.AtomicVarRef(); for (auto var_id : gc_check_list) { - bool is_ready = atomic_var_ref->at(var_id)->fetch_sub( - 1, std::memory_order_relaxed) == 1; + bool is_ready = + atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1; if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ && !var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) { gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id], diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index e594f9ca8b54b5fee6dae88191cc1a58db87ff3e..47f23aff4f00e7b48bc25aa0d999dde1016cc07a 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -65,13 +65,10 @@ class InterpreterCore { void DryRunPrepare(const std::vector& feed_tensors); - void CheckGC(size_t instr_id, const std::vector& gc_check_list, - AtomicVectorSizeT* working_var_ref); + void CheckGC(size_t instr_id, const std::vector& gc_check_list); - void RunInstructionAsync(size_t instr_id, - AtomicVectorSizeT* working_dependecy_count, - AtomicVectorSizeT* working_var_ref, - std::atomic* op_run_number); + void RunInstructionAsync(size_t instr_id); + void RunNextInstruction(const Instruction& instr_id); void AddFetch(const std::vector& fetch_names); void BuildSkipShareLoDInfo(); @@ -101,6 +98,7 @@ class InterpreterCore { InterpreterCoreGarbageCollector gc_; std::vector gc_event_; + std::atomic op_run_number_{0}; }; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 16df5d794f4d44b958f049f2d33360896a934514..3438fc3bd4dcd168b33ba60c5cd497cd23a226b5 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -12,31 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. #include "paddle/fluid/framework/new_executor/interpretercore_util.h" +#include + #include "paddle/fluid/framework/executor_gc_helper.h" namespace paddle { namespace framework { namespace interpretercore { -AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicDeps( +AtomicVectorSizeT& AsyncWorkQueue::PrepareAtomicDeps( const std::vector& dependecy_count) { - AtomicVectorSizeT working_dependecy_count(dependecy_count.size()); + if (atomic_deps_.size() != dependecy_count.size()) { + atomic_deps_.clear(); + std::generate_n(std::back_inserter(atomic_deps_), dependecy_count.size(), + [] { return std::make_unique>(0); }); + } + for (size_t i = 0; i < dependecy_count.size(); ++i) { - working_dependecy_count[i] = - std::make_unique>(dependecy_count[i]); + atomic_deps_[i]->store(dependecy_count[i]); } - return working_dependecy_count; + return atomic_deps_; } -AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicVarRef( +AtomicVectorSizeT& AsyncWorkQueue::PrepareAtomicVarRef( const std::vector& vec_meta_info) { - AtomicVectorSizeT working_var_ref(vec_meta_info.size()); + if (atomic_var_ref_.size() != vec_meta_info.size()) { + atomic_var_ref_.clear(); + std::generate_n(std::back_inserter(atomic_var_ref_), vec_meta_info.size(), + [] { return std::make_unique>(0); }); + } for (size_t i = 0; i < vec_meta_info.size(); ++i) { - working_var_ref[i] = - std::make_unique>(vec_meta_info[i].var_ref_count_); + atomic_var_ref_[i]->store(vec_meta_info[i].var_ref_count_); } - return working_var_ref; + return atomic_var_ref_; } bool var_can_be_deleted(const std::string& name, const BlockDesc& block) { diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 259f1c615533d9eef4d34b27e9c0849eb5e12e25..2a5942c7123651b8d4d763112f9c0434c083f100 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -66,9 +66,9 @@ class AsyncWorkQueue { queue_group_ = CreateWorkQueueGroup(group_options); } - AtomicVectorSizeT PrepareAtomicDeps( + AtomicVectorSizeT& PrepareAtomicDeps( const std::vector& dependecy_count); - AtomicVectorSizeT PrepareAtomicVarRef( + AtomicVectorSizeT& PrepareAtomicVarRef( const std::vector& vec_meta_info); void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); } @@ -77,9 +77,14 @@ class AsyncWorkQueue { queue_group_->AddTask(static_cast(op_func_type), std::move(fn)); } + AtomicVectorSizeT& AtomicDeps() { return atomic_deps_; } + AtomicVectorSizeT& AtomicVarRef() { return atomic_var_ref_; } + private: size_t host_num_thread_; std::unique_ptr queue_group_; + AtomicVectorSizeT atomic_deps_; + AtomicVectorSizeT atomic_var_ref_; }; std::string get_memcpy_type(const platform::Place& src_place, diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index 9c0444b3157cb15ab51b74746b3f4a7cb5f9b5da..19b7b6d5dc299fc0117b12f4a7fa56bc371dadb8 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -477,15 +477,10 @@ struct VariableScope { std::vector vec_meta_info_; }; -struct EventRun { - explicit EventRun(size_t op_id) : op_id_(op_id) {} - size_t op_id_; -}; struct NextInstruction { std::vector direct_run_; - std::vector event_wait_run_; - std::vector synchronize_run_; - std::vector all_next_ops_; + std::vector event_wait_run_; + std::vector synchronize_run_; }; struct EventInter {