未验证 提交 0e5d81c7 编写于 作者: A Aurelius84 提交者: GitHub

Polish multi-thread schedule strategy and Keep one task in current thread (#35928)

* Polish multi-thread schedule strategy

* fix atomic_deps

* modify into lambda function

* add and run
上级 6c4a741a
......@@ -189,8 +189,6 @@ void InterpreterCore::Convert() {
for (auto inst_id : filter_next) {
dependecy_count_[inst_id]++;
}
vec_instruction_[i].next_instruction_.all_next_ops_ =
std::move(filter_next);
}
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
......@@ -356,31 +354,81 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) {
void InterpreterCore::ExecuteInstructionList(
const std::vector<Instruction>& vec_instr) {
auto atomic_deps = async_work_queue_.PrepareAtomicDeps(dependecy_count_);
auto atomic_var_ref = async_work_queue_.PrepareAtomicVarRef(vec_meta_info_);
std::atomic<size_t> op_run_number{0};
async_work_queue_.PrepareAtomicDeps(dependecy_count_);
async_work_queue_.PrepareAtomicVarRef(vec_meta_info_);
op_run_number_ = 0;
for (size_t i = 0; i < dependecy_count_.size(); ++i) {
if (dependecy_count_[i] == 0) {
async_work_queue_.AddTask(vec_instr[i].type_, [&, i]() {
RunInstructionAsync(i, &atomic_deps, &atomic_var_ref, &op_run_number);
});
async_work_queue_.AddTask(vec_instr[i].type_,
[&, i] { RunInstructionAsync(i); });
}
}
async_work_queue_.WaitEmpty();
PADDLE_ENFORCE_EQ(
op_run_number.load(), vec_instr.size(),
op_run_number_.load(), vec_instr.size(),
platform::errors::Fatal(
"Required op_run_number == %d, but received op_run_number = %d.",
vec_instr.size(), op_run_number.load()));
vec_instr.size(), op_run_number_.load()));
}
void InterpreterCore::RunInstructionAsync(size_t instr_id,
AtomicVectorSizeT* atomic_deps,
AtomicVectorSizeT* atomic_var_ref,
std::atomic<size_t>* op_run_number) {
void InterpreterCore::RunNextInstruction(const Instruction& instr) {
auto& next_instr = instr.next_instruction_;
auto& atomic_deps = async_work_queue_.AtomicDeps();
auto IsReady = [&](size_t next_id) {
return atomic_deps[next_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
};
if (instr.type_ == OpFuncType::kQueueAsync) {
// move all sync_ops into other threads
for (auto next_id : next_instr.synchronize_run_) {
if (IsReady(next_id)) {
async_work_queue_.AddTask(
vec_instruction_[next_id].type_,
[&, next_id] { RunInstructionAsync(next_id); });
}
}
// keep all async_ops running in current thread
for (auto next_id : next_instr.direct_run_) {
if (IsReady(next_id)) {
RunInstructionAsync(next_id);
}
}
for (auto next_id : next_instr.event_wait_run_) {
if (IsReady(next_id)) {
RunInstructionAsync(next_id);
}
}
} else {
// move async_ops into async_thread
for (auto next_id : next_instr.event_wait_run_) {
if (IsReady(next_id)) {
async_work_queue_.AddTask(
vec_instruction_[next_id].type_,
[&, next_id] { RunInstructionAsync(next_id); });
}
}
for (size_t i = 0; i < next_instr.direct_run_.size(); ++i) {
auto next_id = next_instr.direct_run_[i];
if (IsReady(next_id)) {
// only keep one op running in current thread
if (i == 0) {
RunInstructionAsync(next_id);
continue;
}
// move rest ops into other threads
async_work_queue_.AddTask(
vec_instruction_[next_id].type_,
[&, next_id] { RunInstructionAsync(next_id); });
}
}
}
}
void InterpreterCore::RunInstructionAsync(size_t instr_id) {
auto& instr_node = vec_instruction_[instr_id];
platform::RecordEvent instruction_event(
instr_node.kernel_func_.operator_base_->Type());
......@@ -389,32 +437,22 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id,
RunInstruction(instr_node);
event_manager_.RecordEvent(instr_node, place_);
op_run_number->fetch_add(1, std::memory_order_relaxed);
op_run_number_.fetch_add(1, std::memory_order_relaxed);
auto& next_instr = instr_node.next_instruction_.all_next_ops_;
for (auto next_i : next_instr) {
// fetch_sub return value before applying sub
bool is_ready =
atomic_deps->at(next_i)->fetch_sub(1, std::memory_order_relaxed) == 1;
if (is_ready) {
async_work_queue_.AddTask(vec_instruction_[next_i].type_, [=]() {
RunInstructionAsync(next_i, atomic_deps, atomic_var_ref, op_run_number);
});
}
}
// GC infomation
CheckGC(instr_id, instr_node.gc_check_var_list, atomic_var_ref);
CheckGC(instr_id, instr_node.gc_check_var_list);
RunNextInstruction(instr_node);
}
void InterpreterCore::CheckGC(size_t instr_id,
const std::vector<size_t>& gc_check_list,
AtomicVectorSizeT* atomic_var_ref) {
const std::vector<size_t>& gc_check_list) {
auto& var_scope = *global_scope_;
auto& atomic_var_ref = async_work_queue_.AtomicVarRef();
for (auto var_id : gc_check_list) {
bool is_ready = atomic_var_ref->at(var_id)->fetch_sub(
1, std::memory_order_relaxed) == 1;
bool is_ready =
atomic_var_ref[var_id]->fetch_sub(1, std::memory_order_relaxed) == 1;
if (is_ready && var_scope.vec_meta_info_[var_id].vardesc_ &&
!var_scope.vec_meta_info_[var_id].vardesc_->Persistable()) {
gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
......
......@@ -65,13 +65,10 @@ class InterpreterCore {
void DryRunPrepare(const std::vector<framework::Tensor>& feed_tensors);
void CheckGC(size_t instr_id, const std::vector<size_t>& gc_check_list,
AtomicVectorSizeT* working_var_ref);
void CheckGC(size_t instr_id, const std::vector<size_t>& gc_check_list);
void RunInstructionAsync(size_t instr_id,
AtomicVectorSizeT* working_dependecy_count,
AtomicVectorSizeT* working_var_ref,
std::atomic<size_t>* op_run_number);
void RunInstructionAsync(size_t instr_id);
void RunNextInstruction(const Instruction& instr_id);
void AddFetch(const std::vector<std::string>& fetch_names);
void BuildSkipShareLoDInfo();
......@@ -101,6 +98,7 @@ class InterpreterCore {
InterpreterCoreGarbageCollector gc_;
std::vector<paddle::platform::DeviceEvent> gc_event_;
std::atomic<size_t> op_run_number_{0};
};
} // namespace framework
} // namespace paddle
......@@ -12,31 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include <algorithm>
#include "paddle/fluid/framework/executor_gc_helper.h"
namespace paddle {
namespace framework {
namespace interpretercore {
AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicDeps(
AtomicVectorSizeT& AsyncWorkQueue::PrepareAtomicDeps(
const std::vector<size_t>& dependecy_count) {
AtomicVectorSizeT working_dependecy_count(dependecy_count.size());
if (atomic_deps_.size() != dependecy_count.size()) {
atomic_deps_.clear();
std::generate_n(std::back_inserter(atomic_deps_), dependecy_count.size(),
[] { return std::make_unique<std::atomic<size_t>>(0); });
}
for (size_t i = 0; i < dependecy_count.size(); ++i) {
working_dependecy_count[i] =
std::make_unique<std::atomic<size_t>>(dependecy_count[i]);
atomic_deps_[i]->store(dependecy_count[i]);
}
return working_dependecy_count;
return atomic_deps_;
}
AtomicVectorSizeT AsyncWorkQueue::PrepareAtomicVarRef(
AtomicVectorSizeT& AsyncWorkQueue::PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info) {
AtomicVectorSizeT working_var_ref(vec_meta_info.size());
if (atomic_var_ref_.size() != vec_meta_info.size()) {
atomic_var_ref_.clear();
std::generate_n(std::back_inserter(atomic_var_ref_), vec_meta_info.size(),
[] { return std::make_unique<std::atomic<size_t>>(0); });
}
for (size_t i = 0; i < vec_meta_info.size(); ++i) {
working_var_ref[i] =
std::make_unique<std::atomic<size_t>>(vec_meta_info[i].var_ref_count_);
atomic_var_ref_[i]->store(vec_meta_info[i].var_ref_count_);
}
return working_var_ref;
return atomic_var_ref_;
}
bool var_can_be_deleted(const std::string& name, const BlockDesc& block) {
......
......@@ -66,9 +66,9 @@ class AsyncWorkQueue {
queue_group_ = CreateWorkQueueGroup(group_options);
}
AtomicVectorSizeT PrepareAtomicDeps(
AtomicVectorSizeT& PrepareAtomicDeps(
const std::vector<size_t>& dependecy_count);
AtomicVectorSizeT PrepareAtomicVarRef(
AtomicVectorSizeT& PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info);
void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); }
......@@ -77,9 +77,14 @@ class AsyncWorkQueue {
queue_group_->AddTask(static_cast<size_t>(op_func_type), std::move(fn));
}
AtomicVectorSizeT& AtomicDeps() { return atomic_deps_; }
AtomicVectorSizeT& AtomicVarRef() { return atomic_var_ref_; }
private:
size_t host_num_thread_;
std::unique_ptr<WorkQueueGroup> queue_group_;
AtomicVectorSizeT atomic_deps_;
AtomicVectorSizeT atomic_var_ref_;
};
std::string get_memcpy_type(const platform::Place& src_place,
......
......@@ -477,15 +477,10 @@ struct VariableScope {
std::vector<VariableMetaInfo> vec_meta_info_;
};
struct EventRun {
explicit EventRun(size_t op_id) : op_id_(op_id) {}
size_t op_id_;
};
struct NextInstruction {
std::vector<size_t> direct_run_;
std::vector<EventRun> event_wait_run_;
std::vector<EventRun> synchronize_run_;
std::vector<size_t> all_next_ops_;
std::vector<size_t> event_wait_run_;
std::vector<size_t> synchronize_run_;
};
struct EventInter {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册