提交 2a0205a5 编写于 作者: Y yuyang18

Draft for train by parallel executor

上级 8c7d2e29
...@@ -66,7 +66,8 @@ def train(use_cuda, save_dirname): ...@@ -66,7 +66,8 @@ def train(use_cuda, save_dirname):
train_func=train_program, train_func=train_program,
infer_func=inference_program, infer_func=inference_program,
place=place, place=place,
optimizer=optimizer) optimizer=optimizer,
parallel=True)
def event_handler(event): def event_handler(event):
if isinstance(event, fluid.EndEpochEvent): if isinstance(event, fluid.EndEpochEvent):
...@@ -95,6 +96,8 @@ def train(use_cuda, save_dirname): ...@@ -95,6 +96,8 @@ def train(use_cuda, save_dirname):
# event.epoch + 1, float(avg_cost), float(acc))) # event.epoch + 1, float(avg_cost), float(acc)))
# if math.isnan(float(avg_cost)): # if math.isnan(float(avg_cost)):
# sys.exit("got NaN loss, training failed.") # sys.exit("got NaN loss, training failed.")
elif isinstance(event, fluid.EndStepEvent):
print("Step {0}, Epoch {1}".format(event.step, event.epoch))
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
...@@ -132,4 +135,4 @@ def main(use_cuda): ...@@ -132,4 +135,4 @@ def main(use_cuda):
if __name__ == '__main__': if __name__ == '__main__':
# for use_cuda in (False, True): # for use_cuda in (False, True):
main(use_cuda=False) main(use_cuda=True)
...@@ -20,6 +20,7 @@ import data_feeder ...@@ -20,6 +20,7 @@ import data_feeder
import contextlib import contextlib
import io import io
import unique_name import unique_name
import parallel_executor
# optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module # optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module
import optimizer as opt_module import optimizer as opt_module
...@@ -97,7 +98,9 @@ class Trainer(object): ...@@ -97,7 +98,9 @@ class Trainer(object):
infer_func, infer_func,
optimizer, optimizer,
param_path=None, param_path=None,
place=None): place=None,
parallel=False):
self.parallel = parallel
# 1. we need to generate a framework.Program by calling # 1. we need to generate a framework.Program by calling
# program_func. Reference: fluid.program_guard in # program_func. Reference: fluid.program_guard in
# test_word2vec.py # test_word2vec.py
...@@ -112,14 +115,14 @@ class Trainer(object): ...@@ -112,14 +115,14 @@ class Trainer(object):
with framework.program_guard(self.train_program, self.startup_program): with framework.program_guard(self.train_program, self.startup_program):
program_func_outs = train_func() program_func_outs = train_func()
self.test_outputs = program_func_outs if isinstance( self.train_func_outputs = program_func_outs if isinstance(
program_func_outs, list) else [program_func_outs] program_func_outs, list) else [program_func_outs]
self.test_program = self.train_program.clone() self.test_program = self.train_program.clone()
if not isinstance(optimizer, opt_module.Optimizer): if not isinstance(optimizer, opt_module.Optimizer):
raise TypeError( raise TypeError(
"The optimizer should be an instance of Optimizer") "The optimizer should be an instance of Optimizer")
# The fisrt element of program_func_outs is loss. # The fisrt element of program_func_outs is loss.
loss = self.test_outputs[0] loss = self.train_func_outputs[0]
optimize_ops, params_grads = optimizer.minimize(loss) optimize_ops, params_grads = optimizer.minimize(loss)
self.place = check_and_get_place(place) self.place = check_and_get_place(place)
...@@ -175,12 +178,7 @@ class Trainer(object): ...@@ -175,12 +178,7 @@ class Trainer(object):
'TRAINING_ROLE environment variable must be either TRAINER or PSERVER' 'TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
) )
def train(self, def train(self, num_epochs, event_handler, reader=None, feed_order=None):
num_epochs,
event_handler,
reader=None,
parallel=False,
feed_order=None):
""" """
Train the model. Train the model.
...@@ -188,25 +186,24 @@ class Trainer(object): ...@@ -188,25 +186,24 @@ class Trainer(object):
num_epochs: The number of epoch. An epoch will process all data in reader num_epochs: The number of epoch. An epoch will process all data in reader
event_handler: The event handler. A function with type (ev:Event)->void event_handler: The event handler. A function with type (ev:Event)->void
reader: reader:
parallel: True if use multi-CPUs or multi-GPUs
feed_order: Feeding order of reader. None will following the defining feed_order: Feeding order of reader. None will following the defining
order in program order in program
Returns: Returns:
""" """
if parallel:
raise NotImplementedError(
"Parallel Executor version of trainer is not implemented")
training_role = os.getenv("PADDLE_TRAINING_ROLE", "") training_role = os.getenv("PADDLE_TRAINING_ROLE", "")
if training_role == "PSERVER": if training_role == "PSERVER":
with self._prog_and_scope_guard(): with self._prog_and_scope_guard():
exe = executor.Executor(self.place) exe = executor.Executor(self.place)
exe.run() exe.run()
return return
if self.parallel:
self._train_by_executor(num_epochs, event_handler, reader, feed_order) self._train_by_parallel_executor(num_epochs, event_handler, reader,
feed_order)
else:
self._train_by_executor(num_epochs, event_handler, reader,
feed_order)
def test(self, reader, feed_order=None): def test(self, reader, feed_order=None):
""" """
...@@ -218,7 +215,8 @@ class Trainer(object): ...@@ -218,7 +215,8 @@ class Trainer(object):
order in program order in program
""" """
return self._test_by_executor(reader, feed_order, self.test_outputs) return self._test_by_executor(reader, feed_order,
self.train_func_outputs)
def save_params(self, param_path): def save_params(self, param_path):
# reference: save_persistables in io.py # reference: save_persistables in io.py
...@@ -286,6 +284,37 @@ class Trainer(object): ...@@ -286,6 +284,37 @@ class Trainer(object):
return [x / count for x in accumulated] return [x / count for x in accumulated]
def _train_by_parallel_executor(self, num_epochs, event_handler, reader,
feed_order):
with self._prog_and_scope_guard():
pe = self._get_or_create_parallel_executor()
feed_var_list = build_feed_var_list(self.train_program, feed_order)
feeder = data_feeder.DataFeeder(
feed_list=feed_var_list, place=self.place)
reader = feeder.decorate_reader(reader, multi_devices=True)
for epoch_id in range(num_epochs):
event_handler(BeginEpochEvent(epoch_id=epoch_id))
for step_id, data in enumerate(reader()):
event_handler(
BeginStepEvent(
epoch_id=epoch_id, step_id=step_id))
pe.run(feed=data, fetch_list=[])
event_handler(
EndStepEvent(
epoch_id=epoch_id, step_id=step_id))
event_handler(EndEpochEvent(epoch_id=epoch_id))
def _get_parallel_executor(self):
return getattr(self, 'parallel_executor', None)
def _get_or_create_parallel_executor(self):
if self._get_parallel_executor() is None:
self.parallel_executor = parallel_executor.ParallelExecutor(
use_cuda=isinstance(self.place, core.CUDAPlace),
loss_name=self.train_func_outputs[0].name)
return self._get_parallel_executor()
def build_feed_var_list(program, feed_order): def build_feed_var_list(program, feed_order):
if not isinstance(program, framework.Program): if not isinstance(program, framework.Program):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册