From 8cec12712e013c08893a1a6819ffe7037e5028d7 Mon Sep 17 00:00:00 2001 From: Ruibiao Chen Date: Tue, 14 Jun 2022 21:07:41 +0800 Subject: [PATCH] Support sequential run GPU OPs for standalone executor (#43243) * Support sequential run for standalone executor * Add UTs * Fix test_standalone_multiply_write * Remove unnecessary UTs --- .../new_executor/interpretercore_util.cc | 60 +++++++++++++------ .../unittests/interpreter/CMakeLists.txt | 11 ++++ .../test_standalone_controlflow.py | 2 + .../interpreter/test_standalone_executor.py | 10 ---- .../test_standalone_multiply_write.py | 4 +- 5 files changed, 57 insertions(+), 30 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 72b7477f2b8..dbea438b140 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -27,9 +27,18 @@ #include "paddle/fluid/platform/mkldnn_helper.h" #endif +// The difference between "sequential_run" and "serial_run": +// "sequential_run" dispatches OPs one by one according to the sequence in the +// Program, while "serial_run" ensures that all Ops are scheduled in a singal +// thread. In standalone executor, "sequential_run" is also "serial_run", while +// "serial_run" is not necessarily "sequential_run". +PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run, false, + "Enable sequential execution for standalone " + "executor, only applied to GPU OPs."); + PADDLE_DEFINE_EXPORTED_bool( - new_executor_sequential_run, false, - "Enable sequential execution for standalone executor, used for debug"); + new_executor_serial_run, false, + "Enable serial execution for standalone executor, used for debug."); DECLARE_bool(use_mkldnn); @@ -42,10 +51,8 @@ constexpr size_t kPrepareWorkQueueIdx = 2; void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, std::function fn) { VLOG(4) << "Add task: " << static_cast(op_func_type) << " "; - // NOTE(zhiqiu): use thhe second queue of size of, so only one thread is used. - if (FLAGS_new_executor_sequential_run) { - VLOG(4) << "FLAGS_new_executor_sequential_run:" - << FLAGS_new_executor_sequential_run; + // NOTE(zhiqiu): use the second queue of size of, so only one thread is used. + if (FLAGS_new_executor_serial_run) { queue_group_->AddTask(static_cast(OpFuncType::kQueueAsync), std::move(fn)); } else { @@ -789,12 +796,14 @@ std::map> build_op_downstream_map( std::set remove_duplicate; // remove the duplicate between inputs and outputs + size_t op_num = vec_instruction.size(); + // reserve - for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { op2dependences[op_idx] = std::set(); } - for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { remove_duplicate.clear(); // step1: update the op2dependences structure for (auto& item : @@ -859,8 +868,7 @@ std::map> build_op_downstream_map( std::map> op_downstream_map = GetDownstreamMap(op2dependences); - ShrinkDownstreamMap(&op_downstream_map, op_happens_before, - vec_instruction.size()); + ShrinkDownstreamMap(&op_downstream_map, op_happens_before, op_num); // add dependences for random op, make sure that the random op is scheduled // sequentially @@ -880,7 +888,7 @@ std::map> build_op_downstream_map( }; int dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) { if (dependence_op_idx != -1) { AddDownstreamOp(dependence_op_idx, op_idx, &op_downstream_map, @@ -907,7 +915,7 @@ std::map> build_op_downstream_map( }; dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) { if (dependence_op_idx != -1) { AddDownstreamOp(dependence_op_idx, op_idx, &op_downstream_map, @@ -931,7 +939,7 @@ std::map> build_op_downstream_map( // c_sync_comm_stream(a) const std::string kSyncComm = "c_sync_comm_stream"; dependence_op_idx = -1; - for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { if (vec_instruction[op_idx].OpBase()->Type() == kSyncComm) { dependence_op_idx = op_idx; } else { @@ -947,7 +955,7 @@ std::map> build_op_downstream_map( // add dependency for coalesce_tensor const std::string kCoalesceTensor = "coalesce_tensor"; - for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { if (vec_instruction[op_idx].OpBase()->Type() == kCoalesceTensor) { VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx; auto fused_out = vec_instruction[op_idx].Outputs().at("FusedOutput")[0]; @@ -977,7 +985,7 @@ std::map> build_op_downstream_map( // find first op that reads fused_out auto first_read_fused_out_op = -1; - for (auto j = op_idx + 1; j < vec_instruction.size(); ++j) { + for (auto j = op_idx + 1; j < op_num; ++j) { if (is_read(vec_instruction[j], fused_out)) { first_read_fused_out_op = j; break; @@ -1017,8 +1025,7 @@ std::map> build_op_downstream_map( // we should take the last one to add depned instead of // 'first_read_fused_out_op' size_t target = first_read_fused_out_op; - for (size_t j = first_read_fused_out_op + 1; j < vec_instruction.size(); - ++j) { + for (size_t j = first_read_fused_out_op + 1; j < op_num; ++j) { if (j == target + 1 && is_comm_op(vec_instruction[target].OpBase()->Type()) && is_comm_op(vec_instruction[j].OpBase()->Type())) { @@ -1032,7 +1039,6 @@ std::map> build_op_downstream_map( for (auto var_id : outputs) { if (is_read(vec_instruction[j], var_id)) { AddDownstreamOp(target, j, &op_downstream_map, *op_happens_before); - op2dependences[j].insert(target); VLOG(4) << target << " -> " << j; VLOG(4) << "Add depend from " << vec_instruction[target].OpBase()->Type() << " to " @@ -1043,6 +1049,24 @@ std::map> build_op_downstream_map( } } + if (FLAGS_new_executor_sequential_run) { + dependence_op_idx = -1; + for (size_t op_idx = 0; op_idx < op_num; ++op_idx) { + if (!IsCpuOp(vec_instruction[op_idx])) { + if (dependence_op_idx != -1) { + AddDownstreamOp(dependence_op_idx, op_idx, &op_downstream_map, + *op_happens_before); + VLOG(4) << "Add depend from " + << vec_instruction[dependence_op_idx].OpBase()->Type() << "(" + << dependence_op_idx << ") to " + << vec_instruction[op_idx].OpBase()->Type() << "(" << op_idx + << ")"; + } + dependence_op_idx = op_idx; + } + } + } + VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map); VLOG(8) << "downstream_map: " << std::endl << StringizeDownstreamMap(op_downstream_map); diff --git a/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt b/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt index 976a36b7615..c60a7511022 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/interpreter/CMakeLists.txt @@ -16,6 +16,7 @@ foreach(target ${TEST_INTERP_CASES}) FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=false FLAGS_eager_delete_tensor_gb=0) + py_test_modules( ${target}_non_eager_deletion MODULES @@ -25,6 +26,7 @@ foreach(target ${TEST_INTERP_CASES}) FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=false FLAGS_eager_delete_tensor_gb=0.000001) + py_test_modules( ${target}_fast_gc MODULES @@ -34,6 +36,7 @@ foreach(target ${TEST_INTERP_CASES}) FLAGS_use_stream_safe_cuda_allocator=true FLAGS_fast_eager_deletion_mode=true FLAGS_eager_delete_tensor_gb=0) + py_test_modules( ${target}_fast_gc_non_eager_deletion MODULES @@ -44,3 +47,11 @@ foreach(target ${TEST_INTERP_CASES}) FLAGS_fast_eager_deletion_mode=true FLAGS_eager_delete_tensor_gb=0.000001) endforeach() + +py_test_modules( + test_standalone_executor_sequential_run MODULES test_standalone_executor ENVS + FLAGS_new_executor_sequential_run=true) + +py_test_modules( + test_standalone_executor_serial_run MODULES test_standalone_executor ENVS + FLAGS_new_executor_serial_run=true) diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py index eeddcaa5bb5..5ce035097d0 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_controlflow.py @@ -81,7 +81,9 @@ class TestCompatibility(unittest.TestCase): return ret def run_raw_executor(self, feed): + os.environ['FLAGS_USE_STANDALONE_EXECUTOR'] = '0' out = self._run(feed) + del os.environ['FLAGS_USE_STANDALONE_EXECUTOR'] print("GT:", out) return out diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py index 7faff7ec181..9e375126550 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py @@ -231,10 +231,6 @@ class MultiStreamModelTestCase(unittest.TestCase): for gt, out in zip(ground_truths, res): self.assertEqual(gt[0], out[0]) - res_sequential = self.run_new_executor_sequential() - for gt, out in zip(ground_truths, res_sequential): - self.assertEqual(gt[0], out[0]) - def run_raw_executor(self): paddle.seed(2020) main_program, startup_program, fetch_list = build_program() @@ -264,12 +260,6 @@ class MultiStreamModelTestCase(unittest.TestCase): np.array(inter_core.run({}, fetch_list)._move_to_list()[0])) return outs - def run_new_executor_sequential(self): - os.environ['FLAGS_new_executor_sequential_run'] = '1' - res = self.run_new_executor() - del os.environ['FLAGS_new_executor_sequential_run'] - return res - class SwitchExecutorInterfaceTestCase(MultiStreamModelTestCase): diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py index 8006c59d2ba..a4d18d29be4 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_multiply_write.py @@ -36,8 +36,8 @@ class TestMultiplyWrite(TestCompatibility): return None def build_program(self): - main_program = paddle.static.default_main_program() - startup_program = paddle.static.default_startup_program() + main_program = Program() + startup_program = Program() with paddle.static.program_guard(main_program, startup_program): out = paddle.full((1, ), 1) inp1 = paddle.full((1, ), 2) -- GitLab