Data Providers

Data Provider

Defines

REGISTER_DATA_PROVIDER(__type_name, __class_name)

Macro for registering a data provider.

namespace paddle

Typedefs

typedef std::shared_ptr<DataBatch> DataBatchPtr
typedef std::shared_ptr<BufferBatch> BufferBatchPtr
typedef std::shared_ptr<DataProvider> DataProviderPtr
typedef Queue<BufferBatch *> BufferBatchQueue
class BufferBatch

Public Functions

BufferBatch()
~BufferBatch()
void setDataBatch(DataBatch *batchData)
DataBatch *getDataBatch()
void setCuStream(hl_stream_t stream)
hl_stream_t getCuStream() const
void setCuEvent(hl_event_t event)
hl_event_t getCuEvent() const
void createCuEvent()
void syncEvent()
void swap(BufferBatch *bufBatch)
void clone(DataBatch *srcBatch, bool useGpu)

Protected Attributes

DataBatch *batchData_
hl_stream_t hlStream_
hl_event_t hlEvent_
class DataBatch

Public Functions

DataBatch()
int64_t getSize() const
int64_t getNumSequences() const
void setSize(int64_t size)
int64_t getNumStreams() const
const Argument &getStream(int i) const
std::vector<Argument> &getStreams()
std::vector<Argument> getStreams() const
void clear()
void appendData(MatrixPtr data)

The order in which each data stream is appended must match the order specified in stream_names of DataConfig. The stream_names can be obtained using DataProvider::getStreamNames().

void appendData(const MatrixPtr &data, const ICpuGpuVectorPtr &sequenceStartPositions)

The order in which each data stream is appended must match the order specified in stream_names of DataConfig. The stream_names can be obtained using DataProvider::getStreamNames().

void appendLabel(IVectorPtr label, MatrixPtr value = nullptr)
void appendUserDefinedPtr(UserDefinedVectorPtr ptr)
void appendArguments(const std::vector<Argument> &argus, int size, int dataId)

Protected Attributes

int64_t size_
std::vector<Argument> data_
class DataProvider
#include <DataProvider.h>

DataProvider supplies data for training It can supplies multiple streams of data. For typical supervised training, there are two streams: one is for input, one is for label.

Subclassed by paddle::DataProviderGroup< T >, paddle::DummyDataProvider, paddle::MultiDataProvider, paddle::ProtoDataProvider, paddle::PyDataProvider, paddle::PyDataProvider2, paddle::SimpleDataProviderBase

Public Functions

DataProvider(const DataConfig &config, bool useGpu)
virtual ~DataProvider()
const DataConfig &getConfig() const
void setSkipShuffle()
int64_t getNextBatch(int64_t size, DataBatch *batch)
virtual void shuffle() = 0

Shuffle the data set

virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

virtual int64_t getSize() = 0

return the number of training samples in the data set. return -1 to indicate unlimited number of samples.

virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch) = 0

Public Static Functions

DataProvider *create(const DataConfig &config, bool useGpu = FLAGS_use_gpu)

Public Static Attributes

ClassRegistrar<DataProvider, DataConfig, bool> registrar_

Protected Functions

int64_t getNextBatchFromBuffer(int64_t size, DataBatch *batch)
void initAsyncLoader()

Protected Attributes

DataConfig config_
bool skipShuffle_
float usageRatio_
bool useGpu_
std::unique_ptr<DoubleBuffer> doubleBuffer_
ThreadLocal<std::vector<MatrixPtr>> constantSlots_
class DoubleBuffer

Public Functions

DoubleBuffer(DataProvider *dataPool, bool useGpu, int64_t batchSize = 0)
virtual ~DoubleBuffer()
void removeOneBatch(DataBatch *dataBatch)
void setBatchSize(int64_t newBatchSize)
int64_t getBatchSize()
void startAsyncLoad()
void finishAsyncLoad()
void setPending(bool pending)

Protected Functions

virtual void asyncLoadBatch()
void insertOneBatch(DataBatch *batch)

