未验证 提交 30568473 编写于 作者: Y Yan Xu 提交者: GitHub

fix broadcast on mp mode (#15951)

* fix broadcast with mp mode

* polish code test=develop

* fix bcast strategy test=develop

* fic cpplint test=develop

* fix py3 failed test=develop

* fix comment test=develop

* update comment test=develop
上级 e3c37bd5
......@@ -181,12 +181,13 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
return member_->local_scopes_;
}
ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &bcast_vars,
const std::string &loss_var_name, Scope *scope,
ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
const std::vector<std::string> &bcast_vars,
const std::string &loss_var_name,
Scope *scope,
const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy,
ir::Graph *graph)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
......@@ -254,9 +255,23 @@ ParallelExecutor::ParallelExecutor(
PADDLE_THROW("Not compiled with CUDA");
#endif
}
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToDevices(bcast_vars);
// broadcast parameters from the 0th device to others:
auto need_broadcast = [&]() -> bool {
if (build_strategy.num_trainers_ > 1) {
// 1. num_tariners would be grater than 1 for nccl distributed training.
return true;
} else if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
// 2. Only one trainer process, but ParallelExecutor hold multiple
// devices.
return true;
}
return false;
};
if (need_broadcast()) {
BCastParamsToDevices(bcast_vars, build_strategy.trainer_id_);
}
// Startup Program has been run. All local scopes has correct parameters.
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
......@@ -338,7 +353,7 @@ ParallelExecutor::ParallelExecutor(
}
void ParallelExecutor::BCastParamsToDevices(
const std::unordered_set<std::string> &vars) const {
const std::vector<std::string> &vars, int trainer_id) const {
// the initializing bcast, all vars would be bcast from device(0).
for (auto &var : vars) {
framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
......@@ -362,7 +377,7 @@ void ParallelExecutor::BCastParamsToDevices(
auto place = member_->places_[i];
void *buffer;
if (i == 0) {
if (i == 0 && trainer_id == 0) {
buffer = const_cast<void *>(main_tensor.data<void>());
} else {
auto local_scope = member_->local_scopes_[i];
......
......@@ -14,9 +14,11 @@ limitations under the License. */
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/build_strategy.h"
......@@ -45,7 +47,7 @@ class ParallelExecutor {
public:
explicit ParallelExecutor(const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &bcast_vars,
const std::vector<std::string> &bcast_vars,
const std::string &loss_var_name, Scope *scope,
const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy,
......@@ -70,7 +72,10 @@ class ParallelExecutor {
const std::string &fetched_var_name);
private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
// broadcast the parameters from the 0th device.
// trainer_id the trainer index in nccl distributed training.
void BCastParamsToDevices(const std::vector<std::string> &vars,
int trainer_id = 0) const;
bool EnableParallelGraphExecution(const ir::Graph &graph,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const;
......
......@@ -1251,7 +1251,7 @@ All parameter, weight, gradient are variables in Paddle.
cannot be updated after being finalized.)DOC");
pe.def(py::init<const std::vector<platform::Place> &,
const std::unordered_set<std::string> &, const std::string &,
const std::vector<std::string> &, const std::string &,
Scope *, std::vector<Scope *> &, const ExecutionStrategy &,
const BuildStrategy &, ir::Graph *>())
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
......
......@@ -230,13 +230,17 @@ class CompiledProgram(object):
self._persistable_vars.append(cpt.to_text(node.name()))
places = list(map(_place_obj, self._places))
return core.ParallelExecutor(places,
set(self._persistable_vars),
# ParallelExecutor would broadcast all the parameters during initializing.
# The parameters of each process should be in the same ordered for the data-parallelism
# distributed training to keep the broadcast correct.
self._persistable_vars = list(set(self._persistable_vars))
self._persistable_vars.sort()
return core.ParallelExecutor(
places, self._persistable_vars,
cpt.to_text(self._loss_name)
if self._loss_name else six.u(''), scope,
self._local_scopes, self._exec_strategy,
self._build_strategy, self._graph)
if self._loss_name else six.u(''), self._scope, self._local_scopes,
self._exec_strategy, self._build_strategy, self._graph)
def _compile_inference(self):
return core.create_paddle_predictor(self._infer_config)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册