diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 0501522a7a810ced2baca8198d1bada16abff293..3817a11b9afe4e6d6b3e2526db612b2c8893800d 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -20,9 +20,26 @@ #include "paddle/fluid/operators/controlflow/recurrent_op_helper.h" #include "paddle/fluid/operators/controlflow/while_op_helper.h" +PADDLE_DEFINE_EXPORTED_bool( + new_executor_sequential_run, false, + "Enable sequential execution for standalone executor, used for debug"); namespace paddle { namespace framework { namespace interpreter { + +void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, + std::function fn) { + // NOTE(zhiqiu): use thhe second queue of size of, so only one thread is used. + if (FLAGS_new_executor_sequential_run) { + VLOG(4) << "FLAGS_new_executor_sequential_run:" + << FLAGS_new_executor_sequential_run; + queue_group_->AddTask(static_cast(OpFuncType::kQueueAsync), + std::move(fn)); + } else { + queue_group_->AddTask(static_cast(op_func_type), std::move(fn)); + } +} + using VariableIdMap = std::map>; AtomicVectorSizeT& AsyncWorkQueue::PrepareAtomicDeps( diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index c92cea6c97c863147133e3944a0ec29b5951e5e2..8f27c7e1811fb8f55826e8f3d030de67827b5703 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -77,9 +77,7 @@ class AsyncWorkQueue { // void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); } - void AddTask(const OpFuncType& op_func_type, std::function fn) { - queue_group_->AddTask(static_cast(op_func_type), std::move(fn)); - } + void AddTask(const OpFuncType& op_func_type, std::function fn); void Cancel() { queue_group_->Cancel(); } diff --git a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py index 23dee7338ae18b18d820aee1c71d388441b1d4ef..01b2cccfc48b25957a2607771414f3e0e695ae55 100644 --- a/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py +++ b/python/paddle/fluid/tests/unittests/interpreter/test_standalone_executor.py @@ -130,6 +130,10 @@ class MultiStreamModelTestCase(unittest.TestCase): for gt, out in zip(ground_truths, res): self.assertEqual(gt[0], out[0]) + res_sequential = self.run_new_executor_sequential() + for gt, out in zip(ground_truths, res_sequential): + self.assertEqual(gt[0], out[0]) + def run_raw_executor(self): paddle.seed(2020) main_program, startup_program, fetch_list = build_program() @@ -158,6 +162,12 @@ class MultiStreamModelTestCase(unittest.TestCase): np.array(inter_core.run({}, fetch_list)._move_to_list()[0])) return outs + def run_new_executor_sequential(self): + os.environ['FLAGS_new_executor_sequential_run'] = '1' + res = self.run_new_executor() + del os.environ['FLAGS_new_executor_sequential_run'] + return res + class SwitchExecutorInterfaceTestCase(MultiStreamModelTestCase): def run_new_executor(self):