diff --git a/.travis.yml b/.travis.yml index 5d82d9729b75ef493a0bd03921c453f9a519c8cd..4fb2ca938795bb6a69f7d7991aee9f7386947bf2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ cache: - $HOME/third_party - $HOME/.ccache - $HOME/.cache/pip + - $HOME/Library/Caches/Homebrew sudo: required dist: trusty os: diff --git a/demo/mnist/api_train_v2.py b/demo/mnist/api_train_v2.py index 59486ed1b3ba494a20b06b7ef7027fc3e86c043c..b5cc74ce67dfc8e1afa65bd52f5ec600260032ce 100644 --- a/demo/mnist/api_train_v2.py +++ b/demo/mnist/api_train_v2.py @@ -25,8 +25,7 @@ def main(): act=paddle.activation.Softmax()) cost = paddle.layer.classification_cost(input=inference, label=label) - topology = paddle.layer.parse_network(cost) - parameters = paddle.parameters.create(topology) + parameters = paddle.parameters.create(cost) for param_name in parameters.keys(): array = parameters.get(param_name) array[:] = numpy.random.uniform(low=-1.0, high=1.0, size=array.shape) @@ -46,7 +45,7 @@ def main(): trainer = paddle.trainer.SGD(update_equation=adam_optimizer) trainer.train(train_data_reader=train_reader, - topology=topology, + topology=cost, parameters=parameters, event_handler=event_handler, batch_size=32, # batch size should be refactor in Data reader diff --git a/doc/design/reader/README.md b/doc/design/reader/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8f7abf12f733542734efe91111f365a34aa4b15b --- /dev/null +++ b/doc/design/reader/README.md @@ -0,0 +1,144 @@ +# Python Data Reader Design Doc + +Paddle reads data from data reader during training. It will be passed into `paddle.train` as a parameter. + +## Data Reader Interface + +Data reader is a function with no parameter that creates a iterable (anything can be used in `for x in iterable`): + +``` +iterable = data_reader() +``` + +Element produced for the iterable should be a **single** entry of data, **not** a mini batch. That entry of data could be a single item, or a tuple of items. Item should be of [supported type](http://www.paddlepaddle.org/doc/ui/data_provider/pydataprovider2.html?highlight=dense_vector#input-types) (e.g., numpy 1d array of float32, int, list of int) + +An example implementation for single item data reader: + +```python +def data_reader_fake_image(): + while True: + yield numpy.random.uniform(-1, 1, size=20*20) +``` + +An example implementation for multiple item data reader: +```python +def data_reader_fake_image_and_label(): + while True: + yield numpy.random.uniform(-1, 1, size=20*20), False +``` + +## Usage + +data reader, mapping from item(s) read to data layer, batch size and number of total pass will be passed into `paddle.train`: + +```python +# two data layer is created: +image_layer = paddle.layer.data("image", ...) +label_layer = paddle.layer.data("label", ...) + +# ... + +paddle.train(paddle.dataset.mnist, {"image":0, "label":1}, 128, 10, ...) +``` + +## Data Reader Decorators + +Data reader decorators takes a single or multiple data reader, returns a new data reader. It is similar to a [python decorator](https://wiki.python.org/moin/PythonDecorators), but it does not use `@` syntax. + +Since we have a strict interface for data readers (no parameter, return a single data item). Data reader can be used flexiable via data reader decorators. Following are a few examples: + +### Prefetch Data + +Since reading data may take time and training can not proceed without data. It is generally a good idea to prefetch data. + +Use `paddle.reader.buffered` to prefetch data: + +```python +buffered_reader = paddle.reader.buffered(paddle.dataset.mnist, 100) +``` + +`buffered_reader` will try to buffer (prefetch) `100` data entries. + +### Compose Multiple Data Readers + +For example, we want to use a source of real images (reusing mnist dataset), and a source of fake images as input for [Generative Adversarial Networks](https://arxiv.org/abs/1406.2661). + +We can do: + +```python +def data_reader_fake_image(): + while True: + yield numpy.random.uniform(-1, 1, size=20*20) + +def data_reader_bool(t): + while True: + yield t + +true_reader = lambda : data_reader_bool(True) +false_reader = lambda : data_reader_bool(False) + +reader = paddle.reader.combine(paddle.dataset.mnist, data_reader_fake_image, true_reader, false_reader) +# Skipped 1 because paddle.dataset.mnist produces two items per data entry. +# And we don't care second item at this time. +paddle.train(reader, {"true_image":0, "fake_image": 2, "true_label": 3, "false_label": 4}, ...) +``` + +### Shuffle + +Given shuffle buffer size `n`, `paddle.reader.shuffle` will return a data reader that buffers `n` data entries and shuffle them before a data entry is read. + +Example: +```python +reader = paddle.reader.shuffle(paddle.dataset.mnist, 512) +``` + +## Q & A + +### Why return only a single entry, but not a mini batch? + +If a mini batch is returned, data reader need to take care of batch size. But batch size is a concept for training, it makes more sense for user to specify batch size as a parameter for `train`. + +Practically, always return a single entry make reusing existing data reader much easier (e.g., if existing data reader return not a single entry but 3 entries, training code will be more complex because it need to handle cases like batch size 2). + +### Why use a dictionary but not a list to provide mapping? + +We decided to use dictionary (`{"image":0, "label":1}`) instead of list (`["image", "label"]`) is because that user can easily resue item (e.g., using `{"image_a":0, "image_b":0, "label":1}`) or skip item (e.g., using `{"image_a":0, "label":2}`). + +### How to create custom data reader + +```python +def image_reader(image_path, label_path): + f = open(image_path) + l = open(label_path) + images = numpy.fromfile( + f, 'ubyte', count=n * 28 * 28).reshape((n, 28 * 28)).astype('float32') + images = images / 255.0 * 2.0 - 1.0 + labels = numpy.fromfile(l, 'ubyte', count=n).astype("int") + for i in xrange(n): + yield images[i, :], labels[i] # a single entry of data is created each time + f.close() + +# use python lambda to change image_reader into a function with no parameters. +reader = lambda : image_reader("/path/to/image_file", "/path/to/label_file") +paddle.train(reader, {"image":0, "label":1}, ...) +``` + +### How is `paddle.train` implemented + +An example implementation of paddle.train could be: + +```python +def minibatch_decorater(reader, minibatch_size): + def ret(): + r = reader() + buf = [r.next() for x in xrange(minibatch_size)] + while len(buf) > 0: + yield buf + buf = [r.next() for x in xrange(minibatch_size)] + return ret + +def train(reader, mapping, batch_size, total_pass): + for pass_idx in range(total_pass): + for mini_batch in minibatch_decorater(reader): # this loop will never end in online learning. + do_forward_backward(mini_batch, mapping) +``` diff --git a/paddle/gserver/evaluators/Evaluator.cpp b/paddle/gserver/evaluators/Evaluator.cpp index 42a877954b6c3ece3f3b728e4af036f367ba37e9..89f95438019d8c6a038c9c8204b6ae391f47196e 100644 --- a/paddle/gserver/evaluators/Evaluator.cpp +++ b/paddle/gserver/evaluators/Evaluator.cpp @@ -1012,93 +1012,18 @@ static InitFunction __reg_type_auc_sum__([]() { * * The config file api is value_printer_evaluator. */ -class ValuePrinter : public Evaluator { +class ValuePrinter : public NotGetableEvaluator { public: virtual void eval(const NeuralNetwork& nn) { - layerOutputs_.clear(); for (const std::string& name : config_.input_layers()) { - auto& argu = nn.getLayer(name)->getOutput(); - layerOutputs_[name] = std::unordered_map(); - auto& out = layerOutputs_[name]; - argu.getValueString(&out); - for (auto field : {"value", "id", "sequence pos", "sub-sequence pos"}) { - auto it = out.find(field); - if (it != out.end()) { - LOG(INFO) << "layer=" << name << " " << field << ":\n" << it->second; - } - } + nn.getLayer(name)->getOutput().printValueString(LOG(INFO), + "layer=" + name + " "); } } virtual void updateSamplesNum(const std::vector& arguments) {} virtual real evalImp(std::vector& arguments) { return 0; } - -private: - std::unordered_map> - layerOutputs_; - - // Evaluator interface -public: - void getNames(std::vector* names) { - for (auto layerIt = layerOutputs_.begin(); layerIt != layerOutputs_.end(); - ++layerIt) { - for (auto it = layerIt->second.begin(); it != layerIt->second.end(); - ++it) { - names->push_back(config_.name() + "." + layerIt->first + "." + - it->second); - } - } - } - - real getValue(const std::string& name, Error* err) const { - (void)(name); - if (err != nullptr) { - *err = Error( - "ValuePrinter do not support getValue, use getValueString instead."); - } - return .0f; - } - std::string getValueStr(const std::string& name, Error* err) const { - std::vector buffer; - str::split(name, '.', &buffer); - if (buffer.size() < 2) { - if (err != nullptr) { - *err = Error("No such key %s", name.c_str()); - } - return ""; - } - auto fieldName = buffer[buffer.size() - 1]; - auto layerName = buffer[buffer.size() - 2]; - auto layerIt = layerOutputs_.find(layerName); - if (layerIt == layerOutputs_.end()) { - if (err != nullptr) { - *err = Error("No such layer %s", layerName.c_str()); - } - return ""; - } - - auto fieldIt = layerIt->second.find(fieldName); - if (fieldIt == layerIt->second.end()) { - if (err != nullptr) { - *err = Error("No such value field %s", fieldName.c_str()); - } - return ""; - } - - return fieldIt->second; - } - std::string getType(const std::string& name, Error* err) const { - Error localErr; - if (err == nullptr) { - err = &localErr; - } - this->getValueStr(name, err); - if (!err->isOK()) { - return ""; - } - return "value_printer"; - } }; REGISTER_EVALUATOR(value_printer, ValuePrinter); diff --git a/paddle/gserver/layers/PrintLayer.cpp b/paddle/gserver/layers/PrintLayer.cpp index f1f3dd412c67a9570831a3299905487c5ddacc6b..de198af111be4200dd1b240f6de9464e3f43b06d 100644 --- a/paddle/gserver/layers/PrintLayer.cpp +++ b/paddle/gserver/layers/PrintLayer.cpp @@ -19,25 +19,17 @@ namespace paddle { class PrintLayer : public Layer { public: explicit PrintLayer(const LayerConfig& config) : Layer(config) {} - void forward(PassType passType) override; - void backward(const UpdateCallback& callback) override {} -}; -void PrintLayer::forward(PassType passType) { - Layer::forward(passType); - for (size_t i = 0; i != inputLayers_.size(); ++i) { - auto& argu = getInput(i); - const std::string& name = inputLayers_[i]->getName(); - std::unordered_map out; - argu.getValueString(&out); - for (auto field : {"value", "id", "sequence pos", "sub-sequence pos"}) { - auto it = out.find(field); - if (it != out.end()) { - LOG(INFO) << "layer=" << name << " " << field << ":\n" << it->second; - } + void forward(PassType passType) override { + Layer::forward(passType); + for (size_t i = 0; i != inputLayers_.size(); ++i) { + getInput(i).printValueString(LOG(INFO), + "layer=" + inputLayers_[i]->getName() + " "); } } -} + + void backward(const UpdateCallback& callback) override {} +}; REGISTER_LAYER(print, PrintLayer); diff --git a/paddle/gserver/layers/SequenceConcatLayer.cpp b/paddle/gserver/layers/SequenceConcatLayer.cpp index b4677687a6cc7755fdb7584a9524de9b65a0c627..599706eb419ede72dbd6f4c8c74e57f5f9965388 100644 --- a/paddle/gserver/layers/SequenceConcatLayer.cpp +++ b/paddle/gserver/layers/SequenceConcatLayer.cpp @@ -168,13 +168,17 @@ void SequenceConcatLayer::backward(const UpdateCallback& callback) { size_t rightNumIns = 0; for (size_t seqId = 0; seqId < numSequences1; ++seqId) { leftNumIns = starts1[seqId + 1] - starts1[seqId]; - inputGrad1->subMatrix(starts1[seqId], leftNumIns) - ->add(*(outputGrad->subMatrix(offset, leftNumIns))); + if (inputGrad1) { + inputGrad1->subMatrix(starts1[seqId], leftNumIns) + ->add(*(outputGrad->subMatrix(offset, leftNumIns))); + } offset += leftNumIns; rightNumIns = starts2[seqId + 1] - starts2[seqId]; - inputGrad2->subMatrix(starts2[seqId], rightNumIns) - ->add(*(outputGrad->subMatrix(offset, rightNumIns))); + if (inputGrad2) { + inputGrad2->subMatrix(starts2[seqId], rightNumIns) + ->add(*(outputGrad->subMatrix(offset, rightNumIns))); + } offset += rightNumIns; } } diff --git a/paddle/parameter/Argument.cpp b/paddle/parameter/Argument.cpp index e9de0f66987603fdfd3c84853b531330f1852896..7a343cca33f5b420be6192231ac73ca1c2da5fb9 100644 --- a/paddle/parameter/Argument.cpp +++ b/paddle/parameter/Argument.cpp @@ -628,6 +628,18 @@ void Argument::getValueString( } } +void Argument::printValueString(std::ostream& stream, + const std::string& prefix) const { + std::unordered_map out; + getValueString(&out); + for (auto field : {"value", "id", "sequence pos", "sub-sequence pos"}) { + auto it = out.find(field); + if (it != out.end()) { + stream << prefix << field << ":\n" << it->second; + } + } +} + void Argument::subArgFrom(const Argument& input, size_t offset, size_t height, diff --git a/paddle/parameter/Argument.h b/paddle/parameter/Argument.h index c751dbb855d31d6b6a765de97aca2f5ba2aa3586..178c068b93ac5fc1e06200984f14da86069cf7e4 100644 --- a/paddle/parameter/Argument.h +++ b/paddle/parameter/Argument.h @@ -305,6 +305,15 @@ struct Argument { * @param out [out]: the return values. */ void getValueString(std::unordered_map* out) const; + + /** + * @brief printValueString will print the argument's output in order of + * 'value', 'id', 'sequence pos', 'sub-sequence pos'. + * @param stream: Output stream + * @param prefix: line prefix for printing. + */ + void printValueString(std::ostream& stream, + const std::string& prefix = "") const; }; } // namespace paddle diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index ee7a5bff84ca96ef1010fa7430356722f807fb0f..357637e20346f8e1179d3a28ff580722cdfcccff 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -24,6 +24,7 @@ add_custom_target(paddle_python ALL DEPENDS ${OUTPUT_DIR}/.timestamp) add_subdirectory(paddle/trainer_config_helpers/tests) +add_subdirectory(paddle/reader/tests) install(DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/dist/ DESTINATION opt/paddle/share/wheels diff --git a/python/paddle/reader/__init__.py b/python/paddle/reader/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..493b410e8299ebe167be43ead1401a6ab245a631 --- /dev/null +++ b/python/paddle/reader/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# It would be too lengthy to require our users to prefix decorators with `decorator`. +# For example, we want the following line +# +# r = paddle.reader.decorator.bufferd(paddle.reader.creator.text("hello.txt")) +# +# to be a shorter version: +# +# r = paddle.reader.buffered(paddle.reader.creator.text("hello.txt")) +from decorator import * diff --git a/python/paddle/reader/decorator.py b/python/paddle/reader/decorator.py new file mode 100644 index 0000000000000000000000000000000000000000..f0ddb0ff812b15ede21e6965c7c8857f12716fa0 --- /dev/null +++ b/python/paddle/reader/decorator.py @@ -0,0 +1,60 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = ['buffered'] + +from Queue import Queue +from threading import Thread + + +def buffered(reader, size): + """Creates a buffered data reader. + + The buffered data reader will read and save data entries into a buffer. + Reading from the buffered data reader will proceed as long as the buffer + is not empty. + + Args: + reader: the data reader to read from. + size: max buffer size. + + Returns: + The buffered data reader. + """ + + class EndSignal(): + pass + + end = EndSignal() + + def read_worker(r, q): + for d in r: + q.put(d) + q.put(end) + + def create_reader(): + r = reader() + q = Queue(maxsize=size) + t = Thread( + target=read_worker, args=( + r, + q, )) + t.daemon = True + t.start() + e = q.get() + while e != end: + yield e + e = q.get() + + return create_reader diff --git a/python/paddle/reader/tests/CMakeLists.txt b/python/paddle/reader/tests/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..502c897d8946a838847c1c23b1236358c58c088e --- /dev/null +++ b/python/paddle/reader/tests/CMakeLists.txt @@ -0,0 +1,4 @@ +add_test(NAME reader_decorator_test + COMMAND ${PROJ_ROOT}/paddle/.set_python_path.sh -d ${PROJ_ROOT}/python/ + ${PYTHON_EXECUTABLE} ${PROJ_ROOT}/python/paddle/reader/tests/decorator_test.py + WORKING_DIRECTORY ${PROJ_ROOT}/python/paddle) diff --git a/python/paddle/reader/tests/decorator_test.py b/python/paddle/reader/tests/decorator_test.py new file mode 100644 index 0000000000000000000000000000000000000000..879d1d9c1d0e0650d347b5c44e36771a0c15390e --- /dev/null +++ b/python/paddle/reader/tests/decorator_test.py @@ -0,0 +1,50 @@ +# Copyright PaddlePaddle contributors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +import paddle.reader +import time + + +def reader_10(dur): + for i in range(10): + time.sleep(dur) + yield i + + +class TestBuffered(unittest.TestCase): + def test_read(self): + for size in range(20): + b = paddle.reader.buffered(lambda: reader_10(0), size) + c = 0 + for i in b(): + self.assertEqual(i, c) + c += 1 + self.assertEqual(c, 10) + + def test_buffering(self): + # read have 30ms delay. + b = paddle.reader.buffered(lambda: reader_10(0.03), 10) + last_time = time.time() + for idx, i in enumerate(b()): + elapsed_time = time.time() - last_time + if i == 0: + time.sleep(0.3) + else: + # read time should be short, meaning already buffered. + self.assertLess(elapsed_time, 0.01) + last_time = time.time() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/parameters.py b/python/paddle/v2/parameters.py index e5b7dabcb8eb3a845dedea663f978e7a9820495d..ea504d5104716d157add87ed3f6e31ea69e0a3f0 100644 --- a/python/paddle/v2/parameters.py +++ b/python/paddle/v2/parameters.py @@ -1,27 +1,27 @@ import numpy as np - -from paddle.proto.ModelConfig_pb2 import ModelConfig -from paddle.proto.ParameterConfig_pb2 import ParameterConfig +from . import layer as v2_layer import py_paddle.swig_paddle as api +from paddle.proto.ParameterConfig_pb2 import ParameterConfig __all__ = ['Parameters', 'create'] -def create(*topologies): +def create(*layers): """ - Create parameter pool by topologies. + Create parameter pool by layers. In paddle, layer can be represent a + model config. - :param topologies: + :param layers: :return: """ - pool = Parameters() - for topo in topologies: - if not isinstance(topo, ModelConfig): + for layer in layers: + if not isinstance(layer, v2_layer.Layer): raise ValueError( - 'create must pass a topologies which type is ModelConfig') - - for param in topo.parameters: - pool.__append_config__(param) + 'create must pass a topologies which type is paddle.layer.Layer') + model_config = v2_layer.parse_network(*layers) + pool = Parameters() + for param in model_config.parameters: + pool.__append_config__(param) return pool diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 9ba13dc5c8a81f8dcf39260d1a44dcdcc7c22ad5..4365bd41e7073bce4112e5813dbf1517856c06f5 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -1,12 +1,13 @@ import collections import py_paddle.swig_paddle as api +from paddle.proto.ModelConfig_pb2 import ModelConfig from py_paddle import DataProviderConverter -from paddle.proto.ModelConfig_pb2 import ModelConfig +from . import event as v2_event +from . import layer as v2_layer from . import optimizer as v2_optimizer from . import parameters as v2_parameters -from . import event as v2_event __all__ = ['ITrainer', 'SGD'] @@ -73,7 +74,7 @@ class SGD(ITrainer): Training method. Will train num_passes of input data. :param train_data_reader: - :param topology: Network Topology, a protobuf ModelConfig message. + :param topology: Network Topology, use one or more Layers to represent it. :param parameters: The parameter pools. :param num_passes: The total train passes. :param test_data_reader: @@ -87,6 +88,8 @@ class SGD(ITrainer): if event_handler is None: event_handler = default_event_handler + topology = v2_layer.parse_network(topology) + __check_train_args__(**locals()) gm = api.GradientMachine.createFromConfigProto(