Protected Attributes

DataProvider *dataPool_
bool useGpu_
int32_t batchSize_
ThreadLocal<BufferBatchPtr> usingBatch_
BufferBatchQueue *dataQueue_
BufferBatchQueue *bufferQueue_
std::unique_ptr<std::thread> asyncLoader_
Semaphore taskReadySem_
bool stopping_
bool pending_
class DummyDataProvider
#include <DataProvider.h>

A data provider which does nothing. It only serves as providing necessary configurations such as stream_names

Inherits from paddle::DataProvider

Public Functions

DummyDataProvider(const DataConfig &config, bool useGpu)
virtual void shuffle()

Shuffle the data set

virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

virtual int64_t getSize()

return the number of training samples in the data set. return -1 to indicate unlimited number of samples.

virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch)
class SimpleDataProvider

Inherits from paddle::SimpleDataProviderBase

Public Functions

SimpleDataProvider(const DataConfig &config, bool useGpu)
~SimpleDataProvider()
virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

Protected Functions

void loadData(const std::string &fileName)
void loadDataFile(const std::string &fileName)
virtual int64_t fillBufferImp(real *data, int *label, int *info, int64_t size)

Fill at most size samples into data and label.

Each input is stored in contiguous memory locations in data.

data[n * sampleDim_] .. data[n * sampleDim_ + sampleDim_ - 1] is for the input of the n-th sample.

label[n] is the label for the n-th sample.

Protected Attributes

size_t currentSampleIndex_
std::vector<int> labels_
std::vector<real> data_
class SimpleDataProviderBase

Inherits from paddle::DataProvider

Subclassed by paddle::SimpleDataProvider

Public Functions

SimpleDataProviderBase(const DataConfig &config, bool useGpu, bool withInfo)
~SimpleDataProviderBase()
virtual void shuffle()

Shuffle the data set

virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

virtual int64_t getSize()

return the number of training samples in the data set. return -1 to indicate unlimited number of samples.

virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch)
int64_t fillBuffer()

Protected Functions

virtual int64_t fillBufferImp(real *data, int *label, int *info, int64_t size) = 0

Fill at most size samples into data and label.

Each input is stored in contiguous memory locations in data.

data[n * sampleDim_] .. data[n * sampleDim_ + sampleDim_ - 1] is for the input of the n-th sample.

label[n] is the label for the n-th sample.

Protected Attributes

int64_t sampleDim_
int64_t bufferCapacity_
int64_t sampleNumInBuf_
int64_t nextItemIndex_
bool withInfo_
CpuMatrixPtr hInputDataBuf_
CpuIVectorPtr hInputLabelBuf_
CpuIVectorPtr hInputInfoBuf_
ThreadLocal<MatrixPtr> dataBatch_
ThreadLocal<IVectorPtr> labelBatch_
ThreadLocal<IVectorPtr> infoBatch_
RWLock lock_
namespace paddle

Enums

enum SlotType

Slot type

Values:

ST_DENSE = 0
ST_NON_SPARSE_VALUE = 1
ST_SPARSE_VALUE = 2
ST_INDEX = 3
enum SeqType

Sequence type

Values:

SQT_NONE = 0
SQT_SEQ
SQT_SUBSEQ
enum CacheType

Cache Type.

Values:

NO_CACHE = 0
CACHE_PASS_IN_MEM = 1

Functions

std::ostream &operator<<(std::ostream &os, const SlotHeader &header)
REGISTER_DATA_PROVIDER(py2, PyDataProvider2)
class CacheOnePassInMemory

Cache One Pass In Memory strategy.

In first pass, will load data from python and store them in memory. The rest passes, will load data from memory.

Inherits from paddle::IPyDataProviderCache

Public Functions

CacheOnePassInMemory()
virtual bool reset()

invoke when DataProvider::reset()

Return
true if read data from python.

virtual void drop(std::deque<PyObjectPtr> *data)

invoke when these data are used by DataProvider, and need to clear.

