diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 57dc663c41c59336a6072c74043547ea927f45c1..450df244b72cababbc0b4c1d2d866d8e401fe81f 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -97,6 +97,10 @@ struct ComputationOpHandle : public OpHandle { void Run() override { // Wait other op if necessary + if (platform::is_gpu_place(place_)) { + int dev_id = boost::get(place_).device; + cudaSetDevice(dev_id); + } auto *cur_ctx = dev_ctx_[place_]; for (auto *in : inputs_) { if (in->generated_op_ && in->generated_op_->dev_ctx_[place_] != cur_ctx) { @@ -637,22 +641,20 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, auto fetched_data = std::make_shared(fetch_tensors.size()); // Version --> VarHandle member_->exception_.reset(); - std::unordered_map> pending_vars; + std::unordered_map pending_vars; std::unordered_map pending_ops; for (auto &place_pair : member_->vars_) { for (auto &name_pair : place_pair.second) { for (auto &version_pair : name_pair.second) { - pending_vars[&version_pair.second].store( - version_pair.second.generated_op_ == nullptr, - std::memory_order_relaxed); + pending_vars[&version_pair.second] = + version_pair.second.generated_op_ == nullptr; } } } for (auto &var : member_->dep_vars_) { - pending_vars[var.get()].store(var->generated_op_ == nullptr, - std::memory_order_relaxed); + pending_vars[var.get()] = var->generated_op_ == nullptr; } std::vector to_run; @@ -704,7 +706,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, while (!pending_ops.empty()) { VarHandleBase *ready_var = nullptr; for (auto &pair : pending_vars) { - if (pair.second.load(std::memory_order_acquire)) { + if (pair.second) { ready_var = pair.first; } } @@ -737,10 +739,10 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, } void ParallelExecutor::RunOp( - std::unordered_map> &pending_vars, + std::unordered_map &pending_vars, OpHandle *op) const { - std::vector *> *ready_buffer = - new std::vector *>(); + std::vector *ready_buffer = + new std::vector(); for (auto *var : op->outputs_) { ready_buffer->emplace_back(&pending_vars[var]); } @@ -751,7 +753,7 @@ void ParallelExecutor::RunOp( op->Run(); VLOG(10) << "Done " << this; for (auto *ready : *ready_buffer) { - ready->store(true, std::memory_order_release); + *ready = true; } delete ready_buffer; } catch (platform::EnforceNotMet ex) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index c3cebcfc57360617a8254a64aa00ffb847cafd0e..150b429f94a4479ba1351b806f85509655e2cb04 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -60,9 +60,8 @@ class ParallelExecutor { void BuildNCCLCommunicator() const; - void RunOp( - std::unordered_map>& pending_vars, - OpHandle* op) const; + void RunOp(std::unordered_map& pending_vars, + OpHandle* op) const; void PolishGraphToSupportDataHarzaeds() const; };