未验证 提交 550e7e41 编写于 作者: C chengduo 提交者: GitHub

Code Clean parallel_executor.py (#14849)

* refine parallel_executor

* remove uncessary code
test=develop
上级 3d750f9c
...@@ -131,9 +131,7 @@ std::shared_ptr<ir::PassBuilder> BuildStrategy::CreatePassesFromStrategy( ...@@ -131,9 +131,7 @@ std::shared_ptr<ir::PassBuilder> BuildStrategy::CreatePassesFromStrategy(
std::unique_ptr<ir::Graph> BuildStrategy::Apply( std::unique_ptr<ir::Graph> BuildStrategy::Apply(
const ProgramDesc &main_program, const std::vector<platform::Place> &places, const ProgramDesc &main_program, const std::vector<platform::Place> &places,
const std::string &loss_var_name, const std::string &loss_var_name, const std::vector<Scope *> &local_scopes,
const std::unordered_set<std::string> &param_names,
const std::vector<Scope *> &local_scopes,
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const { const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const {
#else #else
...@@ -149,9 +147,6 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply( ...@@ -149,9 +147,6 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
pass->SetNotOwned<const std::vector<platform::Place>>("places", &places); pass->SetNotOwned<const std::vector<platform::Place>>("places", &places);
pass->Erase("loss_var_name"); pass->Erase("loss_var_name");
pass->SetNotOwned<const std::string>("loss_var_name", &loss_var_name); pass->SetNotOwned<const std::string>("loss_var_name", &loss_var_name);
pass->Erase("params");
pass->SetNotOwned<const std::unordered_set<std::string>>("params",
&param_names);
pass->Erase("local_scopes"); pass->Erase("local_scopes");
pass->SetNotOwned<const std::vector<Scope *>>("local_scopes", pass->SetNotOwned<const std::vector<Scope *>>("local_scopes",
&local_scopes); &local_scopes);
......
...@@ -106,14 +106,13 @@ struct BuildStrategy { ...@@ -106,14 +106,13 @@ struct BuildStrategy {
// Apply the passes built by the pass_builder_. The passes will be // Apply the passes built by the pass_builder_. The passes will be
// applied to the Program and output an ir::Graph. // applied to the Program and output an ir::Graph.
std::unique_ptr<ir::Graph> Apply( std::unique_ptr<ir::Graph> Apply(const ProgramDesc &main_program,
const ProgramDesc &main_program,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const std::string &loss_var_name, const std::string &loss_var_name,
const std::unordered_set<std::string> &param_names,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const; const bool use_cuda,
platform::NCCLContextMap *nccl_ctxs) const;
#else #else
const bool use_cuda) const; const bool use_cuda) const;
#endif #endif
......
...@@ -130,7 +130,6 @@ void AddOutputToLeafOps(ir::Graph *graph) { ...@@ -130,7 +130,6 @@ void AddOutputToLeafOps(ir::Graph *graph) {
static const char kLossVarName[] = "loss_var_name"; static const char kLossVarName[] = "loss_var_name";
static const char kPlaces[] = "places"; static const char kPlaces[] = "places";
static const char kParams[] = "params";
static const char kLocalScopes[] = "local_scopes"; static const char kLocalScopes[] = "local_scopes";
static const char kStrategy[] = "strategy"; static const char kStrategy[] = "strategy";
static const char kNumTrainers[] = "num_trainers"; static const char kNumTrainers[] = "num_trainers";
...@@ -147,9 +146,6 @@ void MultiDevSSAGraphBuilder::Init() const { ...@@ -147,9 +146,6 @@ void MultiDevSSAGraphBuilder::Init() const {
nccl_ctxs_ = &Get<platform::NCCLContextMap>("nccl_ctxs"); nccl_ctxs_ = &Get<platform::NCCLContextMap>("nccl_ctxs");
#endif #endif
for (auto &p : Get<const std::unordered_set<std::string>>(kParams)) {
grad_names_.insert(GradVarName(p));
}
balance_vars_.resize(places_.size(), 0); balance_vars_.resize(places_.size(), 0);
if (strategy_.enable_data_balance_ && places_.size() == 1) { if (strategy_.enable_data_balance_ && places_.size() == 1) {
LOG(WARNING) << "It is no need to enable data balance when there is only " LOG(WARNING) << "It is no need to enable data balance when there is only "
...@@ -898,7 +894,6 @@ REGISTER_PASS(multi_devices_pass, ...@@ -898,7 +894,6 @@ REGISTER_PASS(multi_devices_pass,
paddle::framework::details::MultiDevSSAGraphBuilder) paddle::framework::details::MultiDevSSAGraphBuilder)
.RequirePassAttr(paddle::framework::details::kLossVarName) .RequirePassAttr(paddle::framework::details::kLossVarName)
.RequirePassAttr(paddle::framework::details::kPlaces) .RequirePassAttr(paddle::framework::details::kPlaces)
.RequirePassAttr(paddle::framework::details::kParams)
.RequirePassAttr(paddle::framework::details::kLocalScopes) .RequirePassAttr(paddle::framework::details::kLocalScopes)
.RequirePassAttr(paddle::framework::details::kStrategy) .RequirePassAttr(paddle::framework::details::kStrategy)
.RequirePassAttr(paddle::framework::details::kNumTrainers); .RequirePassAttr(paddle::framework::details::kNumTrainers);
...@@ -103,7 +103,6 @@ class MultiDevSSAGraphBuilder : public ir::Pass { ...@@ -103,7 +103,6 @@ class MultiDevSSAGraphBuilder : public ir::Pass {
mutable std::string loss_var_name_; mutable std::string loss_var_name_;
mutable std::vector<platform::Place> places_; mutable std::vector<platform::Place> places_;
mutable std::vector<Scope *> local_scopes_; mutable std::vector<Scope *> local_scopes_;
mutable std::unordered_set<std::string> grad_names_;
mutable BuildStrategy strategy_; mutable BuildStrategy strategy_;
mutable std::unordered_map<std::string, VarDesc *> all_vars_; mutable std::unordered_map<std::string, VarDesc *> all_vars_;
......
...@@ -190,7 +190,6 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() { ...@@ -190,7 +190,6 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
ParallelExecutor::ParallelExecutor( ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &params,
const std::unordered_set<std::string> &bcast_vars, const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name, const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, const std::vector<Scope *> &local_scopes, Scope *scope, const std::vector<Scope *> &local_scopes,
...@@ -209,7 +208,7 @@ ParallelExecutor::ParallelExecutor( ...@@ -209,7 +208,7 @@ ParallelExecutor::ParallelExecutor(
"the number of places must be greater than 1."); "the number of places must be greater than 1.");
} }
// Step 1. Bcast the params to devs. // Step 1. Bcast the bcast_vars to devs.
// Create local scopes // Create local scopes
if (local_scopes.empty()) { if (local_scopes.empty()) {
member_->own_local_scope_ = true; member_->own_local_scope_ = true;
...@@ -249,12 +248,12 @@ ParallelExecutor::ParallelExecutor( ...@@ -249,12 +248,12 @@ ParallelExecutor::ParallelExecutor(
// ncclOp // ncclOp
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
std::unique_ptr<ir::Graph> graph = build_strategy.Apply( std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, params, main_program, member_->places_, loss_var_name, member_->local_scopes_,
member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); member_->use_cuda_, member_->nccl_ctxs_.get());
#else #else
std::unique_ptr<ir::Graph> graph = std::unique_ptr<ir::Graph> graph =
build_strategy.Apply(main_program, member_->places_, loss_var_name, build_strategy.Apply(main_program, member_->places_, loss_var_name,
params, member_->local_scopes_, member_->use_cuda_); member_->local_scopes_, member_->use_cuda_);
#endif #endif
auto max_memory_size = GetEagerDeletionThreshold(); auto max_memory_size = GetEagerDeletionThreshold();
if (max_memory_size >= 0) { if (max_memory_size >= 0) {
......
...@@ -41,7 +41,6 @@ class ParallelExecutor { ...@@ -41,7 +41,6 @@ class ParallelExecutor {
public: public:
explicit ParallelExecutor(const std::vector<platform::Place> &places, explicit ParallelExecutor(const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &params,
const std::unordered_set<std::string> &bcast_vars, const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const ProgramDesc &main_program,
const std::string &loss_var_name, Scope *scope, const std::string &loss_var_name, Scope *scope,
......
...@@ -977,7 +977,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -977,7 +977,6 @@ All parameter, weight, gradient are variables in Paddle.
cannot be updated after being finalized.)DOC"); cannot be updated after being finalized.)DOC");
pe.def(py::init<const std::vector<platform::Place> &, pe.def(py::init<const std::vector<platform::Place> &,
const std::unordered_set<std::string> &,
const std::unordered_set<std::string> &, const ProgramDesc &, const std::unordered_set<std::string> &, const ProgramDesc &,
const std::string &, Scope *, std::vector<Scope *> &, const std::string &, Scope *, std::vector<Scope *> &,
const ExecutionStrategy &, const BuildStrategy &, size_t, const ExecutionStrategy &, const BuildStrategy &, size_t,
......
...@@ -92,35 +92,27 @@ class ParallelExecutor(object): ...@@ -92,35 +92,27 @@ class ParallelExecutor(object):
num_trainers=1, num_trainers=1,
trainer_id=0, trainer_id=0,
scope=None): scope=None):
# step1: get places, the places are used in run too.
self._places = [] self._places = []
self._act_places = []
if use_cuda: if use_cuda:
gpus = []
gpus_env = os.getenv("FLAGS_selected_gpus") gpus_env = os.getenv("FLAGS_selected_gpus")
if gpus_env: if gpus_env:
gpus = [int(s) for s in gpus_env.split(",")] gpus = [int(s) for s in gpus_env.split(",")]
else: else:
for i in six.moves.range(core.get_cuda_device_count()): gpus = [
gpus.append(i) i for i in six.moves.range(core.get_cuda_device_count())
for i in gpus: ]
p = core.Place() self._places = [core.CUDAPlace(i) for i in gpus]
self._act_places.append(core.CUDAPlace(i))
p.set_place(self._act_places[-1])
self._places.append(p)
else: else:
cpu_num = int( cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count())) os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
for i in six.moves.range(cpu_num): self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
p = core.Place()
self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1])
self._places.append(p)
assert self._places, "no place for execution" assert self._places, "no place for execution"
# step2: init exec_strategy
if exec_strategy is None: if exec_strategy is None:
exec_strategy = ExecutionStrategy() exec_strategy = ExecutionStrategy()
exec_strategy.use_cuda = use_cuda exec_strategy.use_cuda = use_cuda
if exec_strategy.num_threads == 0: if exec_strategy.num_threads == 0:
if use_cuda: if use_cuda:
# Experiments on se-resnext shows that too many threads hurt # Experiments on se-resnext shows that too many threads hurt
...@@ -131,49 +123,54 @@ class ParallelExecutor(object): ...@@ -131,49 +123,54 @@ class ParallelExecutor(object):
os.environ.get('CPU_NUM', multiprocessing.cpu_count())) os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = cpu_num * 2 exec_strategy.num_threads = cpu_num * 2
# step3: init build_strategy
if build_strategy is None: if build_strategy is None:
build_strategy = BuildStrategy() build_strategy = BuildStrategy()
build_strategy.num_trainers = num_trainers build_strategy.num_trainers = num_trainers
build_strategy.trainer_id = trainer_id build_strategy.trainer_id = trainer_id
main = main_program # step4: get main_program, scope, local_scopes
main = main if main else framework.default_main_program() main = main_program if main_program \
else framework.default_main_program()
scope = scope if scope is not None else executor.global_scope()
if share_vars_from and not isinstance(share_vars_from,
ParallelExecutor):
raise TypeError("share_vars_from must be ParallelExecutor.")
local_scopes = share_vars_from.executor.local_scopes()\
if share_vars_from else []
# step5: check trainers_endpoints, it is used for distribution.
trainers_endpoints = main._trainers_endpoints trainers_endpoints = main._trainers_endpoints
if num_trainers > 1 and trainers_endpoints: if num_trainers > 1 and trainers_endpoints:
assert num_trainers == len( assert num_trainers == len(
trainers_endpoints), "num_trainers == len(end_points)" trainers_endpoints), "num_trainers == len(end_points)"
build_strategy.trainers_endpoints = trainers_endpoints build_strategy.trainers_endpoints = trainers_endpoints
if scope == None: # step5: get persistable_vars, parameter_vars, places. persistable_vars
scope = executor.global_scope() # need be broadcast to other local_scope.
persistable_vars = set([
if share_vars_from and not isinstance(share_vars_from, cpt.to_text(v.name) for v in [
ParallelExecutor):
raise TypeError("share_vars_from must be ParallelExecutor.")
local_scopes = share_vars_from.executor.local_scopes(
) if share_vars_from else []
self.persistable_vars = [
v.name for v in [
var for var in main.list_vars() var for var in main.list_vars()
if var.persistable and var.type != core.VarDesc.VarType.RAW if var.persistable and var.type != core.VarDesc.VarType.RAW
] ]
] ])
def place_obj(place):
p = core.Place()
p.set_place(place)
return p
places = list(map(place_obj, self._places))
# step6: init ParallelExecutor
self.executor = core.ParallelExecutor( self.executor = core.ParallelExecutor(
self._places, places, persistable_vars, main.desc,
set([
cpt.to_text(p.name)
for p in main.global_block().iter_parameters()
if not p.stop_gradient
]),
set(cpt.to_text(var) for var in self.persistable_vars), main.desc,
cpt.to_text(loss_name) cpt.to_text(loss_name)
if loss_name else six.u(''), scope, local_scopes, exec_strategy, if loss_name else six.u(''), scope, local_scopes, exec_strategy,
build_strategy, num_trainers, trainer_id) build_strategy, num_trainers, trainer_id)
self.scope = scope self.scope = scope
def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True): def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
...@@ -261,7 +258,7 @@ class ParallelExecutor(object): ...@@ -261,7 +258,7 @@ class ParallelExecutor(object):
self.executor.feed_and_split_tensor_into_local_scopes( self.executor.feed_and_split_tensor_into_local_scopes(
feed_tensor_dict) feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple): elif isinstance(feed, list) or isinstance(feed, tuple):
if len(feed) != len(self._act_places): if len(feed) != len(self._places):
raise ValueError( raise ValueError(
"Feed a list of tensor, the list should be the same size as places" "Feed a list of tensor, the list should be the same size as places"
) )
...@@ -277,7 +274,7 @@ class ParallelExecutor(object): ...@@ -277,7 +274,7 @@ class ParallelExecutor(object):
tensor = each[feed_name] tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor): if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor() tmp = core.LoDTensor()
tmp.set(tensor, self._act_places[i]) tmp.set(tensor, self._places[i])
tensor = tmp tensor = tmp
res_dict[feed_name] = tensor res_dict[feed_name] = tensor
res.append(res_dict) res.append(res_dict)
...@@ -294,4 +291,4 @@ class ParallelExecutor(object): ...@@ -294,4 +291,4 @@ class ParallelExecutor(object):
@property @property
def device_count(self): def device_count(self):
return len(self._act_places) return len(self._places)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册