diff --git a/paddle/fluid/distributed/auto_parallel/dist_attr.cc b/paddle/fluid/distributed/auto_parallel/dist_attr.cc index 5b97393864d741b1553ab7c6627ac8fd2e759978..c8f5ac6453fc958ea77d98b9b56db2a0ba5c681a 100644 --- a/paddle/fluid/distributed/auto_parallel/dist_attr.cc +++ b/paddle/fluid/distributed/auto_parallel/dist_attr.cc @@ -318,8 +318,11 @@ bool operator==(const TensorDistAttr& lhs, const TensorDistAttr& rhs) { return true; } -std::vector OperatorDistAttr::fields_{ - "process_mesh", "impl_type", "impl_idx", "execution_stream"}; +std::vector OperatorDistAttr::fields_{"process_mesh", + "impl_type", + "impl_idx", + "execution_stream", + "scheduling_priority"}; OperatorDistAttr::OperatorDistAttr(const OpDesc& op) : op_(&op) { VLOG(4) << "[OperatorDistAttr constructor] op type: " << op_->Type(); @@ -379,6 +382,7 @@ void OperatorDistAttr::initialize() { impl_type_ = kDefault; impl_idx_ = 0; execution_stream_ = kDefault; + scheduling_priority_ = 0; } void OperatorDistAttr::copy_from(const OperatorDistAttr& dist_attr) { @@ -388,6 +392,7 @@ void OperatorDistAttr::copy_from(const OperatorDistAttr& dist_attr) { set_impl_type(dist_attr.impl_type()); set_impl_idx(dist_attr.impl_idx()); set_execution_stream(dist_attr.execution_stream()); + set_scheduling_priority(dist_attr.scheduling_priority()); set_annotated(dist_attr.annotated()); } @@ -667,6 +672,7 @@ std::string OperatorDistAttr::to_string() const { str += "impl_type: " + impl_type_ + ", "; str += "impl_idx: " + std::to_string(impl_idx_) + ", "; str += "execution_stream: " + execution_stream_ + ", "; + str += "scheduling_priority: " + std::to_string(scheduling_priority_) + ", "; str += "annotated: [" + str_join(annotated_) + "], "; str += "\nprocess_mesh: " + process_mesh_.to_string() + ", "; str += "\ninput_dist_attrs: [\n"; @@ -751,6 +757,9 @@ bool operator==(const OperatorDistAttr& lhs, const OperatorDistAttr& rhs) { if (lhs.execution_stream() != rhs.execution_stream()) { return false; } + if (lhs.scheduling_priority() != rhs.scheduling_priority()) { + return false; + } for (auto const& item : lhs.input_dist_attrs()) { if (rhs.input_dist_attrs().count(item.first) != 1) { return false; diff --git a/paddle/fluid/distributed/auto_parallel/dist_attr.h b/paddle/fluid/distributed/auto_parallel/dist_attr.h index 61e61e2e53dd6cdffce0a011e91985aa415c4236..2a340a295bff9b510bead1a5b0cb770cc5de715b 100644 --- a/paddle/fluid/distributed/auto_parallel/dist_attr.h +++ b/paddle/fluid/distributed/auto_parallel/dist_attr.h @@ -213,6 +213,12 @@ class OperatorDistAttr { execution_stream_ = execution_stream; } + int64_t scheduling_priority() const { return scheduling_priority_; } + + void set_scheduling_priority(int64_t scheduling_priority) { + scheduling_priority_ = scheduling_priority; + } + const std::map& annotated() const { return annotated_; } void set_annotated(const std::map& annotated); @@ -271,6 +277,7 @@ class OperatorDistAttr { std::string impl_type_; int64_t impl_idx_ = -1; std::string execution_stream_; + int64_t scheduling_priority_; // lower value, higher priority, default to 0 std::map annotated_; }; diff --git a/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc b/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc index 4703bbb3939c465badcdb07121acc238269076d8..4e0359144c2838c2deb1a7ce0bc8be2dfa759fc0 100644 --- a/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc +++ b/paddle/fluid/framework/new_executor/interpreter/interpreter_util.cc @@ -33,11 +33,6 @@ #include "paddle/fluid/platform/mkldnn_helper.h" #endif -PADDLE_DEFINE_EXPORTED_bool( - new_executor_serial_run, - false, - "Enable serial execution for standalone executor, used for debug."); - PADDLE_DEFINE_EXPORTED_bool( new_executor_log_memory_stats, false, @@ -118,11 +113,7 @@ void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, std::function fn) { // queue_idx=0 : kCpuSync or kGpuSync // queue_idx=1 : kGPUAsync - // when serial_run, always make queue_idx=1, so only one thread is used - size_t queue_idx = - (op_func_type == OpFuncType::kGpuAsync || FLAGS_new_executor_serial_run); - VLOG(8) << "Add task: " << queue_idx; - queue_group_->AddTask(queue_idx, std::move(fn)); + queue_group_->AddTask(op_func_type == OpFuncType::kGpuAsync, std::move(fn)); } bool IsCommunicationOp(const std::string& op_name) { @@ -585,6 +576,17 @@ void BuildOpFuncList(const platform::Place& place, op_func_node.execution_stream_ = dist_attr->execution_stream(); } + if (dist_attr) { + op_func_node.priority_ = dist_attr->scheduling_priority(); + } else if (interpreter::IsCommunicationOp(op_type)) { + // NOTE(Ruibiao): Dispatching computation before communication improves + // multi-stream overlap when the time cost of communication less than that + // of the calculation (e.g., ResNet50_bs128_pure_fp16 N4C32 training). + op_func_node.priority_ = 1; + } + VLOG(6) << "scheduling priority of " << op_type << " : " + << op_func_node.priority_; + SingleStreamGuard single_stream_guard(ops[i]); VLOG(4) << "Start run " << place << " " << op->DebugStringEx(local_scope); diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index a0aa82102e315dd25f47705b112bf71124a07290..4642a684663b64296e373b59b0f241659a24cd73 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -33,6 +33,10 @@ #endif #include "paddle/phi/backends/device_manager.h" +PADDLE_DEFINE_EXPORTED_bool( + new_executor_serial_run, + false, + "Enable serial execution for standalone executor, used for debug."); PADDLE_DEFINE_EXPORTED_bool(new_executor_use_inplace, false, "Use inplace in new executor"); @@ -128,6 +132,15 @@ InterpreterCore::InterpreterCore(const platform::Place& place, local_scope_ = local_scope; } var_scope_.SetLocalScope(local_scope_); + + instruction_prority_less = [this](size_t lhs, size_t rhs) { + Priority lhs_prority = vec_instruction_[lhs].GetPriority(); + Priority rhs_prority = vec_instruction_[rhs].GetPriority(); + if (lhs_prority == rhs_prority) { + return lhs > rhs; + } + return lhs_prority > rhs_prority; + }; } InterpreterCore::~InterpreterCore() { @@ -516,25 +529,31 @@ void InterpreterCore::BuildOperatorDependences() { Instruction& cur_instr = vec_instruction_[instr_id]; const std::set& next_instr_ids = downstream_map[instr_id]; - if (cur_instr.KernelType() == OpFuncType::kGpuAsync) { + if (FLAGS_new_executor_serial_run) { for (size_t next_instr_id : next_instr_ids) { - if (vec_instruction_[next_instr_id].KernelType() == - OpFuncType::kGpuAsync) { - cur_instr.AddNextInstrInSameThread(next_instr_id); - } else { - cur_instr.AddNextInstrInDifferentThread(next_instr_id); - } + cur_instr.AddNextInstrInSameThread(next_instr_id); } } else { - bool has_instr_in_same_thread = false; - for (size_t next_instr_id : next_instr_ids) { - if (!has_instr_in_same_thread && - vec_instruction_[next_instr_id].KernelType() != - OpFuncType::kGpuAsync) { - cur_instr.AddNextInstrInSameThread(next_instr_id); - has_instr_in_same_thread = true; - } else { - cur_instr.AddNextInstrInDifferentThread(next_instr_id); + if (cur_instr.KernelType() == OpFuncType::kGpuAsync) { + for (size_t next_instr_id : next_instr_ids) { + if (vec_instruction_[next_instr_id].KernelType() == + OpFuncType::kGpuAsync) { + cur_instr.AddNextInstrInSameThread(next_instr_id); + } else { + cur_instr.AddNextInstrInDifferentThread(next_instr_id); + } + } + } else { + bool has_instr_in_same_thread = false; + for (size_t next_instr_id : next_instr_ids) { + if (!has_instr_in_same_thread && + vec_instruction_[next_instr_id].KernelType() != + OpFuncType::kGpuAsync) { + cur_instr.AddNextInstrInSameThread(next_instr_id); + has_instr_in_same_thread = true; + } else { + cur_instr.AddNextInstrInDifferentThread(next_instr_id); + } } } } @@ -567,12 +586,7 @@ void InterpreterCore::Convert( for (size_t op_idx = 0; op_idx < op_nums; ++op_idx) { auto& op_func_node = nodes[op_idx]; auto* dev_ctx_ = stream_analyzer_.ParseDeviceContext(op_func_node); - Priority priority = - interpreter::IsCommunicationOp(op_func_node.operator_base_->Type()) - ? Priority::kLowest - : Priority::kNormal; - vec_instruction_.emplace_back( - op_idx, std::move(op_func_node), *dev_ctx_, priority); + vec_instruction_.emplace_back(op_idx, std::move(op_func_node), *dev_ctx_); } BuildOperatorDependences(); @@ -938,8 +952,12 @@ void InterpreterCore::ExecuteInstructionList( if (dependecy_count_[i] == 0) { // NOTE(zhiqiu): hot fix for jit input var RecordMemcpyD2H(vec_instr.at(i)); - async_work_queue_->AddTask(vec_instr.at(i).KernelType(), - [this, i] { RunInstructionAsync(i); }); + if (FLAGS_new_executor_serial_run) { + RunInstructionAsync(i); + } else { + async_work_queue_->AddTask(vec_instr.at(i).KernelType(), + [this, i] { RunInstructionAsync(i); }); + } } } @@ -965,8 +983,8 @@ void InterpreterCore::ExecuteInstructionList( } } -void InterpreterCore::RunNextInstructions( - const Instruction& instr, std::deque* reserved_next_ops) { +void InterpreterCore::RunNextInstructions(const Instruction& instr, + SchedulingQueue* reserved_next_ops) { platform::RecordEvent record( "RunNextInstructions", platform::TracerEventType::UserDefined, 10); @@ -986,21 +1004,21 @@ void InterpreterCore::RunNextInstructions( for (size_t next_instr_id : instr.NextInstrsInSameThread()) { if (IsReady(next_instr_id)) { - if (vec_instruction_[next_instr_id].GetPriority() == Priority::kLowest) { - reserved_next_ops->push_back(next_instr_id); - } else { - reserved_next_ops->push_front(next_instr_id); - } + reserved_next_ops->push(next_instr_id); } } } void InterpreterCore::RunInstructionAsync(size_t instr_id) { - std::deque ready_ops; - ready_ops.push_back(instr_id); + // NOTE(Ruibiao): Due to the uncertain order in multi-threading asynchronous + // scheduling, the priority order involved cross-thread scheduling is not + // guaranteed. Only Ops scheduled by the same AddTask call have the guarantee + // of priority order. + SchedulingQueue ready_ops(instruction_prority_less); + ready_ops.push(instr_id); while (!ready_ops.empty()) { - instr_id = ready_ops.front(); - ready_ops.pop_front(); + instr_id = ready_ops.top(); + ready_ops.pop(); auto& instr_node = vec_instruction_.at(instr_id); RunInstruction(instr_node); @@ -1330,24 +1348,24 @@ void InterpreterCore::AnalyseExecuteOrderForTrace() { }; std::vector trace_order; - std::deque ready_ops; + SchedulingQueue ready_ops(instruction_prority_less); for (size_t instr_id = 0; instr_id < dependecy_count_.size(); ++instr_id) { if (dependecy_count_[instr_id] == 0) { - ready_ops.push_back(instr_id); + ready_ops.push(instr_id); } } while (!ready_ops.empty()) { - auto now_id = ready_ops.front(); - ready_ops.pop_front(); + size_t now_id = ready_ops.top(); + ready_ops.pop(); trace_order.push_back(now_id); auto next_op_set = op_downstream_map[now_id]; for (size_t next_op_id : next_op_set) { if (IsReady(next_op_id)) { - ready_ops.push_back(next_op_id); + ready_ops.push(next_op_id); } } } diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 2c25a0dee9ce77886075d087079b976415913393..80db521d60d04acbef55c16ca0bae20c5d31fd6e 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -78,6 +78,10 @@ class InterpreterCore { const platform::Place& GetPlace() const { return place_; } private: + using InstructionPriorityLess = std::function; + using SchedulingQueue = + std::priority_queue, InstructionPriorityLess>; + // build graph void Convert(std::vector* op_func_nodes); void BuildOperatorDependences(); @@ -97,7 +101,7 @@ class InterpreterCore { void RunInstructionAsync(size_t instr_id); void RunInstruction(const Instruction& instr_node); void RunNextInstructions(const Instruction& instr_id, - std::deque* reserved_next_ops); + SchedulingQueue* reserved_next_ops); void RunOperator(const Instruction& instr_node); // Trace void TraceInstructionList(const std::vector& vec_instr); @@ -170,6 +174,8 @@ class InterpreterCore { // used for Trace int64_t sync_op_num_{-1}; std::vector trace_execute_order_; + + InstructionPriorityLess instruction_prority_less; }; std::shared_ptr CreateInterpreterCore( diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.cc b/paddle/fluid/framework/new_executor/new_executor_defs.cc index aa9c0da7fdda4be56349c60da4855b38ccd9175f..87799ddd75c143cc58c45da732561494fb7f6103 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.cc +++ b/paddle/fluid/framework/new_executor/new_executor_defs.cc @@ -670,13 +670,11 @@ void VariableScope::CheckExist(const std::string& name) const { Instruction::Instruction(size_t id, OpFuncNode&& op_func_node, - const platform::DeviceContext& dev_ctx, - const Priority priority) + const platform::DeviceContext& dev_ctx) : is_artificial_(op_func_node.operator_base_->Type() == "depend"), id_(id), op_func_node_(op_func_node), - dev_ctx_(dev_ctx), - priority_(priority) { + dev_ctx_(dev_ctx) { PADDLE_ENFORCE_GE(id, 0, platform::errors::PreconditionNotMet( diff --git a/paddle/fluid/framework/new_executor/new_executor_defs.h b/paddle/fluid/framework/new_executor/new_executor_defs.h index 9fb4e0b7eebaf5410fe8ff546934e51285952e10..3b437275b04e046f0656a48aba4c8bfc7fc24c2e 100644 --- a/paddle/fluid/framework/new_executor/new_executor_defs.h +++ b/paddle/fluid/framework/new_executor/new_executor_defs.h @@ -32,6 +32,8 @@ namespace framework { using OpKernelComputeFunc = std::function; +using Priority = int64_t; + constexpr const char* kCoalesceTensor = "coalesce_tensor"; // stream types @@ -42,8 +44,6 @@ constexpr const char* kH2DStream = "H2DStream"; constexpr int kEmptyVarIndex = 0; -enum class Priority { kLowest, kNormal }; - class InterpretercoreInferShapeContext : public InferShapeContext { public: InterpretercoreInferShapeContext(const OperatorBase& op, @@ -263,29 +263,30 @@ enum class OpFuncType { class RuntimeInferShapeContext; struct OpFuncNode { - // TODO(zhiqiu): Better make it unique_ptr - std::shared_ptr operator_base_; - std::string execution_stream_{kDefaultStream}; - std::map> input_index; - std::map> output_index; + // fit for phi kernel + phi::Kernel* phi_kernel_{nullptr}; // not owned + platform::DeviceContext* dev_ctx_; // not owned std::map inplace_back_map; - OpKernelComputeFunc kernel_func_; - platform::DeviceContext* dev_ctx_; // not owned + std::map> input_index; + std::map> output_index; - // fit for phi kernel - phi::Kernel* phi_kernel_{nullptr}; // not owned + // TODO(zhiqiu): Better make it unique_ptr + std::shared_ptr operator_base_; + std::string execution_stream_{kDefaultStream}; OpFuncType type_; + OpKernelComputeFunc kernel_func_; + + Priority priority_{0}; // lower value, higher priority }; class Instruction { public: Instruction(size_t id, OpFuncNode&& op_func_node, - const platform::DeviceContext& dev_ctx, - const Priority priority); + const platform::DeviceContext& dev_ctx); bool IsArtificial() const { return is_artificial_; } @@ -368,7 +369,7 @@ class Instruction { void ClearInplace(); - Priority GetPriority() const { return priority_; } + Priority GetPriority() const { return op_func_node_.priority_; } private: bool is_artificial_; // Instruction is artificial means that it is only used @@ -384,7 +385,6 @@ class Instruction { OpFuncNode op_func_node_; const platform::DeviceContext& dev_ctx_; // not owned - const Priority priority_; std::shared_ptr runtime_ctx_; std::shared_ptr infershape_ctx_; diff --git a/paddle/fluid/pybind/auto_parallel_py.cc b/paddle/fluid/pybind/auto_parallel_py.cc index a24001819feddc059e88cb75782f873bf2b07865..0d19b01ae2499f705e5f24d34fb6007e0ec5d346 100644 --- a/paddle/fluid/pybind/auto_parallel_py.cc +++ b/paddle/fluid/pybind/auto_parallel_py.cc @@ -226,6 +226,9 @@ void BindAutoParallel(py::module *m) { .def_property("execution_stream", &OperatorDistAttr::execution_stream, &OperatorDistAttr::set_execution_stream) + .def_property("scheduling_priority", + &OperatorDistAttr::scheduling_priority, + &OperatorDistAttr::set_scheduling_priority) .def_property("annotated", &OperatorDistAttr::annotated, &OperatorDistAttr::set_annotated) diff --git a/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_op_priority.py b/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_op_priority.py new file mode 100644 index 0000000000000000000000000000000000000000..9a430c53568ee691f48c0871c366a536e54d44ba --- /dev/null +++ b/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_op_priority.py @@ -0,0 +1,66 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle +from paddle import static + +paddle.enable_static() + + +class TestOpPriority(unittest.TestCase): + def test_op_priority(self): + # In this test case, x and y share the same data, + # which is initialized to 0. The shared data is + # read and wrote by two concurrent Ops increment(x) + # and increment(y). In case of Op sequential scheduling, + # the result of increment(x) would be 1 while that of + # increment(y) would be 2. However, increment(y) is + # set to a higher priority than increment(x), so the + # result of increment(y) would be 1. + program = static.Program() + with static.program_guard(program): + x = paddle.zeros(shape=[1], dtype='int32') + block = program.global_block() + + y = block.create_var(dtype='int32') + block.append_op( + type='share_data', inputs={'X': x.name}, outputs={'Out': y.name} + ) + + paddle.increment(x) + block.ops[-1].dist_attr.scheduling_priority = 1 + paddle.increment(y) + block.ops[-1].dist_attr.scheduling_priority = -1 + + # Note that the priority order involved cross-thread scheduling + # is not guaranteed in standalone executor. As fetch(y) + # is scheduled in the different thread from increment(x), + # they are not scheduled in priority order. To make sure that + # fetch(y) is scheduled before increment(x) in priority order, + # we tricky enable serial_run here. + paddle.framework.set_flags({'FLAGS_new_executor_serial_run': 1}) + + exe = static.Executor() + # Currently, priority scheduling is not supported in the first + # step that builds Op list by running kernel. Remove the first + # run here when static-build without kernel running is supported. + result = exe.run(program, fetch_list=[y]) + result = exe.run(program, fetch_list=[y]) + self.assertEqual(result[0], 1) + + +if __name__ == "__main__": + unittest.main()