diff --git a/demo/mnist/api_train_v2.py b/demo/mnist/api_train_v2.py new file mode 100644 index 0000000000000000000000000000000000000000..5e46d510ad35bb25a36e74a892fc211b1844ad03 --- /dev/null +++ b/demo/mnist/api_train_v2.py @@ -0,0 +1,62 @@ +from paddle.trainer_config_helpers import * +from paddle.trainer.PyDataProvider2 import dense_vector, integer_value +import paddle.v2 as paddle +import numpy +import mnist_util + + +def train_reader(): + train_file = './data/raw_data/train' + generator = mnist_util.read_from_mnist(train_file) + for item in generator: + yield item + + +def network_config(): + imgs = data_layer(name='pixel', size=784) + hidden1 = fc_layer(input=imgs, size=200) + hidden2 = fc_layer(input=hidden1, size=200) + inference = fc_layer(input=hidden2, size=10, act=SoftmaxActivation()) + cost = classification_cost( + input=inference, label=data_layer( + name='label', size=10)) + outputs(cost) + + +def main(): + paddle.init(use_gpu=False, trainer_count=1) + topology = parse_network_config(network_config) + parameters = paddle.parameters.create(topology) + for param_name in parameters.keys(): + array = parameters.get(param_name) + array[:] = numpy.random.uniform(low=-1.0, high=1.0, size=array.shape) + parameters.set(parameter_name=param_name, value=array) + + adam_optimizer = paddle.optimizer.Optimizer( + learning_rate=0.01, learning_method=AdamOptimizer()) + + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + para = parameters.get('___fc_layer_2__.w0') + print "Pass %d, Batch %d, Cost %f, Weight Mean Of Fc 2 is %f" % ( + event.pass_id, event.batch_id, event.cost, para.mean()) + + else: + pass + + trainer = paddle.trainer.SGD(update_equation=adam_optimizer) + + trainer.train(train_data_reader=train_reader, + topology=topology, + parameters=parameters, + event_handler=event_handler, + batch_size=32, # batch size should be refactor in Data reader + data_types={ # data_types will be removed, It should be in + # network topology + 'pixel': dense_vector(784), + 'label': integer_value(10) + }) + + +if __name__ == '__main__': + main() diff --git a/python/paddle/v2/__init__.py b/python/paddle/v2/__init__.py index b2ea87b086101d71e89c33ce7c1f4eb21afade5a..72f1168e94f2a7de627551486b7dd6a5bc92940c 100644 --- a/python/paddle/v2/__init__.py +++ b/python/paddle/v2/__init__.py @@ -11,7 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import optimizer +import parameters +import py_paddle.swig_paddle as api +import trainer +import event + +__all__ = ['optimizer', 'parameters', 'init', 'trainer', 'event'] + + +def init(**kwargs): + args = [] + for key in kwargs.keys(): + args.append('--%s=%s' % (key, str(kwargs[key]))) -__all__ = ['optimizer'] + api.initPaddle(*args) diff --git a/python/paddle/v2/event.py b/python/paddle/v2/event.py new file mode 100644 index 0000000000000000000000000000000000000000..a16cfa91f062a60a141ea8fa962b3ecf6f5f0a22 --- /dev/null +++ b/python/paddle/v2/event.py @@ -0,0 +1,26 @@ +""" +All training events. + +There are: + +* BeginTraining +* EndTraining +* BeginIteration +* EndIteration +* BeginPass +* EndPass + +TODO(yuyang18): Complete it! +""" +__all__ = ['EndIteration'] + + +class EndIteration(object): + """ + Event On One Batch Training Complete. + """ + + def __init__(self, pass_id, batch_id, cost): + self.pass_id = pass_id + self.batch_id = batch_id + self.cost = cost diff --git a/python/paddle/v2/parameters.py b/python/paddle/v2/parameters.py new file mode 100644 index 0000000000000000000000000000000000000000..e5b7dabcb8eb3a845dedea663f978e7a9820495d --- /dev/null +++ b/python/paddle/v2/parameters.py @@ -0,0 +1,265 @@ +import numpy as np + +from paddle.proto.ModelConfig_pb2 import ModelConfig +from paddle.proto.ParameterConfig_pb2 import ParameterConfig +import py_paddle.swig_paddle as api + +__all__ = ['Parameters', 'create'] + + +def create(*topologies): + """ + Create parameter pool by topologies. + + :param topologies: + :return: + """ + pool = Parameters() + for topo in topologies: + if not isinstance(topo, ModelConfig): + raise ValueError( + 'create must pass a topologies which type is ModelConfig') + + for param in topo.parameters: + pool.__append_config__(param) + return pool + + +class Parameters(object): + """ + Parameters is a dictionary contains Paddle's parameter. The key of + Parameters is the name of parameter. The value of Parameters is a plain + :code:`numpy.ndarry` . + + Basically usage is + + .. code-block:: python + + data = paddle.layers.data(...) + ... + out = paddle.layers.fc(...) + + parameters = paddle.parameters.create(out) + + parameter_names = parameters.names() + fc_mat = parameters.get('fc') + print fc_mat + """ + + def __init__(self): + self.__param_conf__ = dict() + self.__gradient_machines__ = [] + self.__tmp_params__ = [] + + def __append_config__(self, param_conf): + """ + Append a parameter configuration. It used to initialize Parameters and + should be invoked only in paddle.parameters.create + + :param param_conf: The parameter configuration in protobuf + :type param_conf: ParameterConfig + :return: Nothing + """ + + if not isinstance(param_conf, ParameterConfig): + raise ValueError("param_conf must be paddle.proto.ParameterConfig") + + if param_conf.name in self.__param_conf__: + raise ValueError("duplicated parameter %s" % param_conf.name) + + self.__param_conf__[param_conf.name] = param_conf + + def keys(self): + """ + keys are the names of each parameter. + :return: list of parameter name + :rtype: list + """ + return self.__param_conf__.keys() + + def names(self): + """ + names of each parameter. + :return: list of parameter name + :rtype: list + """ + return self.keys() + + def has_key(self, key): + """ + has_key return true if there are such parameter name == key + :param key: Parameter name + :type key: basestring + :return: True if contains such key + """ + return key in self.__param_conf__.keys() + + def __iter__(self): + """ + Return an iterator of parameter name. It is used by `for loop` + or `in` operator. + + .. code-block:: python + + parameters = paddle.parameters.create(...) + if "fc_param" in parameters: + print 'OK' + :return: an iterator of parameter name + :rtype: iterator + """ + return iter(self.__param_conf__) + + def __getitem__(self, key): + """ + Get parameter by parameter name. It uses Python dict syntax. + + :note: It will always copy the parameter from C++ side. + :param key: Parameter name + :type key: basestring + :return: parameter value + :rtype: np.ndarray + """ + shape = self.get_shape(key) + + if len(self.__gradient_machines__) == 0: + # create new parameter in python numpy. + return np.ndarray(shape=shape, dtype=np.float32) + else: + for each_gradient_machine in self.__gradient_machines__: + param = __get_parameter_in_gradient_machine__( + each_gradient_machine, key) + # for simplify implementation now, we always copy from C++ + assert isinstance(param, api.Parameter) + val = param.getBuf(api.PARAMETER_VALUE) + assert isinstance(val, api.Vector) + val = val.copyToNumpyArray() + return val + # else continue + + raise RuntimeError("Unexpected branch") + + def get_shape(self, key): + """ + get shape of the parameter. + :param key: parameter name + :type key: basestring + :return: parameter's shape + :rtype: tuple + """ + if not isinstance(key, basestring): + raise ValueError("parameter name should be string") + if not self.has_key(key): + raise ValueError("No such parameter %s" % key) + conf = self.__param_conf__[key] + return tuple(map(int, conf.dims)) + + def __setitem__(self, key, value): + """ + Set parameter by parameter name & value. It use Python dict syntax. + + :note: It will always copy the parameter to C++ side. + :param key: Parameter name + :type key: basestring + :param value: Parameter matrix. + :type value: np.ndarray + :return: Nothing + """ + + if not isinstance(value, np.ndarray): + raise ValueError("Must return ndarray") + value = value.astype(dtype=np.float32) + shape = self.get_shape(key) + if value.shape != shape: + raise ValueError("Value shape mismatch, expect %s, should %s" % + (shape, value.shape)) + + if len(self.__gradient_machines__) == 0: + self.__tmp_params__.append((key, value)) + else: + for each_gradient_machine in self.__gradient_machines__: + __copy_parameter_to_gradient_machine__(each_gradient_machine, + key, value) + + def get(self, parameter_name): + """ + Get parameter by parameter name. + + :note: It will always copy the parameter from C++ side. + :param parameter_name: parameter name + :type parameter_name: basestring + :return: The parameter matrix. + :rtype: np.ndarray + """ + return self.__getitem__(key=parameter_name) + + def set(self, parameter_name, value): + """ + Set parameter by parameter name & matrix. + :param parameter_name: parameter name + :type parameter_name: basestring + :param value: parameter matrix + :type value: np.ndarray + :return: Nothing. + """ + self.__setitem__(key=parameter_name, value=value) + + def append_gradient_machine(self, gradient_machine): + """ + append gradient machine to parameters. This method is used internally in + Trainer.train. + + :param gradient_machine: Paddle C++ GradientMachine object. + :type gradient_machine: api.GradientMachine + :return: + """ + + if not isinstance(gradient_machine, api.GradientMachine): + raise ValueError("gradient_machine should be api.GradientMachine") + + if len(self.__tmp_params__) != 0: + for name, val in self.__tmp_params__: + try: + __copy_parameter_to_gradient_machine__(gradient_machine, + name, val) + except ValueError: + # If no such parameter in gradient machine, then don't copy + pass + self.__gradient_machines__.append(gradient_machine) + + +def __get_parameter_in_gradient_machine__(gradient_machine, name): + """ + + :param gradient_machine: + :type gradient_machine: api.GradientMachine + :param name: + :return: + :rtype: api.Parameter + """ + params = filter(lambda p: p.getName() == name, + gradient_machine.getParameters()) + + if len(params) == 0: + raise ValueError("No such parameter") + elif len(params) > 1: + raise ValueError("Unexpected branch") + else: + return params[0] + + +def __copy_parameter_to_gradient_machine__(gradient_machine, name, arr): + """ + Copy a python ndarray into the gradient machine. + + :param gradient_machine: + :type gradient_machine: api.GradientMachine + :param name: + :param arr: + :type arr: np.ndarray + :return: + :rtype: api.Parameter + """ + param = __get_parameter_in_gradient_machine__(gradient_machine, name) + vec = param.getBuf(api.PARAMETER_VALUE) + assert isinstance(vec, api.Vector) + vec.copyFromNumpyArray(arr.flatten()) diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py new file mode 100644 index 0000000000000000000000000000000000000000..9ba13dc5c8a81f8dcf39260d1a44dcdcc7c22ad5 --- /dev/null +++ b/python/paddle/v2/trainer.py @@ -0,0 +1,185 @@ +import collections + +import py_paddle.swig_paddle as api +from py_paddle import DataProviderConverter + +from paddle.proto.ModelConfig_pb2 import ModelConfig +from . import optimizer as v2_optimizer +from . import parameters as v2_parameters +from . import event as v2_event + +__all__ = ['ITrainer', 'SGD'] + + +def default_event_handler(event): + """ + Default event handler. It will print some log and save mode. + + TODO(yuyang18): Complete it! + :param event: + :return: + """ + pass + + +class ITrainer(object): + """ + The interface of Trainer. The only exposed method is `train`. + """ + + def train(self, + train_data_reader, + topology, + parameters, + test_data_reader=None, + event_handler=None): + """ + train method. + + :param train_data_reader: + :param topology: + :param parameters: + :param test_data_reader: + :param event_handler: + :return: + """ + + raise NotImplementedError() + + +class SGD(ITrainer): + def __init__(self, update_equation): + """ + Simple SGD Trainer. + + :param update_equation: The optimizer object. + :type update_equation: v2_optimizer.Optimizer + """ + if not isinstance(update_equation, v2_optimizer.Optimizer): + raise ValueError("update equation parameter must be " + "paddle.v2.optimizer.Optimizer") + self.__optimizer__ = update_equation + + def train(self, + train_data_reader, + topology, + parameters, + num_passes=1, + test_data_reader=None, + event_handler=None, + batch_size=32, + data_types=None): + """ + Training method. Will train num_passes of input data. + + :param train_data_reader: + :param topology: Network Topology, a protobuf ModelConfig message. + :param parameters: The parameter pools. + :param num_passes: The total train passes. + :param test_data_reader: + :param event_handler: Event handler. A method will be invoked when event + occurred. + :type event_handler: (BaseEvent) => None + :param batch_size: Not important, will be removed after data refactor. + :param data_types: Not important, will be removed after data refactor. + :return: + """ + if event_handler is None: + event_handler = default_event_handler + + __check_train_args__(**locals()) + + gm = api.GradientMachine.createFromConfigProto( + topology, api.CREATE_MODE_NORMAL, self.__optimizer__.enable_types()) + assert isinstance(gm, api.GradientMachine) + parameters.append_gradient_machine(gm) + + updater = self.__optimizer__.create_local_updater() + updater.init(gm) + + gm.start() + out_args = api.Arguments.createArguments(0) + + data_types_lists = [] + for each in topology.input_layer_names: + if each not in data_types: + raise ValueError() + data_types_lists.append(data_types[each]) + + converter = DataProviderConverter(input_types=data_types_lists) + + for pass_id in xrange(num_passes): + updater.startPass() + for batch_id, data_batch in enumerate( + __data_reader_to_batch__(train_data_reader, batch_size, + topology)): + pass_type = updater.startBatch(len(data_batch)) + gm.forwardBackward(converter(data_batch), out_args, pass_type) + for each_param in gm.getParameters(): + updater.update(each_param) + # Get cost. We use numpy to calculate total cost for this batch. + cost_vec = out_args.getSlotValue(0) + cost_vec = cost_vec.copyToNumpyMat() + cost = cost_vec.sum() / len(data_batch) + updater.finishBatch(cost) + event_handler( + v2_event.EndIteration( + pass_id=pass_id, batch_id=batch_id, cost=cost)) + + updater.finishPass() + gm.finish() + + +def __data_reader_to_batch__(reader, batch_size, topology): + """ + This function is not important, and will be removed when data refactored. + """ + + def input_reorder(func): + for item in func(): + retv = [] + for __layer_name__ in topology.input_layer_names: + retv.append(item[__layer_name__]) + yield retv + + return __generator_to_batch__(input_reorder(reader), batch_size=batch_size) + + +def __generator_to_batch__(generator, batch_size): + """ + This function is not important, and will be removed when data refactored. + """ + ret_val = list() + for each_item in generator: + ret_val.append(each_item) + if len(ret_val) == batch_size: + yield ret_val + ret_val = list() + if len(ret_val) != 0: + yield ret_val + + +def __check_train_args__(train_data_reader, topology, parameters, + test_data_reader, event_handler, **kwargs): + """ + Check train function's argument types + """ + if not callable(train_data_reader) or not isinstance(train_data_reader(), + collections.Iterator): + raise ValueError('train_data_reader should be a function, ' + 'which can return a iterator') + + if test_data_reader is not None: + if not callable(test_data_reader) or not isinstance( + test_data_reader(), collections.Iterator): + raise ValueError('test_data_reader should be a function, which can ' + 'return a iterator') + + if not isinstance(topology, ModelConfig): + raise ValueError('topology should be a model config') + + if not isinstance(parameters, v2_parameters.Parameters): + raise ValueError('parameters should be a parameter pool') + + if not callable(event_handler): + raise ValueError('event handler should be a function')