未验证 提交 244ae318 编写于 作者: Y Yuang Liu 提交者: GitHub

[fleet_executor] Add entrance of FleetExecutor in AnalysisPredictor for...

[fleet_executor] Add entrance of FleetExecutor in AnalysisPredictor for distributed inference (#39992)
上级 90ab7403
......@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......@@ -46,7 +48,8 @@ void Carrier::Init(
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, TaskNode*>& interceptor_id_to_node,
const framework::ProgramDesc& program, framework::Scope* scope,
int64_t num_micro_batches, const platform::Place& place) {
int64_t num_micro_batches, const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars) {
rank_ = rank;
interceptor_id_to_rank_ = interceptor_id_to_rank;
interceptor_id_to_node_ = interceptor_id_to_node;
......@@ -60,7 +63,7 @@ void Carrier::Init(
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program);
CopyParameters(i, program, inference_root_scope_vars);
}
// TODO(fleet_exe dev): thread pool
......@@ -80,12 +83,23 @@ void Carrier::Release() {
Carrier::~Carrier() { VLOG(3) << "Carrier's destructor."; }
void Carrier::CopyParameters(int microbatch_id,
const framework::ProgramDesc& program) {
void Carrier::CopyParameters(
int microbatch_id, const framework::ProgramDesc& program,
const std::vector<std::string>& inference_root_scope_vars) {
auto& global_block = program.Block(0);
std::map<std::string, int> inference_root_scope_var_map;
for (auto var_name : inference_root_scope_vars) {
inference_root_scope_var_map.insert({var_name, 1});
}
for (auto& var : global_block.AllVars()) {
if (var->Persistable() && microbatch_id == 0) {
std::string var_name = var->Name();
bool force_root = inference_root_scope_var_map.find(var_name) !=
inference_root_scope_var_map.end();
if (force_root) {
VLOG(4) << var_name << " will be forced to be created in the root scope.";
}
if ((var->Persistable() || force_root) && microbatch_id == 0) {
auto* ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(5) << "Create persistable var: " << var->Name()
......
......@@ -57,9 +57,12 @@ class Carrier final {
const std::unordered_map<int64_t, int64_t>& interceptor_id_to_rank,
const std::unordered_map<int64_t, TaskNode*>& interceptor_id_to_node,
const framework::ProgramDesc& program, framework::Scope* scope,
int64_t num_micro_batches, const platform::Place& place);
int64_t num_micro_batches, const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars = {});
void CopyParameters(int microbatch_id, const framework::ProgramDesc& program);
void CopyParameters(
int microbatch_id, const framework::ProgramDesc& program,
const std::vector<std::string>& inference_root_scope_vars);
void Release();
void Wait();
......
......@@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/global.h"
......@@ -52,7 +53,8 @@ void FleetExecutor::Init(
const std::string& carrier_id, const framework::ProgramDesc& program_desc,
framework::Scope* scope, const platform::Place& place,
int64_t num_micro_batches, const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank) {
const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
const std::vector<std::string>& inference_root_scope_vars) {
PADDLE_ENFORCE_GT(task_nodes.size(), 0,
platform::errors::InvalidArgument(
"Fleet executor is inited with empty task node"));
......@@ -64,6 +66,37 @@ void FleetExecutor::Init(
}
}
auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {});
// NOTE: For inference, the vars in inference_root_scope_vars
// shouldn't be deleted during inf, for that they may be the result of the
// inf. If they are GCed, it will cause error during ZeroCopy the result.
std::vector<const framework::OperatorBase*> changed_ops;
for (auto pair : unused_vars) {
const framework::OperatorBase* op = pair.first;
std::vector<std::string> unused = pair.second;
for (auto name : inference_root_scope_vars) {
auto iter = std::find(unused.begin(), unused.end(), name);
if (iter != unused.end()) {
VLOG(3) << "Removing var: [" << name
<< "] from the unused vars list of op: [" << op->Type() << "]";
unused.erase(iter);
if (std::find(changed_ops.begin(), changed_ops.end(), op) ==
changed_ops.end()) {
// record the op whose unused vars have been updated
changed_ops.emplace_back(op);
}
}
}
// update the unused vars list in the map
unused_vars[op] = unused;
}
for (auto op : changed_ops) {
auto iter = unused_vars.find(op);
if (iter->second.empty()) {
// remove those ops in the map that have empty unused vars list
VLOG(3) << "Removing op: [" << op->Type() << "] from unused_vars map.";
unused_vars.erase(iter);
}
}
runtime_graph_ = std::make_shared<RuntimeGraph>();
std::unordered_map<int64_t, TaskNode*> interceptor_id_to_task;
for (auto task_node : task_nodes) {
......@@ -82,17 +115,18 @@ void FleetExecutor::Init(
carrier_ids_.insert(carrier_id);
// Set current running carrier
GlobalVal<std::string>::Set(new std::string(carrier_id));
InitCarrier(carrier, scope, place, num_micro_batches, program_desc);
InitCarrier(carrier, scope, place, num_micro_batches, program_desc,
inference_root_scope_vars);
GlobalVal<MessageBus>::Get()->Barrier();
}
void FleetExecutor::InitCarrier(Carrier* carrier, framework::Scope* scope,
const platform::Place& place,
int64_t num_micro_batches,
const framework::ProgramDesc& program_desc) {
void FleetExecutor::InitCarrier(
Carrier* carrier, framework::Scope* scope, const platform::Place& place,
int64_t num_micro_batches, const framework::ProgramDesc& program_desc,
const std::vector<std::string>& inference_root_scope_vars) {
carrier->Init(exe_desc_.cur_rank(), runtime_graph_->interceptor_id_to_rank(),
runtime_graph_->interceptor_id_to_node(), program_desc, scope,
num_micro_batches, place);
num_micro_batches, place, inference_root_scope_vars);
}
void FleetExecutor::InitMessageBus() {
......
......@@ -42,15 +42,17 @@ class FleetExecutor final {
const framework::ProgramDesc& program_desc, framework::Scope* scope,
const platform::Place& place, int64_t num_micro_batches,
const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank);
const std::unordered_map<int64_t, int64_t>& task_id_to_rank,
const std::vector<std::string>& inference_root_scope_vars = {});
void Run(const std::string& carrier_id);
private:
DISABLE_COPY_AND_ASSIGN(FleetExecutor);
void InitMessageBus();
void InitCarrier(Carrier* carrier, framework::Scope* scope,
const platform::Place& place, int64_t num_micro_batches,
const framework::ProgramDesc& program_desc);
void InitCarrier(
Carrier* carrier, framework::Scope* scope, const platform::Place& place,
int64_t num_micro_batches, const framework::ProgramDesc& program_desc,
const std::vector<std::string>& inference_root_scope_vars = {});
FleetExecutorDesc exe_desc_;
std::shared_ptr<RuntimeGraph> runtime_graph_;
std::unordered_set<std::string> carrier_ids_;
......
......@@ -52,11 +52,20 @@ void TaskNode::SetProgram(paddle::framework::ProgramDesc* program) {
program_ = program;
}
void TaskNode::Init() {
void TaskNode::Init(bool use_feed_fetch_ops) {
if (!use_feed_fetch_ops) {
VLOG(3) << "TaskNode will be inited without feed and fetch ops";
}
if (ops_.empty()) {
// Q (for fleet executor dev): should we need another reset funct?
VLOG(3) << "Task node will be inited by calling Init().";
for (const auto& op_desc : program_->Block(0).AllOps()) {
if (!use_feed_fetch_ops &&
(op_desc->Type() == "feed" || op_desc->Type() == "fetch")) {
VLOG(3) << "TaskNode will skip [" << op_desc->Input("X")[0] << "], "
<< op_desc->Type() << " -> " << op_desc->Output("Out")[0];
continue;
}
ops_vec_.emplace_back(framework::OpRegistry::CreateOp(*op_desc));
}
for (const auto& op : ops_vec_) {
......
......@@ -46,7 +46,7 @@ class TaskNode final {
~TaskNode() = default;
void SetProgram(paddle::framework::ProgramDesc* program);
void Init();
void Init(bool use_feed_fetch_ops = true);
int64_t rank() const { return rank_; }
int64_t task_id() const { return task_id_; }
int32_t role() const { return role_; }
......
......@@ -274,6 +274,9 @@ AnalysisConfig::AnalysisConfig(const AnalysisConfig &other) {
CP_MEMBER(ipu_available_memory_proportion_);
CP_MEMBER(ipu_enable_half_partial_);
// fleet exe related
CP_MEMBER(dist_config_);
if (use_gpu_) {
PADDLE_ENFORCE_EQ(use_xpu_, false,
platform::errors::InvalidArgument(
......
......@@ -30,6 +30,7 @@
#include "paddle/fluid/framework/ir/fuse_pass_base.h"
#include "paddle/fluid/framework/ir/pass.h"
#include "paddle/fluid/framework/naive_executor.h"
#include "paddle/fluid/framework/op_proto_maker.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/var_type_traits.h"
#include "paddle/fluid/framework/version.h"
......@@ -47,6 +48,14 @@
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/phi/api/ext/op_meta_info.h"
#include "paddle/utils/string/split.h"
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#endif
#ifdef PADDLE_WITH_MKLML
#include "paddle/fluid/platform/dynload/mklml.h"
......@@ -186,14 +195,14 @@ bool AnalysisPredictor::Init(
return false;
}
// Get the feed_target_names and fetch_target_names
PrepareFeedFetch();
// Prepare executor, create local variables.
if (!PrepareExecutor()) {
return true;
}
// Get the feed_target_names and fetch_target_names
PrepareFeedFetch();
return true;
}
......@@ -359,6 +368,13 @@ static void DisablePrepareDataOpt(
}
bool AnalysisPredictor::PrepareExecutor() {
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
if (config_.dist_config().use_dist_model()) {
VLOG(3) << "use_dist_model is enabled, will init FleetExecutor.";
return PrepareFleetExecutor();
}
#endif
DisablePrepareDataOpt(inference_program_, 0, false);
executor_->Prepare(sub_scope_, *inference_program_, 0,
......@@ -371,6 +387,226 @@ bool AnalysisPredictor::PrepareExecutor() {
return true;
}
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
bool AnalysisPredictor::PrepareFleetExecutor() {
VLOG(3) << "AnalysisPredictor::PrepareFleetExecutor()";
if (config_.dist_config().nranks() > 1 && !CommInit()) {
return false;
}
task_node_.reset(new distributed::TaskNode(inference_program_.get(),
config_.dist_config().rank()));
// With auto cut, there is no concept of pp, no need to add dependency.
task_node_->SetType("Compute");
task_node_->Init(config_.use_feed_fetch_ops_enabled());
executor_desc_ = distributed::FleetExecutorDesc();
executor_desc_.set_cur_rank(config_.dist_config().rank());
std::unordered_map<int64_t, int64_t> id_to_rank;
for (int i = 0; i < config_.dist_config().nranks(); ++i) {
distributed::RankInfo *rank_info = executor_desc_.add_cluster_info();
rank_info->set_rank(i);
rank_info->set_ip_port(config_.dist_config().trainer_endpoints()[i]);
id_to_rank.insert({i, i});
}
fleet_exe_.reset(new distributed::FleetExecutor(executor_desc_));
// NOTE: Vars of feed fetch ops are not persistable,
// which will result in that those vars will be created in
// the subscope (microscope) in fleet executor. This will
// cause that the GetInputTensor/GetOutputTensor funct
// in analysis predictor cannot find those vars in the scope
// returned by the DistModel, since DistModel only return the
// root scope. So, those vars must to be created in the root
// scope instead of in the microscope
std::vector<std::string> feed_fetch_vars;
for (auto pair : idx2feeds_) {
feed_fetch_vars.emplace_back(pair.second);
}
for (auto pair : idx2fetches_) {
feed_fetch_vars.emplace_back(pair.second);
}
fleet_exe_->Init(config_.dist_config().carrier_id(),
*(inference_program_.get()), scope_.get(), place_, 1,
{task_node_.get()}, id_to_rank, feed_fetch_vars);
return true;
}
bool AnalysisPredictor::CommInit() {
std::map<int64_t, std::vector<int64_t>> ring_id_to_ranks{};
std::map<int64_t, std::vector<int64_t>> rank_to_ring_ids{};
if (!LoadConverterConfig(&ring_id_to_ranks, &rank_to_ring_ids)) {
VLOG(3) << "Load converter config failed, DistModel init failed.";
return false;
}
std::unique_ptr<framework::ProgramDesc> comm_init_program(
new framework::ProgramDesc());
framework::BlockDesc *comm_init_block = comm_init_program->MutableBlock(0);
std::vector<int64_t> &ring_ids =
rank_to_ring_ids[config_.dist_config().rank()];
int64_t order = 0;
std::string var_name_base = "comm_init_";
for (int64_t ring_id : ring_ids) {
VLOG(3) << "Init comm for ring id: " << ring_id;
int64_t ranks_in_group = ring_id_to_ranks[ring_id].size();
int64_t rank_in_group = 0;
std::vector<int64_t> &ranks = ring_id_to_ranks[ring_id];
for (int64_t rank : ranks) {
if (config_.dist_config().rank() == rank) {
break;
}
rank_in_group += 1;
}
std::vector<std::string> peer_endpoints;
for (int64_t rank : ranks) {
if (config_.dist_config().rank() == rank) {
continue;
}
peer_endpoints.emplace_back(
config_.dist_config().trainer_endpoints()[rank]);
}
InsertCommOp(var_name_base + std::to_string(order), ranks_in_group,
rank_in_group, peer_endpoints, comm_init_block, ring_id);
order += 1;
}
framework::NaiveExecutor e(place_);
e.CreateVariables(*comm_init_program, 0, true, scope_.get());
e.Prepare(scope_.get(), *comm_init_program, 0, false);
e.Run();
VLOG(3) << "Comm init successful.";
return true;
}
void AnalysisPredictor::InsertCommOp(
std::string tmp_var_name, int nranks, int rank,
const std::vector<std::string> &peer_endpoints, framework::BlockDesc *block,
int ring_id) {
/*
* tmp_var_name: the var name for var comm_id
* nranks: number of total ranks
* rank: the rank of local rank in the comm group
* peer_endpoints: peer's endpoints
* block: the block where to insert the comm ops
* ring_id: the ring_id to be inited
*/
const std::string &endpoint = config_.dist_config().current_endpoint();
std::stringstream ss;
ss << "Init comm with tmp var: " << tmp_var_name
<< ". The ring id is: " << ring_id << ". The group has: " << nranks
<< " ranks. Current rank in the group is: " << rank
<< ". The endpoint is: " << endpoint << ". Peer endpoints are: ";
for (auto ep : peer_endpoints) {
ss << ep << ", ";
}
VLOG(3) << ss.str();
if (config_.use_gpu()) {
framework::VarDesc *new_var = block->Var(tmp_var_name);
new_var->SetType(framework::proto::VarType::RAW);
new_var->SetPersistable(true);
framework::OpDesc *gen_nccl_id_op = block->AppendOp();
gen_nccl_id_op->SetType("c_gen_nccl_id");
gen_nccl_id_op->SetOutput("Out", {tmp_var_name});
gen_nccl_id_op->SetAttr("rank", rank);
gen_nccl_id_op->SetAttr("endpoint",
config_.dist_config().current_endpoint());
gen_nccl_id_op->SetAttr("other_endpoints", peer_endpoints);
gen_nccl_id_op->SetAttr("ring_id", ring_id);
gen_nccl_id_op->SetAttr("op_role",
static_cast<int>(framework::OpRole::kForward));
gen_nccl_id_op->CheckAttrs();
framework::OpDesc *comm_init_op = block->AppendOp();
comm_init_op->SetType("c_comm_init");
comm_init_op->SetInput("X", {tmp_var_name});
comm_init_op->SetAttr("rank", rank);
comm_init_op->SetAttr("nranks", nranks);
comm_init_op->SetAttr("ring_id", ring_id);
comm_init_op->SetAttr("op_role",
static_cast<int>(framework::OpRole::kForward));
comm_init_op->CheckAttrs();
} else {
LOG(WARNING) << "DistModelInf doesn't init comm.";
// TODO(fleet exe dev): comm init for more devices
}
}
bool AnalysisPredictor::LoadConverterConfig(
std::map<int64_t, std::vector<int64_t>> *ring_id_to_ranks,
std::map<int64_t, std::vector<int64_t>> *rank_to_ring_ids) {
VLOG(3) << "Going to load converter config from: "
<< config_.dist_config().comm_init_config() << "\n";
std::ifstream fin(config_.dist_config().comm_init_config(), std::ios::in);
PADDLE_ENFORCE_EQ(
static_cast<bool>(fin.is_open()), true,
platform::errors::NotFound(
"Cannot open file %s, please confirm whether the file is normal.",
config_.dist_config().comm_init_config()));
std::string line;
bool ring_to_rank{true};
// Reading config from file, the config file should like these format
// [ring_id -> ranks]
// 0,0,1,2,3
// 1,0,1
// 2,2,3
// 21,0,1
// 22,1,2
// 23,2,3
// [rank -> ring_ids]
// 0,0,1,21
// 1,0,1,21,22
// 2,0,2,22,23
// 3,0,2,23
while (std::getline(fin, line)) {
std::vector<std::string> one_line = paddle::string::Split(line, ',');
if (one_line.size() == 1) {
// start a new section of the config
if (line == "[ring_id -> ranks]") {
ring_to_rank = true;
} else if (line == "[rank -> ring_ids]") {
ring_to_rank = false;
}
} else {
// parse key - values pairs in one section
int64_t key = std::stoll(one_line[0]);
for (size_t i = 1; i < one_line.size(); ++i) {
int64_t val = std::stoll(one_line[i]);
if (ring_to_rank) {
if (ring_id_to_ranks->find(key) == ring_id_to_ranks->end()) {
ring_id_to_ranks->insert({key, std::vector<int64_t>()});
}
ring_id_to_ranks->at(key).emplace_back(val);
} else {
if (rank_to_ring_ids->find(key) == rank_to_ring_ids->end()) {
rank_to_ring_ids->insert({key, std::vector<int64_t>()});
}
rank_to_ring_ids->at(key).emplace_back(val);
}
// NOTE: add more configuration sections here
}
}
}
std::stringstream ss;
ss << "Loaded the following converter config:\n";
ss << "ring_id_to_ranks:\n";
for (auto pair : *ring_id_to_ranks) {
int64_t key = pair.first;
ss << "\t" << key << "\t->\t";
for (auto value : pair.second) {
ss << value << "\t";
}
ss << "\n";
}
ss << "rank_to_ring_ids:\n";
for (auto pair : *rank_to_ring_ids) {
int64_t key = pair.first;
ss << "\t" << key << "\t->\t";
for (auto value : pair.second) {
ss << value << "\t";
}
ss << "\n";
}
VLOG(3) << ss.str();
return true;
}
#endif
void AnalysisPredictor::MkldnnPreSet(const std::vector<PaddleTensor> &inputs) {
#ifdef PADDLE_WITH_MKLDNN
std::vector<std::vector<int>> inputs_shape;
......@@ -946,13 +1182,24 @@ std::vector<std::string> AnalysisPredictor::GetOutputNames() {
std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetInputTensor(
const std::string &name) {
framework::Scope *scope;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
if (config_.dist_config().use_dist_model()) {
scope = scope_.get();
} else {
scope = executor_->scope();
}
#else
scope = executor_->scope();
#endif
PADDLE_ENFORCE_NOT_NULL(
executor_->scope()->FindVar(name),
scope->FindVar(name),
platform::errors::PreconditionNotMet(
"The variable named %s is not found in the scope of the exector.",
"The variable named %s is not found in the scope of the executor.",
name));
std::unique_ptr<ZeroCopyTensor> res(
new ZeroCopyTensor(static_cast<void *>(executor_->scope())));
new ZeroCopyTensor(static_cast<void *>(scope)));
res->input_or_output_ = true;
res->SetName(name);
if (platform::is_cpu_place(place_)) {
......@@ -985,13 +1232,24 @@ std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetInputTensor(
std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetOutputTensor(
const std::string &name) {
framework::Scope *scope;
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
if (config_.dist_config().use_dist_model()) {
scope = scope_.get();
} else {
scope = executor_->scope();
}
#else
scope = executor_->scope();
#endif
PADDLE_ENFORCE_NOT_NULL(
executor_->scope()->FindVar(name),
scope->FindVar(name),
platform::errors::PreconditionNotMet(
"he variable named %s is not found in the scope of the exector.",
"The variable named %s is not found in the scope of the executor.",
name));
std::unique_ptr<ZeroCopyTensor> res(
new ZeroCopyTensor(static_cast<void *>(executor_->scope())));
new ZeroCopyTensor(static_cast<void *>(scope)));
res->input_or_output_ = false;
res->SetName(name);
if (platform::is_cpu_place(place_)) {
......@@ -1023,6 +1281,18 @@ std::unique_ptr<ZeroCopyTensor> AnalysisPredictor::GetOutputTensor(
}
bool AnalysisPredictor::ZeroCopyRun() {
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
if (config_.dist_config().use_dist_model()) {
VLOG(3) << "ZeroCopyRun will use the fleet executor.";
inference::Timer timer;
timer.tic();
fleet_exe_->Run(config_.dist_config().carrier_id());
VLOG(3) << "Fleet executor inf runs once use: "
<< std::to_string(timer.toc()) << "ms";
return true;
}
#endif
paddle::platform::SetNumThreads(config_.cpu_math_library_num_threads());
#ifdef PADDLE_WITH_MKLDNN
if (config_.use_mkldnn_) {
......@@ -1035,7 +1305,6 @@ bool AnalysisPredictor::ZeroCopyRun() {
MkldnnPreSet(shape_vector);
}
#endif
executor_->Run();
if (config_.shape_range_info_collected()) {
......
......@@ -18,6 +18,10 @@
#include <memory>
#include <string>
#include <vector>
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#endif
#include "paddle/fluid/framework/naive_executor.h"
#include "paddle/fluid/framework/op_compatible_info.h"
#include "paddle/fluid/inference/analysis/analyzer.h"
......@@ -391,6 +395,53 @@ class AnalysisPredictor : public PaddlePredictor {
void StatisticShapeRangeInfo();
void CollectShapeRangeInfo();
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
// fleet exe related
///
/// \brief prepare for fleet executor to run
///
/// Used in AnalysisPredictor::Init(),
///
bool PrepareFleetExecutor();
///
/// \brief init NCCL env for multi gpus inference
///
/// Used in AnalysisPredictor::PrepareFleetExecutor()
///
bool CommInit();
///
/// \brief read the config to init NCCL env
///
/// Used in AnalysisPredictor::CommInit()
///
/// \param[in] ring_id_to_ranks: a ptr to ring_id_to_ranks
/// \param[in] rank_to_ring_ids: a ptr to rank_to_ring_ids
///
bool LoadConverterConfig(
std::map<int64_t, std::vector<int64_t>> *ring_id_to_ranks,
std::map<int64_t, std::vector<int64_t>> *rank_to_ring_ids);
///
/// \brief add ops and run them with NaiveExecutor to init NCCL env
///
/// Used in AnalysisPredictor::CommInit()
///
/// \param[in] tmp_var_name: var name to hold NCCL unique id
/// \param[in] nranks: number of ranks in one comm group
/// \param[in] rank: relative rank of current rank in the comm group
/// \param[in] peer_endpoints: group's peers' endpoints
/// \param[in] block: the block to insert comm ops
/// \param[in] ring_id: the ring id to be used to init NCCL env
///
void InsertCommOp(std::string tmp_var_name, int nranks, int rank,
const std::vector<std::string> &peer_endpoints,
framework::BlockDesc *block, int ring_id);
#endif
private:
AnalysisConfig config_;
Argument argument_;
......@@ -436,6 +487,14 @@ class AnalysisPredictor : public PaddlePredictor {
std::map<std::string, std::vector<std::vector<int32_t>>> shape_info_;
int clone_num_{1};
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
// fleet executor related
distributed::FleetExecutorDesc executor_desc_;
std::shared_ptr<distributed::FleetExecutor> fleet_exe_;
std::shared_ptr<distributed::TaskNode> task_node_;
#endif
};
} // namespace paddle
......@@ -76,6 +76,54 @@ struct LiteNNAdapterConfig {
LiteNNAdapterConfig& Disable();
};
struct DistConfig {
bool use_dist_model() const { return use_dist_model_; }
void EnableDistModel(bool use_dist_model) {
use_dist_model_ = use_dist_model;
}
std::vector<std::string> trainer_endpoints() const {
return trainer_endpoints_;
}
std::string current_endpoint() const { return current_endpoint_; }
void SetEndpoints(const std::vector<std::string>& trainer_endpoints,
const std::string& current_endpoint) {
trainer_endpoints_ = trainer_endpoints;
current_endpoint_ = current_endpoint;
}
int64_t nranks() const { return nranks_; }
int64_t rank() const { return rank_; }
void SetRanks(int64_t nranks, int64_t rank) {
nranks_ = nranks;
rank_ = rank;
}
std::string comm_init_config() const { return comm_init_config_; }
void SetCommInitConfig(const std::string& comm_init_config) {
comm_init_config_ = comm_init_config;
}
void SetCarrierId(const std::string& carrier_id) { carrier_id_ = carrier_id; }
std::string carrier_id() const { return carrier_id_; }
protected:
// DistModel Inference related
bool use_dist_model_{false}; // whether use DistModel or not
std::vector<std::string> trainer_endpoints_{}; // all trainers' endpoints
std::string current_endpoint_{}; // current trainer's endpoint
int64_t nranks_{1}; // total ranks (number of trainers)
int64_t rank_{0}; // rank
std::string comm_init_config_{}; // converter config path
std::string carrier_id_{"inference"};
};
///
/// \brief configuration manager for AnalysisPredictor.
/// \since 1.7.0
......@@ -763,6 +811,12 @@ struct PD_INFER_DECL AnalysisConfig {
LiteNNAdapterConfig& NNAdapter() { return nnadapter_config_; }
void SetDistConfig(const DistConfig& dist_config) {
dist_config_ = dist_config;
}
const DistConfig& dist_config() const { return dist_config_; }
protected:
// Update the config.
void Update();
......@@ -902,6 +956,9 @@ struct PD_INFER_DECL AnalysisConfig {
mutable bool is_valid_{true};
std::string opt_cache_dir_;
friend class paddle_infer::experimental::InternalUtils;
// fleet exe related
DistConfig dist_config_{};
};
} // namespace paddle
......@@ -720,6 +720,12 @@ inference_analysis_test(test_analyzer_zerocopytensor_tensor SRCS analyzer_zeroco
EXTRA_DEPS ${INFERENCE_EXTRA_DEPS}
ARGS --infer_model=${OCR_INSTALL_DIR}/model)
if(WITH_DISTRIBUTE AND WITH_PSCORE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL))
inference_analysis_test(test_analyzer_dist_model SRCS analyzer_dist_model_tester.cc
EXTRA_DEPS ${INFERENCE_EXTRA_DEPS}
ARGS --infer_model=${OCR_INSTALL_DIR}/model)
endif()
inference_analysis_test(test_analyzer_paddletensor_tensor SRCS analyzer_paddle_tensor_tester.cc
EXTRA_DEPS ${INFERENCE_EXTRA_DEPS}
ARGS --infer_model=${OCR_INSTALL_DIR}/model --infer_data=${OCR_INSTALL_DIR}/data.txt --refer_result=${OCR_INSTALL_DIR}/result.txt)
......
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "gtest/gtest.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/inference/tests/api/tester_helper.h"
#include "paddle/fluid/inference/utils/singleton.h"
namespace paddle {
namespace inference {
TEST(test_dist_model, dist_model) {
std::cout << "Analysis Predictor DistModel test." << std::endl;
AnalysisConfig config;
config.SetModel(FLAGS_infer_model + "/__model__",
FLAGS_infer_model + "/__params__");
config.SwitchUseFeedFetchOps(false);
config.EnableUseGpu(100, 0);
DistConfig dist_config;
dist_config.SetRanks(1, 0);
dist_config.EnableDistModel(true);
dist_config.SetEndpoints({""}, "");
config.SetDistConfig(dist_config);
auto predictor = paddle_infer::CreatePredictor(config);
int batch_size = 1;
int channels = 1;
int height = 48;
int width = 512;
int nums = batch_size * channels * height * width;
std::cout << "Created predictor." << std::endl;
float* input = new float[nums];
for (int i = 0; i < nums; ++i) input[i] = 0;
auto input_names = predictor->GetInputNames();
auto input_t = predictor->GetInputHandle(input_names[0]);
input_t->Reshape({batch_size, channels, height, width});
input_t->CopyFromCpu(input);
std::cout << "Input data." << std::endl;
predictor->Run();
std::cout << "Zero Copy Run." << std::endl;
std::vector<float> out_data;
auto output_names = predictor->GetOutputNames();
auto output_t = predictor->GetOutputHandle(output_names[0]);
std::vector<int> output_shape = output_t->shape();
int out_num = std::accumulate(output_shape.begin(), output_shape.end(), 1,
std::multiplies<int>());
out_data.resize(out_num);
output_t->CopyToCpu(out_data.data());
std::cout << "Output data." << std::endl;
delete[] input;
}
} // namespace inference
} // namespace paddle
......@@ -168,7 +168,7 @@ void BindFleetExecutor(py::module* m) {
.def("set_run_at_offset", &TaskNode::SetRunAtOffset)
.def("set_type", &TaskNode::SetType)
.def("role", &TaskNode::role)
.def("init", &TaskNode::Init)
.def("init", [](TaskNode& self) { self.Init(); })
.def("set_program", &TaskNode::SetProgram);
py::class_<DistModelConfig>(*m, "DistModelConfig")
......
......@@ -658,7 +658,24 @@ void BindAnalysisConfig(py::module *m) {
return dynamic_cast<PaddlePassBuilder *>(self.pass_builder());
},
py::return_value_policy::reference)
.def("nnadapter", &AnalysisConfig::NNAdapter);
.def("nnadapter", &AnalysisConfig::NNAdapter)
.def("set_dist_config", &AnalysisConfig::SetDistConfig)
.def("dist_config", &AnalysisConfig::dist_config);
py::class_<DistConfig>(*m, "DistConfig")
.def(py::init<>())
.def("set_carrier_id", &DistConfig::SetCarrierId)
.def("set_comm_init_config", &DistConfig::SetCommInitConfig)
.def("set_endpoints", &DistConfig::SetEndpoints)
.def("set_ranks", &DistConfig::SetRanks)
.def("enable_dist_model", &DistConfig::EnableDistModel)
.def("carrier_id", &DistConfig::carrier_id)
.def("current_endpoint", &DistConfig::current_endpoint)
.def("trainer_endpoints", &DistConfig::trainer_endpoints)
.def("nranks", &DistConfig::nranks)
.def("rank", &DistConfig::rank)
.def("comm_init_config", &DistConfig::comm_init_config)
.def("use_dist_model", &DistConfig::use_dist_model);
}
void BindLiteNNAdapterConfig(py::module *m) {
......
......@@ -2034,8 +2034,11 @@ class Executor(object):
fleet_opt['task_id_to_rank'] = task_id_to_rank
place = core.Place()
place.set_place(self.place)
# NOTE: the last argument is used to force create some vars in root scope,
# won't be used during train.
self._fleet_executor.init(carrier_id, program.desc, scope, place,
num_micro_batches, tasks, task_id_to_rank)
num_micro_batches, tasks, task_id_to_rank,
[])
def _run_using_fleet_executor(self,
program=None,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册