提交 b7df7f9e 编写于 作者: L Luo Tao

remove usused ProtoDataProvider related codes

上级 8b30e2ab
...@@ -73,7 +73,6 @@ if(MOBILE_INFERENCE) ...@@ -73,7 +73,6 @@ if(MOBILE_INFERENCE)
list(REMOVE_ITEM GSERVER_SOURCES list(REMOVE_ITEM GSERVER_SOURCES
dataproviders/DataProvider.cpp dataproviders/DataProvider.cpp
dataproviders/MultiDataProvider.cpp dataproviders/MultiDataProvider.cpp
dataproviders/ProtoDataProvider.cpp
dataproviders/PyDataProvider2.cpp dataproviders/PyDataProvider2.cpp
dataproviders/PyDataProvider.cpp) dataproviders/PyDataProvider.cpp)
......
...@@ -16,8 +16,8 @@ limitations under the License. */ ...@@ -16,8 +16,8 @@ limitations under the License. */
#include <unistd.h> #include <unistd.h>
#include <algorithm> #include <algorithm>
#include "ProtoDataProvider.h"
#include "paddle/utils/Logging.h" #include "paddle/utils/Logging.h"
#include "paddle/utils/Stat.h"
#include "paddle/utils/StringUtil.h" #include "paddle/utils/StringUtil.h"
#include "paddle/utils/Util.h" #include "paddle/utils/Util.h"
...@@ -164,8 +164,6 @@ DataProvider* DataProvider::create(const DataConfig& config, ...@@ -164,8 +164,6 @@ DataProvider* DataProvider::create(const DataConfig& config,
REGISTER_DATA_PROVIDER(simple, SimpleDataProvider); REGISTER_DATA_PROVIDER(simple, SimpleDataProvider);
REGISTER_DATA_PROVIDER(dummy, DummyDataProvider); REGISTER_DATA_PROVIDER(dummy, DummyDataProvider);
REGISTER_DATA_PROVIDER(proto, ProtoDataProvider);
REGISTER_DATA_PROVIDER(proto_sequence, ProtoSequenceDataProvider);
int64_t DataProvider::getNextBatch(int64_t size, DataBatch* batch) { int64_t DataProvider::getNextBatch(int64_t size, DataBatch* batch) {
int64_t batchSize = doubleBuffer_ ? getNextBatchFromBuffer(size, batch) int64_t batchSize = doubleBuffer_ ? getNextBatchFromBuffer(size, batch)
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "ProtoDataProvider.h"
#include <algorithm>
#include <fstream>
#include <istream>
#include "paddle/utils/StringUtil.h"
#include "paddle/utils/Util.h"
#include "DataProviderGroup.h"
#include "paddle/utils/Logging.h"
DEFINE_double(memory_threshold_on_load_data,
1.0,
"stop loading data when memory is not sufficient");
namespace paddle {
REGISTER_DATA_PROVIDER(proto_group, DataProviderGroup<ProtoDataProvider>);
REGISTER_DATA_PROVIDER(proto_sequence_group,
DataProviderGroup<ProtoSequenceDataProvider>);
ProtoDataProvider::ProtoDataProvider(const DataConfig& config,
bool useGpu,
bool loadDataAll)
: DataProvider(config, useGpu), sampleNums_(0), currentSequenceIndex_(0) {
if (loadDataAll) {
loadData(config_.files());
}
}
void ProtoDataProvider::loadData(const std::vector<std::string>& fileList) {
for (auto& file : fileList) {
if (FLAGS_memory_threshold_on_load_data < 1.0) {
double memUsage = getMemoryUsage();
if (memUsage > FLAGS_memory_threshold_on_load_data) {
LOG(INFO) << "memUsage is " << memUsage << ", > "
<< FLAGS_memory_threshold_on_load_data
<< " therefore SKIP ALL REMAINING file.";
break;
}
}
LOG(INFO) << "load data file " << file;
loadDataFile(file);
}
if (sequenceStartPositions_.size() == sampleNums_) {
// This means that each sample is one sequence
shuffledSequenceIds_.swap(sequenceStartPositions_);
} else {
sequenceStartPositions_.push_back(sampleNums_);
shuffledSequenceIds_.reserve(sequenceStartPositions_.size() - 1);
for (size_t i = 0; i < sequenceStartPositions_.size() - 1; ++i) {
shuffledSequenceIds_.push_back(i);
}
}
LOG(INFO) << "read done, num of instance=" << sampleNums_;
showDataStats();
}
void ProtoDataProvider::loadData(const std::string& fileName) {
std::vector<std::string> fileList;
loadFileList(fileName, fileList);
loadData(fileList);
}
void ProtoDataProvider::checkDataHeader(const DataHeader& header) {
if (header_.slot_defs_size()) {
// header_ is already set. Need to check consistency.
CHECK_EQ(header_.slot_defs_size(), header.slot_defs_size())
<< "Different header";
for (int i = 0; i < header.slot_defs_size(); ++i) {
CHECK_EQ(header_.slot_defs(i).type(), header.slot_defs(i).type());
CHECK_EQ(header_.slot_defs(i).dim(), header.slot_defs(i).dim());
}
return;
}
// header_ is not set before
CHECK(header.slot_defs_size()) << "Invalid header: no slot is defined";
int i;
for (i = 0; i < header.slot_defs_size(); ++i) {
if (header.slot_defs(i).type() == SlotDef::INDEX ||
header.slot_defs(i).type() == SlotDef::VAR_MDIM_INDEX) {
break;
}
constexpr int kBufLen = 100;
char buf[kBufLen];
snprintf(buf, kBufLen, "slot%d_nnz", i);
nnzStats_.push_back(getStat(buf));
}
numVecSlots_ = i;
// Check that INDEX slots are after VECTOR slots
for (int i = numVecSlots_; i < header.slot_defs_size(); ++i) {
CHECK(header.slot_defs(i).type() == SlotDef::INDEX ||
header.slot_defs(i).type() == SlotDef::VAR_MDIM_INDEX);
}
slots_.clear();
slots_.reserve(header.slot_defs_size());
for (int i = 0; i < header.slot_defs_size(); ++i) {
slots_.emplace_back();
slots_.back().type = header.slot_defs(i).type();
slots_.back().dim = header.slot_defs(i).dim();
if (SlotDef::VECTOR_SPARSE_NON_VALUE == header.slot_defs(i).type() ||
SlotDef::VECTOR_SPARSE_VALUE == header.slot_defs(i).type()) {
slots_.back().indices.push_back(0);
}
}
header_ = header;
}
void ProtoDataProvider::checkSample(const DataSample& sample) {
CHECK_EQ(numVecSlots_, sample.vector_slots_size());
CHECK(header_.slot_defs_size() == numVecSlots_ + sample.id_slots_size() ||
header_.slot_defs_size() == numVecSlots_ + sample.var_id_slots_size());
for (int i = 0; i < numVecSlots_; ++i) {
uint32_t dim = header_.slot_defs(i).dim();
switch (header_.slot_defs(i).type()) {
case SlotDef::VECTOR_DENSE: {
CHECK_EQ(static_cast<int>(dim), sample.vector_slots(i).values_size());
CHECK_EQ(0, sample.vector_slots(i).ids_size());
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE: {
if (0 == sample.vector_slots(i).ids_size()) {
break;
}
CHECK_LT(0, sample.vector_slots(i).ids_size());
CHECK_EQ(0, sample.vector_slots(i).values_size());
auto maxId = *std::max_element(sample.vector_slots(i).ids().begin(),
sample.vector_slots(i).ids().end());
CHECK_GT(dim, maxId);
break;
}
case SlotDef::VECTOR_SPARSE_VALUE: {
if (0 == sample.vector_slots(i).ids_size()) {
CHECK_EQ(0, sample.vector_slots(i).values_size());
break;
}
CHECK_LT(0, sample.vector_slots(i).values_size());
CHECK_GE(static_cast<int>(dim), sample.vector_slots(i).values_size());
CHECK_EQ(sample.vector_slots(i).values_size(),
sample.vector_slots(i).ids_size());
auto maxId = *std::max_element(sample.vector_slots(i).ids().begin(),
sample.vector_slots(i).ids().end());
CHECK_GT(dim, maxId);
break;
}
case SlotDef::VAR_MDIM_DENSE: {
if (static_cast<int>(dim) != 0) {
CHECK_EQ(static_cast<int>(dim), sample.vector_slots(i).values_size());
if (sample.vector_slots(i).dims_size() != 0) {
int totalDim = sample.vector_slots(i).dims(0);
for (int j = 1; j < sample.vector_slots(i).dims_size(); ++j) {
totalDim *= sample.vector_slots(i).dims(j);
}
CHECK_EQ(static_cast<int>(dim), totalDim);
}
} else {
CHECK_NE(sample.vector_slots(i).dims_size(), 0);
int totalDim = sample.vector_slots(i).dims(0);
for (int j = 1; j < sample.vector_slots(i).dims_size(); ++j) {
totalDim *= sample.vector_slots(i).dims(j);
}
CHECK_EQ(totalDim, sample.vector_slots(i).values_size());
}
break;
}
case SlotDef::STRING: {
CHECK_EQ(static_cast<int>(1), sample.vector_slots(i).strs_size());
CHECK_EQ(0, sample.vector_slots(i).ids_size());
CHECK_EQ(0, sample.vector_slots(i).values_size());
break;
}
default:
LOG(FATAL) << "BUG: Should not reach here";
}
}
for (int i = numVecSlots_; i < header_.slot_defs_size(); ++i) {
if (header_.slot_defs(i).type() != SlotDef::VAR_MDIM_INDEX) {
uint32_t id = sample.id_slots(i - numVecSlots_);
if (id == -1U) continue;
CHECK_LT(id, header_.slot_defs(i).dim());
} else {
for (int j = 0; j < sample.var_id_slots(i - numVecSlots_).ids_size();
++j) {
uint32_t id = sample.var_id_slots(i - numVecSlots_).ids(j);
CHECK_LT(id, header_.slot_defs(i).dim());
}
}
}
}
void ProtoDataProvider::loadDataFile(const std::string& fileName) {
std::ifstream is(fileName);
CHECK(is) << "Fail to open " << fileName;
bool dataCompression = str::endsWith(fileName, ".gz");
std::unique_ptr<ProtoReader> reader(new ProtoReader(&is, dataCompression));
CHECK(reader) << "Fail to create proto data input stream";
DataHeader header;
CHECK(reader->read(&header));
checkDataHeader(header);
DataSample sample;
do {
if (!reader->read(&sample)) {
break;
}
checkSample(sample);
if (sample.is_beginning()) {
sequenceStartPositions_.push_back(sampleNums_);
}
fillSlots(sample);
++sampleNums_;
} while (true);
CHECK(is.eof()) << "Fail to read file";
reader.reset(nullptr);
is.close();
}
// checkSample has done before, no check here
void ProtoDataProvider::fillSlots(const DataSample& sample) {
for (size_t i = 0; i < slots_.size(); ++i) {
auto& slot = slots_[i];
int dim = slot.dim;
switch (slot.type) {
case SlotDef::VECTOR_DENSE: {
size_t oldSize = slot.denseData.size();
slot.denseData.resize(oldSize + dim);
const float* values = sample.vector_slots(i).values().data();
#ifdef PADDLE_TYPE_DOUBLE
std::copy(values, values + dim, slot.denseData.begin() + oldSize);
#else
memcpy(slot.denseData.data() + oldSize, values, sizeof(real) * dim);
#endif
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE: {
int slotSize = sample.vector_slots(i).ids_size();
int subSlotSize = 0;
int id = 0; // the slot id
// find whether this vector_slots has subseq. If not has subseq,
// subSlotSize = 0.
for (id = 0; id < sample.subseq_slots_size(); id++) {
if (sample.subseq_slots(id).slot_id() == i) {
subSlotSize = sample.subseq_slots(id).lens_size();
break;
}
}
if (subSlotSize && slot.subIndices.size() == 0UL) {
// If has subSeq, the first element of subIndices = 0.
slot.subIndices.push_back(0);
}
if (slotSize == 0UL) {
// if has no id, new indices = old indices.
slot.indices.push_back(slot.indices.back());
// if has subSeq, new subIndices = old subIndices.
if (slot.subIndices.size()) {
slot.subIndices.push_back(slot.subIndices.back());
}
break;
}
slot.sparseNonValueData.resize(slot.indices.back() + slotSize);
const unsigned int* ids = sample.vector_slots(i).ids().data();
memcpy(slot.sparseNonValueData.data() + slot.indices.back(),
ids,
sizeof(*ids) * slotSize);
slot.indices.push_back(slot.indices.back() + slotSize);
if (subSlotSize) {
for (int ii = 0; ii < subSlotSize; ++ii) {
slot.subIndices.push_back(slot.subIndices.back() +
sample.subseq_slots(id).lens(ii));
}
}
break;
}
case SlotDef::VECTOR_SPARSE_VALUE: {
if (0 == sample.vector_slots(i).ids_size()) {
slot.indices.push_back(slot.indices.back());
break;
}
int slotSize = sample.vector_slots(i).ids_size();
slot.sparseFloatValueData.resize(slot.indices.back() + slotSize);
const unsigned int* ids = sample.vector_slots(i).ids().data();
const float* values = sample.vector_slots(i).values().data();
for (int ii = 0; ii < slotSize; ++ii) {
slot.sparseFloatValueData[slot.indices.back() + ii].col = ids[ii];
slot.sparseFloatValueData[slot.indices.back() + ii].value =
values[ii];
}
slot.indices.push_back(slot.indices.back() + slotSize);
break;
}
case SlotDef::INDEX: {
slot.indexData.push_back(sample.id_slots(i - numVecSlots_));
break;
}
case SlotDef::VAR_MDIM_DENSE: {
size_t oldSize = slot.varDenseData.size();
slot.varDenseData.resize(oldSize + 1);
size_t varDim = sample.vector_slots(i).values_size();
slot.varDenseData[oldSize].data.resize(varDim);
const float* values = sample.vector_slots(i).values().data();
#ifdef PADDLE_TYPE_DOUBLE
std::copy(
values, values + varDim, slot.varDenseData[oldSize].data.data());
#else
memcpy(slot.varDenseData[oldSize].data.data(),
values,
sizeof(real) * varDim);
#endif
slot.varDenseData[oldSize].dims.resize(
sample.vector_slots(i).dims_size());
memcpy(slot.varDenseData[oldSize].dims.data(),
sample.vector_slots(i).dims().data(),
sizeof(uint32_t) * sample.vector_slots(i).dims_size());
break;
}
case SlotDef::VAR_MDIM_INDEX: {
size_t oldSize = slot.varIndices.size();
slot.varIndices.resize(oldSize + 1);
size_t varDim = sample.var_id_slots(i - numVecSlots_).ids_size();
slot.varIndices[oldSize].resize(varDim);
memcpy(slot.varIndices[oldSize].data(),
sample.var_id_slots(i - numVecSlots_).ids().data(),
sizeof(uint32_t) * varDim);
break;
}
case SlotDef::STRING: {
slot.strData.push_back(sample.vector_slots(i).strs(0));
break;
}
}
}
}
void ProtoDataProvider::showDataStats() {
std::ostringstream oss;
for (size_t i = 0; i < slots_.size(); ++i) {
auto& slot = slots_[i];
if (slot.type == SlotDef::VECTOR_SPARSE_NON_VALUE) {
size_t nnz = slot.sparseNonValueData.size();
oss << "slot" << i << ":avgNNZ=" << ((double)nnz / sampleNums_) << "; ";
} else if (slot.type == SlotDef::VECTOR_SPARSE_VALUE) {
size_t nnz = slot.sparseFloatValueData.size();
oss << "slot" << i << ":avgNNZ=" << ((double)nnz / sampleNums_) << "; ";
}
}
LOG(INFO) << oss.str();
}
void ProtoDataProvider::reset() {
currentSequenceIndex_ = 0;
if (!skipShuffle_) {
shuffle();
}
DataProvider::reset();
}
void ProtoDataProvider::shuffle() {
std::shuffle(shuffledSequenceIds_.begin(),
shuffledSequenceIds_.end(),
ThreadLocalRandomEngine::get());
}
/*
Loop through sequences starting from currentSequenceIndex_
for at most size samples. For each sequence ranging from [begin, end),
op(begin, end) will be called.
return the number of sequences scanned
*/
template <class Op>
int64_t ProtoDataProvider::sequenceLoop(Op op, int64_t size) {
int64_t sz = 0;
size_t i;
size_t sequenceCount = shuffledSequenceIds_.size();
if (usageRatio_ < 1.0f) {
sequenceCount = static_cast<int64_t>(sequenceCount * usageRatio_);
}
for (i = currentSequenceIndex_; i < sequenceCount; ++i) {
size_t id = shuffledSequenceIds_[i];
int64_t begin = sequenceStartPositions_[id];
int64_t end = sequenceStartPositions_[id + 1];
int64_t len = end - begin;
if (sz + len > size && sz > 0) break;
sz += len;
op(begin, end);
}
return i - currentSequenceIndex_;
}
/*
Loop through sequences starting from currentSequenceIndex_
for at most size samples. For each sample of each sequence at position
pos, op(pos) will be called.
return the number of sequences scanned
*/
template <class Op>
int64_t ProtoDataProvider::sampleLoop(Op op, int64_t size) {
if (iidData()) {
size = std::min<int64_t>(sampleNums_ - currentSequenceIndex_, size);
for (int64_t i = currentSequenceIndex_; i < currentSequenceIndex_ + size;
++i) {
size_t pos = shuffledSequenceIds_[i];
op(pos);
}
return size;
} else {
auto f = [op](int64_t begin, int64_t end) {
for (int64_t pos = begin; pos < end; ++pos) {
op(pos);
}
};
return sequenceLoop(f, size);
}
}
/*
Loop through sub-sequences starting from currentSequenceIndex_
for at most size samples. For each sample of each sub-sequence at position
pos, op(pos) will be called.
return the number of sub-sequences scanned
*/
template <class Op>
int64_t ProtoDataProvider::subSampleLoop(Op op, int64_t size, int slot) {
CHECK(iidData()) << "subSampleLoop only accepts iid data";
size = std::min<int64_t>(sampleNums_ - currentSequenceIndex_, size);
int subSize = 0;
for (int64_t i = currentSequenceIndex_; i < currentSequenceIndex_ + size;
++i) {
size_t pos = shuffledSequenceIds_[i];
int64_t* indexs = slots_[slot].indices.data();
int64_t* subIndexs = slots_[slot].subIndices.data();
int64_t subSeqStart = 0;
int64_t subSeqEnd = 0;
for (int j = 0; j < (int)slots_[slot].subIndices.size(); j++) {
if (subIndexs[j] == indexs[pos]) {
subSeqStart = j;
if (subIndexs[pos] == subIndexs[pos + 1]) {
subSeqEnd = j + 1;
break;
}
} else if (subIndexs[j] == indexs[pos + 1]) {
subSeqEnd = j;
break;
}
}
for (int j = subSeqStart; j < subSeqEnd; j++) {
op(j);
}
subSize += subSeqEnd - subSeqStart;
}
return subSize;
}
int64_t ProtoDataProvider::getNextBatchInternal(int64_t size,
DataBatch* batch) {
int64_t numSequences = 0; // actual number of sequences in the batch
// the number of sequences scanned, including those skipped because too long
int64_t numScannedSeqs = 0;
std::lock_guard<RWLock> guard(lock_);
if (iidData()) {
size = std::min<int64_t>(getSize() - currentSequenceIndex_, size);
numScannedSeqs = numSequences = size;
} else {
int64_t sz = 0;
auto op = [&sz, &numSequences](int64_t begin, int64_t end) {
++numSequences;
sz += end - begin;
};
numScannedSeqs = sequenceLoop(op, size);
VLOG_IF(1, numScannedSeqs > numSequences)
<< numScannedSeqs - numSequences
<< " sequences are skipped because longer than " << size;
size = sz;
}
if (size <= 0) return 0;
DataBatch& cpuBatch = *cpuBatch_;
std::vector<Argument>& cpuArguments = cpuBatch.getStreams();
cpuBatch.setSize(size);
cpuArguments.resize(header_.slot_defs_size());
if (!iidData()) {
ICpuGpuVector::resizeOrCreate(cpuArguments[0].sequenceStartPositions,
numSequences + 1,
/* useGpu= */ false);
int* buf = cpuArguments[0].sequenceStartPositions->getMutableData(false);
int pos = 0;
int i = 0;
auto op = [buf, &pos, &i](int64_t begin, int64_t end) {
buf[i] = pos;
pos += end - begin;
++i;
};
sequenceLoop(op, size);
buf[i] = size;
for (size_t slot = 1; slot < cpuArguments.size(); ++slot) {
cpuArguments[slot].sequenceStartPositions =
cpuArguments[0].sequenceStartPositions;
}
}
for (int slot = 0; slot < header_.slot_defs_size(); ++slot) {
size_t dim = header_.slot_defs(slot).dim();
SlotDef::SlotType slotType = header_.slot_defs(slot).type();
std::vector<int64_t> dataPos;
dataPos.reserve(size);
auto op = [this, &dataPos](int64_t pos) { dataPos.push_back(pos); };
sampleLoop(op, size);
switch (slotType) {
case SlotDef::VECTOR_DENSE: {
Matrix::resizeOrCreate(cpuArguments[slot].value,
size,
dim,
false, // trans = false
false); // useGpu = false
real* buf = cpuArguments[slot].value->getData();
for (int i = 0; i < size; ++i) {
memcpy(buf + i * dim,
slots_[slot].denseData.data() + dataPos[i] * dim,
sizeof(real) * dim);
}
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE: {
if (!(cpuArguments[slot].value)) {
cpuArguments[slot].value =
Matrix::createSparseMatrix(size,
dim,
size /*DEFAULT_AVG_WIDTH = 1*/,
NO_VALUE,
SPARSE_CSR,
false,
useGpu_);
}
auto mat = cpuArguments[slot].value;
mat->resize(size, dim);
if (std::dynamic_pointer_cast<GpuSparseMatrix>(mat)) {
std::dynamic_pointer_cast<GpuSparseMatrix>(mat)->copyFrom(
dataPos.data(),
slots_[slot].indices.data(),
slots_[slot].sparseNonValueData.data(),
HPPL_STREAM_1);
} else if (std::dynamic_pointer_cast<CpuSparseMatrix>(mat)) {
std::dynamic_pointer_cast<CpuSparseMatrix>(mat)->copyFrom(
dataPos.data(),
slots_[slot].indices.data(),
slots_[slot].sparseNonValueData.data());
} else {
LOG(FATAL) << "Not Supported";
}
size_t numElements = 0;
for (auto pos : dataPos) {
numElements +=
slots_[slot].indices[pos + 1] - slots_[slot].indices[pos];
}
nnzStats_[slot]->addSample(numElements);
break;
}
case SlotDef::VECTOR_SPARSE_VALUE: {
if (!(cpuArguments[slot].value)) {
cpuArguments[slot].value =
Matrix::createSparseMatrix(size,
dim,
size /*DEFAULT_AVG_WIDTH = 1*/,
FLOAT_VALUE,
SPARSE_CSR,
false,
useGpu_);
}
auto mat = cpuArguments[slot].value;
mat->resize(size, dim);
if (std::dynamic_pointer_cast<GpuSparseMatrix>(mat)) {
std::dynamic_pointer_cast<GpuSparseMatrix>(mat)->copyFrom(
dataPos.data(),
slots_[slot].indices.data(),
slots_[slot].sparseFloatValueData.data(),
HPPL_STREAM_1);
} else if (std::dynamic_pointer_cast<CpuSparseMatrix>(mat)) {
std::dynamic_pointer_cast<CpuSparseMatrix>(mat)->copyFrom(
dataPos.data(),
slots_[slot].indices.data(),
slots_[slot].sparseFloatValueData.data());
} else {
LOG(FATAL) << "Not Supported";
}
break;
}
case SlotDef::INDEX: {
IVector::resizeOrCreate(cpuArguments[slot].ids,
size,
/* useGpu= */ false);
int* buf = cpuArguments[slot].ids->getData();
for (int i = 0; i < size; ++i) {
buf[i] = slots_[slot].indexData[dataPos[i]];
}
break;
}
case SlotDef::VAR_MDIM_DENSE: {
CHECK_EQ(size, 1);
auto mat = cpuArguments[slot].value;
size_t totalDim = slots_[slot].varDenseData[dataPos[0]].data.size();
CHECK_EQ(slots_[slot].varDenseData[dataPos[0]].dims.size(), size_t(3));
size_t height, width, depth, oldWidth;
/* dims[2] is depth, will be changed to dims[0] in future */
depth = slots_[slot].varDenseData[dataPos[0]].dims[2];
height = slots_[slot].varDenseData[dataPos[0]].dims[1];
width = slots_[slot].varDenseData[dataPos[0]].dims[0];
oldWidth = width;
/* process the undesirable sample */
if (oldWidth < height) {
width = height;
}
cpuArguments[slot].setFrameHeight(height);
cpuArguments[slot].setFrameWidth(width);
if (oldWidth < height) {
totalDim = width * height * depth;
}
Matrix::resizeOrCreate(cpuArguments[slot].value,
size,
totalDim,
false, // trans = false
false); // useGpu = false
real* buf = cpuArguments[slot].value->getData();
cpuArguments[slot].value->zeroMem();
if (oldWidth < height) {
real* srcBuf = slots_[slot].varDenseData[dataPos[0]].data.data();
for (size_t i = 0; i < depth; i++) {
for (size_t j = 0; j < height; j++) {
for (size_t k = 0; k < oldWidth; k++) {
buf[i * height * width + j * width + k] =
srcBuf[i * height * oldWidth + j * oldWidth + k];
}
}
}
} else {
memcpy(buf,
slots_[slot].varDenseData[dataPos[0]].data.data(),
sizeof(real) * totalDim);
}
ICpuGpuVector::resizeOrCreate(cpuArguments[slot].sequenceStartPositions,
size + 1, /* size == 1 currently */
/* useGpu= */ false);
int* bufStarts =
cpuArguments[slot].sequenceStartPositions->getMutableData(false);
bufStarts[0] = 0;
bufStarts[1] = 1;
break;
}
case SlotDef::VAR_MDIM_INDEX: {
CHECK_EQ(size, 1);
size_t totalDim = slots_[slot].varIndices[dataPos[0]].size();
IVector::resizeOrCreate(cpuArguments[slot].ids,
totalDim,
/* useGpu= */ false);
int* buf = cpuArguments[slot].ids->getData();
memcpy(buf,
slots_[slot].varIndices[dataPos[0]].data(),
sizeof(int) * totalDim);
ICpuGpuVector::resizeOrCreate(cpuArguments[slot].sequenceStartPositions,
size + 1, /* size == 1 currently */
/* useGpu= */ false);
int* bufStarts =
cpuArguments[slot].sequenceStartPositions->getMutableData(false);
bufStarts[0] = 0;
/* we expand the convolutinal feature map to a sequence data,
* so there should be a corresponding sequence labels */
bufStarts[1] = totalDim;
break;
}
case SlotDef::STRING: {
if (cpuArguments[slot].strs) {
cpuArguments[slot].strs->resize(size);
} else {
cpuArguments[slot].strs =
std::make_shared<std::vector<std::string>>(size);
}
for (int i = 0; i < size; ++i) {
(*cpuArguments[slot].strs)[i] = slots_[slot].strData[dataPos[i]];
}
break;
}
}
}
if (useGpu_) {
std::vector<Argument>& cpuArguments = cpuBatch.getStreams();
DataBatch& gpuBatch = *gpuBatch_;
std::vector<Argument>& gpuArguments = gpuBatch.getStreams();
gpuArguments.resize(cpuArguments.size());
gpuBatch.setSize(size);
for (int i = 0; i < header_.slot_defs_size(); ++i) {
SlotDef::SlotType slotType = header_.slot_defs(i).type();
if (SlotDef::VECTOR_SPARSE_VALUE == slotType ||
SlotDef::VECTOR_SPARSE_NON_VALUE == slotType) {
gpuArguments[i] = cpuArguments[i];
gpuArguments[i].sequenceStartPositions =
cpuArguments[i].sequenceStartPositions;
} else {
gpuArguments[i].resizeAndCopyFrom(
cpuArguments[i], useGpu_, HPPL_STREAM_1);
}
}
hl_stream_synchronize(HPPL_STREAM_1);
*batch = gpuBatch;
} else {
*batch = cpuBatch;
}
currentSequenceIndex_ += numScannedSeqs;
return batch->getSize();
}
ProtoSequenceDataProvider::ProtoSequenceDataProvider(const DataConfig& config,
bool useGpu,
bool loadDataAll)
: ProtoDataProvider(config, useGpu, loadDataAll) {}
int64_t ProtoSequenceDataProvider::getNextBatchInternal(int64_t size,
DataBatch* batch) {
CHECK(iidData()) << "ProtoSequenceDataProvider only accepts iid data";
int64_t numSequences = 0; // actual number of sequences in the batch
// the number of sequences scanned, including those skipped because too long
int64_t numScannedSeqs = 0;
std::lock_guard<RWLock> guard(lock_);
size = std::min<int64_t>(getSize() - currentSequenceIndex_, size);
numScannedSeqs = numSequences = size;
if (size <= 0) return 0;
DataBatch& cpuBatch = *cpuBatch_;
std::vector<Argument>& cpuArguments = cpuBatch.getStreams();
cpuBatch.setSize(size);
cpuArguments.resize(header_.slot_defs_size());
for (int slot = 0; slot < header_.slot_defs_size(); ++slot) {
SlotDef::SlotType slotType = header_.slot_defs(slot).type();
std::vector<int64_t> dataPos;
dataPos.reserve(size);
auto op = [this, &dataPos](int64_t pos) { dataPos.push_back(pos); };
sampleLoop(op, size);
// current slot: sequenceStartPositions
ICpuGpuVector::resizeOrCreate(cpuArguments[slot].sequenceStartPositions,
size + 1,
/* useGpu= */ false);
switch (slotType) {
case SlotDef::VECTOR_SPARSE_VALUE:
case SlotDef::VAR_MDIM_DENSE:
case SlotDef::VAR_MDIM_INDEX: {
LOG(FATAL) << "ProtoSequenceDataProvider only support"
<< " VECTOR_DENSE, VECTOR_SPARSE_NON_VALUE and INDEX slots";
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE: {
// copy to IDS, not value
// pointers used in current slot
sparse_non_value_t* data = slots_[slot].sparseNonValueData.data();
int64_t* indexs = slots_[slot].indices.data();
int64_t* seqs = dataPos.data();
// current slot: i need size instances. what is the total length?
int totalFeatureInCurrentSlot = 0;
for (int ins = 0; ins < size; ins++) {
int64_t currInsId = seqs[ins];
totalFeatureInCurrentSlot +=
indexs[currInsId + 1] - indexs[currInsId];
// special: if current instance has NO feature in current slot
if (indexs[currInsId + 1] == indexs[currInsId]) {
totalFeatureInCurrentSlot++;
}
}
// done
// current slot: ids
IVector::resizeOrCreate(cpuArguments[slot].ids,
totalFeatureInCurrentSlot,
/* useGpu= */ false);
// where to write
int* currPosOfArgumentId = cpuArguments[slot].ids->getData();
int* currPosOfArgumentSeqStart =
cpuArguments[slot].sequenceStartPositions->getMutableData(false);
int allSequenceLength = 0;
currPosOfArgumentSeqStart[0] = 0;
// for each instance, copy data and fill sequence positions
for (int instance = 0; instance < size; instance++) {
int64_t currInstanceId = seqs[instance];
int64_t currInstanceLength =
indexs[currInstanceId + 1] - indexs[currInstanceId];
sparse_non_value_t* currInstanceData = data + indexs[currInstanceId];
// write sequenceStartPositions
allSequenceLength += currInstanceLength;
currPosOfArgumentSeqStart[instance + 1] = allSequenceLength;
// copy features
for (int featCopier = 0; featCopier < currInstanceLength;
featCopier++) {
currPosOfArgumentId[featCopier] = currInstanceData[featCopier].col;
}
currPosOfArgumentId += currInstanceLength;
// special: if current instance has NO feature in current slot
if (currInstanceLength == 0) {
allSequenceLength++;
currPosOfArgumentSeqStart[instance + 1] = allSequenceLength;
currPosOfArgumentId[0] = -1;
currPosOfArgumentId++;
}
// done
}
if (slots_[slot].subIndices.size()) {
std::vector<int64_t> dataSubPos;
auto op = [this, &dataSubPos](int64_t pos) {
dataSubPos.push_back(pos);
};
int subSize = subSampleLoop(op, size, slot);
ICpuGpuVector::resizeOrCreate(
cpuArguments[slot].subSequenceStartPositions, subSize + 1, false);
int* currPosOfArgumentSubSeqStart =
cpuArguments[slot].subSequenceStartPositions->getMutableData(
false);
int64_t* subSeqs = dataSubPos.data();
int64_t* subIndexs = slots_[slot].subIndices.data();
int allSubSequenceLength = 0;
currPosOfArgumentSubSeqStart[0] = 0;
// for each instance, compute sub-sequence number
for (int instance = 0; instance < subSize; instance++) {
int64_t currSubInstanceId = subSeqs[instance];
int64_t currSubInstanceLength =
subIndexs[currSubInstanceId + 1] - subIndexs[currSubInstanceId];
// write subSequenceStartPositions
allSubSequenceLength += currSubInstanceLength;
currPosOfArgumentSubSeqStart[instance + 1] = allSubSequenceLength;
// special: if current instance has NO feature in current slot
if (currSubInstanceLength == 0) {
allSubSequenceLength++;
currPosOfArgumentSubSeqStart[instance + 1] = allSubSequenceLength;
}
}
cpuArguments[slot].checkSubset();
}
break;
}
case SlotDef::INDEX: {
// label slot
IVector::resizeOrCreate(cpuArguments[slot].ids,
size,
/* useGpu= */ false);
// fill labels
int* buf = cpuArguments[slot].ids->getData();
for (int i = 0; i < size; ++i) {
buf[i] = slots_[slot].indexData[dataPos[i]];
}
// label HAS sequence structure
cpuArguments[slot].sequenceStartPositions->fillSequence(false);
break;
}
case SlotDef::VECTOR_DENSE: {
// copy values
size_t dim = header_.slot_defs(slot).dim();
Matrix::resizeOrCreate(cpuArguments[slot].value,
size,
dim,
false, // trans = false
false); // useGpu = false
real* buf = cpuArguments[slot].value->getData();
for (int i = 0; i < size; ++i) {
memcpy(buf + i * dim,
slots_[slot].denseData.data() + dataPos[i] * dim,
sizeof(real) * dim);
}
// sequence structure
cpuArguments[slot].sequenceStartPositions->fillSequence(false);
break;
}
default: { LOG(FATAL) << "should not reach here"; }
}
}
if (useGpu_) {
std::vector<Argument>& cpuArguments = cpuBatch.getStreams();
DataBatch& gpuBatch = *gpuBatch_;
std::vector<Argument>& gpuArguments = gpuBatch.getStreams();
gpuArguments.resize(cpuArguments.size());
gpuBatch.setSize(size);
for (size_t i = 0; i < cpuArguments.size(); ++i) {
gpuArguments[i].resizeAndCopyFrom(
cpuArguments[i], useGpu_, HPPL_STREAM_1);
}
hl_stream_synchronize(HPPL_STREAM_1);
*batch = gpuBatch;
} else {
*batch = cpuBatch;
}
currentSequenceIndex_ += numScannedSeqs;
return batch->getSize();
}
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <vector>
#include "DataFormat.pb.h"
#include "paddle/utils/Stat.h"
#include "DataProvider.h"
#include "ProtoReader.h"
namespace paddle {
/**
* @brief Provider data from protobuf data file with each sample
* specified by proto message
*
* DataSample defined in DataFormat.proto.
*
* The file format is
*
* header
*
* sample1
*
* sample2
*
* ...
*
* sampleN
*
* @note: In the data file, each message is prefixed with its length.
* The read/write of the protbuf are implemented in ProtoReader.h
*/
class ProtoDataProvider : public DataProvider {
public:
ProtoDataProvider(const DataConfig& config,
bool useGpu,
bool loadDataAll = true);
virtual void reset();
/**
* @note this size includes the sequences which are skipped because they
* are longer than the batch size.
*/
virtual int64_t getSize() {
int64_t size = sampleNums_;
if (usageRatio_ < 1.0f) {
size = static_cast<int64_t>(size * usageRatio_);
}
return size;
}
virtual void shuffle();
void loadData(const std::vector<std::string>& fileList);
virtual int64_t getNextBatchInternal(int64_t size, DataBatch* batch);
protected:
/**
* @brief load protobuf data from a list of file
* @param[in] fileName file name of a file which contains
* a list of file names
*/
void loadData(const std::string& fileName);
/**
* @brief load protobuf data from file
* @param[in] fileName data file name
*/
void loadDataFile(const std::string& fileName);
/** @brief check data header of each data sample
* @param[in] header data header read from protobuf data
*/
void checkDataHeader(const DataHeader& header);
/**
* @brief fill protobuf data into slot_,
* slot_ is a vector of ProtoSlot in memory.
* @param[in] sample data sample read from protobuf data
*/
void fillSlots(const DataSample& sample);
/**
* @brief return true if each sample is one sequence, i.e., independent
* of other samples.
*/
inline bool iidData() const { return sequenceStartPositions_.empty(); }
/**
* @brief check that sample is consistent with header_
*/
void checkSample(const DataSample& sample);
template <class Op>
int64_t sequenceLoop(Op op, int64_t size);
template <class Op>
int64_t sampleLoop(Op op, int64_t size);
template <class Op>
int64_t subSampleLoop(Op op, int64_t size, int slot);
void showDataStats();
protected:
struct ProtoVarSlot {
std::vector<real> data;
std::vector<int> dims;
};
struct ProtoSlot {
SlotDef::SlotType type;
int dim;
std::vector<int> indexData;
std::vector<real> denseData;
std::vector<sparse_non_value_t> sparseNonValueData;
std::vector<sparse_float_value_t> sparseFloatValueData;
std::vector<int64_t> indices;
std::vector<int64_t> subIndices;
std::vector<ProtoVarSlot> varDenseData;
std::vector<std::vector<int>> varIndices;
std::vector<std::string> strData;
};
DataHeader header_;
int numVecSlots_;
std::vector<ProtoSlot> slots_;
size_t sampleNums_;
/**
* The starting position of each sequence in samples.
* The last element should be num of samples.
* If empty, each sample is one sequence.
*/
std::vector<size_t> sequenceStartPositions_;
int64_t currentSequenceIndex_;
// The size should be the number of sequences.
std::vector<size_t> shuffledSequenceIds_;
ThreadLocalD<DataBatch> cpuBatch_;
ThreadLocalD<DataBatch> gpuBatch_;
RWLock lock_;
std::vector<StatPtr> nnzStats_; // stats for number of none-zeros entries
};
/**
* @brief Special use for Proto data: instances should contain sparse-non-value
* slots
* and label.
*
* @note ProtoSequenceDataProvider treats each SPARSE SLOT as a SEQUENCE
*/
class ProtoSequenceDataProvider : public ProtoDataProvider {
public:
ProtoSequenceDataProvider(const DataConfig& config,
bool useGpu,
bool loadDataAll = true);
~ProtoSequenceDataProvider() {}
virtual int64_t getNextBatchInternal(int64_t size, DataBatch* batch);
};
} // namespace paddle
...@@ -58,17 +58,6 @@ if(NOT WITH_DOUBLE) ...@@ -58,17 +58,6 @@ if(NOT WITH_DOUBLE)
endif() endif()
if(NOT MOBILE_INFERENCE) if(NOT MOBILE_INFERENCE)
################### test_ProtoDataProvider ############
add_unittest_without_exec(test_ProtoDataProvider
test_ProtoDataProvider.cpp)
# test_ProtoDataProvider will mkdir as same name,
# so if WORKING_DIRECTORY is default directory, then
# mkdir will get error.
add_test(NAME test_ProtoDataProvider
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test_ProtoDataProvider
WORKING_DIRECTORY ${PADDLE_SOURCE_DIR}/paddle)
################## test_Evaluator ####################### ################## test_Evaluator #######################
add_unittest(test_Evaluator add_unittest(test_Evaluator
test_Evaluator.cpp) test_Evaluator.cpp)
......
./test_ProtoDataProvider/data1.bin
./test_ProtoDataProvider/data2.bin
./test_ProtoDataProvider/data1.bin.gz
./test_ProtoDataProvider/data2.bin.gz
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <memory>
#include <string>
#include <gtest/gtest.h>
#include "paddle/gserver/dataproviders/ProtoDataProvider.h"
#include "paddle/utils/Util.h"
#include "paddle/testing/TestUtil.h"
using namespace std; // NOLINT
std::vector<string> protoFiles{
"./test_ProtoDataProvider/data1.bin", "./test_ProtoDataProvider/data2.bin",
};
std::vector<string> protoFilesCompressed{
"./test_ProtoDataProvider/data1.bin.gz",
"./test_ProtoDataProvider/data2.bin.gz",
};
const char* kTestDir = "./test_ProtoDataProvider";
const char kProtoFileList[] = "gserver/tests/proto_files.txt";
const char kProtoFileListCompressed[] =
"gserver/tests/proto_files_compressed.txt";
const int kSpraseMatrixDim = 1024;
using namespace paddle; // NOLINT
void prepareData(DataBatch* batch,
const int* numPerSlotType,
bool iid,
bool useGpu) {
batch->clear();
int64_t size = uniformRandom(100) + 10;
batch->setSize(size);
ICpuGpuVectorPtr sequenceStartPositions;
ICpuGpuVectorPtr subSequenceStartPositions;
if (!iid) {
int numSeqs = uniformRandom(10) + 1;
sequenceStartPositions =
ICpuGpuVector::create(numSeqs + 1, /* useGpu= */ false);
int* buf = sequenceStartPositions->getMutableData(false);
subSequenceStartPositions =
ICpuGpuVector::create(numSeqs + 1, /* useGpu= */ false);
int* subBuf = subSequenceStartPositions->getMutableData(false);
int64_t pos = 0;
int maxLen = 2 * size / numSeqs;
for (int i = 0; i < numSeqs; ++i) {
int len =
uniformRandom(min<int64_t>(maxLen, size - pos - numSeqs + i)) + 1;
buf[i] = pos;
subBuf[i] = pos;
pos += len;
VLOG(1) << " len=" << len;
}
buf[numSeqs] = size;
subBuf[numSeqs] = size;
}
vector<Argument>& arguments = batch->getStreams();
for (int i = 0; i < numPerSlotType[SlotDef::VECTOR_DENSE]; ++i) {
int64_t dim = rand() % 10 + 4; // NOLINT rand_r
MatrixPtr mat = Matrix::create(size, dim, /* trans= */ false, false);
mat->randomizeUniform();
Argument arg;
arg.value = mat;
arg.sequenceStartPositions = sequenceStartPositions;
arguments.push_back(arg);
}
for (int i = 0; i < numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE]; ++i) {
MatrixPtr mat =
makeRandomSparseMatrix(size, kSpraseMatrixDim, false, useGpu);
Argument arg;
arg.value = mat;
arg.sequenceStartPositions = sequenceStartPositions;
arg.subSequenceStartPositions = subSequenceStartPositions;
arguments.push_back(arg);
}
for (int i = 0; i < numPerSlotType[SlotDef::VECTOR_SPARSE_VALUE]; ++i) {
MatrixPtr mat =
makeRandomSparseMatrix(size, kSpraseMatrixDim, true, useGpu);
Argument arg;
arg.value = mat;
arg.sequenceStartPositions = sequenceStartPositions;
arguments.push_back(arg);
}
for (int i = 0; i < numPerSlotType[SlotDef::STRING]; ++i) {
int64_t dim = rand() % 10 + 4; // NOLINT rand_r
SVectorPtr vec = std::make_shared<std::vector<std::string>>();
for (int j = 0; j < size; ++j) {
vec->push_back(randStr(dim));
}
Argument arg;
arg.strs = vec;
arg.sequenceStartPositions = sequenceStartPositions;
arguments.push_back(arg);
}
for (int i = 0; i < numPerSlotType[SlotDef::INDEX]; ++i) {
int64_t dim = rand() % 10 + 4; // NOLINT rand_r
IVectorPtr vec = IVector::create(size, /* useGpu= */ false);
int* buf = vec->getData();
for (int j = 0; j < size; ++j) {
buf[j] = uniformRandom(dim);
}
Argument arg;
arg.ids = vec;
arg.sequenceStartPositions = sequenceStartPositions;
arguments.push_back(arg);
}
}
inline int getSlotDim(const Argument& arg) {
if (arg.value) {
return arg.value->getWidth();
} else if (arg.ids) {
return arg.ids->getMax() + 1;
} else if (arg.strs) {
return 1;
}
LOG(FATAL) << "Invalid argument";
return 0;
}
inline SlotDef::SlotType getSlotType(const Argument& arg) {
if (arg.value) {
auto& m = *arg.value;
auto& type = typeid(m);
if (type == typeid(CpuMatrix) || type == typeid(GpuMatrix)) {
return SlotDef::VECTOR_DENSE;
}
if (type == typeid(CpuSparseMatrix)) {
auto valueType =
std::dynamic_pointer_cast<CpuSparseMatrix>(arg.value)->getValueType();
if (NO_VALUE == valueType) {
return SlotDef::VECTOR_SPARSE_NON_VALUE;
} else {
return SlotDef::VECTOR_SPARSE_VALUE;
}
}
if (type == typeid(GpuSparseMatrix)) {
auto valueType =
std::dynamic_pointer_cast<GpuSparseMatrix>(arg.value)->getValueType();
if (NO_VALUE == valueType) {
return SlotDef::VECTOR_SPARSE_NON_VALUE;
} else {
return SlotDef::VECTOR_SPARSE_VALUE;
}
}
LOG(FATAL) << "Unknown matrix type";
}
if (arg.ids) return SlotDef::INDEX;
if (arg.strs) return SlotDef::STRING;
LOG(FATAL) << "Invalid argument";
return SlotDef::VECTOR_DENSE;
}
void getColRow(const Argument& arg,
int64_t pos,
bool useGpu,
int* colNum,
const int** rowCols,
const real** rowValues) {
SlotDef::SlotType type = getSlotType(arg);
GpuSparseMatrixPtr matGpu;
CpuSparseMatrixPtr matCpu;
if (useGpu) {
matGpu = dynamic_pointer_cast<GpuSparseMatrix>(arg.value);
ASSERT_TRUE(matGpu != NULL);
} else {
matCpu = dynamic_pointer_cast<CpuSparseMatrix>(arg.value);
ASSERT_TRUE(matCpu != NULL);
}
*colNum = useGpu ? matGpu->getColNum(pos) : matCpu->getColNum(pos);
*rowCols = useGpu ? matGpu->getRowCols(pos) : matCpu->getRowCols(pos);
if (type == SlotDef::VECTOR_SPARSE_VALUE) {
*rowValues = useGpu ? matGpu->getRowValues(pos) : matCpu->getRowValues(pos);
} else {
*rowValues = NULL;
}
}
void makeSample(const vector<Argument>& arguments,
int64_t pos,
bool isBeginning,
DataSample* sample,
bool useGpu) {
sample->set_is_beginning(isBeginning);
int slotid = 0;
for (auto& arg : arguments) {
SlotDef::SlotType type = getSlotType(arg);
int64_t dim = getSlotDim(arg);
switch (type) {
case SlotDef::VECTOR_DENSE: {
VectorSlot* vecSlot = sample->add_vector_slots();
auto values = vecSlot->mutable_values();
values->Reserve(dim);
for (int i = 0; i < dim; ++i) {
values->AddAlreadyReserved(
static_cast<float>(arg.value->getElement(pos, i)));
}
break;
}
case SlotDef::INDEX: {
sample->add_id_slots(arg.ids->get(pos));
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE: {
VectorSlot* vecSlot = sample->add_vector_slots();
auto ids = vecSlot->mutable_ids();
int colNum;
const int* rowCols;
const real* rowValues; // nullptr
getColRow(arg, pos, useGpu, &colNum, &rowCols, &rowValues);
ids->Reserve(colNum);
for (int i = 0; i < colNum; ++i) {
ids->AddAlreadyReserved(rowCols[i]);
}
SubseqSlot* subseqSlot = sample->add_subseq_slots(); // subseq
subseqSlot->set_slot_id(slotid);
auto lens = subseqSlot->mutable_lens();
lens->Add(colNum);
break;
}
case SlotDef::VECTOR_SPARSE_VALUE: {
VectorSlot* vecSlot = sample->add_vector_slots();
auto values = vecSlot->mutable_values();
auto ids = vecSlot->mutable_ids();
int colNum;
const int* rowCols;
const real* rowValues;
getColRow(arg, pos, useGpu, &colNum, &rowCols, &rowValues);
ids->Reserve(colNum);
values->Reserve(colNum);
for (int i = 0; i < colNum; ++i) {
ids->AddAlreadyReserved(rowCols[i]);
values->AddAlreadyReserved(rowValues[i]);
}
break;
}
case SlotDef::VAR_MDIM_DENSE:
case SlotDef::VAR_MDIM_INDEX: {
LOG(FATAL) << "Not implemented";
break;
}
case SlotDef::STRING: {
VectorSlot* vecSlot = sample->add_vector_slots();
vecSlot->add_strs((*arg.strs)[pos]);
break;
}
}
slotid++;
}
}
void writeData(const DataBatch& batch, bool useGpu, bool dataCompression) {
DataHeader header;
const vector<Argument>& arguments = batch.getStreams();
for (auto& argument : arguments) {
SlotDef* slotDef = header.add_slot_defs();
slotDef->set_type(getSlotType(argument));
slotDef->set_dim(getSlotDim(argument));
}
VLOG(1) << "header=" << header.DebugString();
int64_t totalSeqs = batch.getNumSequences();
int64_t seq = 0;
ICpuGpuVectorPtr sequenceStartPositions = arguments[0].sequenceStartPositions;
int64_t numWritten = 0;
vector<string> curProtoFiles =
dataCompression ? protoFilesCompressed : protoFiles;
for (size_t i = 0; i < curProtoFiles.size(); ++i) {
int64_t numSeqs = totalSeqs * (i + 1) / curProtoFiles.size() -
totalSeqs * i / curProtoFiles.size();
ofstream os(curProtoFiles[i]);
CHECK(os) << "Fail to open " << curProtoFiles[i];
unique_ptr<ProtoWriter> writer(new ProtoWriter(&os, dataCompression));
CHECK(writer->write(header));
for (int j = 0; j < numSeqs; ++j, ++seq) {
int64_t begin = seq;
int64_t end = seq + 1;
if (sequenceStartPositions) {
begin = sequenceStartPositions->getElement(seq);
end = sequenceStartPositions->getElement(seq + 1);
}
for (int pos = begin; pos < end; ++pos) {
DataSample sample;
makeSample(arguments, pos, pos == begin, &sample, useGpu);
CHECK(writer->write(sample));
++numWritten;
}
}
writer.reset(nullptr);
os.close();
}
CHECK_EQ(arguments[0].getBatchSize(), numWritten);
}
// check that the sample at pos1 in args1 is same as the sample at pos2 in args2
void checkSample(const vector<Argument>& args1,
int64_t pos1,
const vector<Argument>& args2,
int64_t pos2,
bool useGpu) {
EXPECT_EQ(args1.size(), args2.size());
VLOG(1) << " pos1=" << pos1 << " pos2=" << pos2;
for (size_t i = 0; i < args1.size(); ++i) {
auto type = getSlotType(args1[i]);
int dim = getSlotDim(args1[i]);
EXPECT_EQ(type, getSlotType(args2[i]));
if (type == SlotDef::INDEX) {
EXPECT_GE(dim, getSlotDim(args2[i]));
} else {
EXPECT_EQ(dim, getSlotDim(args2[i]));
}
switch (type) {
case SlotDef::VECTOR_DENSE: {
for (int j = 0; j < dim; ++j) {
EXPECT_EQ(static_cast<float>(args1[i].value->getElement(pos1, j)),
static_cast<float>(args2[i].value->getElement(pos2, j)));
}
break;
}
case SlotDef::INDEX: {
EXPECT_EQ(args1[i].ids->get(pos1), args2[i].ids->get(pos2));
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE:
case SlotDef::VECTOR_SPARSE_VALUE: {
int colNum1, colNum2;
const int *rowCols1, *rowCols2;
const real *rowValues1, *rowValues2;
getColRow(args1[i], pos1, useGpu, &colNum1, &rowCols1, &rowValues1);
getColRow(args2[i], pos2, useGpu, &colNum2, &rowCols2, &rowValues2);
EXPECT_EQ(colNum1, colNum2);
for (int j = 0; j < colNum1; ++j) {
EXPECT_EQ(rowCols1[j], rowCols2[j]);
if (type == SlotDef::VECTOR_SPARSE_VALUE) {
EXPECT_EQ(rowValues1[j], rowValues2[j]);
}
}
break;
}
case SlotDef::VAR_MDIM_DENSE:
case SlotDef::VAR_MDIM_INDEX: {
LOG(FATAL) << "Not implemented";
break;
}
case SlotDef::STRING: {
EXPECT_EQ((*args1[i].strs)[pos1], (*args2[i].strs)[pos2]);
break;
}
}
}
}
void testProtoDataProvider(int* numPerSlotType,
bool iid,
bool async,
bool useGpu,
bool dataCompression,
int numConstantSlots = 0) {
mkDir(kTestDir);
DataBatch data;
prepareData(&data, numPerSlotType, iid, useGpu);
writeData(data, useGpu, dataCompression);
DataConfig config;
config.set_type("proto");
config.set_files(dataCompression ? kProtoFileListCompressed : kProtoFileList);
config.set_async_load_data(async);
for (int i = 0; i < numConstantSlots; ++i) {
config.add_constant_slots(i + 11);
MatrixPtr w = Matrix::create(data.getSize(),
1,
/* trans= */ false,
/* useGpu= */ false);
w->assign(config.constant_slots(i));
data.appendData(w);
}
unique_ptr<DataProvider> dataProvider(DataProvider::create(config, useGpu));
dataProvider->setSkipShuffle();
EXPECT_EQ(data.getSize(), dataProvider->getSize());
int64_t batchSize = 10;
DataBatch batch;
size_t seq1 = 0;
vector<Argument>& args1 = data.getStreams();
ICpuGpuVectorPtr sequenceStartPositions1 = args1[0].sequenceStartPositions;
dataProvider->reset();
while (dataProvider->getNextBatch(batchSize, &batch) > 0) {
CHECK_EQ(data.getNumStreams(), batch.getNumStreams());
vector<Argument>& args2 = batch.getStreams();
ICpuGpuVectorPtr sequenceStartPositions2 = args2[0].sequenceStartPositions;
for (auto& arg : args2) {
EXPECT_EQ(iid, !arg.sequenceStartPositions);
}
size_t numSeqs = batch.getNumSequences();
VLOG(1) << "numSeqs=" << numSeqs;
for (size_t seq2 = 0; seq2 < numSeqs; ++seq1, ++seq2) {
int64_t begin1 = seq1;
int64_t end1 = seq1 + 1;
if (sequenceStartPositions1) {
begin1 = sequenceStartPositions1->getElement(seq1);
end1 = sequenceStartPositions1->getElement(seq1 + 1);
EXPECT_LT(seq1, sequenceStartPositions1->getSize() - 1);
}
int64_t begin2 = seq2;
int64_t end2 = seq2 + 1;
if (sequenceStartPositions2) {
begin2 = sequenceStartPositions2->getElement(seq2);
end2 = sequenceStartPositions2->getElement(seq2 + 1);
}
VLOG(1) << " begin1=" << begin1 << " end1=" << end1
<< " begin2=" << begin2 << " end2=" << end2;
EXPECT_EQ(end1 - begin1, end2 - begin2);
for (int i = 0; i < end1 - begin1; ++i) {
checkSample(args1, begin1 + i, args2, begin2 + i, useGpu);
}
}
}
EXPECT_EQ(seq1, (size_t)data.getNumSequences());
rmDir(kTestDir);
}
TEST(ProtoDataProvider, test) {
int numSlotsArray[] = {0, 3};
int numTwoArray[] = {0, 1};
int numSlotsArraySize = sizeof(numSlotsArray) / sizeof(numSlotsArray[0]);
const int numSlot = 5;
int combination[numSlot] = {0};
int k = numSlot - 1;
while (k >= 0) {
int numDenseVecSlots = numSlotsArray[combination[0]];
int numSparseNonValueVecSlots = numSlotsArray[combination[1]];
int numSparseValueVectorSlots = numSlotsArray[combination[2]];
int numStrSlots = numSlotsArray[combination[3]];
int numIdSlots = numSlotsArray[combination[4]];
// while loop : traverse all cases
k = numSlot - 1;
while (k >= 0) {
if (combination[k] < (numSlotsArraySize - 1)) {
++combination[k];
break;
} else {
combination[k] = 0;
--k;
}
}
if (numDenseVecSlots + numSparseNonValueVecSlots +
numSparseValueVectorSlots + numStrSlots + numIdSlots <
1)
continue;
for (int iid : numTwoArray) {
for (int async : numTwoArray) {
for (int useGpu : numTwoArray) {
for (int dataCompression : numTwoArray) {
if (async && useGpu) {
// Currently in async mode, useGpu is not supported
continue;
}
#ifndef PADDLE_WITH_CUDA
if (useGpu) {
continue;
}
#endif
LOG(INFO) << " numDenseVecSlots=" << numDenseVecSlots
<< " numSparseNonValueVecSlots="
<< numSparseNonValueVecSlots
<< " numSparseValueVectorSlots="
<< numSparseValueVectorSlots
<< " numStrSlots=" << numStrSlots
<< " numIdSlots=" << numIdSlots << " iid=" << iid
<< " async=" << async << " useGpu=" << useGpu
<< " dataCompression=" << dataCompression;
int numPerSlotType[SlotDef::SlotType_ARRAYSIZE] = {0};
numPerSlotType[SlotDef::VECTOR_DENSE] = numDenseVecSlots;
numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE] =
numSparseNonValueVecSlots;
numPerSlotType[SlotDef::VECTOR_SPARSE_VALUE] =
numSparseValueVectorSlots;
numPerSlotType[SlotDef::INDEX] = numIdSlots;
numPerSlotType[SlotDef::STRING] = numStrSlots;
testProtoDataProvider(
numPerSlotType, iid, async, useGpu, dataCompression);
} // end for (int dataCompression : numTwoArray)
} // end for (int useGpu : numTwoArray)
} // end for (int async : numTwoArray)
} // end for (int iid : numTwoArray)
} // end for (while, traverse all slots)
}
TEST(ProtoDataProvider, constant_slots) {
int numSlotsArray[] = {0, 3};
int numTwoArray[] = {0, 1};
for (int numDenseVecSlots : numSlotsArray) {
for (int numSparseNonValueVecSlots : numSlotsArray) {
if (numDenseVecSlots + numSparseNonValueVecSlots < 1) continue;
for (int numConstantSlots : {1, 2}) {
for (int useGpu : numTwoArray) {
for (int dataCompression : numTwoArray) {
#ifndef PADDLE_WITH_CUDA
if (useGpu) {
continue;
}
#endif
LOG(INFO) << " numDenseVecSlots=" << numDenseVecSlots
<< " numSparseNonValueVecSlots="
<< numSparseNonValueVecSlots
<< " numConstantSlogs=" << numConstantSlots
<< " useGpu=" << useGpu
<< " dataCompression=" << dataCompression;
int numPerSlotType[SlotDef::SlotType_ARRAYSIZE] = {0};
numPerSlotType[SlotDef::VECTOR_DENSE] = numDenseVecSlots;
numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE] =
numSparseNonValueVecSlots;
numPerSlotType[SlotDef::VECTOR_SPARSE_VALUE] = 1;
numPerSlotType[SlotDef::INDEX] = 1;
testProtoDataProvider(numPerSlotType,
/* iid= */ true,
/* async= */ false,
useGpu,
dataCompression,
numConstantSlots);
} // end for (int dataCompression : numTwoArray)
} // end for (int useGpu : numTwoArray)
} // end for (int numConstantSlots : {1, 2})
} // end for (int numSparseNonValueVecSlots : numSlotsArray)
} // end for (int numDenseVecSlots : numSlotsArray)
}
void checkSampleSequence(const vector<Argument>& args1,
const vector<Argument>& args2,
int64_t offset,
int64_t numSeqs,
bool useGpu) {
// check slot num are equal
EXPECT_EQ(args1.size(), args2.size());
for (size_t i = 0; i < args1.size(); i++) {
auto type = getSlotType(args1[i]);
// check for args2: sequenceStartPositions vs numSeqs
// (1) size
EXPECT_EQ(args2[i].sequenceStartPositions->getSize(), (size_t)numSeqs + 1);
// (2) content
auto checkArgContent = [&](const Argument& args, int numSeqs) {
for (int j = 0; j <= numSeqs; j++) {
int start_pos = args.sequenceStartPositions->getElement(j);
EXPECT_EQ(start_pos, j);
}
};
switch (type) {
case SlotDef::INDEX: {
// args1: for label
checkArgContent(args2[i], numSeqs);
// check for args2: ids are equal to args1[offset]
// (1) size
EXPECT_EQ(args2[i].ids->getSize(), (size_t)numSeqs);
// (2) content
for (int j = 0; j < numSeqs; j++) {
EXPECT_EQ(args2[i].ids->get(j), args1[i].ids->get(offset + j));
}
break;
}
case SlotDef::VECTOR_SPARSE_NON_VALUE: {
// args1: for sparse_non_value
// args2 should put sparse indexes in ids
int colNum1;
const int* rowCols1;
const real* rowValues1; // nullptr
int totalLength = 0;
for (int j = 0; j < numSeqs; j++) {
getColRow(
args1[i], offset + j, useGpu, &colNum1, &rowCols1, &rowValues1);
// (1) lengths
EXPECT_EQ(totalLength,
args2[i].sequenceStartPositions->getElement(j));
EXPECT_EQ(totalLength,
args2[i].subSequenceStartPositions->getElement(j));
// (2) content
for (int k = 0; k < colNum1; k++) {
EXPECT_EQ(rowCols1[k], args2[i].ids->get(totalLength + k));
}
totalLength += colNum1;
if (colNum1 == 0) {
// special case here: we will put a "-1" into ids when column num is
// zero. see ProtoSequenceDataProvider::getNextBatchInternal.
EXPECT_EQ(-1, args2[i].ids->get(totalLength));
totalLength++;
}
}
EXPECT_EQ(totalLength,
args2[i].sequenceStartPositions->getElement(numSeqs));
EXPECT_EQ(totalLength,
args2[i].subSequenceStartPositions->getElement(numSeqs));
break;
}
case SlotDef::VECTOR_DENSE: {
// args1: for dense vector
checkArgContent(args2[i], numSeqs);
// check for args2: values are equal to args1[offset]
// (1) size
EXPECT_EQ(args2[i].value->getHeight(), (size_t)numSeqs);
EXPECT_EQ(args2[i].value->getWidth(), (size_t)getSlotDim(args1[i]));
// (2) content
for (int j = 0; j < numSeqs; j++) {
for (size_t k = 0; k < args2[i].value->getWidth(); k++) {
EXPECT_EQ(
static_cast<float>(args1[i].value->getElement(j + offset, k)),
static_cast<float>(args2[i].value->getElement(j, k)));
}
}
break;
}
default: { EXPECT_EQ(true, false) << "should not reach here"; }
}
}
}
void testProtoSequenceDataProvider(int* numPerSlotType,
bool async,
bool useGpu) {
mkDir(kTestDir);
DataBatch data;
prepareData(&data,
numPerSlotType,
/* iid */ true,
useGpu);
writeData(data, useGpu, /* dataCompression */ false);
DataConfig config;
config.set_type("proto_sequence");
config.set_files(kProtoFileList);
config.set_async_load_data(async);
unique_ptr<DataProvider> dataProvider(DataProvider::create(config, useGpu));
dataProvider->setSkipShuffle();
EXPECT_EQ(data.getSize(), dataProvider->getSize());
int64_t batchSize = 10;
DataBatch batch;
vector<Argument>& args1 = data.getStreams();
ICpuGpuVectorPtr sequenceStartPositions1 = args1[0].sequenceStartPositions;
dataProvider->reset();
size_t args1Offset = 0;
while (dataProvider->getNextBatch(batchSize, &batch) > 0) {
CHECK_EQ(data.getNumStreams(), batch.getNumStreams());
vector<Argument>& args2 = batch.getStreams();
ICpuGpuVectorPtr sequenceStartPositions2 = args2[0].sequenceStartPositions;
for (auto& arg : args1) {
// args1 should not has sequence
EXPECT_EQ(true, !arg.sequenceStartPositions);
}
for (auto& arg : args2) {
// args2 should has sequence
EXPECT_NE(true, !arg.sequenceStartPositions);
}
size_t numSeqs = batch.getNumSequences();
checkSampleSequence(args1, args2, args1Offset, numSeqs, useGpu);
args1Offset += numSeqs;
}
EXPECT_EQ(args1Offset, (size_t)data.getNumSequences());
rmDir(kTestDir);
}
TEST(ProtoSequenceDataProvider, test) {
int numSlotsArray[] = {0, 3};
int numTwoArray[] = {0, 1};
for (int numSparseNonValueVecSlots : numSlotsArray) {
for (int numIdSlots : numSlotsArray) {
for (int numDenseVecSlots : numSlotsArray) {
if (numDenseVecSlots + numSparseNonValueVecSlots + numIdSlots < 1)
continue;
for (int async : numTwoArray) {
for (int useGpu : numTwoArray) {
if (async && useGpu) {
// Currently in async mode, useGpu is not supported
continue;
}
#ifndef PADDLE_WITH_CUDA
if (useGpu) {
continue;
}
#endif
LOG(INFO) << " numDenseVecSlots=" << numDenseVecSlots
<< " numSparseNonValueVecSlots="
<< numSparseNonValueVecSlots
<< " numIdSlots=" << numIdSlots << " async=" << async
<< " useGpu=" << useGpu;
int numPerSlotType[SlotDef::SlotType_ARRAYSIZE] = {0};
numPerSlotType[SlotDef::VECTOR_DENSE] = numDenseVecSlots;
numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE] =
numSparseNonValueVecSlots;
numPerSlotType[SlotDef::INDEX] = numIdSlots;
testProtoSequenceDataProvider(numPerSlotType, async, useGpu);
} // end for (int useGpu : numTwoArray)
} // end for (int async : numTwoArray)
} // end for (int numDenseVecSlots : numSlotsArray)
} // end for (int numIdSlots : numSlotsArray)
} // end for (int numSparseNonValueVecSlots : numSlotsArray)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册