diff --git a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py index 420e6e6e42adc22508c414f2c2d1ba93aedd4753..f252a7e2b98b05d0c64533e9e24a902559be2f39 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py +++ b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py @@ -62,7 +62,10 @@ def train(use_cuda, train_program, save_dirname): optimizer = fluid.optimizer.Adam(learning_rate=0.001) trainer = fluid.Trainer( - train_func=train_program, place=place, optimizer=optimizer) + train_func=train_program, + place=place, + optimizer=optimizer, + parallel=True) def event_handler(event): if isinstance(event, fluid.EndEpochEvent): @@ -87,6 +90,9 @@ def train(use_cuda, train_program, save_dirname): event.epoch + 1, float(avg_cost), float(acc))) if math.isnan(float(avg_cost)): sys.exit("got NaN loss, training failed.") + elif isinstance(event, fluid.EndStepEvent): + print("Step {0}, Epoch {1} Metrics {2}".format( + event.step, event.epoch, map(numpy.array, event.metrics))) train_reader = paddle.batch( paddle.reader.shuffle( @@ -131,4 +137,4 @@ def main(use_cuda): if __name__ == '__main__': # for use_cuda in (False, True): - main(use_cuda=False) + main(use_cuda=True) diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index a47af7ccb210fed86c52e0cd6ae0ab683284b2df..d158d586321833fdf046e4e061bfa8460b9a31b5 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -20,6 +20,7 @@ import data_feeder import contextlib import io import unique_name +import parallel_executor # optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module import optimizer as opt_module @@ -48,12 +49,14 @@ class BeginStepEvent(object): def __init__(self, epoch_id, step_id): self.epoch = epoch_id self.step = step_id + self.fetch_metrics = True class EndStepEvent(object): - def __init__(self, epoch_id, step_id): + def __init__(self, epoch_id, step_id, metrics): self.epoch = epoch_id self.step = step_id + self.metrics = metrics def check_and_get_place(place): @@ -87,12 +90,17 @@ class Trainer(object): Args: train_func(callable): A function which will return loss. The loss must be a scalar. - infer_func(callable): A function which will return predict, used to save inference model optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer place: The device place of this trainer. """ - def __init__(self, train_func, optimizer, param_path=None, place=None): + def __init__(self, + train_func, + optimizer, + param_path=None, + place=None, + parallel=False): + self.parallel = parallel # 1. we need to generate a framework.Program by calling # program_func. Reference: fluid.program_guard in # test_word2vec.py @@ -106,14 +114,14 @@ class Trainer(object): with framework.program_guard(self.train_program, self.startup_program): program_func_outs = train_func() - self.test_outputs = program_func_outs if isinstance( + self.train_func_outputs = program_func_outs if isinstance( program_func_outs, list) else [program_func_outs] self.test_program = self.train_program.clone() if not isinstance(optimizer, opt_module.Optimizer): raise TypeError( "The optimizer should be an instance of Optimizer") # The fisrt element of program_func_outs is loss. - loss = self.test_outputs[0] + loss = self.train_func_outputs[0] optimize_ops, params_grads = optimizer.minimize(loss) self.place = check_and_get_place(place) @@ -202,12 +210,7 @@ class Trainer(object): 'TRAINING_ROLE environment variable must be either TRAINER or PSERVER' ) - def train(self, - num_epochs, - event_handler, - reader, - feed_order, - parallel=False): + def train(self, num_epochs, event_handler, reader=None, feed_order=None): """ Train the model. @@ -215,25 +218,24 @@ class Trainer(object): num_epochs: The number of epoch. An epoch will process all data in reader event_handler: The event handler. A function with type (ev:Event)->void reader: - parallel: True if use multi-CPUs or multi-GPUs feed_order: Feeding order of reader. None will following the defining order in program Returns: """ - if parallel: - raise NotImplementedError( - "Parallel Executor version of trainer is not implemented") - training_role = os.getenv("PADDLE_TRAINING_ROLE", "") if training_role == "PSERVER": with self._prog_and_scope_guard(): exe = executor.Executor(self.place) exe.run() return - - self._train_by_executor(num_epochs, event_handler, reader, feed_order) + if self.parallel: + self._train_by_parallel_executor(num_epochs, event_handler, reader, + feed_order) + else: + self._train_by_executor(num_epochs, event_handler, reader, + feed_order) def test(self, reader, feed_order): """ @@ -245,7 +247,8 @@ class Trainer(object): order in program """ - return self._test_by_executor(reader, feed_order, self.test_outputs) + return self._test_by_executor(reader, feed_order, + self.train_func_outputs) def save_params(self, param_path): # reference: save_persistables in io.py @@ -279,13 +282,25 @@ class Trainer(object): feeder = data_feeder.DataFeeder( feed_list=feed_var_list, place=self.place) exe = executor.Executor(self.place) - for epoch_id in range(num_epochs): - event_handler(BeginEpochEvent(epoch_id)) - for step_id, data in enumerate(reader()): - event_handler(BeginStepEvent(epoch_id, step_id)) - exe.run(feed=feeder.feed(data), fetch_list=[]) - event_handler(EndStepEvent(epoch_id, step_id)) - event_handler(EndEpochEvent(epoch_id)) + reader = feeder.decorate_reader(reader, multi_devices=False) + self._train_by_any_executor(event_handler, exe, num_epochs, reader) + + def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): + for epoch_id in range(num_epochs): + event_handler(BeginEpochEvent(epoch_id)) + for step_id, data in enumerate(reader()): + begin_event = BeginStepEvent(epoch_id, step_id) + event_handler(begin_event) + if begin_event.fetch_metrics: + metrics = exe.run(feed=data, + fetch_list=[ + var.name + for var in self.train_func_outputs + ]) + else: + metrics = exe.run(feed=data, fetch_list=[]) + event_handler(EndStepEvent(epoch_id, step_id, metrics)) + event_handler(EndEpochEvent(epoch_id)) def _test_by_executor(self, reader, feed_order, fetch_list): with executor.scope_guard(self.scope): @@ -304,6 +319,28 @@ class Trainer(object): return [x / count for x in accumulated] + def _train_by_parallel_executor(self, num_epochs, event_handler, reader, + feed_order): + with self._prog_and_scope_guard(): + pe = self._get_or_create_parallel_executor() + feed_var_list = build_feed_var_list(self.train_program, feed_order) + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) + reader = feeder.decorate_reader(reader, multi_devices=True) + for epoch_id in range(num_epochs): + self._train_by_any_executor(event_handler, pe, num_epochs, + reader) + + def _get_parallel_executor(self): + return getattr(self, 'parallel_executor', None) + + def _get_or_create_parallel_executor(self): + if self._get_parallel_executor() is None: + self.parallel_executor = parallel_executor.ParallelExecutor( + use_cuda=isinstance(self.place, core.CUDAPlace), + loss_name=self.train_func_outputs[0].name) + return self._get_parallel_executor() + def build_feed_var_list(program, feed_order): if not isinstance(program, framework.Program):