未验证 提交 0012c8d5 编写于 作者: Z zhaoyingli 提交者: GitHub

[AutoParallel] remove pyreader, use feed op in pipeline schedule (#56511)

* modify feed_data for dataloader in pipline parallel mode

* add pre-commit

* remove read op, use feed op

* fix validate batch_size

* tiny fix

* support catch EOFException

* fix conflict

* fix conflict

* fix executor if cond

---------
Co-authored-by: Frida_a's avatarFrida-a <2624653516@qq.com>
上级 a9c497cd
...@@ -23,24 +23,28 @@ ...@@ -23,24 +23,28 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
void SetColAttrForFetchOps(const interpreter::Job& job, void SetColAttrForFeedFetchOps(std::shared_ptr<ProgramDesc> program_desc,
std::shared_ptr<ProgramDesc> program_desc) { const int64_t micro_batch_num,
const std::set<std::string>& valid_feed_fetch_op_types = {"fetch", const int64_t micro_batch_id) {
"fetch_v2"}; const std::set<std::string>& valid_feed_fetch_op_types = {
"fetch", "fetch_v2", "feed"};
const std::vector<int> all_op_ids = job.AllFetchOpIds(); for (const auto& op_desc : program_desc->MutableBlock(0)->AllOps()) {
for (int op_id : all_op_ids) { if (valid_feed_fetch_op_types.find(op_desc->Type()) !=
int col_attr = job.ColAttrForFetchOp(op_id); valid_feed_fetch_op_types.end()) {
OpDesc* op_desc = program_desc->MutableBlock(0)->Op(op_id); int col = op_desc->GetAttrIfExists<int>("col");
PADDLE_ENFORCE(valid_feed_fetch_op_types.find(op_desc->Type()) != PADDLE_ENFORCE_GE(
valid_feed_fetch_op_types.end(), col,
phi::errors::InvalidArgument( 0,
"Op (%s) corressponding to feed_fetch_op_id (%d) is not " platform::errors::InvalidArgument(
"in valid_feed_fetch_op_types.", "Expected the column index (the attribute 'col' of "
op_desc->Type(), "operator 'Fetch') of current fetching variable to be "
op_id)); "no less than 0. But received column index = %d.",
col));
op_desc->SetAttr("col", col_attr); int new_col = static_cast<int>(col * micro_batch_num + micro_batch_id);
op_desc->SetAttr("col", new_col);
VLOG(6) << "Job (" << micro_batch_id << ") Set " << op_desc->Type()
<< "'s attr col=" << new_col;
}
} }
} }
......
...@@ -22,8 +22,9 @@ ...@@ -22,8 +22,9 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
void SetColAttrForFetchOps(const interpreter::Job& job, void SetColAttrForFeedFetchOps(std::shared_ptr<ProgramDesc> program_desc,
std::shared_ptr<ProgramDesc> program_desc); const int64_t micro_batch_num,
const int64_t micro_batch_id);
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -53,7 +53,6 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, ...@@ -53,7 +53,6 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
ir_program = plan_.IrProgram(job_type); ir_program = plan_.IrProgram(job_type);
} else { } else {
program = std::make_shared<ProgramDesc>(*(plan_.Program(job_type))); program = std::make_shared<ProgramDesc>(*(plan_.Program(job_type)));
SetColAttrForFetchOps(*job, program);
} }
int64_t micro_batch_id = job->MicroBatchId(); int64_t micro_batch_id = job->MicroBatchId();
...@@ -64,6 +63,10 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place, ...@@ -64,6 +63,10 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
micro_batch_id, micro_batch_id,
micro_batch_num)); micro_batch_num));
if (micro_batch_num > 1 && !FLAGS_enable_new_ir_api) {
SetColAttrForFeedFetchOps(program, micro_batch_num, micro_batch_id);
}
interpreter::ExecutionConfig execution_config; interpreter::ExecutionConfig execution_config;
execution_config.create_local_scope = false; execution_config.create_local_scope = false;
execution_config.skip_gc_vars = job->SkipGcVars(); execution_config.skip_gc_vars = job->SkipGcVars();
...@@ -136,14 +139,6 @@ paddle::framework::FetchList StandaloneExecutor::Run( ...@@ -136,14 +139,6 @@ paddle::framework::FetchList StandaloneExecutor::Run(
platform::RecordEvent record_event( platform::RecordEvent record_event(
"StandaloneExecutor::run", platform::TracerEventType::UserDefined, 1); "StandaloneExecutor::run", platform::TracerEventType::UserDefined, 1);
if (plan_.MicroBatchNum() > 1) {
PADDLE_ENFORCE_EQ(feed_names.size(),
0,
phi::errors::Unimplemented(
"Unsupported feed data for multiple micro_batch, "
"please use non-iterative DataLoader for now."));
}
const auto& jobs = plan_.JobList(); const auto& jobs = plan_.JobList();
std::map<std::string, size_t> type_to_first_id; std::map<std::string, size_t> type_to_first_id;
...@@ -177,8 +172,14 @@ paddle::framework::FetchList StandaloneExecutor::Run( ...@@ -177,8 +172,14 @@ paddle::framework::FetchList StandaloneExecutor::Run(
interpretercores_[job_idx]->ShareBuildResultsFrom( interpretercores_[job_idx]->ShareBuildResultsFrom(
interpretercores_[type_to_first_id[job_type]]); interpretercores_[type_to_first_id[job_type]]);
} }
// TODO(zhaoyinglia): use a more general method
if (jobs.size() > 1 && job_type != "forward") {
const std::vector<std::string> tmp_feed_names = {};
interpretercores_[job_idx]->Run(tmp_feed_names, /*need_fetch = */ false);
} else {
interpretercores_[job_idx]->Run(feed_names, /*need_fetch = */ false); interpretercores_[job_idx]->Run(feed_names, /*need_fetch = */ false);
} }
}
// return Fetch Tensors // return Fetch Tensors
if (FLAGS_enable_new_ir_in_executor) { if (FLAGS_enable_new_ir_in_executor) {
......
...@@ -944,12 +944,28 @@ class Engine: ...@@ -944,12 +944,28 @@ class Engine:
self._inputs_spec, self._labels_spec = self._prepare_data_spec( self._inputs_spec, self._labels_spec = self._prepare_data_spec(
train_data, train_sample_split, batch_size train_data, train_sample_split, batch_size
) )
micro_batch_size = self._validate_batch_size(batch_size)
if not self._has_prepared[self._mode]: if not self._has_prepared[self._mode]:
self._prepare_program(self._mode) self._prepare_program(self._mode)
else: else:
self._switch_mode(self._mode) self._switch_mode(self._mode)
if auto_utils.use_new_executor():
local_batch_size = self._validate_batch_size(batch_size)
train_dataloader = self._prepare_dataloader(
train_data,
return_list=False,
batch_size=local_batch_size,
epochs=epochs,
collate_fn=collate_fn,
)
steps_per_epoch = (
len(train_dataloader)
if steps_per_epoch is None
else steps_per_epoch
)
else:
micro_batch_size = self._validate_batch_size(batch_size)
train_dataloader = self._prepare_dataloader_from_generator( train_dataloader = self._prepare_dataloader_from_generator(
dataset=train_data, dataset=train_data,
capacity=70, capacity=70,
...@@ -959,15 +975,19 @@ class Engine: ...@@ -959,15 +975,19 @@ class Engine:
steps_per_epoch=steps_per_epoch, steps_per_epoch=steps_per_epoch,
collate_fn=collate_fn, collate_fn=collate_fn,
) )
steps_per_epoch = train_dataloader._steps
local_batch_size = micro_batch_size
if self._strategy.pipeline.enable:
local_batch_size = micro_batch_size * self._acc_steps
fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode) fetch_names, fetch_indices = self._prepare_fetch(None, mode=self._mode)
cbks = config_callbacks( cbks = config_callbacks(
callbacks, callbacks,
engine=self, engine=self,
batch_size=micro_batch_size, batch_size=local_batch_size,
epochs=epochs, epochs=epochs,
steps=train_dataloader._steps, steps=steps_per_epoch,
log_freq=log_freq, log_freq=log_freq,
save_freq=save_freq, save_freq=save_freq,
save_dir=save_dir, save_dir=save_dir,
...@@ -983,20 +1003,27 @@ class Engine: ...@@ -983,20 +1003,27 @@ class Engine:
logs = {} logs = {}
cbks.on_epoch_begin(epoch) cbks.on_epoch_begin(epoch)
for step, _ in enumerate(train_dataloader): for step, data in enumerate(train_dataloader):
if auto_utils.use_new_executor():
feeds = self._validate_feed(data)
else:
feeds = [{}]
try:
for micro_feed in feeds:
with paddle.profiler.utils._nvprof_range( with paddle.profiler.utils._nvprof_range(
iter_id=step, start=nvprof_range[0], end=nvprof_range[1] iter_id=step,
start=nvprof_range[0],
end=nvprof_range[1],
): ):
cbks.on_batch_begin('train', step, logs) cbks.on_batch_begin('train', step, logs)
try:
outs = self._executor.run( outs = self._executor.run(
self.main_program, self.main_program,
feed=micro_feed,
fetch_list=fetch_names, fetch_list=fetch_names,
use_program_cache=self._strategy.use_cache, use_program_cache=self._strategy.use_cache,
return_numpy=self._strategy.return_numpy, return_numpy=self._strategy.return_numpy,
) )
except core.EOFException:
break
lr = auto_utils.get_lr(self.optimizer) lr = auto_utils.get_lr(self.optimizer)
logs = self._prepare_logger( logs = self._prepare_logger(
outs, outs,
...@@ -1008,6 +1035,13 @@ class Engine: ...@@ -1008,6 +1035,13 @@ class Engine:
self._mode, self._mode,
) )
cbks.on_batch_end('train', step, logs) cbks.on_batch_end('train', step, logs)
except core.EOFException:
break
if steps_per_epoch and step >= steps_per_epoch:
if not auto_utils.use_new_executor():
train_dataloader._reset()
break
if valid_data and (epoch + 1) % valid_freq == 0: if valid_data and (epoch + 1) % valid_freq == 0:
val_logs = self.evaluate( val_logs = self.evaluate(
...@@ -1540,6 +1574,8 @@ class Engine: ...@@ -1540,6 +1574,8 @@ class Engine:
def _validate_batch_size(self, batch_size): def _validate_batch_size(self, batch_size):
if batch_size is None: if batch_size is None:
return None return None
if self._strategy.pipeline.enable and auto_utils.use_new_executor():
return batch_size
assert ( assert (
batch_size % self._acc_steps == 0 batch_size % self._acc_steps == 0
), "Requires batch_size:[{}] to be divisible by acc_steps:[{}].".format( ), "Requires batch_size:[{}] to be divisible by acc_steps:[{}].".format(
...@@ -1547,6 +1583,25 @@ class Engine: ...@@ -1547,6 +1583,25 @@ class Engine:
) )
return batch_size // self._acc_steps return batch_size // self._acc_steps
def _validate_feed(self, feed):
if feed is None:
return [None]
# pp with schedule or navie-pp
if self._strategy.pipeline.enable or self._acc_steps == 1:
return feed
# split feed data with gradient_merge k_steps
feed_names = []
split_feeds = []
for feed_name, cur_feed in feed[0].items():
feed_names.append(feed_name)
split_feeds.append(np.split(np.array(cur_feed), self._acc_steps, 0))
micro_feeds = []
for i in range(self._acc_steps):
split_feed = [sf[i] for sf in split_feeds]
micro_feeds.append(dict(zip(feed_names, split_feed)))
return micro_feeds
def _validate_spec(self, specs): def _validate_spec(self, specs):
specs = auto_utils.to_list(specs) specs = auto_utils.to_list(specs)
if specs is not None: if specs is not None:
......
...@@ -525,25 +525,6 @@ def _add_new_ir_fetch_ops(program, fetch_list, fetch_var_name): ...@@ -525,25 +525,6 @@ def _add_new_ir_fetch_ops(program, fetch_list, fetch_var_name):
paddle._ir_ops.fetch(fetch_input, fetch_var_name + str(i), i) paddle._ir_ops.fetch(fetch_input, fetch_var_name + str(i), i)
def _set_micro_batch_fetch(plan):
if plan.micro_batch_num() <= 1:
return
valid_fetch_types = ["fetch", "fetch_v2"]
for job in plan.job_list():
idx_to_col_attr = {}
prog = plan.program(job.type())
for i in range(prog.block(0).op_size()):
op = prog.block(0).op(i)
if op.type() in valid_fetch_types:
idx_to_col_attr[i] = op.attr('col')
for idx, col in idx_to_col_attr.items():
job.set_col_attr_for_fetch_op(
idx, col * plan.micro_batch_num() + job.micro_batch_id()
)
def _merge_tensors(tensor, micro_batch_num): def _merge_tensors(tensor, micro_batch_num):
if micro_batch_num <= 1: if micro_batch_num <= 1:
return tensor return tensor
...@@ -1027,8 +1008,6 @@ class _ExecutorCache: ...@@ -1027,8 +1008,6 @@ class _ExecutorCache:
type_to_program = {"default": new_program.desc} type_to_program = {"default": new_program.desc}
plan = core.Plan([default_job], type_to_program) plan = core.Plan([default_job], type_to_program)
_set_micro_batch_fetch(plan)
new_exe = _StandaloneExecutor(place, plan, scope) new_exe = _StandaloneExecutor(place, plan, scope)
return new_program, new_exe return new_program, new_exe
...@@ -1050,8 +1029,6 @@ class _ExecutorCache: ...@@ -1050,8 +1029,6 @@ class _ExecutorCache:
type_to_program = {"default": program} type_to_program = {"default": program}
plan = core.Plan([default_job], type_to_program) plan = core.Plan([default_job], type_to_program)
_set_micro_batch_fetch(plan)
new_exe = _StandaloneExecutor(place, plan, scope) new_exe = _StandaloneExecutor(place, plan, scope)
return program, new_exe return program, new_exe
...@@ -1233,7 +1210,38 @@ class Executor: ...@@ -1233,7 +1210,38 @@ class Executor:
scope, cur_feed, feed_target_name, idx scope, cur_feed, feed_target_name, idx
) )
else: else:
core.set_feed_variable(scope, cur_feed, feed_var_name, idx) micro_cur_feed = [cur_feed]
num_micro_batch = 1
if (
program._pipeline_opt
and "standalone_opt" in program._pipeline_opt
):
num_micro_batch = program._pipeline_opt[
"standalone_opt"
]["num_micro_batches"]
batch_size = (
cur_feed.shape()[0]
if callable(cur_feed.shape)
else cur_feed.shape[0]
)
assert batch_size % num_micro_batch == 0
micro_cur_feed = np.split(
np.array(cur_feed), num_micro_batch, 0
)
for i in range(num_micro_batch):
micro_feed = (
_as_lodtensor(
micro_cur_feed[i], self.place, var.dtype
)
if num_micro_batch > 1
else micro_cur_feed[i]
)
core.set_feed_variable(
scope,
micro_feed,
feed_var_name,
idx * num_micro_batch + i,
)
else: else:
break break
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册