Note
The implemented class must clear these data array. Or if you want to delete the PyObjectPtr later, you should make sure the paddle process only have one active thread calling python code (use PyGuard otherwise).
Parameters
  • data -

    used data.

virtual std::deque<PyObjectPtr> *load()

Return whole data in cache.

Private Members

std::unique_ptr<std::deque<PyObjectPtr>> objPool_
std::unique_ptr<std::deque<PyObjectPtr>> droppedPool_
class DenseScanner

Scanner for dense slot.

Inherits from paddle::IFieldScanner

Public Functions

DenseScanner(SlotHeader *ptr)
virtual void prepare(Argument &argument, PyObject *obj)

Prepare.

Parameters
  • argument -

    target argument

  • obj -

    each timestep of a sample.

virtual void finishPrepare(Argument &argument)

Finish Prepare step.

virtual void fill(Argument &argument, PyObject *obj)

Fill argument from obj.

Parameters
  • argument -

  • obj -

Private Members

size_t height_
class IFieldScanner

FieldScanner Interface.

It will read python object, and fill to argument’s each slot. There are two steps, prepare and fill. Scanner will alloc memory during prepare step, fill data into argument during fill step.

Subclassed by paddle::DenseScanner, paddle::IndexScanner, paddle::SequenceScanner, paddle::SparseNonValueScanner

Public Functions

DISABLE_COPY(IFieldScanner)
IFieldScanner(SlotHeader *headerPtr)

Ctor.

Parameters
  • headerPtr -

    slot header that scanner belong to.

virtual ~IFieldScanner()
virtual void startPrepare(Argument &argument)

Start prepare step.

virtual void prepare(Argument &argument, PyObject *obj)

Prepare step.

Note
the obj could be a timestep of sample or whole sample. It depends what scanner it is.

virtual void finishPrepare(Argument &argument)

Finish Prepare step.

virtual void startFill(Argument &argument)

Start fill step.

virtual void fill(Argument &argument, PyObject *obj)

Fill step.

Note
the obj could be a timestep of sample or whole sample. It depends what scanner it is.

virtual void finishFill(Argument &argument)

Finish fill step.

Public Static Functions

IFieldScanner *create(SlotHeader *header)

Factory method. Create a scanner by header. The final scanner may be combine many scanners.

Note
Fatal if header is not support.

Protected Attributes

SlotHeader *headerPtr_
class IndexScanner

Scanner for index slot

Inherits from paddle::IFieldScanner

Public Functions

IndexScanner(SlotHeader *ptr)
virtual void prepare(Argument &argument, PyObject *obj)

Prepare memory space.

Note
obj is a single timestep of sample

virtual void finishPrepare(Argument &argument)

Finish Prepare step.

virtual void fill(Argument &argument, PyObject *obj)

Fill one index to argument.

Private Members

size_t cnt_
class IPyDataProviderCache

Py Data Provider Cache Interface.

Subclassed by paddle::CacheOnePassInMemory, paddle::NoCacheStrategy

Public Functions

virtual ~IPyDataProviderCache()
virtual bool reset() = 0

invoke when DataProvider::reset()

Return
true if read data from python.

virtual void drop(std::deque<PyObjectPtr> *data) = 0

invoke when these data are used by DataProvider, and need to clear.

Note
The implemented class must clear these data array. Or if you want to delete the PyObjectPtr later, you should make sure the paddle process only have one active thread calling python code (use PyGuard otherwise).
Parameters
  • data -

    used data.

virtual std::deque<PyObjectPtr> *load() = 0

Return whole data in cache.

Public Static Functions

IPyDataProviderCache *create(CacheType ct)

Factory method. Convert CacheType to IPyDataProviderCache*

class NoCacheStrategy

No Cache Strategy. Will destruct old data immediately and load data from python every pass.

Inherits from paddle::IPyDataProviderCache

Public Functions

virtual bool reset()

invoke when DataProvider::reset()

Return
true if read data from python.

virtual void drop(std::deque<PyObjectPtr> *data)

