From 90b9cba7fe2b516172f76e56ef45a5263ca432c1 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 20 Sep 2016 00:51:14 +0800 Subject: [PATCH] Add min_pool_size, Add default value of should_shuffle (#70) * min_pool_size would be infinite by default. * add unittest for min_pool_size * Fix bug in can_over_batch_size * add unittest for can_over_batch_size * Add DEFINE_PROVIDER_EX * Add default value of should_shuffle * When training, the default value of should_shuffle is True. * When testing, the default value of should_shuffle is False. * User a set a provider should_shuffle or not by pass it to `@provider` * should_shuffle can handle a list of value, not just boolean * Add input order mapping by using name * Add unittest * Add check to check input format. * Default is close for speed reason. * User could stop train when check error, or continue train without this train sample. * use deque instead of vector in generators pool, make erase generator faster. * Add chinese/english documentation * Make should shuffle = false in unittest * Add python files to depends. --- CMakeLists.txt | 2 +- doc/ui/data_provider/pydataprovider2.rst | 37 ++-- doc_cn/ui/data_provider/mnist_config.py | 2 + .../ui/data_provider/mnist_provider.dict.py | 25 +++ doc_cn/ui/data_provider/pydataprovider2.rst | 71 ++++++- paddle/gserver/dataproviders/DataProvider.cpp | 10 +- paddle/gserver/dataproviders/DataProvider.h | 39 +++- .../dataproviders/MultiDataProvider.cpp | 10 +- .../gserver/dataproviders/MultiDataProvider.h | 4 +- .../gserver/dataproviders/PyDataProvider2.cpp | 103 +++++++--- paddle/gserver/tests/rnn_data_provider.py | 8 +- paddle/gserver/tests/sequenceGen.py | 18 +- paddle/gserver/tests/test_PyDataProvider2.cpp | 118 +++++++++++ paddle/gserver/tests/test_PyDataProvider2.py | 45 ++++- paddle/trainer/Trainer.cpp | 4 +- paddle/utils/PythonUtil.h | 22 ++- python/CMakeLists.txt | 10 +- python/paddle/trainer/PyDataProvider2.py | 183 +++++++++++++++++- 18 files changed, 631 insertions(+), 80 deletions(-) create mode 100644 doc_cn/ui/data_provider/mnist_provider.dict.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bb73f8b98..92c866da8f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 2.8) project(paddle CXX C) set(PADDLE_MAJOR_VERSION 0) set(PADDLE_MINOR_VERSION 8) -set(PADDLE_PATCH_VERSION 0b0) +set(PADDLE_PATCH_VERSION 0b1) set(PADDLE_VERSION ${PADDLE_MAJOR_VERSION}.${PADDLE_MINOR_VERSION}.${PADDLE_PATCH_VERSION}) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake") diff --git a/doc/ui/data_provider/pydataprovider2.rst b/doc/ui/data_provider/pydataprovider2.rst index 152f8a6df6..e105d3be30 100644 --- a/doc/ui/data_provider/pydataprovider2.rst +++ b/doc/ui/data_provider/pydataprovider2.rst @@ -24,7 +24,7 @@ A small part of the original data as an example is shown as below: .. literalinclude:: ../../../doc_cn/ui/data_provider/mnist_train.txt -Each line of the data contains two parts, separated by ';'. The first part is +Each line of the data contains two parts, separated by :code:`;`. The first part is label of an image. The second part contains 28x28 pixel float values. Just write path of the above data into train.list. It looks like this: @@ -74,7 +74,20 @@ you can take this as an example. .. literalinclude:: ../../../doc_cn/ui/data_provider/mnist_config.py -Here we specify training data by 'train.list', and no testing data is specified. +Here we specify training data by :code:`train.list`, and no testing data is specified. +The method which actually provide data is :code:`process`. + +User also can use another style to provide data, which defines the +:code:`data_layer`'s name explicitly when `yield`. For example, +the :code:`dataprovider` is shown as below. + +.. literalinclude:: ../../../doc_cn/ui/data_provider/mnist_provider.dict.py + :linenos: + +If user did't give the :code:`data_layer`'s name, PaddlePaddle will use +the order of :code:`data_layer` definition roughly to determine which feature to +which :code:`data_layer`. This order may be not correct, so TO DEFINE THE +:code:`data_layer`'s NAMES EXPLICITLY IS THE RECOMMANDED WAY TO PROVIDER DATA. Now, this simple example of using PyDataProvider is finished. The only thing that the user should know is how to generte **one sample** from @@ -93,7 +106,7 @@ DataProvider for the sequential model ------------------------------------- A sequence model takes sequences as its input. A sequence is made up of several timesteps. The so-called timestep, is not necessary to have something to do -with 'time'. It can also be explained to that the order of data are taken into +with time. It can also be explained to that the order of data are taken into consideration into model design and training. For example, the sentence can be interpreted as a kind of sequence data in NLP tasks. @@ -155,23 +168,7 @@ Reference @provider +++++++++ -'@provider' is a Python `Decorator`_, it can construct a PyDataProvider in -PaddlePaddle from a user defined function. Its parameters are: - -* `input_types`_ defines format of the data input. -* should_shuffle defines whether to shuffle data or not. By default, it is set - true during training, and false during testing. -* pool_size is the memory pool size (in sample number) in DataProvider. - -1 means no limit. -* can_over_batch_size defines whether PaddlePaddle can store little more - samples than pool_size. It is better to set True to avoid some deadlocks. -* calc_batch_size is a function define how to calculate batch size. This is - usefull in sequential model, that defines batch size is counted upon sequence - or token. By default, each sample or sequence counts to 1 when calculating - batch size. -* cache is a data cache strategy, see `cache`_. -* Init_hook function is invoked once the data provider is initialized, - see `init_hook`_. +.. autofunction:: paddle.trainer.PyDataProvider2.provider input_types +++++++++++ diff --git a/doc_cn/ui/data_provider/mnist_config.py b/doc_cn/ui/data_provider/mnist_config.py index 0f9094cd27..7ba344338c 100644 --- a/doc_cn/ui/data_provider/mnist_config.py +++ b/doc_cn/ui/data_provider/mnist_config.py @@ -4,3 +4,5 @@ define_py_data_sources2(train_list='train.list', test_list=None, module='mnist_provider', obj='process') +img = data_layer(name='pixel', size=784) +label = data_layer(name='label', size=10) diff --git a/doc_cn/ui/data_provider/mnist_provider.dict.py b/doc_cn/ui/data_provider/mnist_provider.dict.py new file mode 100644 index 0000000000..4eab5b1fd3 --- /dev/null +++ b/doc_cn/ui/data_provider/mnist_provider.dict.py @@ -0,0 +1,25 @@ +from paddle.trainer.PyDataProvider2 import * + + +# Define a py data provider +@provider(input_types=[ + dense_vector(28 * 28), + integer_value(10) +]) +def process(settings, filename): # settings is not used currently. + f = open(filename, 'r') # open one of training file + + for line in f: # read each line + label, pixel = line.split(';') + + # get features and label + pixels_str = pixel.split(' ') + + pixels_float = [] + for each_pixel_str in pixels_str: + pixels_float.append(float(each_pixel_str)) + + # give data to paddle. + yield { "pixel": pixels_float, 'label': int(label) } + + f.close() # close file diff --git a/doc_cn/ui/data_provider/pydataprovider2.rst b/doc_cn/ui/data_provider/pydataprovider2.rst index e743e41688..9e1d8c531f 100644 --- a/doc_cn/ui/data_provider/pydataprovider2.rst +++ b/doc_cn/ui/data_provider/pydataprovider2.rst @@ -56,6 +56,14 @@ process函数调用多次 :code:`yield` 即可。 :code:`yield` 是Python的一 这里说明了训练数据是 'train.list',而没有测试数据。引用的DataProvider是 'mnist_provider' 这个模块中的 'process' 函数。 +同时,根据模型配置文件中 :code:`data_layer` 的名字,用户也可以显式指定返回的数据对应关系。例如: + +.. literalinclude:: mnist_provider.dict.py + :linenos: + +如果用户不指定返回数据的对应关系,那么PaddlePaddle会粗略的根据layer的声明顺序, +来确定对应关系。这个对应关系可能不正确。所以推荐使用显式指定返回值和数据对应关系。 + 至此,简单的PyDataProvider样例就说明完毕了。对于用户来说,讲数据发送给PaddlePaddle,仅仅需要 知道如何从 **一个文件** 里面读取 **一条** 样本。而PaddlePaddle进程帮助用户做了 @@ -119,11 +127,13 @@ DataProvider创建的时候执行。这个初始化函数具有如下参数: @provider +++++++++ -'@provider'是一个Python的 `Decorator`_ ,他可以将某一个函数标记成一个PyDataProvider。它包含的参数有: +:code:`@provider` 是一个Python的 `Decorator`_ ,他可以将某一个函数标记成一个PyDataProvider。它包含的参数有: * `input_types`_ 是数据输入格式。具体有哪些格式,参考 `input_types`_ 。 * should_shuffle 是个DataProvider是不是要做shuffle,如果不设置的话,训练的时候默认shuffle, - 测试的时候默认不shuffle + 测试的时候默认不shuffle。 +* min_pool_size 是设置DataProvider在内存中最小暂存的数据条数。这个也是PaddlePaddle所能够保证的shuffle粒度。 + 设置成-1的话,会预先读取全部数据到内存中。 * pool_size 是设置DataProvider在内存中暂存的数据条数。设置成-1的话,即不在乎内存暂存多少条数据。 * can_over_batch_size 表示是否允许Paddle暂存略微多余pool_size的数据。这样做可以避免很多死锁问题。 一般推荐设置成True @@ -131,6 +141,11 @@ DataProvider创建的时候执行。这个初始化函数具有如下参数: 是一个batch size,但是有时为了计算均衡性,可以将一条数据设置成多个batch size * cache 是数据缓存的策略,参考 `cache`_ * init_hook 是初始化时调用的函数,参考 `init_hook`_ +* use_dynamic_order 如果是true的话,可以返回一个dict,key是data_layer的名字,value是特征值。同时,也可以 + 返回一个list或者tuple。如果是false的话,只能够返回list或者tuple +* check 设置成true的话,会根据input_types检查数据的合法性。 +* check_fail_continue 如果设置成true的话,即使在check中数据不合法,也会扔到这条数据,继续训练。 如果 + check是false的话,没有作用。 input_types +++++++++++ @@ -190,3 +205,55 @@ DataProvider提供了两种简单的Cache策略。他们是 * CacheType.NO_CACHE 不缓存任何数据,每次都会从python端读取数据 * CacheType.CACHE_PASS_IN_MEM 第一个pass会从python端读取数据,剩下的pass会直接从内存里 读取数据。 + + +注意事项 +-------- + +可能的内存泄露问题 +++++++++++++++++++ + +PaddlePaddle将train.list中的每一行,都传递给process函数,从而生成多个generator。 +即如果train.list中,有100个训练文件,即会生成100个generator。这个本身不是一个很 +严重的问题。 + +但是,如果在训练时,每一条训练数据都是一个文件,并且,训练数据非常多的情况下,就 +会生成多个generator。每个generator在没有调用的时候,是几乎不占内存的。但是,当调 +用过一次的时候,generator便会存下当前的上下文(Context)。而这个Context可能会非常 +大。并且,generator至少调用两次才会知道是否停止。所以,即使在process里面只会有一 +个yield,也需要两次随机选择到同样的generator的时候,才会释放该段内存。 + +.. code-block:: python + + def func(): + yield 0 + + f = func() # 创建generator + tmp = next(f) # 调用一次,返回0 + tmp = next(f) # 调用第二次的时候,才会Stop Iteration + +而如果按顺序调用这些generator就不会出现这个问题。 + +所以最佳实践推荐不要将每一个样本都放入train.list。而是将样本的地址放入另一个文本 +文件,train.list写入那个文本文件的地址。 或者在python generator的上下文中尽量留 +下非常少的变量引用。例如 + +.. code-block:: python + + def real_process(fn): + # ... read from fn + return result # 当函数返回的时候,python可以解除掉内部变量的引用。 + + def process(fn): + yield real_process(fn) + +这个问题是PyDataProvider读数据时候的逻辑问题,基本上不能整体修正。 + + +内存不够用的情况 +++++++++++++++++ + +PyDataProvider2会尽量使用内存。所以如果对于内存比较小的机器,推荐设置 +:code:`pool_size` 变量,而这个变量推荐大于训练的batch size,并且在内存足够 +的情况下越大越好。 + diff --git a/paddle/gserver/dataproviders/DataProvider.cpp b/paddle/gserver/dataproviders/DataProvider.cpp index ba05b70fe9..c3b4769f76 100644 --- a/paddle/gserver/dataproviders/DataProvider.cpp +++ b/paddle/gserver/dataproviders/DataProvider.cpp @@ -149,9 +149,13 @@ void DoubleBuffer::startAsyncLoad() { taskReadySem_.post(); } -ClassRegistrar DataProvider::registrar_; -DataProvider* DataProvider::create(const DataConfig& config, bool useGpu) { - return registrar_.createByType(config.type(), config, useGpu); +ClassRegistrar +DataProvider::registrar_; + +DataProvider* DataProvider::create(const DataConfig& config, + const ModelConfig& modelConfig, + bool useGpu) { + return registrar_.createByType(config.type(), config, modelConfig, useGpu); } REGISTER_DATA_PROVIDER(simple, SimpleDataProvider); diff --git a/paddle/gserver/dataproviders/DataProvider.h b/paddle/gserver/dataproviders/DataProvider.h index aab5d93fca..534491d70d 100644 --- a/paddle/gserver/dataproviders/DataProvider.h +++ b/paddle/gserver/dataproviders/DataProvider.h @@ -39,15 +39,30 @@ limitations under the License. */ #include "paddle/parameter/Argument.h" namespace paddle { - /** * @def REGISTER_DATA_PROVIDER - * @brief Macro for registering a data provider + * @brief Macro for registering a data provider. The class type should contain + * a consturctor with parameter (DataConfig, bool). */ -#define REGISTER_DATA_PROVIDER(__type_name, __class_name) \ - static InitFunction __reg_type_##__type_name([]() { \ - DataProvider::registrar_.registerClass<__class_name>(#__type_name); \ - }) +#define REGISTER_DATA_PROVIDER(__type_name, __class_name)\ + static InitFunction __reg_type_##__type_name([]() {\ + DataProvider::registrar_.registerClass(\ + #__type_name, \ + [](DataConfig conf, ModelConfig, bool useGpu) -> DataProvider* { \ + DataProvider* dp = new __class_name (conf, useGpu);\ + return dp;\ + });\ +}) + +/** + * @def REGISTER_DATA_PROVIDER_EX + * @brief Macro for registering a data provider, which contains a constructor + * with parameter (DataConfig, ModelConfig, bool). + */ +#define REGISTER_DATA_PROVIDER_EX(__type_name, __class_name) \ + static InitFunction __reg_type_##__type_name([] { \ + DataProvider::registrar_.registerClass<__class_name>(#__type_name); \ +}) class DataBatch; class BufferBatch; @@ -285,10 +300,18 @@ protected: */ class DataProvider { public: - static ClassRegistrar registrar_; + static ClassRegistrar registrar_; static DataProvider* create(const DataConfig& config, + const ModelConfig& modelConfig, bool useGpu = FLAGS_use_gpu); + /** + * @brief create only used for unittest. + */ + inline static DataProvider* create(const DataConfig &config, bool useGpu) { + return create(config, ModelConfig(), useGpu); + } + DataProvider(const DataConfig& config, bool useGpu) : config_(config), skipShuffle_(false), @@ -336,13 +359,13 @@ public: * @note return -1 to indicate unlimited number of samples. */ virtual int64_t getSize() = 0; + /** * @brief Get next batch training samples internally * @param[in] size size of training samples to get * @param[out] batch a batch of training samples * @return actual size of obtained training samples */ - virtual int64_t getNextBatchInternal(int64_t size, DataBatch* batch) = 0; protected: diff --git a/paddle/gserver/dataproviders/MultiDataProvider.cpp b/paddle/gserver/dataproviders/MultiDataProvider.cpp index c3d14a7069..8e4f53978a 100644 --- a/paddle/gserver/dataproviders/MultiDataProvider.cpp +++ b/paddle/gserver/dataproviders/MultiDataProvider.cpp @@ -22,7 +22,9 @@ namespace paddle { using namespace std; -MultiDataProvider::MultiDataProvider(const DataConfig& config, bool useGpu) +MultiDataProvider::MultiDataProvider(const DataConfig& config, + const ModelConfig& modelConfig, + bool useGpu) : DataProvider(config, useGpu) { bool atLeastOneMainDataFlag = false; totalDataRatio_ = 0; @@ -58,7 +60,9 @@ MultiDataProvider::MultiDataProvider(const DataConfig& config, bool useGpu) subConfig.set_async_load_data(false); } subDataProviders_[i] = - std::unique_ptr(DataProvider::create(subConfig, useGpu_)); + std::unique_ptr(DataProvider::create(subConfig, + modelConfig, + useGpu_)); } } @@ -116,6 +120,6 @@ int64_t MultiDataProvider::getNextBatchInternal(int64_t size, return batch->getSize(); } -REGISTER_DATA_PROVIDER(multi, MultiDataProvider); +REGISTER_DATA_PROVIDER_EX(multi, MultiDataProvider); } // namespace paddle diff --git a/paddle/gserver/dataproviders/MultiDataProvider.h b/paddle/gserver/dataproviders/MultiDataProvider.h index 7144212863..b498ba6516 100644 --- a/paddle/gserver/dataproviders/MultiDataProvider.h +++ b/paddle/gserver/dataproviders/MultiDataProvider.h @@ -24,7 +24,9 @@ protected: std::vector> subDataProviders_; public: - MultiDataProvider(const DataConfig& config, bool useGpu); + MultiDataProvider(const DataConfig& config, + const ModelConfig& modelConfig, + bool useGpu); ~MultiDataProvider() {} virtual void reset(); virtual void shuffle(); diff --git a/paddle/gserver/dataproviders/PyDataProvider2.cpp b/paddle/gserver/dataproviders/PyDataProvider2.cpp index 8e51752dc2..0b41f6a02a 100644 --- a/paddle/gserver/dataproviders/PyDataProvider2.cpp +++ b/paddle/gserver/dataproviders/PyDataProvider2.cpp @@ -24,6 +24,27 @@ limitations under the License. */ namespace paddle { +namespace unittest { + +static std::unique_ptr> + OnPoolFilled; + +namespace pydp2 { + +void setOnPoolFilledHook(const std::function& callback) { + OnPoolFilled.reset(new std::function()); + *OnPoolFilled = callback; +} + +void clearOnPoolFilledHook() { + OnPoolFilled.reset(); +} + +} // namespace pydp2 +} // namespace unittest + + + /** * Slot type */ @@ -179,6 +200,7 @@ public: * Ctor */ PyDataProvider2(const DataConfig& config, + const ModelConfig& modelConfig, bool useGpu) :DataProvider(config, useGpu), callingContextCreated_(2) { auto& args = config.load_data_args(); @@ -192,6 +214,12 @@ public: py::DictHelper kwargsDict(kwargs); kwargsDict.setBool("is_train", !config.for_test()); + std::vector inputs; + inputs.reserve(modelConfig.input_layer_names().size()); + std::copy(modelConfig.input_layer_names().begin(), + modelConfig.input_layer_names().end(), + std::back_inserter(inputs)); + kwargsDict.setStringList("input_order", inputs); // kwargs is keyword arguemts to create object. this->createPyDataObj(config.load_data_module(), @@ -199,7 +227,7 @@ public: config.files(), std::move(kwargs)); DBG << "Instance " << instance_.get() << " loaded."; - this->readPyFields(); + this->readPyFields(config.for_test()); DBG << "Py Field Done"; } @@ -253,14 +281,28 @@ private: CHECK_PY(instance_) << "Cannot Create instance"; } - void readPyFields() { + void readPyFields(bool testing) { py::ObjectHelper self(this->instance_); - this->skipShuffle_ = !self.getBoolAttr("should_shuffle"); bool ok; + + this->skipShuffle_ = !self.getBoolAttr("should_shuffle", + &ok /*isBoolType*/); + if (!ok) { + this->skipShuffle_ = testing; // shuffle when is training, skip shuffle + // when is testing. + } + DBG << "Provider Skip Shuffle " << this->skipShuffle_; + this->poolSize_ = self.getIntAttr("pool_size", &ok); if (!ok) { this->poolSize_ = -1UL; } + this->minPoolSize_ = self.getIntAttr("min_pool_size", &ok); + if (!ok) { + this->minPoolSize_ = -1UL; + } + this->minPoolSize_ = std::min(this->poolSize_, this->minPoolSize_); + this->canOverBatchSize_ = self.getBoolAttr("can_over_batch_size"); calcBatchSize_.reset(self.getAttr("calc_batch_size")); @@ -307,7 +349,6 @@ private: } void loadThread() { - callingContexts_.reserve(fileLists_.size()); DBG << "Creating context"; for (auto& filename : fileLists_) { PyGuard g; @@ -332,7 +373,14 @@ private: bool atEnd; data = py::iterNext(callingContexts_[cid], &atEnd); if (atEnd || data == nullptr) { - callingContexts_.erase(callingContexts_.begin() + cid); + if (cid != 0) { + std::swap(callingContexts_[cid], callingContexts_[0]); + cid = 0; + } + { + PyGuard g; + callingContexts_.pop_front(); + } this->pullCV_.notify_all(); continue; } @@ -354,11 +402,7 @@ private: if (this->loadThread_){ // wait poolActualSize < poolSize; std::unique_lock l(mtx_); pushCV_.wait(l, [this, additionalBatchSize] { - if (this->canOverBatchSize_) { - return this->poolActualSize_ < poolSize_; - } else { - return this->poolActualSize_ + additionalBatchSize < poolSize_; - } + return this->poolActualSize_ < poolSize_; }); } @@ -402,7 +446,7 @@ private: private: std::unique_ptr loadThread_; std::atomic exit_; - std::vector callingContexts_; + std::deque callingContexts_; std::deque dataPool_; size_t poolActualSize_; std::condition_variable pushCV_; @@ -413,6 +457,7 @@ private: PyObjectPtr instance_; size_t poolSize_; + size_t minPoolSize_; bool canOverBatchSize_; PyObjectPtr calcBatchSize_; PyObjectPtr generator_; @@ -478,8 +523,13 @@ public: // data pool ready. std::unique_lock l(mtx_); pullCV_.wait(l, [this, &size] { - return this->poolActualSize_ >= size || callingContexts_.empty(); + return this->poolActualSize_ >= std::max(size, this->minPoolSize_) + || callingContexts_.empty(); }); + + if (unittest::OnPoolFilled) { + (*unittest::OnPoolFilled)(this->poolActualSize_); + } } std::deque data; size_t bsize = 0; @@ -495,7 +545,8 @@ public: std::deque& pool = *poolPtr; while (bsize < size && !pool.empty()) { - { // move data from pool to data + { + // move data from pool to data std::lock_guard guard(mtx_); if (skipShuffle_) { size_t i = 0; @@ -505,14 +556,13 @@ public: } else { // when shuffle, use swap to drop only last pool element. size_t i = ThreadLocalRand::rand() % pool.size(); CHECK(pool[i] != nullptr); - if (i != pool.size() - 1) { - std::swap(pool[i], pool.back()); + if (i != 0) { + std::swap(pool[i], pool.front()); } - data.emplace_back(std::move(pool.back())); - pool.pop_back(); + data.emplace_back(std::move(pool.front())); + pool.pop_front(); } - } - { + if (calcBatchSize_) { // custom calc batch size. PyGuard guard; Py_INCREF(data.back().get()); @@ -521,8 +571,17 @@ public: calcBatchSize.getArgs().set(0, data.back()); PyObjectPtr customBatchSize(calcBatchSize()); bool ok; - bsize += py::castInt(customBatchSize.get(), &ok); + size_t tmp = py::castInt(customBatchSize.get(), &ok); CHECK(ok) << "calc_batch_size must return int"; + + if (bsize + tmp > size && !canOverBatchSize_) { + // Put data back. + pool.push_front(std::move(data.back())); + data.pop_back(); + break; + } else { + bsize += tmp; + } } else { bsize += 1; } @@ -598,7 +657,6 @@ public: } else { *batch = cpuBatch; } - return bsize; } }; @@ -606,7 +664,8 @@ public: std::unordered_set PyDataProvider2::gModuleClsPtrs_; PyObjectPtr PyDataProvider2::zeroTuple_(PyTuple_New(0)); -REGISTER_DATA_PROVIDER(py2, PyDataProvider2); +REGISTER_DATA_PROVIDER_EX(py2, PyDataProvider2); + /** * Scanner for dense slot. diff --git a/paddle/gserver/tests/rnn_data_provider.py b/paddle/gserver/tests/rnn_data_provider.py index 85a83554c5..347d5891b9 100644 --- a/paddle/gserver/tests/rnn_data_provider.py +++ b/paddle/gserver/tests/rnn_data_provider.py @@ -19,14 +19,18 @@ data = [ [[[0, 2], [2, 5], [0, 1, 2]], 1], ] + @provider(input_types=[integer_value_sub_sequence(10), - integer_value(2)]) + integer_value(2)], + should_shuffle=False) def process_subseq(settings, file_name): for d in data: yield d + @provider(input_types=[integer_value_sequence(10), - integer_value(2)]) + integer_value(2)], + should_shuffle=False) def process_seq(settings, file_name): for d in data: seq = [] diff --git a/paddle/gserver/tests/sequenceGen.py b/paddle/gserver/tests/sequenceGen.py index cb83d79d78..cbed1f15fc 100644 --- a/paddle/gserver/tests/sequenceGen.py +++ b/paddle/gserver/tests/sequenceGen.py @@ -17,22 +17,26 @@ import sys from paddle.trainer.PyDataProvider2 import * + def hook(settings, dict_file, **kwargs): settings.word_dict = dict_file - settings.input_types = [integer_value_sequence(len(settings.word_dict)), + settings.input_types = [integer_value_sequence(len(settings.word_dict)), integer_value_sequence(3)] settings.logger.info('dict len : %d' % (len(settings.word_dict))) -@provider(init_hook=hook) + +@provider(init_hook=hook, should_shuffle=False) def process(settings, file_name): with open(file_name, 'r') as fdata: for line in fdata: label, comment = line.strip().split('\t') label = int(''.join(label.split())) words = comment.split() - word_slot = [settings.word_dict[w] for w in words if w in settings.word_dict] + word_slot = [settings.word_dict[w] for w in words if + w in settings.word_dict] yield word_slot, [label] + ## for hierarchical sequence network def hook2(settings, dict_file, **kwargs): settings.word_dict = dict_file @@ -40,17 +44,19 @@ def hook2(settings, dict_file, **kwargs): integer_value_sub_sequence(3)] settings.logger.info('dict len : %d' % (len(settings.word_dict))) -@provider(init_hook=hook2) + +@provider(init_hook=hook2, should_shuffle=False) def process2(settings, file_name): with open(file_name) as fdata: label_list = [] word_slot_list = [] for line in fdata: if (len(line)) > 1: - label,comment = line.strip().split('\t') + label, comment = line.strip().split('\t') label = int(''.join(label.split())) words = comment.split() - word_slot = [settings.word_dict[w] for w in words if w in settings.word_dict] + word_slot = [settings.word_dict[w] for w in words if + w in settings.word_dict] label_list.append([label]) word_slot_list.append(word_slot) else: diff --git a/paddle/gserver/tests/test_PyDataProvider2.cpp b/paddle/gserver/tests/test_PyDataProvider2.cpp index 824295eb6e..c5fe31b291 100644 --- a/paddle/gserver/tests/test_PyDataProvider2.cpp +++ b/paddle/gserver/tests/test_PyDataProvider2.cpp @@ -20,6 +20,18 @@ limitations under the License. */ #include "paddle/gserver/dataproviders/DataProvider.h" P_DEFINE_string(train_list, "unittest.list", "file list for unittest"); + +namespace paddle { +namespace unittest { +namespace pydp2 { +extern void setOnPoolFilledHook(const std::function& func); +extern void clearOnPoolFilledHook(); + +} // namespace pydp2 +} // namespace unittest +} // namespace paddle + + const paddle::real epsilon = 1e-5; static inline int64_t readDataBatch( @@ -235,6 +247,112 @@ TEST(PyDataProvider2, index_sub_seq) { } } +TEST(PyDataProvider2, min_pool_size) { + paddle::DataConfig config; + config.set_type("py2"); + config.set_files(FLAGS_train_list.c_str()); + config.set_load_data_module("test_PyDataProvider2"); + config.set_load_data_object("test_min_pool_size"); + config.set_load_data_args(""); + size_t totalData = 1 << 14; + constexpr size_t batchSize = 100; + constexpr size_t minPoolSize = 1000; + paddle::DataBatch batch; + std::unique_ptr provider( + paddle::DataProvider::create(config, false)); + provider->reset(); + + paddle::unittest::pydp2::setOnPoolFilledHook([&](size_t poolSize) { + if (totalData > batchSize) { + CHECK_GE(poolSize, std::min(totalData-batchSize, minPoolSize)); + } + }); + while (true) { + size_t realBatchSize = provider->getNextBatchInternal(batchSize, &batch); + if (realBatchSize) { + totalData -= realBatchSize; + } else { + break; + } + } + paddle::unittest::pydp2::clearOnPoolFilledHook(); +} + +TEST(PyDataProvider2, can_over_batch_size) { + paddle::DataConfig config; + config.set_type("py2"); + config.set_files(FLAGS_train_list.c_str()); + config.set_load_data_module("test_PyDataProvider2"); + config.set_load_data_object("test_can_over_batch_size"); + config.set_load_data_args(""); + paddle::DataBatch batch; + std::unique_ptr provider( + paddle::DataProvider::create(config, false)); + provider->reset(); + constexpr size_t batchSize = 100; + while (true) { + size_t realBatchSize = provider->getNextBatchInternal(batchSize, &batch); + if (realBatchSize) { + CHECK_LE(realBatchSize, batchSize); + } else { + break; + } + } +} + +TEST(PyDataProvider2, input_order) { + paddle::DataConfig config; + config.set_type("py2"); + config.set_files(FLAGS_train_list.c_str()); + config.set_load_data_module("test_PyDataProvider2"); + config.set_load_data_object("test_input_order"); + config.set_load_data_args(""); + + paddle::ModelConfig modelConfig; + *modelConfig.add_input_layer_names() = "input1"; + *modelConfig.add_input_layer_names() = "input2"; + paddle::DataBatch batch; + std::unique_ptr provider( + paddle::DataProvider::create(config, modelConfig, false)); + provider->reset(); + constexpr size_t batchSize = 100; + while (true) { + size_t realBatchSize = provider->getNextBatchInternal(batchSize, &batch); + if (!realBatchSize) { + break; + } + ASSERT_EQ(batch.getStreams().size(), 2); + for (size_t i = 0; i < realBatchSize; ++i) { + ASSERT_EQ(batch.getStream(0).ids->getData()[i], 0); + ASSERT_EQ(batch.getStream(1).ids->getData()[i], 1); + } + } +} + +TEST(PyDataProvider2, test_check) { + paddle::DataConfig config; + config.set_type("py2"); + config.set_files(FLAGS_train_list.c_str()); + config.set_load_data_module("test_PyDataProvider2"); + config.set_load_data_object("test_check"); + config.set_load_data_args(""); + paddle::DataBatch batch; + std::unique_ptr provider( + paddle::DataProvider::create(config, false)); + provider->reset(); + while (true) { + size_t realBatchSize = provider->getNextBatchInternal(100, &batch); + if (!realBatchSize) { + break; + } else { + auto& ivec = batch.getStream(0).ids; + for (size_t i=0; i < ivec->getSize(); ++i) { + CHECK_LT(ivec->getData()[i], 10); + } + } + } +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); paddle::initMain(argc, argv); diff --git a/paddle/gserver/tests/test_PyDataProvider2.py b/paddle/gserver/tests/test_PyDataProvider2.py index a88c48cb4e..145fe85cff 100644 --- a/paddle/gserver/tests/test_PyDataProvider2.py +++ b/paddle/gserver/tests/test_PyDataProvider2.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random + from paddle.trainer.PyDataProvider2 import * @@ -39,7 +41,8 @@ def test_init_hook(setting, filename): @provider( - input_types=[sparse_binary_vector(30000, seq_type=SequenceType.NO_SEQUENCE)]) + input_types=[ + sparse_binary_vector(30000, seq_type=SequenceType.NO_SEQUENCE)]) def test_sparse_non_value_no_seq(setting, filename): for i in xrange(200): yield [(i + 1) * (j + 1) for j in xrange(10)] @@ -66,3 +69,43 @@ def test_index_sub_seq(setting, filename): for i in xrange(200): yield list(gen_sub_seq(i)) + + +@provider(input_types=[index_slot(100)], min_pool_size=1000) +def test_min_pool_size(setting, filename): + for _ in xrange(1 << 14): + yield random.randint(0, 100 - 1) + + +@provider(input_types=[index_slot(100, seq_type=SequenceType.SEQUENCE)], + can_over_batch_size=False, + calc_batch_size=lambda x: len(x[0])) +def test_can_over_batch_size(setting, filename): + for _ in xrange(1 << 10): + seq_len = random.randint(0, 99) + yield [random.randint(0, 100 - 1) for _ in xrange(seq_len)] + + +@provider(input_types=[index_slot(10), index_slot(10)]) +def test_input_order(setting, filename): + for _ in xrange(1000): + yield { + 'input1': 0, + 'input2': 1 + } + + +@provider(input_types=[index_slot(10)], + check=True, + check_fail_continue=True, + should_shuffle="123") # also test should shuffle +def test_check(settings, filename): + yield_good_value = False + + while not yield_good_value: + for _ in xrange(10000): + i = random.randint(0, 100) + if i < 10: + yield_good_value = True + yield i + diff --git a/paddle/trainer/Trainer.cpp b/paddle/trainer/Trainer.cpp index 2890f5b5d7..c0e5ec3bd6 100644 --- a/paddle/trainer/Trainer.cpp +++ b/paddle/trainer/Trainer.cpp @@ -193,7 +193,7 @@ void Trainer::init(const std::shared_ptr &config, dataProvider_ = dataProvider; if (!dataProvider_ && config_->hasDataConfig()) { - dataProvider_.reset(DataProvider::create(*config_, gpuData)); + dataProvider_.reset(DataProvider::create(*config_, *config_, gpuData)); } if (dataProvider_) { evaluator_.reset(trainerInternal_.getGradientMachine()->makeEvaluator()); @@ -211,7 +211,7 @@ void Trainer::init(const std::shared_ptr &config, testDataProvider_ = testDataProvider; if (!testDataProvider_ && config_->hasTestDataConfig()) { testDataProvider_.reset( - DataProvider::create(config_->getTestDataConfig(), gpuData)); + DataProvider::create(config_->getTestDataConfig(), *config_, gpuData)); } if (testDataProvider_) { tester_.reset(new Tester(config_, createTesterConfig(), diff --git a/paddle/utils/PythonUtil.h b/paddle/utils/PythonUtil.h index 4467fd784e..928db486fa 100644 --- a/paddle/utils/PythonUtil.h +++ b/paddle/utils/PythonUtil.h @@ -175,10 +175,21 @@ public: /** * Get bool attribute. * @param field + * @param [out] isBoolType return true if attribute is bool type. If the + * attribute is not bool type, then an implicit + * conversion will happens, and will return the + * conversion result. + * + * Such as, if the attribute is 1, then the return + * value of function will be true, but the isBoolType + * will return false. * @return */ - bool getBoolAttr(const std::string& field) const { + bool getBoolAttr(const std::string& field, bool* isBoolType = nullptr) const { PyObjectPtr tmp(getAttr(field)); + if (isBoolType) { + *isBoolType = PyBool_Check(tmp.get()); + } return PyObject_IsTrue(tmp.get()); } @@ -258,6 +269,15 @@ public: this->set(key, PyBool_FromLong(b)); } + void setStringList(const std::string& key, + const std::vector& items) { + auto * list = PyList_New(items.size()); + for (size_t i=0; i < items.size(); ++i) { + PyList_SetItem(list, i, PyString_FromString(items[i].c_str())); + } + this->set(key, list); + } + private: inline void checkDict() { CHECK(PyDict_Check(this->dict_)); diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index fd9a003bb0..dce0b90952 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -1,6 +1,14 @@ set(OUTPUT_DIR "${CMAKE_CURRENT_BINARY_DIR}/build") +file(GLOB TRAINER_PY_FILES . ./paddle/trainer/*.py) +file(GLOB HELPERS_PY_FILES . ./paddle/trainer_config_helpers/*.py) +file(GLOB UTILS_PY_FILES . ./paddle/utils/*.py) + +set(PY_FILES paddle/__init__.py + ${TRAINER_PY_FILES} + ${HELPERS_PY_FILES} + ${UTILS_PY_FILES}) set(PADDLE_INTERNAL_PACKAGE "") if (PADDLE_WITH_INTERNAL) @@ -13,7 +21,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/setup.py.in add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp COMMAND ${PYTHON_EXECUTABLE} setup.py bdist_wheel COMMAND ${CMAKE_COMMAND} -E touch ${OUTPUT_DIR}/.timestamp - DEPENDS gen_proto_py) + DEPENDS gen_proto_py ${PY_FILES}) add_custom_target(paddle_python ALL DEPENDS ${OUTPUT_DIR}/.timestamp) diff --git a/python/paddle/trainer/PyDataProvider2.py b/python/paddle/trainer/PyDataProvider2.py index c4f6147393..34f5dd41b7 100644 --- a/python/paddle/trainer/PyDataProvider2.py +++ b/python/paddle/trainer/PyDataProvider2.py @@ -14,6 +14,13 @@ import cPickle import logging +import collections +import functools +import itertools + +logging.basicConfig( + format="[%(levelname)s %(asctime)s %(filename)s:%(lineno)s]" + " %(message)s") class SequenceType(object): @@ -68,30 +75,39 @@ sparse_binary_vector = sparse_non_value_slot sparse_vector = sparse_value_slot integer_value = index_slot + def dense_vector_sequence(dim): return dense_vector(dim, seq_type=SequenceType.SEQUENCE) + def dense_vector_sub_sequence(dim): return dense_vector(dim, seq_type=SequenceType.SUB_SEQUENCE) + def sparse_binary_vector_sequence(dim): return sparse_binary_vector(dim, seq_type=SequenceType.SEQUENCE) + def sparse_binary_vector_sub_sequence(dim): return sparse_binary_vector(dim, seq_type=SequenceType.SUB_SEQUENCE) + def sparse_vector_sequence(dim): return sparse_vector(dim, seq_type=SequenceType.SEQUENCE) + def sparse_vector_sub_sequence(dim): return sparse_vector(dim, seq_type=SequenceType.SUB_SEQUENCE) + def integer_value_sequence(dim): return integer_value(dim, seq_type=SequenceType.SEQUENCE) + def integer_value_sub_sequence(dim): return integer_value(dim, seq_type=SequenceType.SUB_SEQUENCE) + def integer_sequence(dim): return index_slot(dim, seq_type=SequenceType.SEQUENCE) @@ -102,13 +118,97 @@ class SingleSlotWrapper(object): def __call__(self, obj, filename): for item in self.generator(obj, filename): - yield [item] + if isinstance(item, dict): + yield item + else: + yield [item] -def provider(input_types=None, should_shuffle=True, pool_size=-1, +class InputOrderWrapper(object): + def __init__(self, generator, input_order): + self.generator = generator + self.input_order = input_order + + def __call__(self, obj, filename): + for item in self.generator(obj, filename): + if isinstance(item, dict): + yield [item.get(input_name, None) for input_name in + self.input_order] + else: + yield item + + +class CheckWrapper(object): + def __init__(self, generator, input_types, check_fail_continue, logger): + self.generator = generator + self.input_types = input_types + self.check_fail_continue = check_fail_continue + self.logger = logger + + def __call__(self, obj, filename): + for items in self.generator(obj, filename): + try: + assert len(items) == len(self.input_types) + assert len(filter(lambda x: x is None, items)) == 0 + for item, input_type in itertools.izip(items, self.input_types): + callback = functools.partial(CheckWrapper.loop_callback, + input_type) + + for _ in xrange(input_type.seq_type): + callback = functools.partial(CheckWrapper.loop_check, + callback) + callback(item) + + yield items + except AssertionError as e: + self.logger.warning( + "Item (%s) is not fit the input type with error %s" + % (repr(item), repr(e))) + + if self.check_fail_continue: + continue + else: + raise + + @staticmethod + def loop_callback(input_type, each): + assert isinstance(input_type, InputType) + if input_type.type == DataType.Dense: + assert isinstance(each, collections.Sequence) + for d in each: + assert isinstance(d, float) + assert len(each, input_type.dim) + elif input_type.type == DataType.Index: + assert isinstance(each, int) + assert each < input_type.dim + elif input_type.type == DataType.SparseNonValue \ + or input_type.type == DataType.SparseValue: + assert isinstance(each, collections.Sequence) + sparse_id = set() + for k in each: + if input_type.type == DataType.SparseValue: + k, v = k + assert isinstance(v, float) + assert isinstance(k, int) + assert k < input_type.dim + sparse_id.add(k) + assert len(sparse_id) == len(each) + else: + raise RuntimeError("Not support input type") + + @staticmethod + def loop_check(callback, item): + for each in item: + callback(each) + + +def provider(input_types=None, should_shuffle=None, pool_size=-1, + min_pool_size=-1, can_over_batch_size=True, calc_batch_size=None, cache=CacheType.NO_CACHE, + check=False, check_fail_continue=False, + use_dynamic_order=True, init_hook=None, **kwargs): """ Provider decorator. Use it to make a function into PyDataProvider2 object. @@ -130,30 +230,63 @@ def provider(input_types=None, should_shuffle=True, pool_size=-1, :param input_types: Specify the input types, can also be set in init_hook. It is a list of InputType object. For example, input_types= \ [dense_vector(9), integer_value(2)]. - :param should_shuffle: True if data should shuffle. + :type input_types: list|tuple + + :param should_shuffle: True if data should shuffle. Pass None means shuffle + when is training and not to shuffle when is testing. :type should_shuffle: bool + :param pool_size: Max number of sample in data pool. :type pool_size: int + + :param min_pool_size: Set minimal sample in data pool. The PaddlePaddle will + random pick sample in pool. So the min_pool_size + effect the randomize of data. + :type min_pool_size: int + :param can_over_batch_size: True if paddle can return a mini-batch larger than batch size in settings. It is useful when custom calculate one sample's batch_size. It is very danger to set it to false and use calc_batch_size together. Default is false. + :type can_over_batch_size: bool + :param calc_batch_size: a method to calculate each sample's batch size. Default each sample's batch size is 1. But to you can customize each sample's batch size. + :type calc_batch_size: callable + :param cache: Cache strategy of Data Provider. Default is CacheType.NO_CACHE + :type cache: int :param init_hook: Initialize hook. Useful when data provider need load some external data like dictionary. The parameter is (settings, file_list, \*\*kwargs). - - settings\: Is the global settings. User can set - settings.input_types here. - - file_list\: All file names for passed to data provider. - - kwargs: Other keyword arguments passed from + - settings. It is the global settings object. User can set + settings.input_types here. + - file_list. All file names for passed to data provider. + - is_train. Is this data provider used for training or not. + - kwargs. Other keyword arguments passed from trainer_config's args parameter. + :type init_hook: callable + + :param check: Check the yield data format is as same as input_types. Enable + this will make data provide process slow but it is very useful + for debug. Default is disabled. + :type check: bool + + :param check_fail_continue: Continue train or not when check failed. Just + drop the wrong format data when it is True. Has + no effect when check set to False. + :type check_fail_continue: bool + + :param use_dynamic_order: Allow provider to yield a dictionary object, whose + key is a input data layer name, and value is the + feature value. The tuples are still allowed when + use_dynmaic_order is True. + :type use_dynamic_order: bool """ def __wrapper__(generator): @@ -168,12 +301,38 @@ def provider(input_types=None, should_shuffle=True, pool_size=-1, self.slots = kwargs['slots'] self.slots = input_types self.should_shuffle = should_shuffle + + true_table = [1, 't', 'true', 'on'] + false_table = [0, 'f', 'false', 'off'] + if not isinstance(self.should_shuffle, bool) and \ + self.should_shuffle is not None: + + if isinstance(self.should_shuffle, basestring): + self.should_shuffle = self.should_shuffle.lower() + + if self.should_shuffle in true_table: + self.should_shuffle = True + elif self.should_shuffle in false_table: + self.should_shuffle = False + else: + self.logger.warning( + "Could not recognize should_shuffle (%s), " + "just use default value of should_shuffle." + " Please set should_shuffle to bool value or " + "something in %s" % ( + repr(self.should_shuffle), + repr(true_table + false_table))) + self.should_shuffle = None + self.pool_size = pool_size self.can_over_batch_size = can_over_batch_size self.calc_batch_size = calc_batch_size self.file_list = file_list self.generator = generator self.cache = cache + self.min_pool_size = min_pool_size + self.input_order = kwargs['input_order'] + self.check = check if init_hook is not None: init_hook(self, file_list=file_list, **kwargs) if self.input_types is not None: @@ -184,6 +343,15 @@ def provider(input_types=None, should_shuffle=True, pool_size=-1, if len(self.slots) == 1: self.generator = SingleSlotWrapper(self.generator) + if use_dynamic_order: + self.generator = InputOrderWrapper(self.generator, + self.input_order) + if self.check: + self.generator = CheckWrapper(self.generator, + self.slots, + check_fail_continue, + self.logger) + return DataProvider return __wrapper__ @@ -196,3 +364,4 @@ def deserialize_args(args): :return: """ return cPickle.loads(args) + -- GitLab