diff --git a/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc b/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc index 816331e3fa54900c0e2e261e28a70169c8139d4c..6c002d06b5b193ed0bcf89539313b78ba475e095 100644 --- a/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc +++ b/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc @@ -129,14 +129,13 @@ void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, } } -bool IsCommunicationOp(const Instruction& instr) { +bool IsCommunicationOp(const std::string& op_name) { const std::set special_comm_op_set = { "send", "recv", "send_v2", "recv_v2", }; - const std::string& op_name = instr.OpBase()->Type(); const std::string communication_op_prefix = "c_"; if (op_name.find(communication_op_prefix) != std::string::npos || special_comm_op_set.count(op_name)) { @@ -145,6 +144,10 @@ bool IsCommunicationOp(const Instruction& instr) { return false; } +bool IsCommunicationOp(const Instruction& instr) { + return IsCommunicationOp(instr.OpBase()->Type()); +} + bool IsCpuOp(const Instruction& instr) { return platform::is_cpu_place(instr.DeviceContext().GetPlace()); } diff --git a/paddle/fluid/framework/new_executor/interpreter/interpreter_util.h b/paddle/fluid/framework/new_executor/interpreter/interpreter_util.h index b842d3acfde6dc768c3473ebfe2468988c05aaf3..d6652d26541605cd65bd749ec41fc5440dad44b6 100644 --- a/paddle/fluid/framework/new_executor/interpreter/interpreter_util.h +++ b/paddle/fluid/framework/new_executor/interpreter/interpreter_util.h @@ -65,6 +65,8 @@ class AsyncWorkQueue { std::unique_ptr queue_group_; }; +bool IsCommunicationOp(const std::string& op_name); + bool IsCommunicationOp(const Instruction& instr); bool IsCpuOp(const Instruction& instr); diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 230e333458dd43a5f1904b984b8263c077d4035a..825c4e14c4489464808e41f158a3372e6d8186b7 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -528,7 +528,12 @@ void InterpreterCore::Convert( for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { auto& op_func_node = nodes[op_idx]; auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node); - vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_); + Priority priority = + interpreter::IsCommunicationOp(op_func_node.operator_base_->Type()) + ? Priority::kLowest + : Priority::kNormal; + vec_instruction_.emplace_back( + op_idx, std::move(op_func_node), *dev_ctx_, priority); } BuildOperatorDependences(); @@ -835,7 +840,7 @@ void InterpreterCore::ExecuteInstructionList( } void InterpreterCore::RunNextInstructions( - const Instruction& instr, std::queue* reserved_next_ops) { + const Instruction& instr, std::deque* reserved_next_ops) { platform::RecordEvent record( "RunNextInstructions", platform::TracerEventType::UserDefined, 10); auto& next_instr = instr.NextInstructions(); @@ -848,7 +853,7 @@ void InterpreterCore::RunNextInstructions( if (instr.KernelType() == OpFuncType::kQueueAsync) { // move all sync_ops into other threads - for (auto next_id : next_instr.SyncRunIds()) { + for (size_t next_id : next_instr.SyncRunIds()) { if (IsReady(next_id)) { async_work_queue_->AddTask( vec_instruction_[next_id].KernelType(), @@ -856,14 +861,22 @@ void InterpreterCore::RunNextInstructions( } } // keep all async_ops running in current thread - for (auto next_id : next_instr.DirectRunIds()) { + for (size_t next_id : next_instr.DirectRunIds()) { if (IsReady(next_id)) { - reserved_next_ops->push(next_id); + if (vec_instruction_[next_id].GetPriority() == Priority::kLowest) { + reserved_next_ops->push_back(next_id); + } else { + reserved_next_ops->push_front(next_id); + } } } - for (auto next_id : next_instr.EventRunIds()) { + for (size_t next_id : next_instr.EventRunIds()) { if (IsReady(next_id)) { - reserved_next_ops->push(next_id); + if (vec_instruction_[next_id].GetPriority() == Priority::kLowest) { + reserved_next_ops->push_back(next_id); + } else { + reserved_next_ops->push_front(next_id); + } } } } else { @@ -895,16 +908,18 @@ void InterpreterCore::RunNextInstructions( [this, next_id] { RunInstructionAsync(next_id); }); } } - if (first_op != -1) reserved_next_ops->push(first_op); + if (first_op != -1) { + reserved_next_ops->push_front(first_op); + } } } void InterpreterCore::RunInstructionAsync(size_t instr_id) { - std::queue ready_ops; - ready_ops.push(instr_id); + std::deque ready_ops; + ready_ops.push_back(instr_id); while (!ready_ops.empty()) { instr_id = ready_ops.front(); - ready_ops.pop(); + ready_ops.pop_front(); auto& instr_node = vec_instruction_.at(instr_id); VLOG(5) << __func__ << " OP id:" << instr_node.Id() << " name:" << instr_node.OpBase()->Type() << " type:" diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index ff89f5ed731dec698799ef0ee531d405d76610a5..4cf5053448703b3663e605647731898c174edba4 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -92,7 +92,7 @@ class InterpreterCore { void RunInstructionAsync(size_t instr_id); void RunInstruction(const Instruction& instr_node); void RunNextInstructions(const Instruction& instr_id, - std::queue* reserved_next_ops); + std::deque* reserved_next_ops); // only used when program contains no feed op void Prepare(const std::vector& feed_names, const std::vector& feed_tensors, diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.cc b/paddle/fluid/framework/new_executor/new_executor_defs.cc index 02be9f47ecf3efbfa7ea0c008b508bd5a9bb0a1e..08a4a486173f7bf0ff3cb0628171ecaae9700b90 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.cc +++ b/paddle/fluid/framework/new_executor/new_executor_defs.cc @@ -673,8 +673,12 @@ void VariableScope::CheckExist(const std::string& name) const { Instruction::Instruction(size_t id, OpFuncNode&& op_func_node, - const platform::DeviceContext& dev_ctx) - : id_(id), op_func_node_(op_func_node), dev_ctx_(dev_ctx) { + const platform::DeviceContext& dev_ctx, + const Priority priority) + : id_(id), + op_func_node_(op_func_node), + dev_ctx_(dev_ctx), + priority_(priority) { PADDLE_ENFORCE_GE(id, 0, platform::errors::PreconditionNotMet( diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index 6735e891230d7aef7afa0ee88d853268312a166c..f1ede0974bb0c1b03d45caea12feaef8228f0563 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -40,6 +40,8 @@ constexpr const char* kDefaultStream = "DefaultStream"; constexpr const char* kD2HStream = "D2HStream"; constexpr const char* kH2DStream = "H2DStream"; +enum class Priority { kLowest, kNormal }; + class InterpretercoreInferShapeContext : public InferShapeContext { public: InterpretercoreInferShapeContext(const OperatorBase& op, @@ -300,7 +302,8 @@ class Instruction { public: Instruction(size_t id, OpFuncNode&& op_func_node, - const platform::DeviceContext& dev_ctx); + const platform::DeviceContext& dev_ctx, + const Priority priority); size_t Id() const; @@ -362,10 +365,13 @@ class Instruction { std::shared_ptr event, platform::DeviceType waiter_type); + Priority GetPriority() const { return priority_; } + private: size_t id_; OpFuncNode op_func_node_; const platform::DeviceContext& dev_ctx_; // not owned + const Priority priority_; std::shared_ptr runtime_ctx_; std::shared_ptr infershape_ctx_;