invoke when these data are used by DataProvider, and need to clear.

Note
The implemented class must clear these data array. Or if you want to delete the PyObjectPtr later, you should make sure the paddle process only have one active thread calling python code (use PyGuard otherwise).
Parameters
  • data -

    used data.

virtual std::deque<PyObjectPtr> *load()

Return whole data in cache.

class PyDataProvider2

PyDataProvider2.

For usage, please refer python module ‘paddle.trainer.PyDataProvider2’

Here, we start a thread to read data. It is totally asynchronous for reading data. And it support cache strategies.

Inherits from paddle::DataProvider

Public Functions

PyDataProvider2(const DataConfig &config, bool useGpu)

Ctor

virtual ~PyDataProvider2()

Dtor

Note
will stop loading thread when destructing

virtual void reset()

Resetting the PyDataProvider. May start reading thread here.

virtual void shuffle()

Shuffle. Do nothing because PyDataProvider do shuffle implicitly by random select data from datapool.

virtual int64_t getSize()

Not limited size.

virtual int64_t getNextBatchInternal(int64_t size_, DataBatch *batch)

Loading a batch of data.

Private Functions

void createPyDataObj(const std::string &model, const std::string &className, const std::string &fileListName, PyObjectPtr &&kwargs)
void readPyFields()
PyObjectPtr loadPyFileLists(const std::string &fileListName)
void loadThread()
void resetImpl(bool startNewThread)

Private Members

std::unique_ptr<std::thread> loadThread_
std::atomic<bool> exit_
std::vector<PyObjectPtr> callingContexts_
std::deque<PyObjectPtr> dataPool_
size_t poolActualSize_
std::condition_variable pushCV_
std::condition_variable pullCV_
std::mutex mtx_
ThreadBarrier callingContextCreated_
std::unique_ptr<IPyDataProviderCache> cache_
PyObjectPtr instance_
size_t poolSize_
bool canOverBatchSize_
PyObjectPtr calcBatchSize_
PyObjectPtr generator_
std::vector<std::string> fileLists_
std::vector<SlotHeader> headers_

Private Static Attributes

PyObjectPtr zeroTuple_
std::unordered_set<uintptr_t> gModuleClsPtrs_
class PositionRandom

Public Functions

PositionRandom(bool skipRand)
size_t operator()(size_t len)

Private Members

std::default_random_engine &eng_
std::unique_ptr<std::uniform_int_distribution<size_t>> dist_
bool skipRand_
class SequenceScanner

Sequence Scanner. Scanner for sequence or sub-sequence.

Inherits from paddle::IFieldScanner

Public Functions

SequenceScanner(std::unique_ptr<IFieldScanner> &&innerScanner, const std::function<ICpuGpuVectorPtr&(Argument&)> &getSeqStartPos)

Ctor

Parameters
  • innerScanner -

    inner scanner for each timestep or sub-sequence.

  • getSeqStartPos -

    A callback, (Argument) => ICpuGpuVectorPtr. return a sequence start position or a sub-sequence start position.

virtual void startPrepare(Argument &argument)

Start prepare. Invoke inner->startPrepare too.

virtual void prepare(Argument &argument, PyObject *obj)

Prepare. obj is a list or tuple. it will invoke inner_->prepare for each element of sequence obj.

virtual void finishPrepare(Argument &argument)

Finish prepare. invoke inner_->finishPrepare too.

virtual void startFill(Argument &argument)

Start fill. invoke inner->startFill too.

virtual void fill(Argument &argument, PyObject *obj)

Fill. Obj is a tuple or list. invoke inner->fill for each element of sequence obj. And set seqStartPos at same time. The seqStartPos will be calculated by getSeqStartPos callback passed in ctor.

virtual void finishFill(Argument &argument)

Finish fill. will invoke inner->finishFill too.

Protected Functions

size_t getSize(PyObject *obj)

Private Members

std::unique_ptr<IFieldScanner> inner_
size_t cnt_
std::function<ICpuGpuVectorPtr&(Argument&)> getSeqStartPos_
struct SlotHeader

