/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/parallel_executor.h" #include #include #include #include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/ir/graph.h" #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #include "paddle/fluid/platform/nccl_helper.h" #endif #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { namespace framework { class ParallelExecutorPrivate { public: explicit ParallelExecutorPrivate(const std::vector &places) : places_(places) {} ~ParallelExecutorPrivate() { if (own_local_scope_) { for (size_t i = 1; i < local_scopes_.size(); ++i) { // Skip the first scope, since it is the global scope. Scope *local_scope = local_scopes_[i]; if (global_scope_->HasKid(local_scope)) { global_scope_->DeleteScope(local_scope); } } } } std::vector places_; std::vector local_scopes_; Scope *global_scope_; // not owned std::unique_ptr executor_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::unique_ptr nccl_ctxs_; #endif bool own_local_scope_; bool use_cuda_; bool use_all_reduce_; }; std::vector &ParallelExecutor::GetLocalScopes() { return member_->local_scopes_; } ParallelExecutor::ParallelExecutor( const std::vector &places, const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, size_t num_trainers, size_t trainer_id) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; member_->use_cuda_ = exec_strategy.use_cuda_; member_->use_all_reduce_ = build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce; if (!member_->use_all_reduce_) { PADDLE_ENFORCE(places.size() > 1, "If you set build_strategy.reduce with 'Reduce'," "the number of places must be greater than 1."); PADDLE_ENFORCE(exec_strategy.type_ != ExecutionStrategy::kParallelGraph, "You should set build_strategy.reduce with 'AllReduce' for " "ParallelGraph executor type"); } // Step 1. Bcast the params to devs. // Create local scopes if (local_scopes.empty()) { member_->own_local_scope_ = true; member_->local_scopes_.emplace_back(member_->global_scope_); for (size_t i = 1; i < member_->places_.size(); ++i) { member_->local_scopes_.emplace_back(&scope->NewScope()); } } else { member_->own_local_scope_ = false; PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size()); for (size_t i = 0; i < member_->places_.size(); ++i) { member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope()); } } if (member_->use_cuda_) { // Bcast Parameters to all GPUs #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); std::unique_ptr nccl_id = nullptr; bool need_group_call = true; if (nccl_id_var != nullptr) { nccl_id.reset(nccl_id_var->GetMutable()); } else if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { nccl_id.reset(new ncclUniqueId()); PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id.get())); *member_->global_scope_->Var(NCCL_ID_VARNAME) ->GetMutable() = *nccl_id.get(); need_group_call = false; } else { // init nccl_id in NCCLContextMap } member_->nccl_ctxs_.reset(new platform::NCCLContextMap( member_->places_, nccl_id.get(), num_trainers, trainer_id, need_group_call)); #else PADDLE_THROW("Not compiled with CUDA"); #endif } if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { BCastParamsToDevices(bcast_vars); } // Startup Program has been run. All local scopes has correct parameters. // Step 2. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp std::vector> graphs; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { for (size_t i = 0; i < member_->places_.size(); ++i) { std::unique_ptr graph = build_strategy.Apply( main_program, {member_->places_[i]}, loss_var_name, params, {member_->local_scopes_[i]}, member_->use_cuda_, member_->nccl_ctxs_.get()); graphs.push_back(std::move(graph)); } } else { std::unique_ptr graph = build_strategy.Apply( main_program, member_->places_, loss_var_name, params, member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); graphs.push_back(std::move(graph)); } auto max_memory_size = GetEagerDeletionThreshold(); // FIXME(Yancey1989): need to fix on parallel graph mode if (max_memory_size >= 0 && exec_strategy.type_ != ExecutionStrategy::kParallelGraph) { for (auto &place : member_->places_) { if (!platform::is_gpu_place(place)) continue; auto gpu_place = boost::get(place); if (gcs_[gpu_place.device] == nullptr) { ref_cnts_[gpu_place.device].reset(new details::ReferenceCountMap()); cur_ref_cnts_[gpu_place.device].reset( new details::AtomicReferenceCountMap()); gcs_[gpu_place.device].reset( new StreamGarbageCollector(gpu_place, max_memory_size)); } } if (!gcs_.empty()) { for (size_t i = 0; i < graphs.size(); ++i) { auto ref_cnt_pass = ir::PassRegistry::Instance().Get("reference_count_pass"); ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_); ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_); ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_); graphs[i] = ref_cnt_pass->Apply(std::move(graphs[i])); graphs[i]->SetNotOwned("garbage_collector", &gcs_); } } } #else std::unique_ptr graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, params, member_->local_scopes_, member_->use_cuda_); graphs.push_back(std::move(graph)); #endif // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; for (auto &graph : graphs) { for (auto &node : graph->Nodes()) { if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { var_infos.emplace_back(); var_infos.back().name_ = node->Var()->Name(); var_infos.back().type_ = node->Var()->GetType(); var_infos.back().persistable_ = node->Var()->Persistable(); } } } // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { size_t graph_num = ir::GraphNum(*graphs[0]); if (graph_num > 1) { LOG(WARNING) << "The number of graph should be only one, " "but the current graph has " << ir::GraphNum(*graphs[0]) << " sub_graphs. If you want to see the nodes of the " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "to specify the output dir. NOTES: if you not do training, " "please don't pass loss_var_name."; } } if (exec_strategy.type_ == ExecutionStrategy::kDefault) { member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, std::move(graphs[0]))); } else if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) { member_->executor_.reset(new details::ParallelSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, std::move(graphs))); } else { member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, std::move(graphs[0]))); } member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( exec_strategy, member_->local_scopes_, std::move(var_infos), member_->places_, std::move(member_->executor_))); } void ParallelExecutor::BCastParamsToDevices( const std::unordered_set &vars) const { // the initializing bcast, all vars would be bcast from device(0). for (auto &var : vars) { framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var); if (main_var == nullptr || !main_var->IsType()) { continue; } auto &main_tensor = main_var->Get(); if (!main_tensor.IsInitialized()) { VLOG(3) << "one in var not inited, return!"; continue; } auto &dims = main_tensor.dims(); if (paddle::platform::is_gpu_place(main_tensor.place())) { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::vector buffers; size_t numel = main_tensor.numel(); ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); for (size_t i = 0; i < member_->places_.size(); ++i) { auto place = member_->places_[i]; void *buffer; if (i == 0) { buffer = const_cast(main_tensor.data()); } else { auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); t->Resize(dims); buffer = t->mutable_data(place, main_tensor.type()); } buffers.push_back(buffer); } PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(), "variables' buffer size to bcast NOT equal to places"); { platform::NCCLGroupGuard guard; for (size_t i = 0; i < member_->places_.size(); ++i) { auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]); platform::dynload::ncclBcast(buffers[i], numel, data_type, 0, nccl_ctx.comm_, nccl_ctx.stream()); } member_->nccl_ctxs_->WaitAll(); } #else PADDLE_THROW("Not compiled with CUDA"); #endif } else { platform::CPUPlace cpu; for (size_t i = 0; i < member_->places_.size(); ++i) { if (i == 0) continue; auto local_scope = member_->local_scopes_[i]; auto *t = local_scope->Var(var)->GetMutable(); // FIXME(zcd): LR_DECAY_COUNTER should not be shared. This is a hot fix. if (member_->use_all_reduce_ || member_->use_cuda_ || var == "@LR_DECAY_COUNTER@") { t->Resize(dims); t->mutable_data(cpu, main_tensor.type()); paddle::framework::TensorCopy(main_tensor, cpu, t); } else { t->ShareDataWith(main_tensor); } } } } } void ParallelExecutor::Run(const std::vector &fetch_tensors, const std::string &fetched_var_name) { platform::RecordBlock b(0); #ifdef PADDLE_WITH_CUDA if (!gcs_.empty()) { ResetReferenceCount(); for (auto &pair : cur_ref_cnts_) { auto &name_map = *(pair.second); for (auto &fetch_name : fetch_tensors) { name_map.erase(fetch_name); } name_map.erase(fetched_var_name); } } #endif auto fetch_data = member_->executor_->Run(fetch_tensors); *member_->global_scope_->Var(fetched_var_name)->GetMutable() = fetch_data; } void ParallelExecutor::FeedTensorsIntoLocalScopes( const std::vector> &tensors) { PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size()); for (size_t i = 0; i < tensors.size(); ++i) { auto &map = tensors[i]; auto *scope = member_->local_scopes_[i]; for (auto &pair : map) { auto *trg = scope->Var(pair.first)->GetMutable(); trg->ShareDataWith(pair.second); trg->set_lod(pair.second.lod()); } } } void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( const std::unordered_map &tensors) { for (auto pair : tensors) { auto lod_tensors = pair.second.SplitLoDTensor(member_->places_); PADDLE_ENFORCE_EQ( member_->places_.size(), lod_tensors.size(), "The number of samples of current batch is less than the count of " "devices, currently, it is not allowed. (%d vs %d)", member_->places_.size(), lod_tensors.size()); for (size_t j = 0; j < member_->places_.size(); ++j) { // TODO(panxy0718): Do I need to delete this var? auto t = member_->local_scopes_[j]->Var(pair.first)->GetMutable(); t->ShareDataWith(lod_tensors[j]); t->set_lod(lod_tensors[j].lod()); } } } ParallelExecutor::~ParallelExecutor() { for (auto &p : member_->places_) { platform::DeviceContextPool::Instance().Get(p)->Wait(); } // member_ must be destructed before gcs_ since the destructor of // ReferenceCountOpHandle use raw pointers of gcs_ inside. member_.reset(); } } // namespace framework } // namespace paddle #ifdef PADDLE_WITH_CUDA USE_PASS(reference_count_pass); #endif