提交 ec6ee0a2 编写于 作者: X Xin Pan

simplify and hide bcast_params

上级 d9297c12
...@@ -209,30 +209,9 @@ ParallelExecutor::ParallelExecutor( ...@@ -209,30 +209,9 @@ ParallelExecutor::ParallelExecutor(
void ParallelExecutor::BCastParamsToDevices( void ParallelExecutor::BCastParamsToDevices(
const std::unordered_set<std::string> &vars) const { const std::unordered_set<std::string> &vars) const {
// the initializing bcast, all vars would be bcast from device(0), // the initializing bcast, all vars would be bcast from device(0).
// otherwise
// bcast from the specified device.
bool initializing = member_->executor_ ? false : true;
for (auto &var : vars) { for (auto &var : vars) {
int var_dev_id = -1; framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
if (member_->executor_) {
auto &sharded_var_device =
member_->executor_->Graph().Get<details::ShardedVarDevice>(
details::kShardedVarDevice);
if (sharded_var_device.find(var) != sharded_var_device.end()) {
var_dev_id = sharded_var_device.at(var);
}
}
if (!initializing && var_dev_id == -1) continue;
framework::Variable *main_var = nullptr;
if (initializing) {
main_var = member_->local_scopes_[0]->FindVar(var);
} else {
main_var = member_->local_scopes_[var_dev_id]->FindVar(var);
}
if (main_var == nullptr || !main_var->IsType<LoDTensor>()) { if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
continue; continue;
} }
...@@ -248,8 +227,7 @@ void ParallelExecutor::BCastParamsToDevices( ...@@ -248,8 +227,7 @@ void ParallelExecutor::BCastParamsToDevices(
auto place = member_->places_[i]; auto place = member_->places_[i];
void *buffer; void *buffer;
if ((initializing && i == 0) || if (i == 0) {
(!initializing && static_cast<int>(i) == var_dev_id)) {
buffer = const_cast<void *>(main_tensor.data<void>()); buffer = const_cast<void *>(main_tensor.data<void>());
} else { } else {
auto local_scope = member_->local_scopes_[i]; auto local_scope = member_->local_scopes_[i];
...@@ -266,29 +244,18 @@ void ParallelExecutor::BCastParamsToDevices( ...@@ -266,29 +244,18 @@ void ParallelExecutor::BCastParamsToDevices(
platform::NCCLGroupGuard guard; platform::NCCLGroupGuard guard;
for (size_t i = 0; i < member_->places_.size(); ++i) { for (size_t i = 0; i < member_->places_.size(); ++i) {
auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]); auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]);
if (initializing) { platform::dynload::ncclBcast(buffers[i], numel, data_type, 0,
platform::dynload::ncclBcast(buffers[i], numel, data_type, 0, nccl_ctx.comm_, nccl_ctx.stream());
nccl_ctx.comm_, nccl_ctx.stream());
} else {
if (var_dev_id >= 0) {
platform::dynload::ncclBcast(buffers[i], numel, data_type,
var_dev_id, nccl_ctx.comm_,
nccl_ctx.stream());
}
}
} }
member_->nccl_ctxs_->WaitAll(); member_->nccl_ctxs_->WaitAll();
} }
#else #else
PADDLE_THROW("Not compiled with CUDA"); PADDLE_THROW("Not compiled with CUDA");
#endif #endif
} else { } else {
platform::CPUPlace cpu; platform::CPUPlace cpu;
for (size_t i = 0; i < member_->places_.size(); ++i) { for (size_t i = 0; i < member_->places_.size(); ++i) {
if ((initializing && i == 0) || if (i == 0) continue;
(!initializing && static_cast<int>(i) == var_dev_id))
continue;
auto local_scope = member_->local_scopes_[i]; auto local_scope = member_->local_scopes_[i];
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>(); auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
......
...@@ -66,9 +66,9 @@ class ParallelExecutor { ...@@ -66,9 +66,9 @@ class ParallelExecutor {
void Run(const std::vector<std::string> &fetch_tensors, void Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name); const std::string &fetched_var_name);
private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const; void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
private:
ParallelExecutorPrivate *member_; ParallelExecutorPrivate *member_;
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册