未验证 提交 752670e2 编写于 作者: R Ruibiao Chen 提交者: GitHub

Run plan in standalone executor (#54394)

* Run plan in standalone executor

* Update codes

* Update atol and rtol for py3-CI

* Add scope to cache key

* Fix CI errors

* Fix code style

* Update codes

* Remove fetch_name in standalone executor

* Fix UT

* Update codes

* Fix new IR bug
上级 4c6f77d8
......@@ -2,8 +2,8 @@ add_subdirectory(garbage_collector)
add_subdirectory(interpreter)
add_subdirectory(workqueue)
set(STANDALONE_EXECUTOR_SRCS interpretercore.cc new_executor_defs.cc
standalone_executor.cc)
set(STANDALONE_EXECUTOR_SRCS feed_fetch_utils.cc interpretercore.cc
new_executor_defs.cc standalone_executor.cc)
set(STANDALONE_EXECUTOR_DEPS
interpreter
......
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/feed_fetch_utils.h"
#include <map>
#include <vector>
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/program_desc.h"
namespace paddle {
namespace framework {
void SetColAttrForFetchOps(const interpreter::Job& job,
std::shared_ptr<ProgramDesc> program_desc) {
const std::set<std::string>& valid_feed_fetch_op_types = {"fetch",
"fetch_v2"};
const std::vector<int> all_op_ids = job.AllFetchOpIds();
for (int op_id : all_op_ids) {
int col_attr = job.ColAttrForFetchOp(op_id);
OpDesc* op_desc = program_desc->MutableBlock(0)->Op(op_id);
PADDLE_ENFORCE(valid_feed_fetch_op_types.find(op_desc->Type()) !=
valid_feed_fetch_op_types.end(),
phi::errors::InvalidArgument(
"Op (%s) corressponding to feed_fetch_op_id (%d) is not "
"in valid_feed_fetch_op_types.",
op_desc->Type(),
op_id));
op_desc->SetAttr("col", col_attr);
}
}
} // namespace framework
} // namespace paddle
// Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <map>
#include <vector>
#include "paddle/fluid/framework/new_executor/interpreter/plan.h"
#include "paddle/fluid/framework/program_desc.h"
namespace paddle {
namespace framework {
void SetColAttrForFetchOps(const interpreter::Job& job,
std::shared_ptr<ProgramDesc> program_desc);
} // namespace framework
} // namespace paddle
......@@ -13,29 +13,57 @@
// limitations under the License.
#pragma once
#include <glog/logging.h>
#include "paddle/phi/core/enforce.h"
#include "paddle/phi/core/errors.h"
#include "paddle/phi/core/macros.h"
namespace paddle {
namespace framework {
namespace interpreter {
class Job final {
public:
explicit Job(const std::string& type) : type_(type), micro_batch_id_(-1) {}
explicit Job(const std::string& type) : type_(type), micro_batch_id_(0) {}
~Job() = default;
const std::string& GetJobType() const { return type_; }
int64_t GetMicroBatchId() const { return micro_batch_id_; }
const std::string& Type() const { return type_; }
int ColAttrForFetchOp(int fetch_op_id) const {
return fetch_op_id_to_col_attr_.at(fetch_op_id);
}
int64_t MicroBatchId() const { return micro_batch_id_; }
std::vector<int> AllFetchOpIds() const {
std::vector<int> fetch_op_ids;
fetch_op_ids.reserve(fetch_op_id_to_col_attr_.size());
for (auto& item : fetch_op_id_to_col_attr_) {
fetch_op_ids.push_back(item.first);
}
return fetch_op_ids;
}
void SetColAttrForFetchOp(int fetch_op_id, int col_attr) {
fetch_op_id_to_col_attr_[fetch_op_id] = col_attr;
}
void SetMicroBatchId(int64_t micro_batch_id) {
PADDLE_ENFORCE_GE(
micro_batch_id,
0,
phi::errors::InvalidArgument(
"The micro_batch_id should be greater or equal to 0."));
micro_batch_id_ = micro_batch_id;
}
private:
DISABLE_COPY_AND_ASSIGN(Job);
std::string type_;
const std::string type_;
int64_t micro_batch_id_;
std::unordered_map<int, int> fetch_op_id_to_col_attr_;
};
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -13,20 +13,42 @@
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpreter/plan.h"
#include "paddle/fluid/framework/new_executor/interpreter/job.h"
#include "paddle/fluid/framework/program_desc.h"
namespace paddle {
namespace framework {
namespace interpreter {
Plan::Plan(const std::vector<std::shared_ptr<Job>>& job_list,
const std::unordered_map<std::string, ProgramDesc*>& type_to_program)
: job_list_(job_list),
type_to_program_(type_to_program),
micro_batch_num_(1) {
for (size_t i = 0; i < job_list_.size(); ++i) {
const auto& job = job_list_[i];
PADDLE_ENFORCE(type_to_program_.find(job->Type()) != type_to_program_.end(),
phi::errors::InvalidArgument(
"The %d-th job (type:%s, micro_batch_id:%d) has no "
"corresponding Program.",
i,
job->Type(),
job->MicroBatchId()));
const std::vector<std::shared_ptr<Job>>& Plan::GetJobList() const {
micro_batch_num_ = std::max(micro_batch_num_, job->MicroBatchId() + 1);
}
}
const std::vector<std::shared_ptr<Job>>& Plan::JobList() const {
return job_list_;
}
const std::unordered_map<std::string, ProgramDesc*>& Plan::GetTypeToProgram()
const {
return type_to_program_;
const ProgramDesc* Plan::Program(const std::string& job_type) const {
return type_to_program_.at(job_type);
}
int64_t Plan::MicroBatchNum() const { return micro_batch_num_; }
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -17,30 +17,34 @@
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/new_executor/interpreter/job.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/phi/core/macros.h"
namespace paddle {
namespace framework {
class ProgramDesc;
class Job;
namespace interpreter {
class Plan final {
public:
Plan(const std::vector<std::shared_ptr<Job>>& job_list,
const std::unordered_map<std::string, ProgramDesc*>& type_to_program)
: job_list_(job_list), type_to_program_(type_to_program) {}
const std::unordered_map<std::string, ProgramDesc*>& type_to_program);
~Plan() = default;
const std::vector<std::shared_ptr<Job>>& GetJobList() const;
const std::unordered_map<std::string, ProgramDesc*>& GetTypeToProgram() const;
const std::vector<std::shared_ptr<Job>>& JobList() const;
private:
DISABLE_COPY_AND_ASSIGN(Plan);
const ProgramDesc* Program(const std::string& job_type) const;
int64_t MicroBatchNum() const;
std::vector<std::shared_ptr<Job>> job_list_;
std::unordered_map<std::string, ProgramDesc*> type_to_program_;
private:
const std::vector<std::shared_ptr<Job>> job_list_;
const std::unordered_map<std::string, ProgramDesc*> type_to_program_;
int64_t micro_batch_num_;
};
} // namespace interpreter
} // namespace framework
} // namespace paddle
......@@ -151,14 +151,14 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
InterpreterCore::InterpreterCore(const platform::Place& place,
const BlockDesc& block,
framework::Scope* scope,
::ir::Program* ir_prog,
std::unique_ptr<::ir::Program> ir_prog,
const ExecutionConfig& execution_config)
: place_(place),
block_(block),
stream_analyzer_(place),
execution_config_(execution_config),
var_scope_(scope),
ir_program_(ir_prog) {
ir_program_(std::move(ir_prog)) {
VLOG(4) << "InterpreterCore(): " << this << " on " << place_;
static_build_ = FLAGS_new_executor_static_build &&
......
......@@ -57,7 +57,7 @@ class InterpreterCore {
InterpreterCore(const platform::Place& place,
const BlockDesc& block,
Scope* scope,
::ir::Program* ir_prog,
std::unique_ptr<::ir::Program> ir_prog,
const ExecutionConfig& execution_config = ExecutionConfig());
~InterpreterCore();
......@@ -201,7 +201,7 @@ class InterpreterCore {
std::vector<HookFunc> hookfuncs_;
// The next only for new IR
::ir::Program* ir_program_{nullptr};
std::unique_ptr<::ir::Program> ir_program_{nullptr};
std::unordered_map<::ir::Value, std::string> value_2_var_name_map_;
};
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/framework/new_executor/standalone_executor.h"
#include "paddle/fluid/framework/new_executor/feed_fetch_utils.h"
#include "paddle/fluid/framework/new_executor/interpreter/interpreter_util.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
......@@ -25,112 +26,98 @@ PHI_DECLARE_bool(enable_new_ir_in_executor);
namespace paddle {
namespace framework {
StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
const std::vector<ProgramDesc>& programs)
: place_(place), programs_(programs) {
if (FLAGS_enable_new_ir_in_executor) {
for (size_t i = 0; i < programs_.size(); ++i) {
VLOG(6) << "begin to translate" << std::endl;
auto base_progrm = paddle::TranslateLegacyProgramToProgram(programs_[i]);
const interpreter::Plan& plan,
Scope* scope)
: place_(place), plan_(plan), scope_(scope) {
int64_t micro_batch_num = plan_.MicroBatchNum();
for (int64_t i = 0; i < micro_batch_num; ++i) {
micro_batch_scopes_.emplace_back(&scope->NewScope());
}
std::stringstream ss;
ss << "Create " << micro_batch_num << " micro_batch_scopes for scope "
<< scope_ << " : ";
for (Scope* scope : micro_batch_scopes_) {
ss << scope << ", ";
}
VLOG(6) << ss.str();
const auto& jobs = plan_.JobList();
for (const auto& job : jobs) {
const std::string& job_type = job->Type();
std::shared_ptr<ProgramDesc> program =
std::make_shared<ProgramDesc>(*(plan_.Program(job_type)));
SetColAttrForFetchOps(*job, program);
int64_t micro_batch_id = job->MicroBatchId();
PADDLE_ENFORCE(
micro_batch_id >= 0 && micro_batch_id < micro_batch_num,
phi::errors::Unavailable("The micro batch id (%lld) out of bound, "
"which should be in the range of [0, %lld].",
micro_batch_id,
micro_batch_num));
interpreter::ExecutionConfig execution_config;
execution_config.create_local_scope = false;
// TODO(Ruibiao): hack skip gc all vars for multiple jobs, improve it later
if (jobs.size() > 1) {
for (VarDesc* var : program->Block(0).AllVars()) {
execution_config.skip_gc_vars.insert(var->Name());
}
}
if (FLAGS_enable_new_ir_in_executor) {
VLOG(6) << "begin to translate" << std::endl;
auto base_progrm = paddle::TranslateLegacyProgramToProgram(*program);
auto kernel_program =
paddle::dialect::PdOpLowerToKernelPass(base_progrm.get());
ir_programs_.emplace_back(std::move(kernel_program));
interpretercores_.emplace_back(
std::make_unique<InterpreterCore>(place_,
program->Block(0),
scope_,
std::move(kernel_program),
execution_config));
} else {
interpretercores_.emplace_back(
std::make_unique<InterpreterCore>(place_,
program->Block(0),
micro_batch_scopes_[micro_batch_id],
execution_config));
interpretercores_.back()->SetCopyProgram(program);
}
}
}
paddle::framework::FetchList StandaloneExecutor::Run(
Scope* scope,
const std::vector<std::string>& feed_names,
const std::vector<std::string>& fetch_names) {
const std::vector<std::string>& feed_names) {
platform::RecordEvent record_event(
"StandaloneExecutor::run", platform::TracerEventType::UserDefined, 1);
// TODO(Ruibiao): Unified single and multiple program run
if (programs_.size() == 1) { // run single program
VLOG(6) << "Run single program";
auto core = GetInterpreterCore(scope,
programs_.at(0),
feed_names,
fetch_names,
0,
interpreter::ExecutionConfig());
VLOG(4) << "StandaloneExecutor: " << this << ", InterpreterCore: " << core;
return core->Run(feed_names);
} else { // run multiple programs
VLOG(6) << "Run multiple program, programs_.size() " << programs_.size();
FetchList merged_fetch_list;
for (size_t program_idx = 0; program_idx < programs_.size();
++program_idx) {
const ProgramDesc& program = programs_[program_idx];
interpreter::ExecutionConfig execution_config;
execution_config.create_local_scope = false;
// TODO(Ruibiao): hack skip gc for all vars, improve it later
std::set<std::string> skip_gc_vars;
for (VarDesc* var : program.Block(0).AllVars()) {
execution_config.skip_gc_vars.insert(var->Name());
}
// TODO(Ruibiao): ONLY support feeds data in the first program for now
const std::vector<std::string>& real_feed_names =
(program_idx == 0 ? feed_names : std::vector<std::string>());
auto core = GetInterpreterCore(scope,
program,
real_feed_names,
fetch_names,
program_idx,
execution_config);
const FetchList& fetch_list = core->Run(real_feed_names);
std::move(fetch_list.begin(),
fetch_list.end(),
std::back_inserter(merged_fetch_list));
}
return merged_fetch_list;
if (plan_.MicroBatchNum() > 1) {
PADDLE_ENFORCE_EQ(feed_names.size(),
0,
phi::errors::Unimplemented(
"Unsupported feed data for multiple micro_batch, "
"please use non-iterative DataLoader for now."));
}
}
std::shared_ptr<InterpreterCore> StandaloneExecutor::GetInterpreterCore(
Scope* scope,
const ProgramDesc& program,
const std::vector<std::string>& feed_names,
const std::vector<std::string>& fetch_names,
size_t program_idx,
interpreter::ExecutionConfig execution_config) {
std::ostringstream oss;
oss << "prog_idx:" << program_idx << ",";
oss << "feed:";
for (auto& feedname : feed_names) {
oss << feedname << ",";
}
oss << "fetch:";
for (auto& fetchname : fetch_names) {
oss << fetchname << ",";
}
oss << "scope:" << scope;
const auto& jobs = plan_.JobList();
for (size_t job_idx = 0; job_idx < jobs.size(); ++job_idx) {
const auto& job = jobs[job_idx];
const std::string& job_type = job->Type();
auto iter = interpretercores_.find(oss.str());
VLOG(6) << "Run job (" << job_idx << "), type = " << job_type
<< ", micro_batch_id =" << job->MicroBatchId();
if (iter == interpretercores_.end()) {
VLOG(3) << "create interpreter_core for " << oss.str() << " on place "
<< place_;
std::shared_ptr<InterpreterCore> core = nullptr;
if (FLAGS_enable_new_ir_in_executor) {
core = std::make_shared<InterpreterCore>(place_,
program.Block(0),
scope,
ir_programs_[program_idx].get(),
execution_config);
} else {
core = std::make_shared<InterpreterCore>(
place_, program.Block(0), scope, execution_config);
}
interpretercores_.emplace(oss.str(), core);
return core;
interpretercores_[job_idx]->Run(feed_names, /*need_fetch = */ false);
}
// return Fetch Tensors
auto* fetch_var = scope_->FindVar(interpreter::kFetchVarName);
if (fetch_var) {
return std::move(*fetch_var->GetMutable<framework::FetchList>());
} else {
return iter->second;
return {};
}
}
......
......@@ -19,6 +19,7 @@
#include <vector>
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/new_executor/interpreter/plan.h"
#include "paddle/fluid/framework/new_executor/interpretercore.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/program_desc.h"
......@@ -33,33 +34,21 @@ class InterpreterCore;
class StandaloneExecutor {
public:
StandaloneExecutor(const platform::Place& place,
const std::vector<ProgramDesc>& programs);
const interpreter::Plan& plan_,
Scope* scope);
~StandaloneExecutor() {}
// NOTE(zhiqiu): feed_names are only used for caching interpretercore.
// fetch_names are used for caching interpretercore and inserting fetch ops,
// the latter can be moved to python side.
paddle::framework::FetchList Run(Scope* scope,
const std::vector<std::string>& feed_names,
const std::vector<std::string>& fetch_names);
paddle::framework::FetchList Run(const std::vector<std::string>& feed_names);
private:
std::shared_ptr<InterpreterCore> GetInterpreterCore(
Scope* scope,
const ProgramDesc& prog,
const std::vector<std::string>& feed_names,
const std::vector<std::string>& fetch_names,
size_t program_idx,
interpreter::ExecutionConfig execution_config);
const platform::Place place_;
const std::vector<ProgramDesc> programs_;
std::vector<std::unique_ptr<::ir::Program>> ir_programs_;
std::vector<framework::Scope*> microbatch_scopes_;
const interpreter::Plan plan_;
std::vector<framework::Scope*> micro_batch_scopes_;
std::vector<std::unique_ptr<InterpreterCore>> interpretercores_;
std::unordered_map<std::string, std::shared_ptr<InterpreterCore>>
interpretercores_;
Scope* scope_;
};
} // namespace framework
......
......@@ -49,7 +49,7 @@ void BuildScope(ir::Block* block,
auto attr_map = (*it)->attributes();
std::string op_name = (*it)->name();
if (attr_map.count("op_name")) {
auto op_name = attr_map.at("op_name").dyn_cast<ir::StrAttribute>().data();
op_name = attr_map.at("op_name").dyn_cast<ir::StrAttribute>().data();
}
if (op_name == "pd.fetch") {
// fetch is a very special op, with no output
......
......@@ -1157,6 +1157,8 @@ All parameter, weight, gradient are variables in Paddle.
_Scope
.def("_remove_from_pool",
[](Scope &self) { ScopePool::Instance().Remove(&self); })
.def("raw_address",
[](Scope &self) { return reinterpret_cast<uint64_t>(&self); })
.def(
"var",
[](Scope &self, const std::string &name) -> Variable * {
......@@ -1852,35 +1854,39 @@ All parameter, weight, gradient are variables in Paddle.
});
py::class_<framework::StandaloneExecutor>(m, "StandaloneExecutor")
.def(
py::init<const platform::Place &, const std::vector<ProgramDesc> &>())
.def(py::init<const platform::Place &,
const interpreter::Plan &,
Scope *>())
.def("run",
[](StandaloneExecutor &self,
Scope *scope,
std::vector<std::string> feed_names,
std::vector<std::string> fetch_names) {
[](StandaloneExecutor &self, std::vector<std::string> feed_names) {
paddle::framework::FetchList ret;
{
pybind11::gil_scoped_release release;
ret = self.Run(scope, feed_names, fetch_names);
ret = self.Run(feed_names);
}
return py::cast(std::move(ret));
});
py::class_<framework::Job, std::shared_ptr<framework::Job>>(m, "job")
py::class_<framework::interpreter::Job,
std::shared_ptr<framework::interpreter::Job>>(m, "Job")
.def(py::init<const std::string &>(), py::arg("type"))
.def("type", &framework::Job::GetJobType)
.def("micro_batch_id", &framework::Job::GetMicroBatchId)
.def("set_micro_batch_id", &framework::Job::SetMicroBatchId);
py::class_<framework::Plan>(m, "plan")
.def(py::init<const std::vector<std::shared_ptr<framework::Job>> &,
const std::unordered_map<std::string,
framework::ProgramDesc *> &>(),
py::arg("job_list"),
py::arg("type_to_program"))
.def("job_list", &framework::Plan::GetJobList)
.def("type_to_program", &framework::Plan::GetTypeToProgram);
.def("micro_batch_id", &framework::interpreter::Job::MicroBatchId)
.def("type", &framework::interpreter::Job::Type)
.def("set_col_attr_for_fetch_op",
&framework::interpreter::Job::SetColAttrForFetchOp)
.def("set_micro_batch_id", &framework::interpreter::Job::SetMicroBatchId);
py::class_<framework::interpreter::Plan>(m, "Plan")
.def(
py::init<
const std::vector<std::shared_ptr<framework::interpreter::Job>> &,
const std::unordered_map<std::string, framework::ProgramDesc *>
&>(),
py::arg("job_list"),
py::arg("type_to_program"))
.def("job_list", &framework::interpreter::Plan::JobList)
.def("micro_batch_num", &framework::interpreter::Plan::MicroBatchNum)
.def("program", &framework::interpreter::Plan::Program);
m.def("init_gflags", framework::InitGflags);
m.def("init_glog", framework::InitGLOG);
......
......@@ -270,19 +270,19 @@ class PipelineFThenBPass(PassBase):
def _create_job_list(self):
job_list = []
lr_job = core.job("lr")
lr_job = core.Job("lr")
job_list.append(lr_job)
for i in range(self._micro_batch_size):
forward_job = core.job("forward")
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(i)
job_list.append(forward_job)
for i in range(self._micro_batch_size):
backward_job = core.job("backward")
backward_job = core.Job("backward")
backward_job.set_micro_batch_id(i)
job_list.append(backward_job)
opt_job = core.job("optimizer")
opt_job = core.Job("optimizer")
job_list.append(opt_job)
return job_list
......@@ -294,5 +294,5 @@ class PipelineFThenBPass(PassBase):
type_to_program = _program_for_fthenb_and_1f1b(self._program)
job_list = self._create_job_list()
plan = core.plan(job_list, type_to_program)
plan = core.Plan(job_list, type_to_program)
context.set_attr("plan", plan)
......@@ -511,9 +511,11 @@ def _prepare_fleet_executor():
return fleet_exe
def _get_strong_program_cache_key_for_new_exe(program, feed, fetch_list):
return program.desc.cached_hash_str() + _get_program_cache_key(
feed, fetch_list
def _get_strong_program_cache_key_for_new_exe(program, scope, feed, fetch_list):
return (
program.desc.cached_hash_str()
+ str(scope.raw_address())
+ _get_program_cache_key(feed, fetch_list)
)
......@@ -632,14 +634,14 @@ handler = FetchHandlerExample(var_dict=var_dict)
class _StandaloneExecutor:
def __init__(self, place, programs, scope):
def __init__(self, place, plan, scope):
self._place = core.Place()
self._place.set_place(place)
self._programs = programs
self._plan = plan
self._scope = scope
self._new_exe = self._create_new_executor()
def run(self, scope, feed_names, fetch_list, return_numpy=True):
def run(self, feed_names, return_numpy=True):
"""
Args:
feed_names(list): This parameter represents the input names of the model.
......@@ -649,41 +651,17 @@ class _StandaloneExecutor:
(the Tensor specified in the fetch list) to numpy.ndarray. if it is False,
the type of the return value is a list of :code:`LoDTensor`. The default is True.
"""
fetch_list = self._check_fetch(fetch_list)
tensors = self._new_exe.run(
scope, feed_names, fetch_list
)._move_to_list()
tensors = self._new_exe.run(feed_names)._move_to_list()
if return_numpy:
return as_numpy(tensors, copy=True)
else:
return tensors
def _create_new_executor(self):
new_exe = core.StandaloneExecutor(
self._place, [program.desc for program in self._programs]
)
new_exe = core.StandaloneExecutor(self._place, self._plan, self._scope)
return new_exe
def _check_fetch(self, fetch_list):
if fetch_list is None:
fetch_list = []
res = []
for fetch_var in fetch_list:
if isinstance(fetch_var, Variable):
fetch_var = fetch_var.name
elif not isinstance(fetch_var, str):
raise TypeError(
"Required fetch_var shall be str|Variable, but received {}".format(
type(fetch_var).__name__
)
)
res.append(fetch_var)
return res
class _ExecutorCache:
class _CachedData:
......@@ -716,13 +694,16 @@ class _ExecutorCache:
).to_program()
self.key = hash(
_get_strong_program_cache_key_for_new_exe(
self.program._program, feed, fetch_list
self.program._program,
self.scope,
self.feed,
self.fetch_list,
)
)
else:
self.key = hash(
_get_strong_program_cache_key_for_new_exe(
self.program, feed, fetch_list
self.program, self.scope, self.feed, self.fetch_list
)
)
......@@ -850,9 +831,12 @@ class _ExecutorCache:
_apply_inplace_addto_pass(
program, enable_inplace, enable_addto, skip_var_names
)
new_program = program.clone()
new_exe = _StandaloneExecutor(place, [new_program], scope)
new_exe = _StandaloneExecutor(
place,
core.Plan([core.Job("default")], {"default": new_program.desc}),
scope,
)
return new_program, new_exe
......@@ -1614,9 +1598,7 @@ class Executor:
else:
tensor._copy_from(cpu_tensor, self.place)
ret = new_exe.run(
scope, list(feed.keys()), fetch_list, return_numpy
)
ret = new_exe.run(list(feed.keys()), return_numpy)
set_flags(stored_flag)
return ret
......
......@@ -70,7 +70,7 @@ TEST(StandaloneExecutor, run) {
ProgramDesc prog_desc;
InterpreterCore test_core(
place, prog_desc.Block(0), &scope, kernel_program.get());
place, prog_desc.Block(0), &scope, std::move(kernel_program));
test_core.Run({});
......
......@@ -18,8 +18,10 @@
#include <chrono>
#include <iostream>
#include <memory>
#include <string>
#include "paddle/fluid/framework/new_executor/interpreter/plan.h"
#include "paddle/phi/core/kernel_registry.h"
USE_OP_ITSELF(fill_constant);
......@@ -106,6 +108,9 @@ PD_DECLARE_KERNEL(add_n, GPU, ALL_LAYOUT);
namespace paddle {
namespace framework {
using Job = interpreter::Job;
using Plan = interpreter::Plan;
ProgramDesc load_from_file(const std::string& file_name) {
std::ifstream fin(file_name, std::ios::in | std::ios::binary);
fin.seekg(0, std::ios::end);
......@@ -146,11 +151,22 @@ TEST(StandaloneExecutor, run) {
ProgramDesc main_prog = GetLmMainProgram();
Scope scope;
StandaloneExecutor startup_exec(place,
std::vector<ProgramDesc>{startup_prog});
startup_exec.Run(&scope, {}, {});
StandaloneExecutor exec(place, std::vector<ProgramDesc>{main_prog});
exec.Run(&scope, {}, {});
std::shared_ptr<Job> startup_job = std::make_shared<Job>(Job("startup"));
StandaloneExecutor startup_exec(
place,
Plan(std::vector<std::shared_ptr<Job>>({startup_job}),
std::unordered_map<std::string, ProgramDesc*>(
{{startup_job->Type(), &startup_prog}})),
&scope);
startup_exec.Run({});
std::shared_ptr<Job> main_job = std::make_shared<Job>(Job("main"));
StandaloneExecutor exec(place,
Plan(std::vector<std::shared_ptr<Job>>({main_job}),
std::unordered_map<std::string, ProgramDesc*>(
{{main_job->Type(), &main_prog}})),
&scope);
exec.Run({});
auto start = std::chrono::steady_clock::now();
for (size_t i = 0; i < 10; ++i) {
......@@ -158,7 +174,7 @@ TEST(StandaloneExecutor, run) {
std::cout << i << std::endl;
}
exec.Run(&scope, {}, {});
exec.Run({});
}
auto end = std::chrono::steady_clock::now();
......
......@@ -40,9 +40,7 @@ class TestNewIr(unittest.TestCase):
self.assertEqual(
np.array_equal(
np.array(
paddle.static.global_scope()
.find_var("inner_var_2")
.get_tensor()
paddle.static.global_scope().find_var(z.name).get_tensor()
),
gold_res,
),
......
......@@ -20,18 +20,14 @@ import numpy as np
import paddle
from paddle.distributed.passes.pass_utils import split_program
from paddle.fluid import core
from paddle.fluid.executor import (
_add_feed_fetch_ops,
_as_lodtensor,
_StandaloneExecutor,
check_feed_shape_type,
)
from paddle.fluid.core import Job, Plan
from paddle.fluid.executor import _add_feed_fetch_ops, _StandaloneExecutor
from paddle.nn import TransformerEncoderLayer
paddle.enable_static()
class TestMulitProgramRun(unittest.TestCase):
class TestEncorderMulitMicroBatchRun(unittest.TestCase):
def setUp(self):
self.place_desc = (
paddle.CUDAPlace(0)
......@@ -41,26 +37,72 @@ class TestMulitProgramRun(unittest.TestCase):
self.place = core.Place()
self.place.set_place(self.place_desc)
def build_program(self):
batch_size = 2
src_len = 4
d_model = 128
n_head = 2
self.batch_size = 2
self.src_len = 4
self.d_model = 128
self.n_head = 2
self.run_step = 3
self.enc_input_data, self.attn_mask_data = self.get_random_data(
self.batch_size,
self.src_len,
self.d_model,
self.n_head,
self.run_step,
)
def get_random_data(self, batch_size, src_len, d_model, n_head, run_step):
np.random.seed(2022)
enc_input_data = np.random.rand(
run_step, batch_size, src_len, d_model
).astype(np.float32)
attn_mask_data = np.random.rand(
run_step, batch_size, n_head, src_len, src_len
).astype(np.float32)
return enc_input_data, attn_mask_data
def batch_generator_creator(self, micro_batch_size):
def __reader__():
for i in range(self.run_step):
for offset in range(0, self.batch_size, micro_batch_size):
enc_input = self.enc_input_data[i][
offset : offset + micro_batch_size
]
attn_mask = self.attn_mask_data[i][
offset : offset + micro_batch_size
]
yield enc_input, attn_mask
return __reader__
def build_program(self, micro_batch_size, src_len, d_model, n_head):
startup_program = paddle.static.Program()
main_program = paddle.static.Program()
with paddle.static.program_guard(main_program, startup_program):
enc_input = paddle.static.data(
name="enc_input",
shape=[batch_size, src_len, d_model],
shape=[micro_batch_size, src_len, d_model],
dtype="float32",
)
attn_mask = paddle.static.data(
name="attn_mask",
shape=[batch_size, n_head, src_len, src_len],
shape=[micro_batch_size, n_head, src_len, src_len],
dtype="float32",
)
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[enc_input, attn_mask],
use_double_buffer=False,
capacity=16,
iterable=False,
)
loader.set_batch_generator(
self.batch_generator_creator(micro_batch_size)
)
encoder_layer = TransformerEncoderLayer(
d_model, n_head, dim_feedforward=512
)
......@@ -74,110 +116,116 @@ class TestMulitProgramRun(unittest.TestCase):
enc_output = encoder_layer(enc_output, attn_mask)
np.random.seed(2022)
feed = {
enc_input.name: np.random.rand(
batch_size, src_len, d_model
).astype(np.float32),
attn_mask.name: np.random.rand(
batch_size, n_head, src_len, src_len
).astype(np.float32),
}
fetch_list = [enc_output.name]
return (
startup_program,
main_program,
split_op_indics,
feed,
loader,
fetch_list,
)
def feed_data(self, program, feed, feed_var_name, scope):
# feed var to framework
global_block = program.global_block()
for op in global_block.ops:
if op.desc.type() == 'feed':
feed_target_name = op.desc.output('Out')[0]
cur_feed = feed[feed_target_name]
var = global_block.var(feed_target_name)
if var.dtype != core.VarDesc.VarType.STRINGS:
if not isinstance(cur_feed, core.LoDTensor):
cur_feed = _as_lodtensor(
cur_feed, self.place_desc, var.dtype
)
check_feed_shape_type(var, cur_feed)
idx = op.desc.attr('col')
core.set_feed_variable(scope, cur_feed, feed_var_name, idx)
else:
break
def run_program(
self,
startup_program,
main_program,
feed,
fetch_list,
scope,
run_step,
split_op_indics=None,
):
def avoid_randomness(self, program):
for op in program.block(0).ops:
if op.type == "dropout":
op._set_attr("dropout_prob", 0)
def run_train(self, split=False, micro_batch_num=1):
paddle.seed(2022)
startup_exe = _StandaloneExecutor(self.place, [startup_program], scope)
startup_exe.run(scope, [], [])
scope = paddle.static.Scope()
with paddle.static.scope_guard(scope):
(
startup_program,
main_program,
split_op_indics,
loader,
fetch_list,
) = self.build_program(
self.batch_size // micro_batch_num,
self.src_len,
self.d_model,
self.n_head,
)
self.avoid_randomness(main_program)
startup_exe = _StandaloneExecutor(
self.place,
Plan([Job("startup")], {"startup": startup_program.desc}),
scope,
)
startup_exe.run([])
programs = [main_program]
if split_op_indics is not None:
fetch_op_num = len(fetch_list)
fetch_op_indics = []
if split:
programs, _, _ = split_program(main_program, split_op_indics)
# hack add feed ops in the first program and fetch ops in the last program
programs[0] = _add_feed_fetch_ops(
programs[0], feed, [], "feed", "fetch"
)
# hack add fetch ops in the last program
programs[-1] = _add_feed_fetch_ops(
programs[-1], [], fetch_list, "feed", "fetch"
)
op_num = len(programs[-1].block(0).ops)
fetch_op_indics = list(range(op_num - fetch_op_num, op_num))
else:
programs[0] = _add_feed_fetch_ops(
programs[0], feed, fetch_list, "feed", "fetch"
programs[0], [], fetch_list, "feed", "fetch"
)
op_num = len(programs[0].block(0).ops)
fetch_op_indics = list(range(op_num - fetch_op_num, op_num))
job_list = []
program_num = len(programs)
for micro_batch_id in range(micro_batch_num):
for program_id in range(program_num):
job = Job(f"P{program_id}")
job.set_micro_batch_id(micro_batch_id)
# Set col_attr info for fetch_op to fetch the correct data after running multiple micro batch
if program_id == program_num - 1:
fetch_op_id_to_col_attr = {}
for i in range(fetch_op_num):
job.set_col_attr_for_fetch_op(
fetch_op_indics[i],
i * micro_batch_num + micro_batch_id,
)
job_list.append(job)
type_to_program = {}
for program_id in range(program_num):
type_to_program[f"P{program_id}"] = programs[program_id].desc
self.feed_data(programs[0], feed, "feed", scope)
plan = Plan(job_list, type_to_program)
main_exe = _StandaloneExecutor(self.place, programs, scope)
main_exe = _StandaloneExecutor(self.place, plan, scope)
loader.start()
res = []
for i in range(run_step):
res += main_exe.run(scope, list(feed.keys()), fetch_list)
for i in range(self.run_step):
fetch_res = main_exe.run(feed_names=[])
res.append(
np.array(fetch_res).reshape(
self.batch_size, self.src_len, self.d_model
)
)
return res
def test_multi_program_run(self):
(
startup_program,
main_program,
split_op_indics,
feed,
fetch_list,
) = self.build_program()
run_step = 3
res = self.run_program(
startup_program,
main_program,
feed,
fetch_list,
paddle.static.Scope(),
run_step,
)
splited_res = self.run_program(
startup_program,
main_program,
feed,
fetch_list,
paddle.static.Scope(),
run_step,
split_op_indics,
)
np.testing.assert_array_equal(res, splited_res)
def test_multi_micro_batch_run(self):
last_res = None
for split in [True, False]:
for micro_batch_num in [1, 2]:
res = self.run_train(split, micro_batch_num)
if last_res:
for i in range(len(res)):
np.testing.assert_allclose(
last_res[i], res[i], atol=1e-6, rtol=1e-6
)
last_res = res
if __name__ == "__main__":
......
......@@ -21,9 +21,9 @@ from paddle.fluid import core
class TestStandaloneExecutorPlan(unittest.TestCase):
def test_standalone_executor_plan(self):
micro_batch_id = 0
forward_job = core.job("forward")
backward_job = core.job("backward")
optimizer_job = core.job("optimizer")
forward_job = core.Job("forward")
backward_job = core.Job("backward")
optimizer_job = core.Job("optimizer")
forward_job.set_micro_batch_id(micro_batch_id)
backward_job.set_micro_batch_id(micro_batch_id)
optimizer_job.set_micro_batch_id(micro_batch_id)
......@@ -39,9 +39,10 @@ class TestStandaloneExecutorPlan(unittest.TestCase):
"backward": backward_program.desc,
"optimizer": optimizer_program.desc,
}
plan = core.plan(job_list, type_to_program)
plan = core.Plan(job_list, type_to_program)
self.assertEqual(plan.job_list(), job_list)
self.assertEqual(plan.type_to_program(), type_to_program)
for type in type_to_program.keys():
self.assertEqual(plan.program(type), type_to_program[type])
if __name__ == '__main__':
......
......@@ -44,6 +44,7 @@ disable_wingpu_test="^test_model$|\
^test_dataloader_unkeep_order$|\
^test_multiprocess_dataloader_iterable_dataset_static$|\
^test_fuse_bn_act_pass$|\
^test_fuse_bn_act_pass_static_build$|\
^test_fuse_bn_add_act_pass$|\
^test_gather_op$|\
^test_activation_op$|\
......@@ -165,6 +166,7 @@ disable_win_inference_test="^trt_quant_int8_yolov3_r50_test$|\
^test_sync_batch_norm_op$|\
^test_sync_batch_norm_op_static_build$|\
^test_fuse_bn_act_pass$|\
^test_fuse_bn_act_pass_static_build$|\
^test_fuse_bn_add_act_pass$|\
^test_decoupled_py_reader_data_check$|\
^test_parallel_dygraph_sync_batch_norm$|\
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册