未验证 提交 3a294042 编写于 作者: T tensor-tang 提交者: GitHub

Merge pull request #11233 from tensor-tang/multithreads

Fix abort issue in cpu multi-threads
...@@ -43,6 +43,8 @@ Scope& Scope::NewScope() const { ...@@ -43,6 +43,8 @@ Scope& Scope::NewScope() const {
} }
Variable* Scope::Var(const std::string& name) { Variable* Scope::Var(const std::string& name) {
// acquire the lock when new var under this scope
std::unique_lock<std::mutex> lock(mutex_);
auto* v = FindVarLocally(name); auto* v = FindVarLocally(name);
if (v != nullptr) return v; if (v != nullptr) return v;
...@@ -62,11 +64,17 @@ Variable* Scope::Var(std::string* name) { ...@@ -62,11 +64,17 @@ Variable* Scope::Var(std::string* name) {
} }
Variable* Scope::FindVar(const std::string& name) const { Variable* Scope::FindVar(const std::string& name) const {
// acquire the lock when find var
std::unique_lock<std::mutex> lock(mutex_);
return FindVarInternal(name);
}
Variable* Scope::FindVarInternal(const std::string& name) const {
auto var = FindVarLocally(name); auto var = FindVarLocally(name);
if (var != nullptr) { if (var != nullptr) {
return var; return var;
} }
return (parent_ == nullptr) ? nullptr : parent_->FindVar(name); return (parent_ == nullptr) ? nullptr : parent_->FindVarInternal(name);
} }
const Scope* Scope::FindScope(const Variable* var) const { const Scope* Scope::FindScope(const Variable* var) const {
...@@ -78,6 +86,7 @@ const Scope* Scope::FindScope(const Variable* var) const { ...@@ -78,6 +86,7 @@ const Scope* Scope::FindScope(const Variable* var) const {
return (parent_ == nullptr) ? nullptr : parent_->FindScope(var); return (parent_ == nullptr) ? nullptr : parent_->FindScope(var);
} }
void Scope::DropKids() { void Scope::DropKids() {
std::unique_lock<std::mutex> lock(mutex_);
for (Scope* s : kids_) delete s; for (Scope* s : kids_) delete s;
kids_.clear(); kids_.clear();
} }
...@@ -105,6 +114,7 @@ void Scope::DeleteScope(Scope* scope) const { ...@@ -105,6 +114,7 @@ void Scope::DeleteScope(Scope* scope) const {
} }
void Scope::EraseVars(const std::vector<std::string>& var_names) { void Scope::EraseVars(const std::vector<std::string>& var_names) {
std::unique_lock<std::mutex> lock(mutex_);
std::set<std::string> var_set(var_names.begin(), var_names.end()); std::set<std::string> var_set(var_names.begin(), var_names.end());
for (auto it = vars_.begin(); it != vars_.end();) { for (auto it = vars_.begin(); it != vars_.end();) {
if (var_set.find(it->first) != var_set.end()) { if (var_set.find(it->first) != var_set.end()) {
......
...@@ -81,14 +81,20 @@ class Scope { ...@@ -81,14 +81,20 @@ class Scope {
// Rename variable to a new name and return the new name // Rename variable to a new name and return the new name
std::string Rename(const std::string& origin_name) const; std::string Rename(const std::string& origin_name) const;
/// Caller doesn't own the returned Variable.
Variable* FindVarLocally(const std::string& name) const;
private: private:
// Call Scope::NewScope for a sub-scope. // Call Scope::NewScope for a sub-scope.
explicit Scope(Scope const* parent) : parent_(parent) {} explicit Scope(Scope const* parent) : parent_(parent) {}
// Called by FindVar recursively.
// Caller doesn't own the returned Variable.
Variable* FindVarInternal(const std::string& name) const;
// Called by FindVarInternal and Var.
// Caller doesn't own the returned Variable.
Variable* FindVarLocally(const std::string& name) const;
mutable std::unordered_map<std::string, std::unique_ptr<Variable>> vars_; mutable std::unordered_map<std::string, std::unique_ptr<Variable>> vars_;
// Scope in `kids_` are owned by this class. // Scope in `kids_` are owned by this class.
mutable std::list<Scope*> kids_; mutable std::list<Scope*> kids_;
Scope const* parent_{nullptr}; Scope const* parent_{nullptr};
......
...@@ -101,23 +101,22 @@ void SplitData( ...@@ -101,23 +101,22 @@ void SplitData(
} }
void ThreadRunInfer( void ThreadRunInfer(
const int tid, paddle::framework::Executor* executor, const int tid, paddle::framework::Scope* scope,
paddle::framework::Scope* scope,
const std::unique_ptr<paddle::framework::ProgramDesc>& inference_program,
const std::vector<std::vector<const paddle::framework::LoDTensor*>>& jobs) { const std::vector<std::vector<const paddle::framework::LoDTensor*>>& jobs) {
auto copy_program = std::unique_ptr<paddle::framework::ProgramDesc>( // maybe framework:ProgramDesc is not thread-safe
new paddle::framework::ProgramDesc(*inference_program));
auto& sub_scope = scope->NewScope(); auto& sub_scope = scope->NewScope();
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
auto inference_program =
paddle::inference::Load(&executor, scope, FLAGS_model_path);
std::string feed_holder_name = "feed_" + paddle::string::to_string(tid); auto ctx = executor.Prepare(*inference_program, /*block_id*/ 0);
std::string fetch_holder_name = "fetch_" + paddle::string::to_string(tid); executor.CreateVariables(*inference_program, &sub_scope, /*block_id*/ 0);
copy_program->SetFeedHolderName(feed_holder_name);
copy_program->SetFetchHolderName(fetch_holder_name);
const std::vector<std::string>& feed_target_names = const std::vector<std::string>& feed_target_names =
copy_program->GetFeedTargetNames(); inference_program->GetFeedTargetNames();
const std::vector<std::string>& fetch_target_names = const std::vector<std::string>& fetch_target_names =
copy_program->GetFetchTargetNames(); inference_program->GetFetchTargetNames();
PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL); PADDLE_ENFORCE_EQ(fetch_target_names.size(), 1UL);
std::map<std::string, paddle::framework::LoDTensor*> fetch_targets; std::map<std::string, paddle::framework::LoDTensor*> fetch_targets;
...@@ -131,9 +130,8 @@ void ThreadRunInfer( ...@@ -131,9 +130,8 @@ void ThreadRunInfer(
auto start_ms = GetCurrentMs(); auto start_ms = GetCurrentMs();
for (size_t i = 0; i < inputs.size(); ++i) { for (size_t i = 0; i < inputs.size(); ++i) {
feed_targets[feed_target_names[0]] = inputs[i]; feed_targets[feed_target_names[0]] = inputs[i];
executor->Run(*copy_program, &sub_scope, &feed_targets, &fetch_targets, executor.RunPreparedContext(ctx.get(), &sub_scope, &feed_targets,
true /*create_local_scope*/, true /*create_vars*/, &fetch_targets, false /*create_local_scope*/);
feed_holder_name, fetch_holder_name);
} }
auto stop_ms = GetCurrentMs(); auto stop_ms = GetCurrentMs();
scope->DeleteScope(&sub_scope); scope->DeleteScope(&sub_scope);
...@@ -158,22 +156,10 @@ TEST(inference, nlp) { ...@@ -158,22 +156,10 @@ TEST(inference, nlp) {
LOG(INFO) << "Number of samples (seq_len<1024): " << datasets.size(); LOG(INFO) << "Number of samples (seq_len<1024): " << datasets.size();
LOG(INFO) << "Total number of words: " << num_total_words; LOG(INFO) << "Total number of words: " << num_total_words;
const bool model_combined = false;
// 0. Call `paddle::framework::InitDevices()` initialize all the devices // 0. Call `paddle::framework::InitDevices()` initialize all the devices
// 1. Define place, executor, scope
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
std::unique_ptr<paddle::framework::Scope> scope( std::unique_ptr<paddle::framework::Scope> scope(
new paddle::framework::Scope()); new paddle::framework::Scope());
// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
inference_program =
InitProgram(&executor, scope.get(), FLAGS_model_path, model_combined);
if (FLAGS_use_mkldnn) {
EnableMKLDNN(inference_program);
}
#ifdef PADDLE_WITH_MKLML #ifdef PADDLE_WITH_MKLML
// only use 1 thread number per std::thread // only use 1 thread number per std::thread
omp_set_dynamic(0); omp_set_dynamic(0);
...@@ -189,21 +175,30 @@ TEST(inference, nlp) { ...@@ -189,21 +175,30 @@ TEST(inference, nlp) {
start_ms = GetCurrentMs(); start_ms = GetCurrentMs();
for (int i = 0; i < FLAGS_num_threads; ++i) { for (int i = 0; i < FLAGS_num_threads; ++i) {
threads.emplace_back( threads.emplace_back(
new std::thread(ThreadRunInfer, i, &executor, scope.get(), new std::thread(ThreadRunInfer, i, scope.get(), std::ref(jobs)));
std::ref(inference_program), std::ref(jobs)));
} }
for (int i = 0; i < FLAGS_num_threads; ++i) { for (int i = 0; i < FLAGS_num_threads; ++i) {
threads[i]->join(); threads[i]->join();
} }
stop_ms = GetCurrentMs(); stop_ms = GetCurrentMs();
} else { } else {
if (FLAGS_prepare_vars) { // 1. Define place, executor, scope
executor.CreateVariables(*inference_program, scope.get(), 0); auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
// 2. Initialize the inference_program and load parameters
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
inference_program = InitProgram(&executor, scope.get(), FLAGS_model_path,
/*model combined*/ false);
if (FLAGS_use_mkldnn) {
EnableMKLDNN(inference_program);
} }
// always prepare context // always prepare context
std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx; std::unique_ptr<paddle::framework::ExecutorPrepareContext> ctx;
ctx = executor.Prepare(*inference_program, 0); ctx = executor.Prepare(*inference_program, 0);
if (FLAGS_prepare_vars) {
executor.CreateVariables(*inference_program, scope.get(), 0);
}
// preapre fetch // preapre fetch
const std::vector<std::string>& fetch_target_names = const std::vector<std::string>& fetch_target_names =
inference_program->GetFetchTargetNames(); inference_program->GetFetchTargetNames();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册