diff --git a/cmake/flags.cmake b/cmake/flags.cmake index cc59309ee7efabcff167a0a80a6ad370f159e99b..dbad6be3f41b3f565d6bf275633d07198491ff3d 100644 --- a/cmake/flags.cmake +++ b/cmake/flags.cmake @@ -64,7 +64,9 @@ set(COMMON_FLAGS -Wdelete-non-virtual-dtor -Wno-unused-parameter -Wno-error=literal-suffix - -Wno-error=unused-local-typedefs) + -Wno-error=unused-local-typedefs + -Wno-error=unused-function # Warnings in Numpy Header. +) foreach(flag ${COMMON_FLAGS}) safe_set_cflag(CMAKE_C_FLAGS ${flag}) diff --git a/demo/mnist/data/get_mnist_data.sh b/demo/mnist/data/get_mnist_data.sh index c3ef99445049d0326bc2888f2e6cd47f7940d879..9099b5ab6fb85d86d346a7ad819538fbd013c6ff 100755 --- a/demo/mnist/data/get_mnist_data.sh +++ b/demo/mnist/data/get_mnist_data.sh @@ -1,6 +1,6 @@ #!/usr/bin/env sh # This scripts downloads the mnist data and unzips it. - +set -e DIR="$( cd "$(dirname "$0")" ; pwd -P )" rm -rf "$DIR/raw_data" mkdir "$DIR/raw_data" diff --git a/paddle/gserver/dataproviders/DataProvider.cpp b/paddle/gserver/dataproviders/DataProvider.cpp index c3b4769f7612b76f5c467fee66826f0e84a6e787..8cefbb30ada46d1ff1b0a4952dde0aeafb5419b1 100644 --- a/paddle/gserver/dataproviders/DataProvider.cpp +++ b/paddle/gserver/dataproviders/DataProvider.cpp @@ -57,7 +57,8 @@ void BufferBatch::clone(DataBatch* srcBatch, bool useGpu) { } } -DoubleBuffer::DoubleBuffer(DataProvider* dataPool, bool useGpu, +DoubleBuffer::DoubleBuffer(DataProvider *dataPool, + bool useGpu, int64_t batchSize) { batchSize_ = batchSize; dataPool_ = dataPool; @@ -110,6 +111,9 @@ void DoubleBuffer::removeOneBatch(DataBatch* dataBatch) { } void DoubleBuffer::insertOneBatch(DataBatch* batch) { + while (!bufferQueue_->waitNotEmptyFor(2 /* seconds */)) { // time out + if (stopping_) return; + } BufferBatch* bufBatch = bufferQueue_->dequeue(); // clone and copy the data from an Threadlocal Variable bufBatch->clone(batch, useGpu_); @@ -138,7 +142,7 @@ void DoubleBuffer::asyncLoadBatch() { actualSize = dataPool_->getNextBatchInternal(batchSize_, &newBatch); } insertOneBatch(&newBatch); - } while (actualSize > 0); + } while (actualSize > 0 && !stopping_); } } diff --git a/paddle/gserver/dataproviders/DataProvider.h b/paddle/gserver/dataproviders/DataProvider.h index c24546374abf3d93dc0ee87f84a00eff46d78829..112e45de1cb232097ed63b120d5ac631b37952e9 100644 --- a/paddle/gserver/dataproviders/DataProvider.h +++ b/paddle/gserver/dataproviders/DataProvider.h @@ -259,7 +259,9 @@ typedef Queue BufferBatchQueue; class DoubleBuffer { public: - DoubleBuffer(DataProvider* dataPool, bool useGpu, int64_t batchSize = 0); + DoubleBuffer(DataProvider* dataPool, + bool useGpu, + int64_t batchSize = 0); virtual ~DoubleBuffer(); void removeOneBatch(DataBatch* dataBatch); @@ -349,7 +351,6 @@ public: */ virtual void reset() { if (doubleBuffer_ != nullptr) { - LOG(INFO) << "the double-buffer is starting ..."; doubleBuffer_->startAsyncLoad(); } } diff --git a/paddle/gserver/dataproviders/PyDataProvider2.cpp b/paddle/gserver/dataproviders/PyDataProvider2.cpp index e3e472ac166c271940308e1a8efa917c286b962a..c464d01fdefd1fbbd65a7798441c53b6aed89ce6 100644 --- a/paddle/gserver/dataproviders/PyDataProvider2.cpp +++ b/paddle/gserver/dataproviders/PyDataProvider2.cpp @@ -18,9 +18,16 @@ limitations under the License. */ #include #include #include +#include +#include +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION +#include #include "DataProvider.h" + #include "paddle/utils/PythonUtil.h" +#include "paddle/utils/Locks.h" +#include "paddle/utils/Stat.h" namespace paddle { @@ -202,7 +209,10 @@ public: PyDataProvider2(const DataConfig& config, const ModelConfig& modelConfig, bool useGpu) - :DataProvider(config, useGpu), callingContextCreated_(2) { + :DataProvider(config, useGpu), + callingContextCreated_(2) { + if (PyArray_API == NULL) + import_array(); auto& args = config.load_data_args(); PyObjectPtr kwargs = PyObjectPtr(PyDict_New()); if (!args.empty()) { @@ -454,6 +464,7 @@ private: std::condition_variable pushCV_; std::condition_variable pullCV_; std::mutex mtx_; + ThreadBarrier callingContextCreated_; std::unique_ptr cache_; @@ -496,8 +507,8 @@ public: * Resetting the PyDataProvider. May start reading thread here. */ virtual void reset() { - DataProvider::reset(); resetImpl(true); + DataProvider::reset(); } /** @@ -518,6 +529,7 @@ public: * Loading a batch of data. */ int64_t getNextBatchInternal(int64_t size_, DataBatch *batch) { + REGISTER_TIMER("PyDP2.getNextBatchInternal") CHECK_GE(size_, 0); size_t size = (size_t) size_; if (loadThread_) { // loading from thread should wait for data pool ready. @@ -698,10 +710,22 @@ public: */ virtual void fill(Argument &argument, PyObject *obj) { real* dat = argument.value->getData() + height_ * headerPtr_->dim; - py::SequenceHelper s(obj); - // TODO(yuyang18): Here we can use AVX or SSE to accelerate memory copy. - for (size_t i=0; i < headerPtr_->dim; ++i) { - dat[i] = (real) s.getDouble(i); + if (PyArray_Check(obj)) { + auto dtype = PyArray_DTYPE((PyArrayObject*)obj); + if (dtype->type == 'f' && dtype->elsize == sizeof(real)) { + real * data = (real*)PyArray_DATA((PyArrayObject*)obj); + auto sz = PyArray_SIZE((PyArrayObject*)obj); + std::copy(data, data + sz, dat); + } else { + LOG(FATAL) << "You should yield float" << sizeof(real) * 8 + << " array"; + } + } else { + py::SequenceHelper s(obj); + // TODO(yuyang18): Here we can use AVX or SSE to accelerate memory copy. + for (size_t i=0; i < headerPtr_->dim; ++i) { + dat[i] = (real) s.getDouble(i); + } } ++height_; } diff --git a/paddle/utils/Queue.h b/paddle/utils/Queue.h index d73f27d7fafd6c3e3122a2dfc8d7dcd7a68b387a..f952cf58778dee0565a8e88ef0015d51dc295428 100644 --- a/paddle/utils/Queue.h +++ b/paddle/utils/Queue.h @@ -135,6 +135,21 @@ public: queueCV_.wait(lock, [this]() { return numElements_ == 0; }); } + /** + * @brief wait queue is not empty at most for some seconds. + * @param seconds wait time limit. + * @return true if queue is not empty. false if timeout. + */ + bool waitNotEmptyFor(int seconds) { + std::unique_lock lock(queueLock_); + return queueCV_.wait_for( + lock, + std::chrono::seconds(seconds), + [this] { + return numElements_ != 0; + }); + } + private: std::deque elements_; int numElements_; diff --git a/python/paddle/trainer_config_helpers/data_sources.py b/python/paddle/trainer_config_helpers/data_sources.py index 8ada3903dc06befb82e47ba36a34e13865f6484b..3b5c17a271f02b4a7506c4b2ffc10d3759dd97c7 100644 --- a/python/paddle/trainer_config_helpers/data_sources.py +++ b/python/paddle/trainer_config_helpers/data_sources.py @@ -84,6 +84,7 @@ def define_py_data_source(file_list, cls, module, data.load_data_module = load_data_module data.load_data_object = load_data_object data.load_data_args = load_data_args + data.async_load_data = True return data data_cls = py_data2