diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc index 56d8da3eca4b5a82ff6cdb8f4e3ff8638a02b437..0d5d328fd32cc2e12d4f4e94c94dae51f0c040bc 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.cc +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h" @@ -46,7 +48,8 @@ void Carrier::Init( const std::unordered_map& interceptor_id_to_rank, const std::unordered_map& interceptor_id_to_node, const framework::ProgramDesc& program, framework::Scope* scope, - int64_t num_micro_batches, const platform::Place& place) { + int64_t num_micro_batches, const platform::Place& place, + const std::vector& inference_root_scope_vars) { rank_ = rank; interceptor_id_to_rank_ = interceptor_id_to_rank; interceptor_id_to_node_ = interceptor_id_to_node; @@ -60,7 +63,7 @@ void Carrier::Init( microbatch_scopes_.resize(num_micro_batches); for (int i = 0; i < num_micro_batches; ++i) { microbatch_scopes_[i] = &minibatch_scope_->NewScope(); - CopyParameters(i, program); + CopyParameters(i, program, inference_root_scope_vars); } // TODO(fleet_exe dev): thread pool @@ -80,12 +83,23 @@ void Carrier::Release() { Carrier::~Carrier() { VLOG(3) << "Carrier's destructor."; } -void Carrier::CopyParameters(int microbatch_id, - const framework::ProgramDesc& program) { +void Carrier::CopyParameters( + int microbatch_id, const framework::ProgramDesc& program, + const std::vector& inference_root_scope_vars) { auto& global_block = program.Block(0); + std::map inference_root_scope_var_map; + for (auto var_name : inference_root_scope_vars) { + inference_root_scope_var_map.insert({var_name, 1}); + } for (auto& var : global_block.AllVars()) { - if (var->Persistable() && microbatch_id == 0) { + std::string var_name = var->Name(); + bool force_root = inference_root_scope_var_map.find(var_name) != + inference_root_scope_var_map.end(); + if (force_root) { + VLOG(4) << var_name << " will be forced to be created in the root scope."; + } + if ((var->Persistable() || force_root) && microbatch_id == 0) { auto* ptr = root_scope_->Var(var->Name()); InitializeVariable(ptr, var->GetType()); VLOG(5) << "Create persistable var: " << var->Name() diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 9a74fa78c0e7638cd9c5201b92b06619c1f5b10c..d35a3260915e2cfd40bea9dc03fe6af7d9d04c54 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -57,9 +57,12 @@ class Carrier final { const std::unordered_map& interceptor_id_to_rank, const std::unordered_map& interceptor_id_to_node, const framework::ProgramDesc& program, framework::Scope* scope, - int64_t num_micro_batches, const platform::Place& place); + int64_t num_micro_batches, const platform::Place& place, + const std::vector& inference_root_scope_vars = {}); - void CopyParameters(int microbatch_id, const framework::ProgramDesc& program); + void CopyParameters( + int microbatch_id, const framework::ProgramDesc& program, + const std::vector& inference_root_scope_vars); void Release(); void Wait(); diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 457549a27b4b7ed6305b107cfd319ecae026a53b..e946d78550ff1bb0155843a680fbec33fdca9aa3 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/global.h" @@ -52,7 +53,8 @@ void FleetExecutor::Init( const std::string& carrier_id, const framework::ProgramDesc& program_desc, framework::Scope* scope, const platform::Place& place, int64_t num_micro_batches, const std::vector& task_nodes, - const std::unordered_map& task_id_to_rank) { + const std::unordered_map& task_id_to_rank, + const std::vector& inference_root_scope_vars) { PADDLE_ENFORCE_GT(task_nodes.size(), 0, platform::errors::InvalidArgument( "Fleet executor is inited with empty task node")); @@ -64,6 +66,37 @@ void FleetExecutor::Init( } } auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {}); + // NOTE: For inference, the vars in inference_root_scope_vars + // shouldn't be deleted during inf, for that they may be the result of the + // inf. If they are GCed, it will cause error during ZeroCopy the result. + std::vector changed_ops; + for (auto pair : unused_vars) { + const framework::OperatorBase* op = pair.first; + std::vector unused = pair.second; + for (auto name : inference_root_scope_vars) { + auto iter = std::find(unused.begin(), unused.end(), name); + if (iter != unused.end()) { + VLOG(3) << "Removing var: [" << name + << "] from the unused vars list of op: [" << op->Type() << "]"; + unused.erase(iter); + if (std::find(changed_ops.begin(), changed_ops.end(), op) == + changed_ops.end()) { + // record the op whose unused vars have been updated + changed_ops.emplace_back(op); + } + } + } + // update the unused vars list in the map + unused_vars[op] = unused; + } + for (auto op : changed_ops) { + auto iter = unused_vars.find(op); + if (iter->second.empty()) { + // remove those ops in the map that have empty unused vars list + VLOG(3) << "Removing op: [" << op->Type() << "] from unused_vars map."; + unused_vars.erase(iter); + } + } runtime_graph_ = std::make_shared(); std::unordered_map interceptor_id_to_task; for (auto task_node : task_nodes) { @@ -82,17 +115,18 @@ void FleetExecutor::Init( carrier_ids_.insert(carrier_id); // Set current running carrier GlobalVal::Set(new std::string(carrier_id)); - InitCarrier(carrier, scope, place, num_micro_batches, program_desc); + InitCarrier(carrier, scope, place, num_micro_batches, program_desc, + inference_root_scope_vars); GlobalVal::Get()->Barrier(); } -void FleetExecutor::InitCarrier(Carrier* carrier, framework::Scope* scope, - const platform::Place& place, - int64_t num_micro_batches, - const framework::ProgramDesc& program_desc) { +void FleetExecutor::InitCarrier( + Carrier* carrier, framework::Scope* scope, const platform::Place& place, + int64_t num_micro_batches, const framework::ProgramDesc& program_desc, + const std::vector& inference_root_scope_vars) { carrier->Init(exe_desc_.cur_rank(), runtime_graph_->interceptor_id_to_rank(), runtime_graph_->interceptor_id_to_node(), program_desc, scope, - num_micro_batches, place); + num_micro_batches, place, inference_root_scope_vars); } void FleetExecutor::InitMessageBus() { diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index fa65309127bec50869c52d2f3c85477910ccb37b..ccdb3dcc459489db9f342a2302fae3d777170313 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -42,15 +42,17 @@ class FleetExecutor final { const framework::ProgramDesc& program_desc, framework::Scope* scope, const platform::Place& place, int64_t num_micro_batches, const std::vector& task_nodes, - const std::unordered_map& task_id_to_rank); + const std::unordered_map& task_id_to_rank, + const std::vector& inference_root_scope_vars = {}); void Run(const std::string& carrier_id); private: DISABLE_COPY_AND_ASSIGN(FleetExecutor); void InitMessageBus(); - void InitCarrier(Carrier* carrier, framework::Scope* scope, - const platform::Place& place, int64_t num_micro_batches, - const framework::ProgramDesc& program_desc); + void InitCarrier( + Carrier* carrier, framework::Scope* scope, const platform::Place& place, + int64_t num_micro_batches, const framework::ProgramDesc& program_desc, + const std::vector& inference_root_scope_vars = {}); FleetExecutorDesc exe_desc_; std::shared_ptr runtime_graph_; std::unordered_set carrier_ids_; diff --git a/paddle/fluid/distributed/fleet_executor/task_node.cc b/paddle/fluid/distributed/fleet_executor/task_node.cc index 6de7038b3231f2fb302dd970273c565c5a718b73..95e4c73305998e4190c1547cb2f92809e360b216 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.cc +++ b/paddle/fluid/distributed/fleet_executor/task_node.cc @@ -52,11 +52,20 @@ void TaskNode::SetProgram(paddle::framework::ProgramDesc* program) { program_ = program; } -void TaskNode::Init() { +void TaskNode::Init(bool use_feed_fetch_ops) { + if (!use_feed_fetch_ops) { + VLOG(3) << "TaskNode will be inited without feed and fetch ops"; + } if (ops_.empty()) { // Q (for fleet executor dev): should we need another reset funct? VLOG(3) << "Task node will be inited by calling Init()."; for (const auto& op_desc : program_->Block(0).AllOps()) { + if (!use_feed_fetch_ops && + (op_desc->Type() == "feed" || op_desc->Type() == "fetch")) { + VLOG(3) << "TaskNode will skip [" << op_desc->Input("X")[0] << "], " + << op_desc->Type() << " -> " << op_desc->Output("Out")[0]; + continue; + } ops_vec_.emplace_back(framework::OpRegistry::CreateOp(*op_desc)); } for (const auto& op : ops_vec_) { diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h index b655d140d37a5bdf547a278eec3355ef4638539f..4764d4fd4af87adf3df31f2dabb614da7d719861 100644 --- a/paddle/fluid/distributed/fleet_executor/task_node.h +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -46,7 +46,7 @@ class TaskNode final { ~TaskNode() = default; void SetProgram(paddle::framework::ProgramDesc* program); - void Init(); + void Init(bool use_feed_fetch_ops = true); int64_t rank() const { return rank_; } int64_t task_id() const { return task_id_; } int32_t role() const { return role_; } diff --git a/paddle/fluid/inference/api/analysis_config.cc b/paddle/fluid/inference/api/analysis_config.cc index fd2ccffae3b4af3280f622722d6080d7c68bfbad..9c33d7003064532db7276d0f6dad90e1b2c55104 100644 --- a/paddle/fluid/inference/api/analysis_config.cc +++ b/paddle/fluid/inference/api/analysis_config.cc @@ -274,6 +274,9 @@ AnalysisConfig::AnalysisConfig(const AnalysisConfig &other) { CP_MEMBER(ipu_available_memory_proportion_); CP_MEMBER(ipu_enable_half_partial_); + // fleet exe related + CP_MEMBER(dist_config_); + if (use_gpu_) { PADDLE_ENFORCE_EQ(use_xpu_, false, platform::errors::InvalidArgument( diff --git a/paddle/fluid/inference/api/analysis_predictor.cc b/paddle/fluid/inference/api/analysis_predictor.cc index cd6e3a3c759c05bda34978dd78d07358aacd53fe..5492c3b0d26453c590e6a0a1350d88b442b789f7 100644 --- a/paddle/fluid/inference/api/analysis_predictor.cc +++ b/paddle/fluid/inference/api/analysis_predictor.cc @@ -30,6 +30,7 @@ #include "paddle/fluid/framework/ir/fuse_pass_base.h" #include "paddle/fluid/framework/ir/pass.h" #include "paddle/fluid/framework/naive_executor.h" +#include "paddle/fluid/framework/op_proto_maker.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/var_type_traits.h" #include "paddle/fluid/framework/version.h" @@ -47,6 +48,14 @@ #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/profiler.h" #include "paddle/phi/api/ext/op_meta_info.h" +#include "paddle/utils/string/split.h" + +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" +#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" +#include "paddle/fluid/distributed/fleet_executor/task_node.h" +#endif #ifdef PADDLE_WITH_MKLML #include "paddle/fluid/platform/dynload/mklml.h" @@ -186,14 +195,14 @@ bool AnalysisPredictor::Init( return false; } + // Get the feed_target_names and fetch_target_names + PrepareFeedFetch(); + // Prepare executor, create local variables. if (!PrepareExecutor()) { return true; } - // Get the feed_target_names and fetch_target_names - PrepareFeedFetch(); - return true; } @@ -359,6 +368,13 @@ static void DisablePrepareDataOpt( } bool AnalysisPredictor::PrepareExecutor() { +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) + if (config_.dist_config().use_dist_model()) { + VLOG(3) << "use_dist_model is enabled, will init FleetExecutor."; + return PrepareFleetExecutor(); + } +#endif DisablePrepareDataOpt(inference_program_, 0, false); executor_->Prepare(sub_scope_, *inference_program_, 0, @@ -371,6 +387,226 @@ bool AnalysisPredictor::PrepareExecutor() { return true; } +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) +bool AnalysisPredictor::PrepareFleetExecutor() { + VLOG(3) << "AnalysisPredictor::PrepareFleetExecutor()"; + if (config_.dist_config().nranks() > 1 && !CommInit()) { + return false; + } + task_node_.reset(new distributed::TaskNode(inference_program_.get(), + config_.dist_config().rank())); + // With auto cut, there is no concept of pp, no need to add dependency. + task_node_->SetType("Compute"); + task_node_->Init(config_.use_feed_fetch_ops_enabled()); + executor_desc_ = distributed::FleetExecutorDesc(); + executor_desc_.set_cur_rank(config_.dist_config().rank()); + std::unordered_map id_to_rank; + for (int i = 0; i < config_.dist_config().nranks(); ++i) { + distributed::RankInfo *rank_info = executor_desc_.add_cluster_info(); + rank_info->set_rank(i); + rank_info->set_ip_port(config_.dist_config().trainer_endpoints()[i]); + id_to_rank.insert({i, i}); + } + fleet_exe_.reset(new distributed::FleetExecutor(executor_desc_)); + // NOTE: Vars of feed fetch ops are not persistable, + // which will result in that those vars will be created in + // the subscope (microscope) in fleet executor. This will + // cause that the GetInputTensor/GetOutputTensor funct + // in analysis predictor cannot find those vars in the scope + // returned by the DistModel, since DistModel only return the + // root scope. So, those vars must to be created in the root + // scope instead of in the microscope + std::vector feed_fetch_vars; + for (auto pair : idx2feeds_) { + feed_fetch_vars.emplace_back(pair.second); + } + for (auto pair : idx2fetches_) { + feed_fetch_vars.emplace_back(pair.second); + } + fleet_exe_->Init(config_.dist_config().carrier_id(), + *(inference_program_.get()), scope_.get(), place_, 1, + {task_node_.get()}, id_to_rank, feed_fetch_vars); + return true; +} + +bool AnalysisPredictor::CommInit() { + std::map> ring_id_to_ranks{}; + std::map> rank_to_ring_ids{}; + if (!LoadConverterConfig(&ring_id_to_ranks, &rank_to_ring_ids)) { + VLOG(3) << "Load converter config failed, DistModel init failed."; + return false; + } + std::unique_ptr comm_init_program( + new framework::ProgramDesc()); + framework::BlockDesc *comm_init_block = comm_init_program->MutableBlock(0); + std::vector &ring_ids = + rank_to_ring_ids[config_.dist_config().rank()]; + int64_t order = 0; + std::string var_name_base = "comm_init_"; + for (int64_t ring_id : ring_ids) { + VLOG(3) << "Init comm for ring id: " << ring_id; + int64_t ranks_in_group = ring_id_to_ranks[ring_id].size(); + int64_t rank_in_group = 0; + std::vector &ranks = ring_id_to_ranks[ring_id]; + for (int64_t rank : ranks) { + if (config_.dist_config().rank() == rank) { + break; + } + rank_in_group += 1; + } + std::vector peer_endpoints; + for (int64_t rank : ranks) { + if (config_.dist_config().rank() == rank) { + continue; + } + peer_endpoints.emplace_back( + config_.dist_config().trainer_endpoints()[rank]); + } + InsertCommOp(var_name_base + std::to_string(order), ranks_in_group, + rank_in_group, peer_endpoints, comm_init_block, ring_id); + order += 1; + } + framework::NaiveExecutor e(place_); + e.CreateVariables(*comm_init_program, 0, true, scope_.get()); + e.Prepare(scope_.get(), *comm_init_program, 0, false); + e.Run(); + VLOG(3) << "Comm init successful."; + return true; +} + +void AnalysisPredictor::InsertCommOp( + std::string tmp_var_name, int nranks, int rank, + const std::vector &peer_endpoints, framework::BlockDesc *block, + int ring_id) { + /* + * tmp_var_name: the var name for var comm_id + * nranks: number of total ranks + * rank: the rank of local rank in the comm group + * peer_endpoints: peer's endpoints + * block: the block where to insert the comm ops + * ring_id: the ring_id to be inited + */ + const std::string &endpoint = config_.dist_config().current_endpoint(); + std::stringstream ss; + ss << "Init comm with tmp var: " << tmp_var_name + << ". The ring id is: " << ring_id << ". The group has: " << nranks + << " ranks. Current rank in the group is: " << rank + << ". The endpoint is: " << endpoint << ". Peer endpoints are: "; + for (auto ep : peer_endpoints) { + ss << ep << ", "; + } + VLOG(3) << ss.str(); + if (config_.use_gpu()) { + framework::VarDesc *new_var = block->Var(tmp_var_name); + new_var->SetType(framework::proto::VarType::RAW); + new_var->SetPersistable(true); + framework::OpDesc *gen_nccl_id_op = block->AppendOp(); + gen_nccl_id_op->SetType("c_gen_nccl_id"); + gen_nccl_id_op->SetOutput("Out", {tmp_var_name}); + gen_nccl_id_op->SetAttr("rank", rank); + gen_nccl_id_op->SetAttr("endpoint", + config_.dist_config().current_endpoint()); + gen_nccl_id_op->SetAttr("other_endpoints", peer_endpoints); + gen_nccl_id_op->SetAttr("ring_id", ring_id); + gen_nccl_id_op->SetAttr("op_role", + static_cast(framework::OpRole::kForward)); + gen_nccl_id_op->CheckAttrs(); + framework::OpDesc *comm_init_op = block->AppendOp(); + comm_init_op->SetType("c_comm_init"); + comm_init_op->SetInput("X", {tmp_var_name}); + comm_init_op->SetAttr("rank", rank); + comm_init_op->SetAttr("nranks", nranks); + comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("op_role", + static_cast(framework::OpRole::kForward)); + comm_init_op->CheckAttrs(); + } else { + LOG(WARNING) << "DistModelInf doesn't init comm."; + // TODO(fleet exe dev): comm init for more devices + } +} + +bool AnalysisPredictor::LoadConverterConfig( + std::map> *ring_id_to_ranks, + std::map> *rank_to_ring_ids) { + VLOG(3) << "Going to load converter config from: " + << config_.dist_config().comm_init_config() << "\n"; + std::ifstream fin(config_.dist_config().comm_init_config(), std::ios::in); + PADDLE_ENFORCE_EQ( + static_cast(fin.is_open()), true, + platform::errors::NotFound( + "Cannot open file %s, please confirm whether the file is normal.", + config_.dist_config().comm_init_config())); + std::string line; + bool ring_to_rank{true}; + // Reading config from file, the config file should like these format + // [ring_id -> ranks] + // 0,0,1,2,3 + // 1,0,1 + // 2,2,3 + // 21,0,1 + // 22,1,2 + // 23,2,3 + // [rank -> ring_ids] + // 0,0,1,21 + // 1,0,1,21,22 + // 2,0,2,22,23 + // 3,0,2,23 + while (std::getline(fin, line)) { + std::vector one_line = paddle::string::Split(line, ','); + if (one_line.size() == 1) { + // start a new section of the config + if (line == "[ring_id -> ranks]") { + ring_to_rank = true; + } else if (line == "[rank -> ring_ids]") { + ring_to_rank = false; + } + } else { + // parse key - values pairs in one section + int64_t key = std::stoll(one_line[0]); + for (size_t i = 1; i < one_line.size(); ++i) { + int64_t val = std::stoll(one_line[i]); + if (ring_to_rank) { + if (ring_id_to_ranks->find(key) == ring_id_to_ranks->end()) { + ring_id_to_ranks->insert({key, std::vector()}); + } + ring_id_to_ranks->at(key).emplace_back(val); + } else { + if (rank_to_ring_ids->find(key) == rank_to_ring_ids->end()) { + rank_to_ring_ids->insert({key, std::vector()}); + } + rank_to_ring_ids->at(key).emplace_back(val); + } + // NOTE: add more configuration sections here + } + } + } + std::stringstream ss; + ss << "Loaded the following converter config:\n"; + ss << "ring_id_to_ranks:\n"; + for (auto pair : *ring_id_to_ranks) { + int64_t key = pair.first; + ss << "\t" << key << "\t->\t"; + for (auto value : pair.second) { + ss << value << "\t"; + } + ss << "\n"; + } + ss << "rank_to_ring_ids:\n"; + for (auto pair : *rank_to_ring_ids) { + int64_t key = pair.first; + ss << "\t" << key << "\t->\t"; + for (auto value : pair.second) { + ss << value << "\t"; + } + ss << "\n"; + } + VLOG(3) << ss.str(); + return true; +} +#endif + void AnalysisPredictor::MkldnnPreSet(const std::vector &inputs) { #ifdef PADDLE_WITH_MKLDNN std::vector> inputs_shape; @@ -946,13 +1182,24 @@ std::vector AnalysisPredictor::GetOutputNames() { std::unique_ptr AnalysisPredictor::GetInputTensor( const std::string &name) { + framework::Scope *scope; +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) + if (config_.dist_config().use_dist_model()) { + scope = scope_.get(); + } else { + scope = executor_->scope(); + } +#else + scope = executor_->scope(); +#endif PADDLE_ENFORCE_NOT_NULL( - executor_->scope()->FindVar(name), + scope->FindVar(name), platform::errors::PreconditionNotMet( - "The variable named %s is not found in the scope of the exector.", + "The variable named %s is not found in the scope of the executor.", name)); std::unique_ptr res( - new ZeroCopyTensor(static_cast(executor_->scope()))); + new ZeroCopyTensor(static_cast(scope))); res->input_or_output_ = true; res->SetName(name); if (platform::is_cpu_place(place_)) { @@ -985,13 +1232,24 @@ std::unique_ptr AnalysisPredictor::GetInputTensor( std::unique_ptr AnalysisPredictor::GetOutputTensor( const std::string &name) { + framework::Scope *scope; +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) + if (config_.dist_config().use_dist_model()) { + scope = scope_.get(); + } else { + scope = executor_->scope(); + } +#else + scope = executor_->scope(); +#endif PADDLE_ENFORCE_NOT_NULL( - executor_->scope()->FindVar(name), + scope->FindVar(name), platform::errors::PreconditionNotMet( - "he variable named %s is not found in the scope of the exector.", + "The variable named %s is not found in the scope of the executor.", name)); std::unique_ptr res( - new ZeroCopyTensor(static_cast(executor_->scope()))); + new ZeroCopyTensor(static_cast(scope))); res->input_or_output_ = false; res->SetName(name); if (platform::is_cpu_place(place_)) { @@ -1023,6 +1281,18 @@ std::unique_ptr AnalysisPredictor::GetOutputTensor( } bool AnalysisPredictor::ZeroCopyRun() { +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) + if (config_.dist_config().use_dist_model()) { + VLOG(3) << "ZeroCopyRun will use the fleet executor."; + inference::Timer timer; + timer.tic(); + fleet_exe_->Run(config_.dist_config().carrier_id()); + VLOG(3) << "Fleet executor inf runs once use: " + << std::to_string(timer.toc()) << "ms"; + return true; + } +#endif paddle::platform::SetNumThreads(config_.cpu_math_library_num_threads()); #ifdef PADDLE_WITH_MKLDNN if (config_.use_mkldnn_) { @@ -1035,7 +1305,6 @@ bool AnalysisPredictor::ZeroCopyRun() { MkldnnPreSet(shape_vector); } #endif - executor_->Run(); if (config_.shape_range_info_collected()) { diff --git a/paddle/fluid/inference/api/analysis_predictor.h b/paddle/fluid/inference/api/analysis_predictor.h index a8e56101d37dabe8837b8adde9672ce45ffd62a0..8ed183dae0b1b00f8e0014b2d9b470ac177152f0 100644 --- a/paddle/fluid/inference/api/analysis_predictor.h +++ b/paddle/fluid/inference/api/analysis_predictor.h @@ -18,6 +18,10 @@ #include #include #include +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) +#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" +#endif #include "paddle/fluid/framework/naive_executor.h" #include "paddle/fluid/framework/op_compatible_info.h" #include "paddle/fluid/inference/analysis/analyzer.h" @@ -391,6 +395,53 @@ class AnalysisPredictor : public PaddlePredictor { void StatisticShapeRangeInfo(); void CollectShapeRangeInfo(); +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) + // fleet exe related + + /// + /// \brief prepare for fleet executor to run + /// + /// Used in AnalysisPredictor::Init(), + /// + bool PrepareFleetExecutor(); + + /// + /// \brief init NCCL env for multi gpus inference + /// + /// Used in AnalysisPredictor::PrepareFleetExecutor() + /// + bool CommInit(); + + /// + /// \brief read the config to init NCCL env + /// + /// Used in AnalysisPredictor::CommInit() + /// + /// \param[in] ring_id_to_ranks: a ptr to ring_id_to_ranks + /// \param[in] rank_to_ring_ids: a ptr to rank_to_ring_ids + /// + bool LoadConverterConfig( + std::map> *ring_id_to_ranks, + std::map> *rank_to_ring_ids); + + /// + /// \brief add ops and run them with NaiveExecutor to init NCCL env + /// + /// Used in AnalysisPredictor::CommInit() + /// + /// \param[in] tmp_var_name: var name to hold NCCL unique id + /// \param[in] nranks: number of ranks in one comm group + /// \param[in] rank: relative rank of current rank in the comm group + /// \param[in] peer_endpoints: group's peers' endpoints + /// \param[in] block: the block to insert comm ops + /// \param[in] ring_id: the ring id to be used to init NCCL env + /// + void InsertCommOp(std::string tmp_var_name, int nranks, int rank, + const std::vector &peer_endpoints, + framework::BlockDesc *block, int ring_id); +#endif + private: AnalysisConfig config_; Argument argument_; @@ -436,6 +487,14 @@ class AnalysisPredictor : public PaddlePredictor { std::map>> shape_info_; int clone_num_{1}; + +#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ + !defined(PADDLE_WITH_ASCEND_CL) + // fleet executor related + distributed::FleetExecutorDesc executor_desc_; + std::shared_ptr fleet_exe_; + std::shared_ptr task_node_; +#endif }; } // namespace paddle diff --git a/paddle/fluid/inference/api/paddle_analysis_config.h b/paddle/fluid/inference/api/paddle_analysis_config.h index 180c028c6a61088edeb8723891d4de1ba2272b80..b4a358394404fa7d28838a00c96290747f146a1f 100644 --- a/paddle/fluid/inference/api/paddle_analysis_config.h +++ b/paddle/fluid/inference/api/paddle_analysis_config.h @@ -76,6 +76,54 @@ struct LiteNNAdapterConfig { LiteNNAdapterConfig& Disable(); }; +struct DistConfig { + bool use_dist_model() const { return use_dist_model_; } + void EnableDistModel(bool use_dist_model) { + use_dist_model_ = use_dist_model; + } + + std::vector trainer_endpoints() const { + return trainer_endpoints_; + } + + std::string current_endpoint() const { return current_endpoint_; } + + void SetEndpoints(const std::vector& trainer_endpoints, + const std::string& current_endpoint) { + trainer_endpoints_ = trainer_endpoints; + current_endpoint_ = current_endpoint; + } + + int64_t nranks() const { return nranks_; } + + int64_t rank() const { return rank_; } + + void SetRanks(int64_t nranks, int64_t rank) { + nranks_ = nranks; + rank_ = rank; + } + + std::string comm_init_config() const { return comm_init_config_; } + + void SetCommInitConfig(const std::string& comm_init_config) { + comm_init_config_ = comm_init_config; + } + + void SetCarrierId(const std::string& carrier_id) { carrier_id_ = carrier_id; } + + std::string carrier_id() const { return carrier_id_; } + + protected: + // DistModel Inference related + bool use_dist_model_{false}; // whether use DistModel or not + std::vector trainer_endpoints_{}; // all trainers' endpoints + std::string current_endpoint_{}; // current trainer's endpoint + int64_t nranks_{1}; // total ranks (number of trainers) + int64_t rank_{0}; // rank + std::string comm_init_config_{}; // converter config path + std::string carrier_id_{"inference"}; +}; + /// /// \brief configuration manager for AnalysisPredictor. /// \since 1.7.0 @@ -763,6 +811,12 @@ struct PD_INFER_DECL AnalysisConfig { LiteNNAdapterConfig& NNAdapter() { return nnadapter_config_; } + void SetDistConfig(const DistConfig& dist_config) { + dist_config_ = dist_config; + } + + const DistConfig& dist_config() const { return dist_config_; } + protected: // Update the config. void Update(); @@ -902,6 +956,9 @@ struct PD_INFER_DECL AnalysisConfig { mutable bool is_valid_{true}; std::string opt_cache_dir_; friend class paddle_infer::experimental::InternalUtils; + + // fleet exe related + DistConfig dist_config_{}; }; } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/CMakeLists.txt b/paddle/fluid/inference/tests/api/CMakeLists.txt index 0281fd917658ad0a2f6b22cefe02efec97870721..8c96499a022f7e9f0d1fd8c512070592cf7428ff 100644 --- a/paddle/fluid/inference/tests/api/CMakeLists.txt +++ b/paddle/fluid/inference/tests/api/CMakeLists.txt @@ -720,6 +720,12 @@ inference_analysis_test(test_analyzer_zerocopytensor_tensor SRCS analyzer_zeroco EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} ARGS --infer_model=${OCR_INSTALL_DIR}/model) +if(WITH_DISTRIBUTE AND WITH_PSCORE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) + inference_analysis_test(test_analyzer_dist_model SRCS analyzer_dist_model_tester.cc + EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} + ARGS --infer_model=${OCR_INSTALL_DIR}/model) +endif() + inference_analysis_test(test_analyzer_paddletensor_tensor SRCS analyzer_paddle_tensor_tester.cc EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} ARGS --infer_model=${OCR_INSTALL_DIR}/model --infer_data=${OCR_INSTALL_DIR}/data.txt --refer_result=${OCR_INSTALL_DIR}/result.txt) diff --git a/paddle/fluid/inference/tests/api/analyzer_dist_model_tester.cc b/paddle/fluid/inference/tests/api/analyzer_dist_model_tester.cc new file mode 100644 index 0000000000000000000000000000000000000000..7cf6e2adfc688f70e0ed31f7c1f5305206aa1702 --- /dev/null +++ b/paddle/fluid/inference/tests/api/analyzer_dist_model_tester.cc @@ -0,0 +1,72 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "gtest/gtest.h" +#include "paddle/fluid/framework/block_desc.h" +#include "paddle/fluid/framework/op_desc.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/inference/tests/api/tester_helper.h" +#include "paddle/fluid/inference/utils/singleton.h" + +namespace paddle { +namespace inference { + +TEST(test_dist_model, dist_model) { + std::cout << "Analysis Predictor DistModel test." << std::endl; + AnalysisConfig config; + config.SetModel(FLAGS_infer_model + "/__model__", + FLAGS_infer_model + "/__params__"); + config.SwitchUseFeedFetchOps(false); + config.EnableUseGpu(100, 0); + DistConfig dist_config; + dist_config.SetRanks(1, 0); + dist_config.EnableDistModel(true); + dist_config.SetEndpoints({""}, ""); + config.SetDistConfig(dist_config); + + auto predictor = paddle_infer::CreatePredictor(config); + int batch_size = 1; + int channels = 1; + int height = 48; + int width = 512; + int nums = batch_size * channels * height * width; + std::cout << "Created predictor." << std::endl; + + float* input = new float[nums]; + for (int i = 0; i < nums; ++i) input[i] = 0; + auto input_names = predictor->GetInputNames(); + + auto input_t = predictor->GetInputHandle(input_names[0]); + input_t->Reshape({batch_size, channels, height, width}); + input_t->CopyFromCpu(input); + std::cout << "Input data." << std::endl; + + predictor->Run(); + std::cout << "Zero Copy Run." << std::endl; + + std::vector out_data; + auto output_names = predictor->GetOutputNames(); + auto output_t = predictor->GetOutputHandle(output_names[0]); + std::vector output_shape = output_t->shape(); + int out_num = std::accumulate(output_shape.begin(), output_shape.end(), 1, + std::multiplies()); + out_data.resize(out_num); + output_t->CopyToCpu(out_data.data()); + std::cout << "Output data." << std::endl; + delete[] input; +} + +} // namespace inference +} // namespace paddle diff --git a/paddle/fluid/pybind/bind_fleet_executor.cc b/paddle/fluid/pybind/bind_fleet_executor.cc index b29cc10e8f56f5698874db8b357621aa4a88b238..8491d1e224930939212ed20019f5c78ca1c43f67 100644 --- a/paddle/fluid/pybind/bind_fleet_executor.cc +++ b/paddle/fluid/pybind/bind_fleet_executor.cc @@ -168,7 +168,7 @@ void BindFleetExecutor(py::module* m) { .def("set_run_at_offset", &TaskNode::SetRunAtOffset) .def("set_type", &TaskNode::SetType) .def("role", &TaskNode::role) - .def("init", &TaskNode::Init) + .def("init", [](TaskNode& self) { self.Init(); }) .def("set_program", &TaskNode::SetProgram); py::class_(*m, "DistModelConfig") diff --git a/paddle/fluid/pybind/inference_api.cc b/paddle/fluid/pybind/inference_api.cc index eafd5baab7d24cc63f0bce4fd479c0054d1d3659..9b5041154c95a8555e93a8995d2c230cd537f71b 100644 --- a/paddle/fluid/pybind/inference_api.cc +++ b/paddle/fluid/pybind/inference_api.cc @@ -658,7 +658,24 @@ void BindAnalysisConfig(py::module *m) { return dynamic_cast(self.pass_builder()); }, py::return_value_policy::reference) - .def("nnadapter", &AnalysisConfig::NNAdapter); + .def("nnadapter", &AnalysisConfig::NNAdapter) + .def("set_dist_config", &AnalysisConfig::SetDistConfig) + .def("dist_config", &AnalysisConfig::dist_config); + + py::class_(*m, "DistConfig") + .def(py::init<>()) + .def("set_carrier_id", &DistConfig::SetCarrierId) + .def("set_comm_init_config", &DistConfig::SetCommInitConfig) + .def("set_endpoints", &DistConfig::SetEndpoints) + .def("set_ranks", &DistConfig::SetRanks) + .def("enable_dist_model", &DistConfig::EnableDistModel) + .def("carrier_id", &DistConfig::carrier_id) + .def("current_endpoint", &DistConfig::current_endpoint) + .def("trainer_endpoints", &DistConfig::trainer_endpoints) + .def("nranks", &DistConfig::nranks) + .def("rank", &DistConfig::rank) + .def("comm_init_config", &DistConfig::comm_init_config) + .def("use_dist_model", &DistConfig::use_dist_model); } void BindLiteNNAdapterConfig(py::module *m) { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index e372727b0f0b6a338cd43ac81001bb32ffd03ecc..a7971763f53e1fa1ea445f8ac843a57ae00bd1c2 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -2034,8 +2034,11 @@ class Executor(object): fleet_opt['task_id_to_rank'] = task_id_to_rank place = core.Place() place.set_place(self.place) + # NOTE: the last argument is used to force create some vars in root scope, + # won't be used during train. self._fleet_executor.init(carrier_id, program.desc, scope, place, - num_micro_batches, tasks, task_id_to_rank) + num_micro_batches, tasks, task_id_to_rank, + []) def _run_using_fleet_executor(self, program=None,