diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index 2afa47c81bead6fb104f49886713bf75dc1b4dc0..d5ca061944f33939cea59a5275e691b1966194fa 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -38,9 +38,7 @@ void BroadcastOpHandle::RunImpl() { out_var_handles.size(), places_.size(), "The number of output should equal to the number of places."); - // Wait input done, this Wait is asynchronous operation platform::Place - // &in_place; - WaitInputVarGenerated(*in_var_handle); + WaitInputVarGenerated(); std::vector var_scopes; for (auto *s : local_scopes_) { @@ -50,29 +48,9 @@ void BroadcastOpHandle::RunImpl() { auto *in_var = var_scopes.at(in_var_handle->scope_idx_)->FindVar(in_var_handle->name_); PADDLE_ENFORCE_NOT_NULL(in_var); - Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var); - // NOTE: The tensors' Place of input and output must be all on GPU or all on - // CPU. - for (auto *out_var_handle : out_var_handles) { - if (out_var_handle->IsTheSameVar(*in_var_handle)) { - continue; - } - auto t_out_p = out_var_handle->place_; - auto *out_var = var_scopes.at(out_var_handle->scope_idx_) - ->FindVar(out_var_handle->name_); - PADDLE_ENFORCE_NOT_NULL(out_var); - if (platform::is_gpu_place(in_tensor.place())) { - PADDLE_ENFORCE(platform::is_gpu_place(t_out_p), - "Places of input and output must be all on GPU."); - } else { - t_out_p = platform::CPUPlace(); - } - VariableVisitor::ShareDimsAndLoD(*in_var, out_var); - VariableVisitor::GetMutableTensor(out_var).mutable_data(t_out_p, - in_tensor.type()); - } + InitOutputValue(*in_var_handle, out_var_handles); if (platform::is_cpu_place(in_tensor.place())) { for (auto *out_var_handle : out_var_handles) { @@ -147,11 +125,37 @@ void BroadcastOpHandle::RunImpl() { } } -void BroadcastOpHandle::WaitInputVarGenerated(const VarHandle &in_var) { - if (in_var.generated_op_) { - for (auto &pair : dev_ctxes_) { - in_var.generated_op_->Wait(pair.second); +void BroadcastOpHandle::InitOutputValue( + const VarHandle &in_var_handle, + const std::vector &out_var_handles) const { + std::vector var_scopes; + for (auto *s : local_scopes_) { + var_scopes.emplace_back(s->FindVar(kLocalExecScopeName)->Get()); + } + auto *in_var = + var_scopes.at(in_var_handle.scope_idx_)->FindVar(in_var_handle.name_); + + Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var); + + // NOTE: The tensors' Place of input and output must be all on GPU or all on + // CPU. + for (auto *out_var_handle : out_var_handles) { + if (out_var_handle->IsTheSameVar(in_var_handle)) { + continue; } + auto t_out_p = out_var_handle->place_; + auto *out_var = var_scopes.at(out_var_handle->scope_idx_) + ->FindVar(out_var_handle->name_); + PADDLE_ENFORCE_NOT_NULL(out_var); + if (is_gpu_place(in_tensor.place())) { + PADDLE_ENFORCE(platform::is_gpu_place(t_out_p), + "Places of input and output must be all on GPU."); + } else { + t_out_p = platform::CPUPlace(); + } + VariableVisitor::ShareDimsAndLoD(*in_var, out_var); + VariableVisitor::GetMutableTensor(out_var).mutable_data(t_out_p, + in_tensor.type()); } } diff --git a/paddle/fluid/framework/details/broadcast_op_handle.h b/paddle/fluid/framework/details/broadcast_op_handle.h index 984a95008c0393eff01c2d419cc98949aed14980..629aa00cb817c4b1446e7b750ca62a7c6b1db670 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.h +++ b/paddle/fluid/framework/details/broadcast_op_handle.h @@ -57,7 +57,6 @@ struct BroadcastOpHandle : public OpHandleBase { protected: void RunImpl() override; - void WaitInputVarGenerated(const VarHandle &in_var); private: const std::vector &local_scopes_; @@ -65,6 +64,9 @@ struct BroadcastOpHandle : public OpHandleBase { #ifdef PADDLE_WITH_CUDA const platform::NCCLContextMap *nccl_ctxs_; #endif + + void InitOutputValue(const VarHandle &in_var_handle, + const std::vector &out_var_handles) const; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/computation_op_handle.cc b/paddle/fluid/framework/details/computation_op_handle.cc index 7ff0efe09387b7e5d7cfe0dfe5e129ca9914d90b..df05bb06333d6b964f2f5434c3d43214e5d2cb7a 100644 --- a/paddle/fluid/framework/details/computation_op_handle.cc +++ b/paddle/fluid/framework/details/computation_op_handle.cc @@ -26,20 +26,20 @@ ComputationOpHandle::ComputationOpHandle(const OpDesc &op_desc, Scope *scope, place_(place) {} void ComputationOpHandle::RunImpl() { - auto *cur_ctx = dev_ctxes_[place_]; - for (auto *in : inputs_) { - bool need_wait = in->generated_op_ && - in->generated_op_->DeviceContext(place_) != cur_ctx; - if (need_wait) { - in->generated_op_->Wait(cur_ctx); - } - } + WaitInputVarGenerated(place_); this->RunAndRecordEvent([this] { op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get(), place_); }); } +bool ComputationOpHandle::NeedWait(VarHandleBase *in_var) { + bool need_wait = + in_var && in_var->generated_op_ && + in_var->generated_op_->DeviceContext(place_) != dev_ctxes_[place_]; + return need_wait; +} + std::string ComputationOpHandle::Name() const { return op_->Type(); } } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/computation_op_handle.h b/paddle/fluid/framework/details/computation_op_handle.h index c363b973d9abbae6bea76c2458fbe82a37a342ca..36e6f1bf59a7646e1dff6c4844f2a36a5caf363a 100644 --- a/paddle/fluid/framework/details/computation_op_handle.h +++ b/paddle/fluid/framework/details/computation_op_handle.h @@ -36,6 +36,8 @@ struct ComputationOpHandle : public OpHandleBase { protected: void RunImpl() override; + virtual bool NeedWait(VarHandleBase *in_var); + private: std::unique_ptr op_; Scope *scope_; diff --git a/paddle/fluid/framework/details/fetch_op_handle.cc b/paddle/fluid/framework/details/fetch_op_handle.cc index a3cae8c64cdff8594c8971b0458c443f54375f11..b1c9dd0d15223f7d1bf6ea44144589f1de927e3e 100644 --- a/paddle/fluid/framework/details/fetch_op_handle.cc +++ b/paddle/fluid/framework/details/fetch_op_handle.cc @@ -31,7 +31,7 @@ FetchOpHandle::~FetchOpHandle() { } } -void FetchOpHandle::Wait(platform::DeviceContext *waited_dev) { +void FetchOpHandle::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) { PADDLE_THROW("Nobody should wait FetchOp. Unexpceted Error"); } @@ -45,14 +45,8 @@ void FetchOpHandle::WaitAndMergeCPUTensors() const { } void FetchOpHandle::RunImpl() { - auto cpu_ctx = - platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - for (auto *input : inputs_) { - auto *var = static_cast(input); - if (var->generated_op_) { - var->generated_op_->Wait(cpu_ctx); - } - } + WaitInputVarGenerated(platform::CPUPlace()); + tensors_.resize(inputs_.size()); auto *var_handle = static_cast(inputs_[0]); auto &var_name = var_handle->name_; @@ -79,6 +73,15 @@ void FetchOpHandle::RunImpl() { this->WaitAndMergeCPUTensors(); } +void FetchOpHandle::WaitInputVarGenerated(const platform::Place &place) { + auto cpu_ctx = platform::DeviceContextPool::Instance().Get(place); + for (auto *input : inputs_) { + if (input->generated_op_) { + input->generated_op_->RecordWaitEventOnCtx(cpu_ctx); + } + } +} + std::string FetchOpHandle::Name() const { return "Fetch"; } } // namespace details diff --git a/paddle/fluid/framework/details/fetch_op_handle.h b/paddle/fluid/framework/details/fetch_op_handle.h index b49f3df338dc11310a4a0c27c8aaae3602373fcc..e696a7a9ce562e7f1b7fe6633623cb940810fbe1 100644 --- a/paddle/fluid/framework/details/fetch_op_handle.h +++ b/paddle/fluid/framework/details/fetch_op_handle.h @@ -33,7 +33,7 @@ struct FetchOpHandle : public OpHandleBase { ~FetchOpHandle(); - void Wait(platform::DeviceContext *waited_dev) override; + void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) override; void WaitAndMergeCPUTensors() const; @@ -42,6 +42,8 @@ struct FetchOpHandle : public OpHandleBase { protected: void RunImpl() override; + virtual void WaitInputVarGenerated(const platform::Place &place); + private: FeedFetchList *data_; size_t offset_; diff --git a/paddle/fluid/framework/details/gather_op_handle.cc b/paddle/fluid/framework/details/gather_op_handle.cc index 3dfc972a44c62bd2adfc1331f29ffb1cca537652..2be02304566cf5dbe348fa01fc4171990eafd158 100644 --- a/paddle/fluid/framework/details/gather_op_handle.cc +++ b/paddle/fluid/framework/details/gather_op_handle.cc @@ -55,7 +55,7 @@ void GatherOpHandle::RunImpl() { "Currently, gather_op only can gather SelectedRows."); // Wait input done, this Wait is asynchronous operation - WaitInputVarGenerated(in_var_handles); + WaitInputVarGenerated(); auto &pre_in_value = pre_in_var->Get(); std::vector out_rows; @@ -111,17 +111,6 @@ void GatherOpHandle::RunImpl() { }); } -void GatherOpHandle::WaitInputVarGenerated( - const std::vector &in_var_handles) { - for (auto *in : in_var_handles) { - if (in->generated_op_) { - for (auto pair : dev_ctxes_) { - in->generated_op_->Wait(pair.second); - } - } - } -} - std::string GatherOpHandle::Name() const { return "gather"; } } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/gather_op_handle.h b/paddle/fluid/framework/details/gather_op_handle.h index c394dd7a14b07cb956aa1aedfc0df4fa25744dd7..d11ef8556aa8840949ca8dc7aa176413f70b9f22 100644 --- a/paddle/fluid/framework/details/gather_op_handle.h +++ b/paddle/fluid/framework/details/gather_op_handle.h @@ -39,7 +39,6 @@ struct GatherOpHandle : public OpHandleBase { protected: void RunImpl() override; - void WaitInputVarGenerated(const std::vector &in_var_handles); private: const std::vector &local_scopes_; diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc index 16aa5d067ab7a222af8fbb6ca8ec18222ecd799b..95aa599cd3e403e9cc66b2b5ad35d0d214d1ab5b 100644 --- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc @@ -34,12 +34,7 @@ void NCCLAllReduceOpHandle::RunImpl() { return; // No need to all reduce when GPU count = 1; } else { // Wait input done - for (auto *in : inputs_) { - auto &p = static_cast(in)->place_; - if (in->generated_op_) { - in->generated_op_->Wait(dev_ctxes_[p]); - } - } + WaitInputVarGenerated(); auto &var_name = static_cast(this->inputs_[0])->name_; int dtype = -1; diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index 534d77860f87be08c8834efd373d90eb199ed6a2..6b064650b4f09737836bda4a43fa421720077929 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -56,15 +56,15 @@ void OpHandleBase::Run(bool use_event) { RunImpl(); } -void OpHandleBase::Wait(platform::DeviceContext *waited_dev) { +void OpHandleBase::RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx) { #ifdef PADDLE_WITH_CUDA - if (platform::is_cpu_place(waited_dev->GetPlace()) || events_.empty()) { + if (platform::is_cpu_place(waited_ctx->GetPlace()) || events_.empty()) { for (auto &dev_ctx : dev_ctxes_) { dev_ctx.second->Wait(); } } else { auto stream = - static_cast(waited_dev)->stream(); + static_cast(waited_ctx)->stream(); for (auto &ev : events_) { PADDLE_ENFORCE(cudaStreamWaitEvent(stream, ev.second, 0)); } @@ -86,6 +86,28 @@ void OpHandleBase::AddOutput(VarHandleBase *out) { out->generated_op_ = this; } +void OpHandleBase::WaitInputVarGenerated() { + for (auto in_var : inputs_) { + if (NeedWait(in_var)) { + for (auto &pair : dev_ctxes_) { + in_var->generated_op_->RecordWaitEventOnCtx(pair.second); + } + } + } +} + +void OpHandleBase::WaitInputVarGenerated(const platform::Place &place) { + for (auto *in : inputs_) { + if (NeedWait(in)) { + in->generated_op_->RecordWaitEventOnCtx(dev_ctxes_[place]); + } + } +} + +bool OpHandleBase::NeedWait(VarHandleBase *in_var) { + return in_var && in_var->generated_op_; +} + void OpHandleBase::RunAndRecordEvent(const std::function &callback) { #ifdef PADDLE_WITH_CUDA if (!events_.empty()) { // Use event diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h index 00f213f3ed294adcce7c540e3ff346de8e2be7fb..fe1735d05dde5f09d5c72c68e5002d16f0083eb5 100644 --- a/paddle/fluid/framework/details/op_handle_base.h +++ b/paddle/fluid/framework/details/op_handle_base.h @@ -38,12 +38,24 @@ class OpHandleBase { void Run(bool use_event); - virtual void Wait(platform::DeviceContext *waited_dev); + virtual void RecordWaitEventOnCtx(platform::DeviceContext *waited_ctx); void AddInput(VarHandleBase *in); void AddOutput(VarHandleBase *out); + // This method adds the wait events of all the input on all the device + // context. + // NODE: This Wait is asynchronous operation. + virtual void WaitInputVarGenerated(); + + // This method adds the wait events of all the input on the specified device + // context. + // NODE: This Wait is asynchronous operation. + virtual void WaitInputVarGenerated(const platform::Place &place); + + virtual bool NeedWait(VarHandleBase *in_var); + // If the Op involves data transfer of multiple devices that // will likely block other computations. virtual bool IsMultiDeviceTransfer() { return false; } diff --git a/paddle/fluid/framework/details/reduce_op_handle.cc b/paddle/fluid/framework/details/reduce_op_handle.cc index 1bb04c1dfca107f4b7ce4c599e9aa132de3e5985..7160e346dad0615e2fd32b70c096880af0359e1a 100644 --- a/paddle/fluid/framework/details/reduce_op_handle.cc +++ b/paddle/fluid/framework/details/reduce_op_handle.cc @@ -51,7 +51,7 @@ void ReduceOpHandle::RunImpl() { PADDLE_ENFORCE_NOT_NULL(pre_in_var); // Wait input done, this Wait is asynchronous operation - WaitInputVarGenerated(in_var_handles); + WaitInputVarGenerated(); // NOTE: The Places of all input tensor must be all on CPU or all on GPU. std::vector in_places; // used to get dev_ctx @@ -80,19 +80,21 @@ void ReduceOpHandle::RunImpl() { } if (pre_in_var->IsType()) { - std::vector in_selected_rows = - GetInputValues(in_var_handles, var_scopes); - - GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p, - out_var->GetMutable()); + this->RunAndRecordEvent([&] { + std::vector in_selected_rows = + GetInputValues(in_var_handles, var_scopes); + GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, t_out_p, + out_var->GetMutable()); + }); } else { std::vector lod_tensors = GetInputValues(in_var_handles, var_scopes); - if (paddle::platform::is_cpu_place(lod_tensors[0]->place())) { - ReduceLoDTensor func(lod_tensors, - out_var->GetMutable()); - VisitDataType(ToDataType(lod_tensors[0]->type()), func); + this->RunAndRecordEvent([&] { + ReduceLoDTensor func(lod_tensors, + out_var->GetMutable()); + VisitDataType(ToDataType(lod_tensors[0]->type()), func); + }); } else if (paddle::platform::is_gpu_place(lod_tensors[0]->place())) { #ifdef PADDLE_WITH_CUDA auto pre_in = pre_in_var->Get(); @@ -157,17 +159,6 @@ std::vector ReduceOpHandle::GetInputValues( return in_selected_rows; } -void ReduceOpHandle::WaitInputVarGenerated( - const std::vector &in_var_handles) { - for (auto *in : in_var_handles) { - if (in->generated_op_) { - for (auto pair : dev_ctxes_) { - in->generated_op_->Wait(pair.second); - } - } - } -} - std::string ReduceOpHandle::Name() const { return "reduce"; } } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/reduce_op_handle.h b/paddle/fluid/framework/details/reduce_op_handle.h index 59731d348d17755fbd8bf3b6fa29b32bdefaf71e..c652a2f4eb0f9b73cb19ebbd9d0809210b280ad3 100644 --- a/paddle/fluid/framework/details/reduce_op_handle.h +++ b/paddle/fluid/framework/details/reduce_op_handle.h @@ -60,8 +60,6 @@ struct ReduceOpHandle : public OpHandleBase { protected: void RunImpl() override; - void WaitInputVarGenerated(const std::vector &in_var_handles); - template std::vector GetInputValues( const std::vector &in_var_handles, diff --git a/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc b/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc index 1cd3113030086104e7fc5c4ba3364a5ff027632b..d9c387e79dc71288e7330597fed57171d447f31b 100644 --- a/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc +++ b/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc @@ -29,6 +29,7 @@ ScaleLossGradOpHandle::ScaleLossGradOpHandle(size_t num_dev, Scope *scope, ScaleLossGradOpHandle::~ScaleLossGradOpHandle() {} void ScaleLossGradOpHandle::RunImpl() { + // Doesn't wait any event std::string var_name = static_cast(this->outputs_[0])->name_; auto &local_scope = *scope_->FindVar(kLocalExecScopeName)->Get(); diff --git a/paddle/fluid/framework/details/send_op_handle.cc b/paddle/fluid/framework/details/send_op_handle.cc index bd97c5260dbba935e422793e0aa6aac8b6875627..7109659dd7001f91e7674ac7bebbe3a59794cfc0 100644 --- a/paddle/fluid/framework/details/send_op_handle.cc +++ b/paddle/fluid/framework/details/send_op_handle.cc @@ -26,6 +26,7 @@ SendOpHandle::SendOpHandle(const framework::OpDesc &op_desc, place_(place) {} void SendOpHandle::RunImpl() { + // TODO(wuyi): need further analysis whether wait VarDummyHandle. // Wait input done for (auto *in : inputs_) { auto &p = static_cast(in)->place_; @@ -33,7 +34,7 @@ void SendOpHandle::RunImpl() { continue; } if (in->generated_op_) { - in->generated_op_->Wait(dev_ctxes_[p]); + in->generated_op_->RecordWaitEventOnCtx(dev_ctxes_[p]); } } auto &tmp_scope = local_scope_->FindVar(kLocalExecScopeName)->Get(); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 5e6ed5cb7cdc534332d402380458f39aecd841b8..e90523ebe8dc720d10034e3af9b0e51bb7a2fde9 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -14,8 +14,6 @@ #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" -#include "paddle/fluid/framework/details/fetch_op_handle.h" - namespace paddle { namespace framework { namespace details { @@ -45,73 +43,33 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( // Should revisit it if overlapping is available. std::unordered_set delayed_ops; - auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) { - pending_vars.insert(&var); - if (var.generated_op_ == nullptr) { - ready_vars.Push(&var); - } - }; - - auto InsertPendingOp = [&pending_ops](OpHandleBase &op_instance) { - pending_ops.insert({&op_instance, op_instance.Inputs().size()}); - }; - // Transform SSAGraph to pending_ops & pending_vars for (auto &var_map : graph_->vars_) { for (auto &name_pair : var_map) { for (auto &version_pair : name_pair.second) { - InsertPendingVar(*version_pair); + InsertPendingVar(&pending_vars, &ready_vars, version_pair.get()); } } } for (auto &var : graph_->dep_vars_) { - InsertPendingVar(*var); + InsertPendingVar(&pending_vars, &ready_vars, var.get()); } for (auto &op : graph_->ops_) { if (op->Inputs().empty()) { // Special case, Op has no input. ready_ops.insert(op.get()); } else { - InsertPendingOp(*op); + InsertPendingOp(&pending_ops, op.get()); } } // Step 2. Insert FetchOps std::vector> fetch_ops; - FeedFetchList fetch_data(fetch_tensors.size()); - - std::unordered_map> fetched_vars; - - for (auto &fetch_var_name : fetch_tensors) { - for (auto &var_map : graph_->vars_) { - auto it = var_map.find(fetch_var_name); - if (it != var_map.end()) { - fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get()); - } - } - } - std::unordered_set> fetch_dependencies; - for (size_t i = 0; i < fetch_tensors.size(); ++i) { - auto &var_name = fetch_tensors[i]; - auto &vars = fetched_vars.at(var_name); - auto *op = new FetchOpHandle(&fetch_data, i, &local_scopes_); - fetch_ops.emplace_back(op); - - for (auto &p : places_) { - op->SetDeviceContext(p, fetch_ctxs_.Get(p)); - } - - for (auto *var : vars) { - op->AddInput(var); - } + FeedFetchList fetch_data(fetch_tensors.size()); - auto *fetch_dummy = new DummyVarHandle(); - op->AddOutput(fetch_dummy); - fetch_dependencies.emplace(fetch_dummy); - InsertPendingVar(*fetch_dummy); - InsertPendingOp(*op); - } + InsertFetchOps(fetch_tensors, &fetch_ops, &fetch_dependencies, &pending_ops, + &pending_vars, &ready_vars, &fetch_data); auto run_all_ops = [&](std::unordered_set &set) { for (auto *op : set) { @@ -174,6 +132,60 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( return fetch_data; } +void ThreadedSSAGraphExecutor::InsertFetchOps( + const std::vector &fetch_tensors, + std::vector> *fetch_ops, + std::unordered_set> *fetch_dependencies, + std::unordered_map *pending_ops, + std::unordered_set *pending_vars, + BlockingQueue *ready_vars, FeedFetchList *fetch_data) { + std::unordered_map> fetched_vars; + + for (auto &fetch_var_name : fetch_tensors) { + for (auto &var_map : graph_->vars_) { + auto it = var_map.find(fetch_var_name); + if (it != var_map.end()) { + fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get()); + } + } + } + + for (size_t i = 0; i < fetch_tensors.size(); ++i) { + auto &var_name = fetch_tensors[i]; + auto &vars = fetched_vars.at(var_name); + auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_); + fetch_ops->emplace_back(op); + + for (auto &p : places_) { + op->SetDeviceContext(p, fetch_ctxs_.Get(p)); + } + + for (auto *var : vars) { + op->AddInput(var); + } + + auto *fetch_dummy = new DummyVarHandle(); + op->AddOutput(fetch_dummy); + fetch_dependencies->emplace(fetch_dummy); + this->InsertPendingVar(pending_vars, ready_vars, fetch_dummy); + this->InsertPendingOp(pending_ops, op); + } +} + +void ThreadedSSAGraphExecutor::InsertPendingOp( + std::unordered_map *pending_ops, + OpHandleBase *op_instance) const { + pending_ops->insert({op_instance, op_instance->Inputs().size()}); +} + +void ThreadedSSAGraphExecutor::InsertPendingVar( + std::unordered_set *pending_vars, + BlockingQueue *ready_vars, VarHandleBase *var) const { + pending_vars->insert(var); + if (var->generated_op_ == nullptr) { + ready_vars->Push(var); + } +} void ThreadedSSAGraphExecutor::RunOp( BlockingQueue *ready_var_q, details::OpHandleBase *op) { auto op_run = [ready_var_q, op, this] { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index d089b79d91327e38408439a8019ec5189ff6d189..f18a88526b3238220fc56fd07299643d32c8b58b 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -23,6 +23,7 @@ #include #include "ThreadPool.h" // ThreadPool in thrird party #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/framework/details/fetch_op_handle.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h" namespace paddle { @@ -58,6 +59,21 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unique_ptr exception_; std::atomic running_ops_; bool allow_op_delay_; + + void InsertPendingOp(std::unordered_map *pending_ops, + OpHandleBase *op_instance) const; + + void InsertPendingVar(std::unordered_set *pending_vars, + BlockingQueue *ready_vars, + VarHandleBase *var) const; + + void InsertFetchOps( + const std::vector &fetch_tensors, + std::vector> *fetch_ops, + std::unordered_set> *fetch_dependencies, + std::unordered_map *pending_ops, + std::unordered_set *pending_vars, + BlockingQueue *ready_vars, FeedFetchList *fetch_data); }; } // namespace details