Public Members

size_t dim
SlotType slotType
SeqType seqType
class SparseNonValueScanner

Inherits from paddle::IFieldScanner

Subclassed by paddle::SparseValueScanner

Public Functions

SparseNonValueScanner(SlotHeader *ptr)
virtual void prepare(Argument &argument, PyObject *obj)

Prepare memory space

Note
obj is a timestep of one sample.

virtual void finishPrepare(Argument &argument)

Finish Prepare step.

virtual void startFill(Argument &argument)

Start fill step.

virtual void fill(Argument &argument, PyObject *obj)

Fill one sparse vector to argument.

Note
obj is a timestep of one sample.

Protected Functions

virtual void setData(int *col, real *dat, PyObject *obj)

Set a single sparse index and value.

Parameters
  • col -

    sparse index

  • dat -

    sparse value

  • obj -

    Python Object. For sparse_non_value is a PyInt or PyLong. For sparse_value is a Tuple (int, float).

Protected Attributes

size_t nnz_
size_t height_
class SparseValueScanner

Inherits from paddle::SparseNonValueScanner

Public Functions

SparseValueScanner(SlotHeader *ptr)
virtual void finishPrepare(Argument &argument)

Finish Prepare step.

Protected Functions

virtual void setData(int *col, real *dat, PyObject *obj)

Set a single sparse index and value.

Parameters
  • col -

    sparse index

  • dat -

    sparse value

  • obj -

    Python Object. For sparse_non_value is a PyInt or PyLong. For sparse_value is a Tuple (int, float).

namespace paddle
template <class T>
class DataProviderGroup

Inherits from paddle::DataProvider

Public Functions

DataProviderGroup(const DataConfig &config, bool useGpu)
~DataProviderGroup()
virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

virtual void shuffle()

Shuffle the data set

virtual int64_t getSize()

return the number of training samples in the data set. return -1 to indicate unlimited number of samples.

virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch)

Protected Types

typedef T ProviderType
typedef std::shared_ptr<ProviderType> ProviderPtrType

Protected Attributes

ProviderPtrType provider_
std::vector<std::string> fileList_
std::mutex lock_
std::unique_ptr<MultiThreadWorker<ProviderType>> loader_

Private Functions

void startLoader()
void stopLoader()
void forceStopLoader()
std::shared_ptr<T> loadFile(const std::vector<std::string> &fileList)
namespace paddle
class MultiDataProvider

Inherits from paddle::DataProvider

Public Functions

MultiDataProvider(const DataConfig &config, bool useGpu)
~MultiDataProvider()
virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

virtual void shuffle()

Shuffle the data set

virtual int64_t getSize()

return the number of training samples in the data set. return -1 to indicate unlimited number of samples.

virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch)
bool isTestMode() const

Protected Attributes

std::vector<std::unique_ptr<DataProvider>> subDataProviders_

Private Members

int totalDataRatio_
bool isTestMode_

Proto Data Provider

namespace paddle
class ProtoDataProvider
#include <ProtoDataProvider.h>

Data file with each sample specified by proto message DataSample defined in DataFormat.proto.

The file format is

header

sample1

sample2

...

sampleN

Note
: In the data file, each message is prefixed with its length. The read/write of the protbuf are implemented in ProtoReader.h

Inherits from paddle::DataProvider

Subclassed by paddle::ProtoSequenceDataProvider

Public Functions

ProtoDataProvider(const DataConfig &config, bool useGpu, bool loadDataAll = true)
virtual void reset()

reset() must be called before any calls to getNextBatch() reset all the value of index IMPORTANT: subclass reset() should always call the base class reset() at the end of the function

virtual int64_t getSize()

Note
this size includes the sequences which are skipped because they are longer than the batch size.

virtual void shuffle()

Shuffle the data set

void loadData(const std::vector<std::string> &fileList)
virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch)

Protected Functions

