diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 1e6482e3c18a6fe8289ca301c684cb9834cacb21..bd325bd2574afa9c652a9613bdb3bf0b6a93b4a3 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -21,8 +21,7 @@ import executor from executor import * import trainer -from trainer import Trainer -from trainer import Event +from trainer import * import inferencer from inferencer import Inferencer diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index cc71c2136a6756ff094f6e06b8e200c6a68db06a..a5570b653e73ce0ee25d8f98ff5b193d284da9ba 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -50,8 +50,6 @@ def data(name, dtype(int|float): The type of data : float32, float_16, int etc type(VarType): The output type. By default it is LOD_TENSOR. lod_level(int): The LoD Level. 0 means the input data is not a sequence. - main_program(Program): Name of the main program that calls this - startup_program(Program): Name of the startup program stop_gradient(bool): A boolean that mentions whether gradient should flow. Returns: @@ -74,13 +72,15 @@ def data(name, if append_batch_size: shape = [-1] + shape # append batch size as -1 - return helper.create_global_variable( + data_var = helper.create_global_variable( name=name, shape=shape, dtype=dtype, type=type, stop_gradient=stop_gradient, lod_level=lod_level) + data_var.is_data = True + return data_var class BlockGuardServ(BlockGuard): diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 9ae43b3e93e4b7d337097a25379720c18dfd331c..0a314ddfd7c607a3bc7f7c746c4c4990fc4a52e2 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -28,7 +28,8 @@ from contextlib import contextmanager __all__ = [ 'SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad', 'SGDOptimizer', 'MomentumOptimizer', 'AdagradOptimizer', 'AdamOptimizer', - 'AdamaxOptimizer', 'DecayedAdagradOptimizer', 'Adadelta', 'ModelAverage' + 'AdamaxOptimizer', 'DecayedAdagradOptimizer', 'Adadelta', 'ModelAverage', + 'Optimizer' ] diff --git a/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py b/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py index 272db7b5739b0388201a5e14293c2f076c31ad69..30939cae29ddf7070f3e5d39f33dc88e86c65450 100644 --- a/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py +++ b/python/paddle/fluid/tests/book/word2vec/no_test_word2vec_new_api.py @@ -79,9 +79,9 @@ def inference_network(is_sparse): return predict_word -def train_network(): +def train_network(is_sparse): next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') - predict_word = inference_network() + predict_word = inference_network(is_sparse) cost = fluid.layers.cross_entropy(input=predict_word, label=next_word) avg_cost = fluid.layers.mean(cost) return avg_cost @@ -94,7 +94,8 @@ def train(use_cuda, is_sparse, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() def event_handler(event): - if isinstance(event, fluid.Event.END_EPOCH): + print type(event) + if isinstance(event, fluid.EndEpochEvent): avg_cost = trainer.test(reader=paddle.dataset.imikolov.test( word_dict, N)) @@ -105,10 +106,11 @@ def train(use_cuda, is_sparse, save_path): sys.exit("got NaN loss, training failed.") trainer = fluid.Trainer( - partial(inference_network, is_sparse), + partial(train_network, is_sparse), fluid.optimizer.SGD(learning_rate=0.001), place=place) - trainer.train(train_reader, 100, event_handler) + trainer.train( + reader=train_reader, num_epochs=100, event_handler=event_handler) def infer(use_cuda, save_path): diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index aeda67650205a473486b05fdd5b4364753fb18ca..2362da370a39b6e59f486e2affbb02e21840784f 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -12,44 +12,200 @@ # See the License for the specific language governing permissions and # limitations under the License. +import core +import framework +import executor +import data_feeder +import contextlib + +# optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module +import optimizer as opt_module + __all__ = [ - 'Event', 'Trainer', + 'BeginEpochEvent', + 'EndEpochEvent', + 'BeginStepEvent', + 'EndStepEvent', ] -class Event(object): - BEGIN_EPOCH = 0 - END_EPOCH = 1 - BEGIN_STEP = 2 - END_STEP = 3 +class BeginEpochEvent(object): + def __init__(self, epoch_id): + self.epoch = epoch_id + + +class EndEpochEvent(object): + def __init__(self, epoch_id): + self.epoch = epoch_id - def __init__(self): - self.step = 0 - self.epoch = 0 - self.type = Event.BEGIN_EPOCH + +class BeginStepEvent(object): + def __init__(self, epoch_id, step_id): + self.epoch = epoch_id + self.step = step_id + + +class EndStepEvent(object): + def __init__(self, epoch_id, step_id): + self.epoch = epoch_id + self.step = step_id class Trainer(object): + """ + + Args: + network_func(callable): A function which will return loss. The loss must be a scaler. + optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer + params: + place: The device place of this trainer. + """ + def __init__(self, network_func, optimizer, params=None, place=None): # 1. we need to generate a framework.Program by calling # network_func. Reference: fluid.program_guard in # test_word2vec.py + self.scope = self._get_scope_from_params(params) + + self.startup_program = framework.Program() + self.train_program = framework.Program() + + with framework.program_guard(self.train_program, self.startup_program): + loss = network_func() + if not isinstance(optimizer, opt_module.Optimizer): + raise TypeError( + "The optimizer should be an instance of Optimizer") + + optimizer.minimize(loss) + + self.place = Trainer._check_and_get_place(place) # 2. move the default_main_program to self.program and run the # default_startup program on an empty core.Scope() + # Run startup program + if params is None: + exe = executor.Executor(place) + exe.run(self.startup_program, scope=self.scope) # 3. call self.params.add_vars with the initialized scope, it # will add the new vars of the initialized scope into # self.params. - self.network_func = network_func - self.optimizer = optimizer - self.params = params - self.place = place + # TODO(yuyang): This depends on parameters implementation. + # TODO(helin): support distributed training - def train(self, reader, num_epochs, event_handler): - pass + def train(self, + num_epochs, + event_handler, + reader=None, + parallel=False, + feed_order=None): + """ + Train the model. + + Args: + num_epochs: The number of epoch. An epoch will process all data in reader + event_handler: The event handler. A function with type (ev:Event)->void + reader: + parallel: True if use multi-CPUs or multi-GPUs + feed_order: Feeding order of reader. None will following the defining + order in program + + Returns: + + """ + if parallel: + raise NotImplementedError( + "Parallel Executor version of trainer is not implemented") + + self._train_by_executor(num_epochs, event_handler, reader, feed_order) def test(self, reader): pass + + def _get_scope_from_params(self, params): + """ + Get Scope from parameter object. + Args: + params(Parameter|None): The parameter object instance. Could be None. + + Returns: New scope if params is None. Or params.scope() + NOTE: This method is WIP. Not fully implemented. + """ + if params is None: + return core.Scope() # new scope when params is None + else: + raise NotImplementedError("Not implemented right now.") + + @staticmethod + def _check_and_get_place(place): + """ + Check the type of place or get the default place + Args: + place(None|core.CUDAPlace|core.CPUPlace): the place that trainer will be executed on. + + Raises: + TypeError if the type mismatched. + + Returns: + the original place if it is not None. + if fluid is compiled with CUDA, returns CUDAPlace(0) by default. + Otherwise returns CPUPlace by default. + """ + if place is None: + if core.is_compiled_with_cuda(): + return core.CUDAPlace(0) + else: + return core.CPUPlace() + else: + if not isinstance(place, core.CUDAPlace) and not isinstance( + place, core.CPUPlace): + raise TypeError("Place should be either CUDAPlace or CPUPlace") + return place + + @contextlib.contextmanager + def _prog_and_scope_guard(self): + with framework.program_guard( + main_program=self.train_program, + startup_program=self.startup_program): + with executor.scope_guard(self.scope): + yield + + def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): + """ + Train by Executor and single device. + + Args: + num_epochs: + event_handler: + reader: + feed_order: + + Returns: + + """ + with self._prog_and_scope_guard(): + exe = executor.Executor(self.place) + if feed_order is None: + feed_var_list = [ + var + for var in self.train_program.global_block( + ).vars.itervalues() + if hasattr(var, 'is_data') and var.is_data + ] + else: + feed_var_list = [ + self.train_program.global_block().var(var_name) + for var_name in feed_order + ] + + feeder = data_feeder.DataFeeder( + feed_list=feed_var_list, place=self.place) + for epoch_id in range(num_epochs): + event_handler(BeginEpochEvent(epoch_id)) + for step_id, data in enumerate(reader()): + event_handler(BeginStepEvent(epoch_id, step_id)) + exe.run(feed=feeder.feed(data), fetch_list=[]) + event_handler(EndStepEvent(epoch_id, step_id)) + event_handler(EndEpochEvent(epoch_id))