From 364376e5fd2a629a4e9194a433af3cbcd7e6045d Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 6 Dec 2021 11:13:50 +0800 Subject: [PATCH] [new-exec] enable sequential run for debug (#37835) * enable sequential_run for standalone_executor * add ut * fix ut --- .../new_executor/interpretercore_util.cc | 17 +++++++++++++++++ .../new_executor/interpretercore_util.h | 4 +--- .../interpreter/test_standalone_executor.py | 10 ++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 0501522a7a..3817a11b9a 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -20,9 +20,26 @@ #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" +PADDLE_DEFINE_EXPORTED_bool( + new_executor_sequential_run, false, + "Enable sequential execution for standalone executor, used for debug"); namespace paddle { namespace framework { namespace interpreter { + +void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, + std::function fn) { + // NOTE(zhiqiu): use thhe second queue of size of, so only one thread is used. + if (FLAGS_new_executor_sequential_run) { + VLOG(4) << "FLAGS_new_executor_sequential_run:" + << FLAGS_new_executor_sequential_run; + queue_group_->AddTask(static_cast(OpFuncType::kQueueAsync), + std::move(fn)); + } else { + queue_group_->AddTask(static_cast(op_func_type), std::move(fn)); + } +} + using VariableIdMap = std::map>; AtomicVectorSizeT& AsyncWorkQueue::PrepareAtomicDeps( diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index c92cea6c97..8f27c7e181 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -77,9 +77,7 @@ class AsyncWorkQueue { // void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); } - void AddTask(const OpFuncType& op_func_type, std::function fn) { - queue_group_->AddTask(static_cast(op_func_type), std::move(fn)); - } + void AddTask(const OpFuncType& op_func_type, std::function fn); void Cancel() { queue_group_->Cancel(); } diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py index 23dee7338a..01b2cccfc4 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py @@ -130,6 +130,10 @@ class MultiStreamModelTestCase(unittest.TestCase): for gt, out in zip(ground_truths, res): self.assertEqual(gt[0], out[0]) + res_sequential = self.run_new_executor_sequential() + for gt, out in zip(ground_truths, res_sequential): + self.assertEqual(gt[0], out[0]) + def run_raw_executor(self): paddle.seed(2020) main_program, startup_program, fetch_list = build_program() @@ -158,6 +162,12 @@ class MultiStreamModelTestCase(unittest.TestCase): np.array(inter_core.run({}, fetch_list)._move_to_list()[0])) return outs + def run_new_executor_sequential(self): + os.environ['FLAGS_new_executor_sequential_run'] = '1' + res = self.run_new_executor() + del os.environ['FLAGS_new_executor_sequential_run'] + return res + class SwitchExecutorInterfaceTestCase(MultiStreamModelTestCase): def run_new_executor(self): -- GitLab