void loadData(const std::string &fileName)
void loadDataFile(const std::string &fileName)
void checkDataHeader(const DataHeader &header)
void fillSlots(const DataSample &sample)
bool iidData() const

return true if each sample is one sequence, i.e., independent of other samples.

void checkSample(const DataSample &sample)
template <class Op>
int64_t sequenceLoop(Op op, int64_t size)
template <class Op>
int64_t sampleLoop(Op op, int64_t size)
template <class Op>
int64_t subSampleLoop(Op op, int64_t size, int slot)
void showDataStats()

Protected Attributes

DataHeader header_
int numVecSlots_
std::vector<ProtoSlot> slots_
size_t sampleNums_
std::vector<size_t> sequenceStartPositions_

The starting position of each sequence in samples. The last element should be num of samples. If empty, each sample is one sequence.

int64_t currentSequenceIndex_
std::vector<size_t> shuffledSequenceIds_
ThreadLocalD<DataBatch> cpuBatch_
ThreadLocalD<DataBatch> gpuBatch_
RWLock lock_
std::vector<StatPtr> nnzStats_
struct ProtoSlot

Public Members

SlotDef::SlotType type
int dim
std::vector<int> indexData
std::vector<real> denseData
std::vector<sparse_non_value_t> sparseNonValueData
std::vector<sparse_float_value_t> sparseFloatValueData
std::vector<int64_t> indices
std::vector<int64_t> subIndices
std::vector<ProtoVarSlot> varDenseData
std::vector<std::vector<int>> varIndices
std::vector<std::string> strData
struct ProtoVarSlot

Public Members

std::vector<real> data
std::vector<int> dims
class ProtoSequenceDataProvider
#include <ProtoDataProvider.h>

Special use for Proto data: instances should contain sparse-non-value slots and label. ProtoSequenceDataProvider treats each SPARSE SLOT as a SEQUENCE

Inherits from paddle::ProtoDataProvider

Public Functions

ProtoSequenceDataProvider(const DataConfig &config, bool useGpu, bool loadDataAll = true)
~ProtoSequenceDataProvider()
virtual int64_t getNextBatchInternal(int64_t size, DataBatch *batch)
namespace paddle
class ProtoReader
#include <ProtoReader.h>

ProtoReader/ProtoWriter are used to read/write a sequence of protobuf messages from/to i/ostream.

Public Functions

ProtoReader(std::istream *s, bool dataCompression = false)
bool read(google::protobuf::MessageLite *msg)

read one message

Protected Attributes

std::unique_ptr<google::protobuf::io::ZeroCopyInputStream> istreamInput_
std::unique_ptr<google::protobuf::io::GzipInputStream> gzipInput_
std::unique_ptr<google::protobuf::io::CodedInputStream> codedInput_
bool dataCompression_
int approximateReadedBytes_

This variable dosen’t store the exact bytes readed by CodedInputStream object since which is constructed. Instead, it store the approximate bytes because we can’t tell how many bytes are readed by the object with the help of API.

Note
this code depends on protobuf 2.4.0. There is nothing like CodedInputStream::CurrentPosition() in protobuf 2.5.0 to tell us how many bytes has the object readed so far. Therefore, we calculated bytes ourselves.

Protected Static Attributes

const int kDefaultTotalBytesLimit

This is the maximum number of bytes that this CodedInputStream will read before refusing to continue.

const int kMaxLimitBytes

If data readed by the reader is more than 55MB( << 64MB), we reset the CodedInputStream object. This can help avoid 64MB warning which will cause the ParseFromCodedStream to fail.

class ProtoWriter

Public Functions

ProtoWriter(std::ostream *s, bool dataCompression = false)
bool write(const google::protobuf::MessageLite &msg)

write one message.

Protected Attributes

std::unique_ptr<google::protobuf::io::ZeroCopyOutputStream> ostreamOutput_
std::unique_ptr<google::protobuf::io::GzipOutputStream> gzipOutput_
std::unique_ptr<google::protobuf::io::CodedOutputStream> codedOutput_