未验证 提交 8cec1271 编写于 作者: R Ruibiao Chen 提交者: GitHub

Support sequential run GPU OPs for standalone executor (#43243)

* Support sequential run for standalone executor

* Add UTs

* Fix test_standalone_multiply_write

* Remove unnecessary UTs
上级 077f3788
......@@ -27,9 +27,18 @@
#include "paddle/fluid/platform/mkldnn_helper.h"
#endif
// The difference between "sequential_run" and "serial_run":
// "sequential_run" dispatches OPs one by one according to the sequence in the
// Program, while "serial_run" ensures that all Ops are scheduled in a singal
// thread. In standalone executor, "sequential_run" is also "serial_run", while
// "serial_run" is not necessarily "sequential_run".
PADDLE_DEFINE_EXPORTED_bool(new_executor_sequential_run, false,
"Enable sequential execution for standalone "
"executor, only applied to GPU OPs.");
PADDLE_DEFINE_EXPORTED_bool(
new_executor_sequential_run, false,
"Enable sequential execution for standalone executor, used for debug");
new_executor_serial_run, false,
"Enable serial execution for standalone executor, used for debug.");
DECLARE_bool(use_mkldnn);
......@@ -42,10 +51,8 @@ constexpr size_t kPrepareWorkQueueIdx = 2;
void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type,
std::function<void()> fn) {
VLOG(4) << "Add task: " << static_cast<size_t>(op_func_type) << " ";
// NOTE(zhiqiu): use thhe second queue of size of, so only one thread is used.
if (FLAGS_new_executor_sequential_run) {
VLOG(4) << "FLAGS_new_executor_sequential_run:"
<< FLAGS_new_executor_sequential_run;
// NOTE(zhiqiu): use the second queue of size of, so only one thread is used.
if (FLAGS_new_executor_serial_run) {
queue_group_->AddTask(static_cast<size_t>(OpFuncType::kQueueAsync),
std::move(fn));
} else {
......@@ -789,12 +796,14 @@ std::map<int, std::list<int>> build_op_downstream_map(
std::set<int>
remove_duplicate; // remove the duplicate between inputs and outputs
size_t op_num = vec_instruction.size();
// reserve
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
op2dependences[op_idx] = std::set<int>();
}
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
remove_duplicate.clear();
// step1: update the op2dependences structure
for (auto& item :
......@@ -859,8 +868,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
std::map<int, std::list<int>> op_downstream_map =
GetDownstreamMap(op2dependences);
ShrinkDownstreamMap(&op_downstream_map, op_happens_before,
vec_instruction.size());
ShrinkDownstreamMap(&op_downstream_map, op_happens_before, op_num);
// add dependences for random op, make sure that the random op is scheduled
// sequentially
......@@ -880,7 +888,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
};
int dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (random_op_set.count(vec_instruction[op_idx].OpBase()->Type())) {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx, &op_downstream_map,
......@@ -907,7 +915,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
};
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (is_comm_op(vec_instruction[op_idx].OpBase()->Type())) {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx, &op_downstream_map,
......@@ -931,7 +939,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
// c_sync_comm_stream(a)
const std::string kSyncComm = "c_sync_comm_stream";
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == kSyncComm) {
dependence_op_idx = op_idx;
} else {
......@@ -947,7 +955,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
// add dependency for coalesce_tensor
const std::string kCoalesceTensor = "coalesce_tensor";
for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) {
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (vec_instruction[op_idx].OpBase()->Type() == kCoalesceTensor) {
VLOG(4) << "Add depend for " << kCoalesceTensor << " " << op_idx;
auto fused_out = vec_instruction[op_idx].Outputs().at("FusedOutput")[0];
......@@ -977,7 +985,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
// find first op that reads fused_out
auto first_read_fused_out_op = -1;
for (auto j = op_idx + 1; j < vec_instruction.size(); ++j) {
for (auto j = op_idx + 1; j < op_num; ++j) {
if (is_read(vec_instruction[j], fused_out)) {
first_read_fused_out_op = j;
break;
......@@ -1017,8 +1025,7 @@ std::map<int, std::list<int>> build_op_downstream_map(
// we should take the last one to add depned instead of
// 'first_read_fused_out_op'
size_t target = first_read_fused_out_op;
for (size_t j = first_read_fused_out_op + 1; j < vec_instruction.size();
++j) {
for (size_t j = first_read_fused_out_op + 1; j < op_num; ++j) {
if (j == target + 1 &&
is_comm_op(vec_instruction[target].OpBase()->Type()) &&
is_comm_op(vec_instruction[j].OpBase()->Type())) {
......@@ -1032,7 +1039,6 @@ std::map<int, std::list<int>> build_op_downstream_map(
for (auto var_id : outputs) {
if (is_read(vec_instruction[j], var_id)) {
AddDownstreamOp(target, j, &op_downstream_map, *op_happens_before);
op2dependences[j].insert(target);
VLOG(4) << target << " -> " << j;
VLOG(4) << "Add depend from "
<< vec_instruction[target].OpBase()->Type() << " to "
......@@ -1043,6 +1049,24 @@ std::map<int, std::list<int>> build_op_downstream_map(
}
}
if (FLAGS_new_executor_sequential_run) {
dependence_op_idx = -1;
for (size_t op_idx = 0; op_idx < op_num; ++op_idx) {
if (!IsCpuOp(vec_instruction[op_idx])) {
if (dependence_op_idx != -1) {
AddDownstreamOp(dependence_op_idx, op_idx, &op_downstream_map,
*op_happens_before);
VLOG(4) << "Add depend from "
<< vec_instruction[dependence_op_idx].OpBase()->Type() << "("
<< dependence_op_idx << ") to "
<< vec_instruction[op_idx].OpBase()->Type() << "(" << op_idx
<< ")";
}
dependence_op_idx = op_idx;
}
}
}
VLOG(8) << "downstream count: " << CountDownstreamMap(op_downstream_map);
VLOG(8) << "downstream_map: " << std::endl
<< StringizeDownstreamMap(op_downstream_map);
......
......@@ -16,6 +16,7 @@ foreach(target ${TEST_INTERP_CASES})
FLAGS_use_stream_safe_cuda_allocator=true
FLAGS_fast_eager_deletion_mode=false
FLAGS_eager_delete_tensor_gb=0)
py_test_modules(
${target}_non_eager_deletion
MODULES
......@@ -25,6 +26,7 @@ foreach(target ${TEST_INTERP_CASES})
FLAGS_use_stream_safe_cuda_allocator=true
FLAGS_fast_eager_deletion_mode=false
FLAGS_eager_delete_tensor_gb=0.000001)
py_test_modules(
${target}_fast_gc
MODULES
......@@ -34,6 +36,7 @@ foreach(target ${TEST_INTERP_CASES})
FLAGS_use_stream_safe_cuda_allocator=true
FLAGS_fast_eager_deletion_mode=true
FLAGS_eager_delete_tensor_gb=0)
py_test_modules(
${target}_fast_gc_non_eager_deletion
MODULES
......@@ -44,3 +47,11 @@ foreach(target ${TEST_INTERP_CASES})
FLAGS_fast_eager_deletion_mode=true
FLAGS_eager_delete_tensor_gb=0.000001)
endforeach()
py_test_modules(
test_standalone_executor_sequential_run MODULES test_standalone_executor ENVS
FLAGS_new_executor_sequential_run=true)
py_test_modules(
test_standalone_executor_serial_run MODULES test_standalone_executor ENVS
FLAGS_new_executor_serial_run=true)
......@@ -81,7 +81,9 @@ class TestCompatibility(unittest.TestCase):
return ret
def run_raw_executor(self, feed):
os.environ['FLAGS_USE_STANDALONE_EXECUTOR'] = '0'
out = self._run(feed)
del os.environ['FLAGS_USE_STANDALONE_EXECUTOR']
print("GT:", out)
return out
......
......@@ -231,10 +231,6 @@ class MultiStreamModelTestCase(unittest.TestCase):
for gt, out in zip(ground_truths, res):
self.assertEqual(gt[0], out[0])
res_sequential = self.run_new_executor_sequential()
for gt, out in zip(ground_truths, res_sequential):
self.assertEqual(gt[0], out[0])
def run_raw_executor(self):
paddle.seed(2020)
main_program, startup_program, fetch_list = build_program()
......@@ -264,12 +260,6 @@ class MultiStreamModelTestCase(unittest.TestCase):
np.array(inter_core.run({}, fetch_list)._move_to_list()[0]))
return outs
def run_new_executor_sequential(self):
os.environ['FLAGS_new_executor_sequential_run'] = '1'
res = self.run_new_executor()
del os.environ['FLAGS_new_executor_sequential_run']
return res
class SwitchExecutorInterfaceTestCase(MultiStreamModelTestCase):
......
......@@ -36,8 +36,8 @@ class TestMultiplyWrite(TestCompatibility):
return None
def build_program(self):
main_program = paddle.static.default_main_program()
startup_program = paddle.static.default_startup_program()
main_program = Program()
startup_program = Program()
with paddle.static.program_guard(main_program, startup_program):
out = paddle.full((1, ), 1)
inp1 = paddle.full((1, ), 2)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册