未验证 提交 a0871194 编写于 作者: A Aurelius84 提交者: GitHub

Intergrate MultiThreadedWorkQueue to execute program ops (#35356)

* format code

* format interface

* polish interface

* Remove std::memory_order

* modify into SpinLock

* remove fetch_context_pool_

* fix comment

* modify into WorkQueueGroup

* refine code

* fix pointer

* fix paddle_enforce

* split into AsyncWorkQueue

* polish code

* specify std::memory_relax

* fix atomic fetch_sub

* fix num_thread
上级 c59c8e4f
......@@ -4,7 +4,7 @@ graph_to_program_pass variable_helper timer monitor)
cc_library(workqueue SRCS workqueue.cc DEPS enforce)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS})
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS})
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS} workqueue)
cc_library(event_manager SRCS event_manager.cc DEPS ${DEVICE_EVENT_LIBS} glog)
cc_library(stream_analyzer SRCS stream_analyzer.cc DEPS ${DEVICE_EVENT_LIBS} glog device_context)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector stream_analyzer event_manager)
......
......@@ -30,7 +30,6 @@ void EventManager::WaitEvent(const Instruction& instruction,
}
void EventManager::RecordEvent(const Instruction& instruction,
const OpFuncNode& op_func_node,
const platform::Place& place) {
// If InterpreterCore in on CPUPlace, do nothing.
if (platform::is_cpu_place(place)) return;
......
......@@ -21,7 +21,6 @@ namespace framework {
class EventManager {
public:
void RecordEvent(const Instruction& instruction,
const OpFuncNode& op_func_node,
const platform::Place& place);
void WaitEvent(const Instruction& instruction, const platform::Place& place);
......
......@@ -23,6 +23,8 @@ DEFINE_bool(new_executor_use_inplace, true, "Use inplace in new executor");
namespace paddle {
namespace framework {
// NOTE(Aurelius84): Need a better strategy to determine it.
static constexpr size_t kHostNumThreads = 4;
InterpreterCore::InterpreterCore(const platform::Place& place,
const ProgramDesc& main_prog,
......@@ -32,7 +34,8 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
: place_(place),
main_program_(main_prog),
global_scope_(global_scope),
stream_analyzer_(place) {
stream_analyzer_(place),
async_work_queue_(kHostNumThreads) {
is_build_ = false;
feed_names_ = feed_names;
......@@ -89,7 +92,7 @@ paddle::framework::FetchList InterpreterCore::Run(
Convert();
} else {
FeedInput();
ExecuteInstructionList(vec_instruction_, *global_scope_, place_);
ExecuteInstructionList(vec_instruction_);
}
// return Fetch Tensors
......@@ -112,6 +115,7 @@ void InterpreterCore::Convert() {
temp_inst.kernel_func_.operator_base_ = op_base;
temp_inst.input_index_ = vec_func_list_[i].input_index;
temp_inst.output_index_ = vec_func_list_[i].output_index;
temp_inst.type_ = vec_func_list_[i].type_;
OpInOutInfo info;
......@@ -168,8 +172,8 @@ void InterpreterCore::Convert() {
}
}
// In Program, op order is a very import information.
// Op can noly add op after it as next as next ops.
// In Program, op order is a very important information.
// Op can only add op after it as next as next ops.
std::vector<size_t> filter_next;
filter_next.reserve(vec_temp.size());
for (auto item : vec_temp) {
......@@ -319,62 +323,73 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) {
}
void InterpreterCore::ExecuteInstructionList(
const std::vector<Instruction>& vec_instr, const VariableScope& var_scope,
const platform::Place& place, bool is_dry_run) {
std::queue<size_t> working_queue;
auto working_dependecy_count = dependecy_count_;
const std::vector<Instruction>& vec_instr, bool is_dry_run) {
auto atomic_deps = async_work_queue_.PrepareAtomicDeps(dependecy_count_);
auto atomic_var_ref = async_work_queue_.PrepareAtomicVarRef(vec_meta_info_);
std::atomic<size_t> op_run_number{0};
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
working_queue.push(i);
async_work_queue_.AddTask(vec_instr[i].type_, [&, i, is_dry_run]() {
RunInstructionAsync(i, &atomic_deps, &atomic_var_ref, &op_run_number,
is_dry_run);
});
}
}
auto working_var_ref = vec_meta_info_;
size_t run_op_number = 0;
while (!working_queue.empty()) {
auto instr_id = working_queue.front();
working_queue.pop();
auto& instr_node = vec_instr[instr_id];
// step1 : stream_wait (non-block host) or sync (block host)
event_manager_.WaitEvent(instr_node, place_);
// step2: run instruction
RunInstruction(instr_node);
++run_op_number;
if (is_dry_run) {
dry_run_profiler_.ParseMemoryInfo(var_scope.var_list);
}
async_work_queue_.WaitEmpty();
// step3: insert event for out_vars if needed
event_manager_.RecordEvent(instr_node, vec_func_list_[instr_id], place_);
PADDLE_ENFORCE_EQ(
op_run_number.load(), vec_instr.size(),
platform::errors::Fatal(
"Required op_run_number == %d, but received op_run_number = %d.",
vec_instr.size(), op_run_number.load()));
}
// step4: update working_queue
auto& next_instr = instr_node.next_instruction_.all_next_ops_;
void InterpreterCore::RunInstructionAsync(size_t instr_id,
AtomicVectorSizeT* atomic_deps,
AtomicVectorSizeT* atomic_var_ref,
std::atomic<size_t>* op_run_number,
bool is_dry_run) {
auto& instr_node = vec_instruction_[instr_id];
event_manager_.WaitEvent(instr_node, place_);
for (auto next_i : next_instr) {
--working_dependecy_count[next_i];
if (working_dependecy_count[next_i] == 0) {
working_queue.push(next_i);
}
}
RunInstruction(instr_node);
// GC infomation
CheckGC(instr_id, instr_node.gc_check_var_list, var_scope, place,
working_var_ref);
event_manager_.RecordEvent(instr_node, place_);
op_run_number->fetch_add(1, std::memory_order_relaxed);
if (is_dry_run) {
dry_run_profiler_.ParseMemoryInfo(global_scope_->var_list);
}
auto& next_instr = instr_node.next_instruction_.all_next_ops_;
for (auto next_i : next_instr) {
// fetch_sub return value before applying sub
bool is_ready =
atomic_deps->at(next_i)->fetch_sub(1, std::memory_order_relaxed) == 1;
if (is_ready) {
async_work_queue_.AddTask(vec_instruction_[next_i].type_, [=]() {
RunInstructionAsync(next_i, atomic_deps, atomic_var_ref, op_run_number,
is_dry_run);
});
}
}
// GC infomation
CheckGC(instr_id, instr_node.gc_check_var_list, atomic_var_ref);
}
void InterpreterCore::CheckGC(size_t instr_id,
const std::vector<size_t>& gc_check_list,
const VariableScope& var_scope,
const platform::Place& place,
std::vector<VariableMetaInfo>& working_var_ref) {
AtomicVectorSizeT* atomic_var_ref) {
auto& var_scope = *global_scope_;
for (auto var_id : gc_check_list) {
--working_var_ref[var_id].var_ref_count_;
if (var_scope.vec_meta_info_[var_id].vardesc_ &&
!var_scope.vec_meta_info_[var_id].vardesc_->Persistable() &&
working_var_ref[var_id].var_ref_count_ == 0) {
bool is_ready = atomic_var_ref->at(var_id)->fetch_sub(
1, std::memory_order_relaxed) == 1;
if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ &&
!var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) {
gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
vec_instruction_[instr_id].dev_ctx_);
}
......@@ -417,8 +432,7 @@ const CostInfo& InterpreterCore::DryRun(
// DryRun may be called many times.
dry_run_profiler_.Reset();
dry_run_profiler_.Start();
ExecuteInstructionList(vec_instruction_, *global_scope_, place_,
/*is_dry_run=*/true);
ExecuteInstructionList(vec_instruction_, /*is_dry_run=*/true);
platform::DeviceContextPool::Instance().Get(place_)->Wait();
dry_run_profiler_.Pause();
......
......@@ -29,10 +29,12 @@
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/device_event.h"
namespace paddle {
namespace framework {
using AtomicVectorSizeT = std::vector<std::unique_ptr<std::atomic<size_t>>>;
class InterpreterCore {
public:
......@@ -58,16 +60,17 @@ class InterpreterCore {
void RunInstruction(const Instruction& instr_node);
void ExecuteInstructionList(const std::vector<Instruction>& vec_instr,
const VariableScope& var_scope,
const platform::Place& place,
bool is_dry_run = false);
void DryRunPrepare(const std::vector<framework::Tensor>& feed_tensors);
void CheckGC(size_t instr_id, const std::vector<size_t>& gc_check_list,
const VariableScope& var_scope, const platform::Place& place,
std::vector<VariableMetaInfo>& working_var_ref); // NOLINT
AtomicVectorSizeT* working_var_ref);
void RunInstructionAsync(size_t instr_id,
AtomicVectorSizeT* working_dependecy_count,
AtomicVectorSizeT* working_var_ref,
std::atomic<size_t>* op_run_number, bool is_dry_run);
void AddFetch(const std::vector<std::string>& fetch_names);
void BuildSkipShareLoDInfo();
......@@ -93,9 +96,11 @@ class InterpreterCore {
InterpreterProfiler dry_run_profiler_;
StreamAnalyzer stream_analyzer_;
EventManager event_manager_;
interpretercore::AsyncWorkQueue async_work_queue_;
InterpreterCoreGarbageCollector gc_;
std::vector<paddle::platform::DeviceEvent> gc_event_;
std::unique_ptr<WorkQueueGroup> group_thread_pool_;
};
} // namespace framework
} // namespace paddle
......@@ -23,8 +23,8 @@ InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() {
max_memory_size_ = static_cast<size_t>(GetEagerDeletionThreshold());
cur_memory_size_ = 0;
WorkQueueOptions options;
options.num_threads = 1;
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ false);
queue_ = CreateSingleThreadedWorkQueue(options);
}
......
......@@ -18,6 +18,27 @@ namespace paddle {
namespace framework {
namespace interpretercore {
AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicDeps(
const std::vector<size_t>& dependecy_count) {
AtomicVectorSizeT working_dependecy_count(dependecy_count.size());
for (size_t i = 0; i < dependecy_count.size(); ++i) {
working_dependecy_count[i] =
std::make_unique<std::atomic<size_t>>(dependecy_count[i]);
}
return std::move(working_dependecy_count);
}
AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info) {
AtomicVectorSizeT working_var_ref(vec_meta_info.size());
for (size_t i = 0; i < vec_meta_info.size(); ++i) {
working_var_ref[i] =
std::make_unique<std::atomic<size_t>>(vec_meta_info[i].var_ref_count_);
}
return std::move(working_var_ref);
}
bool var_can_be_deleted(const std::string& name, const BlockDesc& block) {
auto* var_desc = block.FindVar(name);
if (var_desc == nullptr || var_desc->Persistable()) {
......
......@@ -32,6 +32,7 @@
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
......@@ -48,6 +49,39 @@ namespace framework {
namespace interpretercore {
using AtomicVectorSizeT = std::vector<std::unique_ptr<std::atomic<size_t>>>;
class AsyncWorkQueue {
public:
explicit AsyncWorkQueue(size_t host_num_threads)
: host_num_thread_(host_num_threads) {
std::vector<WorkQueueOptions> group_options;
// for execute host Kernel
group_options.emplace_back(/*num_threads*/ host_num_threads,
/*allow_spinning*/ true,
/*track_task*/ true);
// for launch device Kernel
group_options.emplace_back(/*num_threads*/ 1,
/*allow_spinning*/ true, /*track_task*/ true);
queue_group_ = CreateWorkQueueGroup(group_options);
}
AtomicVectorSizeT PrepareAtomicDeps(
const std::vector<size_t>& dependecy_count);
AtomicVectorSizeT PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info);
void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); }
void AddTask(const OpFuncType& op_func_type, std::function<void()> fn) {
queue_group_->AddTask(static_cast<size_t>(op_func_type), std::move(fn));
}
private:
size_t host_num_thread_;
std::unique_ptr<WorkQueueGroup> queue_group_;
};
std::string get_memcpy_type(const platform::Place& src_place,
const platform::Place& dst_place);
......
......@@ -507,6 +507,12 @@ struct InstructionInfo {
std::vector<size_t> dependecy_count_;
};
enum class OpFuncType {
kQueueSync = 0, // CPU kernel, block host
kQueueAsync = 1, // GPU Kernel or d2h, h2d, send, recv, broadcast
};
class RuntimeInferShapeContext;
struct Instruction {
OpKernelFunc kernel_func_;
std::shared_ptr<RuntimeContext> runtime_ctx_;
......@@ -522,15 +528,11 @@ struct Instruction {
std::vector<EventInter> output_events_;
platform::DeviceContext* dev_ctx_; // not owned
OpFuncType type_;
std::vector<std::pair<Variable*, Variable*>> vec_inplace_in_to_out_;
};
enum class OpFuncType {
kQueueAsync, // GPU Kernel or d2h, h2d, send, recv, broadcast
kQueueSync, // CPU kernel, block host
};
struct OpFuncNode {
// int unsed;
std::map<std::string, std::vector<int>> input_index;
......
......@@ -22,9 +22,14 @@ namespace paddle {
namespace framework {
struct WorkQueueOptions {
size_t num_threads{0};
bool allow_spinning{true};
bool track_task{false};
WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task)
: num_threads(num_threads),
allow_spinning(allow_spinning),
track_task(track_task) {}
size_t num_threads;
bool allow_spinning;
bool track_task;
};
class WorkQueue {
......
......@@ -26,9 +26,8 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) {
std::atomic<unsigned> counter{0};
constexpr unsigned kLoopNum = 1000000;
// CreateSingleThreadedWorkQueue
WorkQueueOptions options;
options.num_threads = 1;
options.track_task = true;
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true);
auto work_queue = CreateSingleThreadedWorkQueue(options);
// NumThreads
EXPECT_EQ(work_queue->NumThreads(), 1u);
......@@ -58,9 +57,8 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) {
constexpr unsigned kExternalLoopNum = 100;
constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue
WorkQueueOptions options;
options.num_threads = 10;
options.track_task = true;
WorkQueueOptions options(/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true);
auto work_queue = CreateMultiThreadedWorkQueue(options);
// NumThreads
EXPECT_EQ(work_queue->NumThreads(), 10u);
......@@ -91,12 +89,10 @@ TEST(WorkQueue, TestWorkQueueGroup) {
constexpr unsigned kExternalLoopNum = 100;
constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue
WorkQueueOptions sq_options;
sq_options.num_threads = 1;
sq_options.track_task = true;
WorkQueueOptions mq_options;
mq_options.num_threads = 10;
mq_options.track_task = true;
WorkQueueOptions sq_options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true);
WorkQueueOptions mq_options(/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true);
auto queue_group = CreateWorkQueueGroup({sq_options, mq_options});
// NumThreads
EXPECT_EQ(queue_group->QueueNumThreads(0), 1u);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册