From 1f53193a630bc3b6289154dd5f5334a45ddb9285 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Mon, 19 Mar 2018 19:22:03 +0800 Subject: [PATCH] Use atomic code --- paddle/fluid/framework/parallel_executor.cc | 13 ++++++----- paddle/fluid/framework/parallel_executor.h | 25 +++------------------ 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 2fb274d3a5..fa6763b5b5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -645,7 +645,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, auto fetched_data = std::make_shared(fetch_tensors.size()); // Version --> VarHandle member_->exception_.reset(); - std::unordered_map pending_vars; + std::unordered_map> pending_vars; std::unordered_map pending_ops; std::vector dummy_vars; @@ -694,7 +694,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, op->offset_ = i; op->local_scopes_ = &member_->local_scopes_; for (auto &p : member_->places_) { - op->dev_ctx_[p] = this->member_->GetNCCLCtx(p).ctx_.get(); + op->dev_ctx_[p] = member_->GetNCCLCtx(p).ctx_.get(); } for (auto *var : vars) { @@ -718,7 +718,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, while (!pending_vars.empty()) { VarHandleBase *ready_var = nullptr; for (auto &pair : pending_vars) { - if (pair.second) { + if (pair.second.load(std::memory_order_consume)) { ready_var = pair.first; } } @@ -750,9 +750,10 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, } void ParallelExecutor::RunOp( - std::unordered_map &pending_vars, + std::unordered_map> &pending_vars, OpHandle *op) const { - std::vector *ready_buffer = new std::vector(); + std::vector *> *ready_buffer = + new std::vector *>(); for (auto *var : op->outputs_) { ready_buffer->emplace_back(&pending_vars[var]); } @@ -761,7 +762,7 @@ void ParallelExecutor::RunOp( try { op->Run(); for (auto *ready : *ready_buffer) { - *ready = true; + ready->store(true, std::memory_order_release); } delete ready_buffer; } catch (platform::EnforceNotMet ex) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 8fe93fb62e..03bf60b8bc 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -33,26 +33,6 @@ class VarHandle; class OpHandle; class VarHandleBase; -struct GuardedBool { - public: - GuardedBool() {} - - operator bool() const { - std::lock_guard g(mtx_); - return value_; - } - - GuardedBool& operator=(bool o) { - std::lock_guard g(mtx_); - value_ = o; - return *this; - } - - private: - mutable std::mutex mtx_; - bool value_; -}; - class ParallelExecutor { public: explicit ParallelExecutor(const std::vector& places, @@ -81,8 +61,9 @@ class ParallelExecutor { void BuildNCCLCommunicator() const; - void RunOp(std::unordered_map& pending_vars, - OpHandle* op) const; + void RunOp( + std::unordered_map>& pending_vars, + OpHandle* op) const; void PolishGraphToSupportDataHarzaeds() const; }; -- GitLab