diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 344587897fcf5d36e60e73dc7189dff8bd9aef22..121302880cbe4decb4bbbc09c26a4091eba50293 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -86,8 +86,8 @@ struct OpHandle { virtual ~OpHandle() {} - void Run() { - if (events_.empty()) { + void Run(bool use_event) { + if (events_.empty() && use_event) { for (auto &p : dev_ctx_) { int dev_id = boost::get(p.first).device; cudaSetDevice(dev_id); @@ -97,16 +97,18 @@ struct OpHandle { RunImpl(); - for (auto &p : dev_ctx_) { - int dev_id = boost::get(p.first).device; - auto stream = - static_cast(p.second)->stream(); - cudaEventRecord(events_.at(dev_id), stream); + if (use_event) { + for (auto &p : dev_ctx_) { + int dev_id = boost::get(p.first).device; + auto stream = + static_cast(p.second)->stream(); + cudaEventRecord(events_.at(dev_id), stream); + } } } virtual void Wait(platform::DeviceContext *waited_dev) { - if (platform::is_cpu_place(waited_dev->GetPlace())) { + if (platform::is_cpu_place(waited_dev->GetPlace()) && events_.empty()) { for (auto &dev_ctx : dev_ctx_) { dev_ctx.second->Wait(); } @@ -677,7 +679,7 @@ void ParallelExecutor::BuildNCCLCommunicator() const { void ParallelExecutor::Run(const std::vector &fetch_tensors, const std::string &fetched_var_name) { - VLOG(3) << "Run iter"; + bool use_event = false; auto fetched_data = std::make_shared(fetch_tensors.size()); // Version --> VarHandle member_->exception_.reset(); @@ -748,7 +750,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, } for (auto *op : to_run) { - RunOp(pending_vars, op); + RunOp(use_event, pending_vars, op); } while (!pending_vars.empty()) { @@ -776,7 +778,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, } for (auto *op : to_run) { pending_ops.erase(op); - RunOp(pending_vars, op); + RunOp(use_event, pending_vars, op); } } @@ -790,6 +792,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, } void ParallelExecutor::RunOp( + bool use_event, std::unordered_map> &pending_vars, OpHandle *op) const { std::vector *> *ready_buffer = @@ -798,10 +801,10 @@ void ParallelExecutor::RunOp( ready_buffer->emplace_back(&pending_vars[var]); } - auto op_run = [ready_buffer, op, this] { + auto op_run = [ready_buffer, op, this, use_event] { try { VLOG(10) << op->DebugString(); - op->Run(); + op->Run(use_event); for (auto *ready : *ready_buffer) { ready->store(true, std::memory_order_release); } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index cb93c0cd4103813463f39ec7d3d51debbd6e15f6..2345bffcc765d41e974d3a2be7fb8346544f2ae8 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -62,6 +62,7 @@ class ParallelExecutor { void BuildNCCLCommunicator() const; void RunOp( + bool use_event, std::unordered_map>& pending_vars, OpHandle* op) const;