From 094d29aaadbdf17ede7d7950676ee0e91ff3b865 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Wed, 15 Feb 2017 15:57:02 +0800 Subject: [PATCH] Draft for parameters --- demo/mnist/api_train_v2.py | 29 +++-- python/paddle/v2/parameters.py | 212 +++++++++++++++++++-------------- python/paddle/v2/trainer.py | 104 ++-------------- 3 files changed, 148 insertions(+), 197 deletions(-) diff --git a/demo/mnist/api_train_v2.py b/demo/mnist/api_train_v2.py index d0ccc2c5aa..8618a8f211 100644 --- a/demo/mnist/api_train_v2.py +++ b/demo/mnist/api_train_v2.py @@ -23,29 +23,32 @@ def network_config(): outputs(cost) -def event_handler(event): - if isinstance(event, paddle.trainer.CompleteTrainOneBatch): - print "Pass %d, Batch %d, Cost %f" % (event.pass_id, event.batch_id, - event.cost) - else: - pass - - def main(): paddle.init(use_gpu=False, trainer_count=1) model_config = parse_network_config(network_config) - pool = paddle.parameters.create(model_config) - for param_name in pool.get_names(): - array = pool.get_parameter(param_name) + parameters = paddle.parameters.create(model_config) + for param_name in parameters.keys(): + array = parameters[param_name] array[:] = numpy.random.uniform(low=-1.0, high=1.0, size=array.shape) + parameters[param_name] = array + + adam_optimizer = paddle.optimizer.Optimizer( + learning_rate=0.01, learning_method=AdamOptimizer()) + + def event_handler(event): + if isinstance(event, paddle.trainer.CompleteTrainOneBatch): + para = parameters['___fc_layer_2__.w0'] + print "Pass %d, Batch %d, Cost %f, Weight Mean Of Fc 2 is %f" % ( + event.pass_id, event.batch_id, event.cost, para.mean()) - adam_optimizer = paddle.optimizer.Adam(learning_rate=1e-3) + else: + pass trainer = paddle.trainer.SGDTrainer(update_equation=adam_optimizer) trainer.train(train_data_reader=train_reader, topology=model_config, - parameters=pool, + parameters=parameters, event_handler=event_handler, batch_size=32, # batch size should be refactor in Data reader data_types={ # data_types will be removed, It should be in diff --git a/python/paddle/v2/parameters.py b/python/paddle/v2/parameters.py index 892c68a38b..a30bf5d363 100644 --- a/python/paddle/v2/parameters.py +++ b/python/paddle/v2/parameters.py @@ -2,110 +2,144 @@ import numpy as np from paddle.proto.ModelConfig_pb2 import ModelConfig from paddle.proto.ParameterConfig_pb2 import ParameterConfig +import py_paddle.swig_paddle as api -__all__ = ['IParameterPool', 'create', 'ParameterFlag'] +__all__ = ['Parameters', 'create'] -class ParameterFlag(object): - """ - The flag for IParameterPool.get_parameter. If writeable, operation on return - numpy array will also apply to Paddle parameter. But it will be slower in - GPU mode. +def create(*topologies): """ - READ_ONLY = 0x01 - WRITE_ONLY = 0x02 - READ_WRITE = READ_ONLY | WRITE_ONLY - + Create parameter pool by topologies. -class IParameterPool(object): + :param topologies: + :return: """ - Interface of Parameter Pool. The parameter pool is a dictionary of - parameters. User can modify parameter or customize parameter value - by `get_parameter`. - - .. code-block:: python + pool = Parameters() + for topo in topologies: + if not isinstance(topo, ModelConfig): + raise ValueError( + 'create must pass a topologies which type is ModelConfig') - pool = paddle.parameters.create(topo1, topo2) + for param in topo.parameters: + pool.append_config(param) + return pool - embedding = pool.get_parameter("embedding") - assert isinstance(embedding, numpy.ndarray) - print embedding[1:] - """ - def get_parameter(self, name, flag=ParameterFlag.READ_WRITE): - """ - Get a parameter by name. - - :param name: parameter name. - :type name: basestring - :param flag: the flag for return value. readable or writable. - :type flag: int - :return: The parameter value - :rtype: np.ndarray - """ - raise NotImplementedError() - - def get_names(self): - """ - Get all parameter names - :return: all parameter names - :rtype: list - """ - raise NotImplementedError() - - -class NumpyParameterPool(IParameterPool): +class Parameters(object): def __init__(self): - self.__param_configs__ = dict() - self.__params__ = dict() - - def append(self, conf): - if not isinstance(conf, ParameterConfig): - raise ValueError("conf must be ParameterConfig") - - if not conf.IsInitialized(): - raise ValueError("conf is not initialized") - - self.__param_configs__[conf.name] = conf - self.__params__[conf.name] = None - - def get_config(self, name): - if name not in self.__param_configs__: - raise ValueError("parameter %s is not appended" % name) - - return self.__param_configs__[name] - - def get_parameter(self, name, *args, **kwargs): - if name not in self.__params__: - raise ValueError("parameter %s is not appended" % name) + self.__param_conf__ = dict() + self.__gradient_machines__ = [] + self.__tmp_params__ = [] + + def append_config(self, param_conf): + if not isinstance(param_conf, ParameterConfig): + raise ValueError("param_conf must be paddle.proto.ParameterConfig") + + if param_conf.name in self.__param_conf__: + raise ValueError("duplicated parameter %s" % param_conf.name) + + self.__param_conf__[param_conf.name] = param_conf + + def keys(self): + return self.__param_conf__.keys() + + def names(self): + return self.keys() + + def has_key(self, key): + return key in self.__param_conf__.keys() + + def __getitem__(self, key): + shape = self.get_shape(key) + + if len(self.__gradient_machines__) == 0: + # create new parameter in python numpy. + return np.ndarray(shape=shape, dtype=np.float32) + else: + for each_gradient_machine in self.__gradient_machines__: + param = __get_parameter_in_gradient_machine__( + each_gradient_machine, key) + # for simplify implementation now, we always copy from C++ + assert isinstance(param, api.Parameter) + val = param.getBuf(api.PARAMETER_VALUE) + assert isinstance(val, api.Vector) + return val.copyToNumpyArray().reshape(shape=shape) + # else continue + + raise RuntimeError("Unexpected branch") + + def get_shape(self, key): + if not isinstance(key, basestring): + raise ValueError("parameter name should be string") + if not self.has_key(key): + raise ValueError("No such parameter %s" % key) + conf = self.__param_conf__[key] + return map(int, conf.dims) + + def __setitem__(self, key, value): + if not isinstance(value, np.ndarray): + raise ValueError("Must return ndarray") + value = value.astype(dtype=np.float32) + shape = self.get_shape(key) + if not reduce(lambda a, b: a and b, + map(lambda x: x[0] == x[1], zip(value.shape, shape))): + raise ValueError("Value shape mismatch, expect %s, should %s" % + (shape, value.shape)) + + if len(self.__gradient_machines__) == 0: + self.__tmp_params__.append((key, value)) + else: + for each_gradient_machine in self.__gradient_machines__: + __copy_parameter_to_gradient_machine__(each_gradient_machine, + key, value) + + def append_gradient_machine(self, gradient_machine): + if not isinstance(gradient_machine, api.GradientMachine): + raise ValueError("gradient_machine should be api.GradientMachine") + + if len(self.__tmp_params__) != 0: + for name, val in self.__tmp_params__: + try: + __copy_parameter_to_gradient_machine__(gradient_machine, + name, val) + except ValueError: + # If no such parameter in gradient machine, then don't copy + pass + + +def __get_parameter_in_gradient_machine__(gradient_machine, name): + """ - param = self.__params__[name] - if param is None: - shape = self.__param_configs__[name].dims - if len(shape) == 0: - raise ValueError("parameter %s is no shape" % name) - param = np.ndarray( - shape=[int(item) for item in shape], dtype='float32') - self.__params__[name] = param - return param + :param gradient_machine: + :type gradient_machine: api.GradientMachine + :param name: + :return: + :rtype: api.Parameter + """ + params = filter(lambda p: p.getName() == name, + gradient_machine.getParameters()) - def get_names(self): - return self.__param_configs__.keys() + if len(params) == 0: + raise ValueError("No such parameter") + elif len(params) > 1: + raise ValueError("Unexpected branch") + else: + return params[0] -def create(*topologies): +def __copy_parameter_to_gradient_machine__(gradient_machine, name, arr): """ - Create parameter pool by topologies. + Copy a python ndarray into the gradient machine. - :param topologies: + :param gradient_machine: + :type gradient_machine: api.GradientMachine + :param name: + :param arr: + :type arr: np.ndarray :return: + :rtype: api.Parameter """ - pool = NumpyParameterPool() - for topo in topologies: - if not isinstance(topo, ModelConfig): - raise ValueError( - 'create must pass a topologies which type is ModelConfig') - - for param in topo.parameters: - pool.append(param) - return pool + param = __get_parameter_in_gradient_machine__(gradient_machine, name) + vec = param.getBuf(api.PARAMETER_VALUE) + assert isinstance(vec, api.Vector) + vec.copyFromNumpyArray(arr.flatten()) diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 41dfb522db..baed7d0025 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -1,11 +1,12 @@ import collections -from paddle.proto.ModelConfig_pb2 import ModelConfig -from paddle.proto.ParameterConfig_pb2 import ParameterConfig -from . import parameters as v2_parameters -from . import optimizer as v2_optimizer + import py_paddle.swig_paddle as api from py_paddle import DataProviderConverter +from paddle.proto.ModelConfig_pb2 import ModelConfig +from . import optimizer as v2_optimizer +from . import parameters as v2_parameters + __all__ = ['ITrainer', 'SGDTrainer', 'CompleteTrainOneBatch', 'BaseEvent'] @@ -21,11 +22,10 @@ class CompleteTrainOneBatch(BaseEvent): Event On One Batch Training Complete. """ - def __init__(self, pass_id, batch_id, cost, parameters): + def __init__(self, pass_id, batch_id, cost): self.pass_id = pass_id self.batch_id = batch_id self.cost = cost - self.parameters = parameters def default_event_handler(event): @@ -42,57 +42,6 @@ class ITrainer(object): raise NotImplementedError() -class LazyParameterPool(v2_parameters.IParameterPool): - """ - Lazy Parameter Pool stores a reference to GradientMachine. User could invoke - `get_parameter` if needed, but the operation is lazy. It means the parameter - will only fetched from GPU or Parameter Server if `get_parameter` is - invoked. Also, set flag = writable will make a extra host2device copy after - reading/modifying parameter. - - This class is not exposed to User. User should treat this class as a normal - IParameterPool. - - See IParameterPool for usage documentation. - - :type __gradient_machine__: api.GradientMachine - """ - - def get_parameter(self, name, flag=v2_parameters.ParameterFlag.READ_WRITE): - param = filter(lambda x: x.getName() == name, - self.__gradient_machine__.getParameters()) - if len(param) == 0: - raise ValueError("Cannot found parameter with name %s" % name) - elif len(param) > 1: - raise RuntimeError("Unexpected branch") - else: - conf = param[0].getConfig().toProto() - param = param[0].getBuf(api.PARAMETER_VALUE) - assert isinstance(param, api.Vector) - assert isinstance(conf, ParameterConfig) - - shape = map(int, conf.dims) - if api.isUsingGpu(): - arr = param.copyToNumpyArray().reshape(shape) - if flag & v2_parameters.ParameterFlag.WRITE_ONLY: - self.need_copy = True - self.arrays[name] = arr - else: - arr = param.toNumpyArrayInplace().reshape(shape) - return arr - - def get_names(self): - return [ - param.getName() - for param in self.__gradient_machine__.getParameters() - ] - - def __init__(self, gradient_machine): - self.__gradient_machine__ = gradient_machine - self.need_copy = False - self.arrays = dict() - - class SGDTrainer(ITrainer): def __init__(self, update_equation): """ @@ -137,7 +86,7 @@ class SGDTrainer(ITrainer): gm = api.GradientMachine.createFromConfigProto( topology, api.CREATE_MODE_NORMAL, self.__optimizer__.enable_types()) assert isinstance(gm, api.GradientMachine) - __copy_parameter_from_pool__(gm, parameters) + parameters.append_gradient_machine(gm) updater = self.__optimizer__.create_local_updater() updater.init(gm) @@ -167,16 +116,9 @@ class SGDTrainer(ITrainer): cost_vec = cost_vec.copyToNumpyMat() cost = cost_vec.sum() / len(data_batch) updater.finishBatch(cost) - pool = LazyParameterPool(gradient_machine=gm) event_handler( CompleteTrainOneBatch( - pass_id=pass_id, - batch_id=batch_id, - cost=cost, - parameters=pool)) - - if pool.need_copy: - __copy_parameter_from_lazy_pool__(gm, pool) + pass_id=pass_id, batch_id=batch_id, cost=cost)) updater.finishPass() gm.finish() @@ -211,34 +153,6 @@ def __generator_to_batch__(generator, batch_size): yield ret_val -def __copy_parameter_from_lazy_pool__(gm, pool): - assert isinstance(pool, LazyParameterPool) - for each_param_name in pool.arrays.keys(): - param = filter(lambda x: x.getName() == each_param_name, - gm.getParameters()) - assert len(param) == 1 - param = param[0] - param.getBuf(api.PARAMETER_VALUE).copyFromNumpyArray(pool.arrays[ - each_param_name].flatten().astype('float32')) - - -def __copy_parameter_from_pool__(gm, pool): - """ - - :param gm: - :type gm: api.GradientMachine - :param pool: - :type pool: v2_parameters.IParameterPool - :return: - """ - assert isinstance(pool, v2_parameters.IParameterPool) - for each_param in gm.getParameters(): - name = each_param.getName() - param = pool.get_parameter(name, v2_parameters.ParameterFlag.READ_ONLY) - each_param.getBuf(api.PARAMETER_VALUE).copyFromNumpyArray(param.flatten( - ).astype('float32')) - - def __check_train_args__(train_data_reader, topology, parameters, test_data_reader, event_handler, **kwargs): """ @@ -258,7 +172,7 @@ def __check_train_args__(train_data_reader, topology, parameters, if not isinstance(topology, ModelConfig): raise ValueError('topology should be a model config') - if not isinstance(parameters, v2_parameters.IParameterPool): + if not isinstance(parameters, v2_parameters.Parameters): raise ValueError('parameters should be a parameter pool') if not callable(event_handler): -- GitLab