From 2a0205a5d9ae65a1251ecd7fc7d65cbdc9a60839 Mon Sep 17 00:00:00 2001 From: yuyang18 Date: Mon, 14 May 2018 14:19:29 +0800 Subject: [PATCH] Draft for train by parallel executor --- .../test_recognize_digits_conv.py | 7 ++- python/paddle/fluid/trainer.py | 63 ++++++++++++++----- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py index 1f91f471f22..159eec94874 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py +++ b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py @@ -66,7 +66,8 @@ def train(use_cuda, save_dirname): train_func=train_program, infer_func=inference_program, place=place, - optimizer=optimizer) + optimizer=optimizer, + parallel=True) def event_handler(event): if isinstance(event, fluid.EndEpochEvent): @@ -95,6 +96,8 @@ def train(use_cuda, save_dirname): # event.epoch + 1, float(avg_cost), float(acc))) # if math.isnan(float(avg_cost)): # sys.exit("got NaN loss, training failed.") + elif isinstance(event, fluid.EndStepEvent): + print("Step {0}, Epoch {1}".format(event.step, event.epoch)) train_reader = paddle.batch( paddle.reader.shuffle( @@ -132,4 +135,4 @@ def main(use_cuda): if __name__ == '__main__': # for use_cuda in (False, True): - main(use_cuda=False) + main(use_cuda=True) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 67d8be82d5f..8dd140a92c9 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -20,6 +20,7 @@ import data_feeder import contextlib import io import unique_name +import parallel_executor # optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module import optimizer as opt_module @@ -97,7 +98,9 @@ class Trainer(object): infer_func, optimizer, param_path=None, - place=None): + place=None, + parallel=False): + self.parallel = parallel # 1. we need to generate a framework.Program by calling # program_func. Reference: fluid.program_guard in # test_word2vec.py @@ -112,14 +115,14 @@ class Trainer(object): with framework.program_guard(self.train_program, self.startup_program): program_func_outs = train_func() - self.test_outputs = program_func_outs if isinstance( + self.train_func_outputs = program_func_outs if isinstance( program_func_outs, list) else [program_func_outs] self.test_program = self.train_program.clone() if not isinstance(optimizer, opt_module.Optimizer): raise TypeError( "The optimizer should be an instance of Optimizer") # The fisrt element of program_func_outs is loss. - loss = self.test_outputs[0] + loss = self.train_func_outputs[0] optimize_ops, params_grads = optimizer.minimize(loss) self.place = check_and_get_place(place) @@ -175,12 +178,7 @@ class Trainer(object): 'TRAINING_ROLE environment variable must be either TRAINER or PSERVER' ) - def train(self, - num_epochs, - event_handler, - reader=None, - parallel=False, - feed_order=None): + def train(self, num_epochs, event_handler, reader=None, feed_order=None): """ Train the model. @@ -188,25 +186,24 @@ class Trainer(object): num_epochs: The number of epoch. An epoch will process all data in reader event_handler: The event handler. A function with type (ev:Event)->void reader: - parallel: True if use multi-CPUs or multi-GPUs feed_order: Feeding order of reader. None will following the defining order in program Returns: """ - if parallel: - raise NotImplementedError( - "Parallel Executor version of trainer is not implemented") - training_role = os.getenv("PADDLE_TRAINING_ROLE", "") if training_role == "PSERVER": with self._prog_and_scope_guard(): exe = executor.Executor(self.place) exe.run() return - - self._train_by_executor(num_epochs, event_handler, reader, feed_order) + if self.parallel: + self._train_by_parallel_executor(num_epochs, event_handler, reader, + feed_order) + else: + self._train_by_executor(num_epochs, event_handler, reader, + feed_order) def test(self, reader, feed_order=None): """ @@ -218,7 +215,8 @@ class Trainer(object): order in program """ - return self._test_by_executor(reader, feed_order, self.test_outputs) + return self._test_by_executor(reader, feed_order, + self.train_func_outputs) def save_params(self, param_path): # reference: save_persistables in io.py @@ -286,6 +284,37 @@ class Trainer(object): return [x / count for x in accumulated] + def _train_by_parallel_executor(self, num_epochs, event_handler, reader, + feed_order): + with self._prog_and_scope_guard(): + pe = self._get_or_create_parallel_executor() + feed_var_list = build_feed_var_list(self.train_program, feed_order) + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) + reader = feeder.decorate_reader(reader, multi_devices=True) + for epoch_id in range(num_epochs): + event_handler(BeginEpochEvent(epoch_id=epoch_id)) + for step_id, data in enumerate(reader()): + event_handler( + BeginStepEvent( + epoch_id=epoch_id, step_id=step_id)) + pe.run(feed=data, fetch_list=[]) + event_handler( + EndStepEvent( + epoch_id=epoch_id, step_id=step_id)) + + event_handler(EndEpochEvent(epoch_id=epoch_id)) + + def _get_parallel_executor(self): + return getattr(self, 'parallel_executor', None) + + def _get_or_create_parallel_executor(self): + if self._get_parallel_executor() is None: + self.parallel_executor = parallel_executor.ParallelExecutor( + use_cuda=isinstance(self.place, core.CUDAPlace), + loss_name=self.train_func_outputs[0].name) + return self._get_parallel_executor() + def build_feed_var_list(program, feed_order): if not isinstance(program, framework.Program): -- GitLab