diff --git a/paddle/gserver/CMakeLists.txt b/paddle/gserver/CMakeLists.txt index 5f39167afc34affbea7858fa0794ef52b786a383..b02902543b9bf4b0bbdd4e32a7181d420ea3d8fb 100644 --- a/paddle/gserver/CMakeLists.txt +++ b/paddle/gserver/CMakeLists.txt @@ -73,7 +73,6 @@ if(MOBILE_INFERENCE) list(REMOVE_ITEM GSERVER_SOURCES dataproviders/DataProvider.cpp dataproviders/MultiDataProvider.cpp - dataproviders/ProtoDataProvider.cpp dataproviders/PyDataProvider2.cpp dataproviders/PyDataProvider.cpp) diff --git a/paddle/gserver/dataproviders/DataProvider.cpp b/paddle/gserver/dataproviders/DataProvider.cpp index 0478256f9cd81f4a99eb0cbcbd1a5a21de5cf14b..106cf5b6228e636026ded558d0f591022f1ae586 100644 --- a/paddle/gserver/dataproviders/DataProvider.cpp +++ b/paddle/gserver/dataproviders/DataProvider.cpp @@ -16,8 +16,8 @@ limitations under the License. */ #include #include -#include "ProtoDataProvider.h" #include "paddle/utils/Logging.h" +#include "paddle/utils/Stat.h" #include "paddle/utils/StringUtil.h" #include "paddle/utils/Util.h" @@ -164,8 +164,6 @@ DataProvider* DataProvider::create(const DataConfig& config, REGISTER_DATA_PROVIDER(simple, SimpleDataProvider); REGISTER_DATA_PROVIDER(dummy, DummyDataProvider); -REGISTER_DATA_PROVIDER(proto, ProtoDataProvider); -REGISTER_DATA_PROVIDER(proto_sequence, ProtoSequenceDataProvider); int64_t DataProvider::getNextBatch(int64_t size, DataBatch* batch) { int64_t batchSize = doubleBuffer_ ? getNextBatchFromBuffer(size, batch) diff --git a/paddle/gserver/dataproviders/ProtoDataProvider.cpp b/paddle/gserver/dataproviders/ProtoDataProvider.cpp deleted file mode 100644 index c6f5cab1915b7f41d505c37a7fef762a392bad7f..0000000000000000000000000000000000000000 --- a/paddle/gserver/dataproviders/ProtoDataProvider.cpp +++ /dev/null @@ -1,932 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "ProtoDataProvider.h" -#include -#include -#include -#include "paddle/utils/StringUtil.h" -#include "paddle/utils/Util.h" - -#include "DataProviderGroup.h" -#include "paddle/utils/Logging.h" - -DEFINE_double(memory_threshold_on_load_data, - 1.0, - "stop loading data when memory is not sufficient"); - -namespace paddle { - -REGISTER_DATA_PROVIDER(proto_group, DataProviderGroup); -REGISTER_DATA_PROVIDER(proto_sequence_group, - DataProviderGroup); - -ProtoDataProvider::ProtoDataProvider(const DataConfig& config, - bool useGpu, - bool loadDataAll) - : DataProvider(config, useGpu), sampleNums_(0), currentSequenceIndex_(0) { - if (loadDataAll) { - loadData(config_.files()); - } -} - -void ProtoDataProvider::loadData(const std::vector& fileList) { - for (auto& file : fileList) { - if (FLAGS_memory_threshold_on_load_data < 1.0) { - double memUsage = getMemoryUsage(); - if (memUsage > FLAGS_memory_threshold_on_load_data) { - LOG(INFO) << "memUsage is " << memUsage << ", > " - << FLAGS_memory_threshold_on_load_data - << " therefore SKIP ALL REMAINING file."; - break; - } - } - LOG(INFO) << "load data file " << file; - loadDataFile(file); - } - - if (sequenceStartPositions_.size() == sampleNums_) { - // This means that each sample is one sequence - shuffledSequenceIds_.swap(sequenceStartPositions_); - } else { - sequenceStartPositions_.push_back(sampleNums_); - shuffledSequenceIds_.reserve(sequenceStartPositions_.size() - 1); - for (size_t i = 0; i < sequenceStartPositions_.size() - 1; ++i) { - shuffledSequenceIds_.push_back(i); - } - } - - LOG(INFO) << "read done, num of instance=" << sampleNums_; - showDataStats(); -} - -void ProtoDataProvider::loadData(const std::string& fileName) { - std::vector fileList; - loadFileList(fileName, fileList); - loadData(fileList); -} - -void ProtoDataProvider::checkDataHeader(const DataHeader& header) { - if (header_.slot_defs_size()) { - // header_ is already set. Need to check consistency. - CHECK_EQ(header_.slot_defs_size(), header.slot_defs_size()) - << "Different header"; - for (int i = 0; i < header.slot_defs_size(); ++i) { - CHECK_EQ(header_.slot_defs(i).type(), header.slot_defs(i).type()); - CHECK_EQ(header_.slot_defs(i).dim(), header.slot_defs(i).dim()); - } - return; - } - - // header_ is not set before - CHECK(header.slot_defs_size()) << "Invalid header: no slot is defined"; - int i; - for (i = 0; i < header.slot_defs_size(); ++i) { - if (header.slot_defs(i).type() == SlotDef::INDEX || - header.slot_defs(i).type() == SlotDef::VAR_MDIM_INDEX) { - break; - } - constexpr int kBufLen = 100; - char buf[kBufLen]; - snprintf(buf, kBufLen, "slot%d_nnz", i); - nnzStats_.push_back(getStat(buf)); - } - numVecSlots_ = i; - - // Check that INDEX slots are after VECTOR slots - for (int i = numVecSlots_; i < header.slot_defs_size(); ++i) { - CHECK(header.slot_defs(i).type() == SlotDef::INDEX || - header.slot_defs(i).type() == SlotDef::VAR_MDIM_INDEX); - } - - slots_.clear(); - slots_.reserve(header.slot_defs_size()); - for (int i = 0; i < header.slot_defs_size(); ++i) { - slots_.emplace_back(); - slots_.back().type = header.slot_defs(i).type(); - slots_.back().dim = header.slot_defs(i).dim(); - if (SlotDef::VECTOR_SPARSE_NON_VALUE == header.slot_defs(i).type() || - SlotDef::VECTOR_SPARSE_VALUE == header.slot_defs(i).type()) { - slots_.back().indices.push_back(0); - } - } - - header_ = header; -} - -void ProtoDataProvider::checkSample(const DataSample& sample) { - CHECK_EQ(numVecSlots_, sample.vector_slots_size()); - CHECK(header_.slot_defs_size() == numVecSlots_ + sample.id_slots_size() || - header_.slot_defs_size() == numVecSlots_ + sample.var_id_slots_size()); - for (int i = 0; i < numVecSlots_; ++i) { - uint32_t dim = header_.slot_defs(i).dim(); - switch (header_.slot_defs(i).type()) { - case SlotDef::VECTOR_DENSE: { - CHECK_EQ(static_cast(dim), sample.vector_slots(i).values_size()); - CHECK_EQ(0, sample.vector_slots(i).ids_size()); - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: { - if (0 == sample.vector_slots(i).ids_size()) { - break; - } - CHECK_LT(0, sample.vector_slots(i).ids_size()); - CHECK_EQ(0, sample.vector_slots(i).values_size()); - auto maxId = *std::max_element(sample.vector_slots(i).ids().begin(), - sample.vector_slots(i).ids().end()); - CHECK_GT(dim, maxId); - break; - } - case SlotDef::VECTOR_SPARSE_VALUE: { - if (0 == sample.vector_slots(i).ids_size()) { - CHECK_EQ(0, sample.vector_slots(i).values_size()); - break; - } - CHECK_LT(0, sample.vector_slots(i).values_size()); - CHECK_GE(static_cast(dim), sample.vector_slots(i).values_size()); - CHECK_EQ(sample.vector_slots(i).values_size(), - sample.vector_slots(i).ids_size()); - auto maxId = *std::max_element(sample.vector_slots(i).ids().begin(), - sample.vector_slots(i).ids().end()); - CHECK_GT(dim, maxId); - break; - } - case SlotDef::VAR_MDIM_DENSE: { - if (static_cast(dim) != 0) { - CHECK_EQ(static_cast(dim), sample.vector_slots(i).values_size()); - if (sample.vector_slots(i).dims_size() != 0) { - int totalDim = sample.vector_slots(i).dims(0); - for (int j = 1; j < sample.vector_slots(i).dims_size(); ++j) { - totalDim *= sample.vector_slots(i).dims(j); - } - CHECK_EQ(static_cast(dim), totalDim); - } - } else { - CHECK_NE(sample.vector_slots(i).dims_size(), 0); - int totalDim = sample.vector_slots(i).dims(0); - for (int j = 1; j < sample.vector_slots(i).dims_size(); ++j) { - totalDim *= sample.vector_slots(i).dims(j); - } - CHECK_EQ(totalDim, sample.vector_slots(i).values_size()); - } - break; - } - case SlotDef::STRING: { - CHECK_EQ(static_cast(1), sample.vector_slots(i).strs_size()); - CHECK_EQ(0, sample.vector_slots(i).ids_size()); - CHECK_EQ(0, sample.vector_slots(i).values_size()); - break; - } - default: - LOG(FATAL) << "BUG: Should not reach here"; - } - } - for (int i = numVecSlots_; i < header_.slot_defs_size(); ++i) { - if (header_.slot_defs(i).type() != SlotDef::VAR_MDIM_INDEX) { - uint32_t id = sample.id_slots(i - numVecSlots_); - if (id == -1U) continue; - CHECK_LT(id, header_.slot_defs(i).dim()); - } else { - for (int j = 0; j < sample.var_id_slots(i - numVecSlots_).ids_size(); - ++j) { - uint32_t id = sample.var_id_slots(i - numVecSlots_).ids(j); - CHECK_LT(id, header_.slot_defs(i).dim()); - } - } - } -} - -void ProtoDataProvider::loadDataFile(const std::string& fileName) { - std::ifstream is(fileName); - CHECK(is) << "Fail to open " << fileName; - bool dataCompression = str::endsWith(fileName, ".gz"); - std::unique_ptr reader(new ProtoReader(&is, dataCompression)); - CHECK(reader) << "Fail to create proto data input stream"; - - DataHeader header; - CHECK(reader->read(&header)); - checkDataHeader(header); - - DataSample sample; - do { - if (!reader->read(&sample)) { - break; - } - checkSample(sample); - if (sample.is_beginning()) { - sequenceStartPositions_.push_back(sampleNums_); - } - fillSlots(sample); - ++sampleNums_; - } while (true); - - CHECK(is.eof()) << "Fail to read file"; - reader.reset(nullptr); - is.close(); -} - -// checkSample has done before, no check here -void ProtoDataProvider::fillSlots(const DataSample& sample) { - for (size_t i = 0; i < slots_.size(); ++i) { - auto& slot = slots_[i]; - int dim = slot.dim; - switch (slot.type) { - case SlotDef::VECTOR_DENSE: { - size_t oldSize = slot.denseData.size(); - slot.denseData.resize(oldSize + dim); - const float* values = sample.vector_slots(i).values().data(); -#ifdef PADDLE_TYPE_DOUBLE - std::copy(values, values + dim, slot.denseData.begin() + oldSize); -#else - memcpy(slot.denseData.data() + oldSize, values, sizeof(real) * dim); -#endif - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: { - int slotSize = sample.vector_slots(i).ids_size(); - int subSlotSize = 0; - int id = 0; // the slot id - // find whether this vector_slots has subseq. If not has subseq, - // subSlotSize = 0. - for (id = 0; id < sample.subseq_slots_size(); id++) { - if (sample.subseq_slots(id).slot_id() == i) { - subSlotSize = sample.subseq_slots(id).lens_size(); - break; - } - } - if (subSlotSize && slot.subIndices.size() == 0UL) { - // If has subSeq, the first element of subIndices = 0. - slot.subIndices.push_back(0); - } - if (slotSize == 0UL) { - // if has no id, new indices = old indices. - slot.indices.push_back(slot.indices.back()); - // if has subSeq, new subIndices = old subIndices. - if (slot.subIndices.size()) { - slot.subIndices.push_back(slot.subIndices.back()); - } - break; - } - slot.sparseNonValueData.resize(slot.indices.back() + slotSize); - const unsigned int* ids = sample.vector_slots(i).ids().data(); - memcpy(slot.sparseNonValueData.data() + slot.indices.back(), - ids, - sizeof(*ids) * slotSize); - slot.indices.push_back(slot.indices.back() + slotSize); - if (subSlotSize) { - for (int ii = 0; ii < subSlotSize; ++ii) { - slot.subIndices.push_back(slot.subIndices.back() + - sample.subseq_slots(id).lens(ii)); - } - } - break; - } - case SlotDef::VECTOR_SPARSE_VALUE: { - if (0 == sample.vector_slots(i).ids_size()) { - slot.indices.push_back(slot.indices.back()); - break; - } - int slotSize = sample.vector_slots(i).ids_size(); - slot.sparseFloatValueData.resize(slot.indices.back() + slotSize); - const unsigned int* ids = sample.vector_slots(i).ids().data(); - const float* values = sample.vector_slots(i).values().data(); - for (int ii = 0; ii < slotSize; ++ii) { - slot.sparseFloatValueData[slot.indices.back() + ii].col = ids[ii]; - slot.sparseFloatValueData[slot.indices.back() + ii].value = - values[ii]; - } - slot.indices.push_back(slot.indices.back() + slotSize); - break; - } - case SlotDef::INDEX: { - slot.indexData.push_back(sample.id_slots(i - numVecSlots_)); - break; - } - case SlotDef::VAR_MDIM_DENSE: { - size_t oldSize = slot.varDenseData.size(); - slot.varDenseData.resize(oldSize + 1); - size_t varDim = sample.vector_slots(i).values_size(); - slot.varDenseData[oldSize].data.resize(varDim); - const float* values = sample.vector_slots(i).values().data(); -#ifdef PADDLE_TYPE_DOUBLE - std::copy( - values, values + varDim, slot.varDenseData[oldSize].data.data()); -#else - memcpy(slot.varDenseData[oldSize].data.data(), - values, - sizeof(real) * varDim); -#endif - slot.varDenseData[oldSize].dims.resize( - sample.vector_slots(i).dims_size()); - memcpy(slot.varDenseData[oldSize].dims.data(), - sample.vector_slots(i).dims().data(), - sizeof(uint32_t) * sample.vector_slots(i).dims_size()); - break; - } - case SlotDef::VAR_MDIM_INDEX: { - size_t oldSize = slot.varIndices.size(); - slot.varIndices.resize(oldSize + 1); - size_t varDim = sample.var_id_slots(i - numVecSlots_).ids_size(); - slot.varIndices[oldSize].resize(varDim); - memcpy(slot.varIndices[oldSize].data(), - sample.var_id_slots(i - numVecSlots_).ids().data(), - sizeof(uint32_t) * varDim); - break; - } - case SlotDef::STRING: { - slot.strData.push_back(sample.vector_slots(i).strs(0)); - break; - } - } - } -} - -void ProtoDataProvider::showDataStats() { - std::ostringstream oss; - for (size_t i = 0; i < slots_.size(); ++i) { - auto& slot = slots_[i]; - if (slot.type == SlotDef::VECTOR_SPARSE_NON_VALUE) { - size_t nnz = slot.sparseNonValueData.size(); - oss << "slot" << i << ":avgNNZ=" << ((double)nnz / sampleNums_) << "; "; - } else if (slot.type == SlotDef::VECTOR_SPARSE_VALUE) { - size_t nnz = slot.sparseFloatValueData.size(); - oss << "slot" << i << ":avgNNZ=" << ((double)nnz / sampleNums_) << "; "; - } - } - LOG(INFO) << oss.str(); -} - -void ProtoDataProvider::reset() { - currentSequenceIndex_ = 0; - if (!skipShuffle_) { - shuffle(); - } - - DataProvider::reset(); -} - -void ProtoDataProvider::shuffle() { - std::shuffle(shuffledSequenceIds_.begin(), - shuffledSequenceIds_.end(), - ThreadLocalRandomEngine::get()); -} - -/* - Loop through sequences starting from currentSequenceIndex_ - for at most size samples. For each sequence ranging from [begin, end), - op(begin, end) will be called. - - return the number of sequences scanned -*/ -template -int64_t ProtoDataProvider::sequenceLoop(Op op, int64_t size) { - int64_t sz = 0; - size_t i; - size_t sequenceCount = shuffledSequenceIds_.size(); - if (usageRatio_ < 1.0f) { - sequenceCount = static_cast(sequenceCount * usageRatio_); - } - for (i = currentSequenceIndex_; i < sequenceCount; ++i) { - size_t id = shuffledSequenceIds_[i]; - int64_t begin = sequenceStartPositions_[id]; - int64_t end = sequenceStartPositions_[id + 1]; - int64_t len = end - begin; - if (sz + len > size && sz > 0) break; - sz += len; - op(begin, end); - } - return i - currentSequenceIndex_; -} - -/* - Loop through sequences starting from currentSequenceIndex_ - for at most size samples. For each sample of each sequence at position - pos, op(pos) will be called. - - return the number of sequences scanned -*/ -template -int64_t ProtoDataProvider::sampleLoop(Op op, int64_t size) { - if (iidData()) { - size = std::min(sampleNums_ - currentSequenceIndex_, size); - for (int64_t i = currentSequenceIndex_; i < currentSequenceIndex_ + size; - ++i) { - size_t pos = shuffledSequenceIds_[i]; - op(pos); - } - return size; - } else { - auto f = [op](int64_t begin, int64_t end) { - for (int64_t pos = begin; pos < end; ++pos) { - op(pos); - } - }; - return sequenceLoop(f, size); - } -} - -/* - Loop through sub-sequences starting from currentSequenceIndex_ - for at most size samples. For each sample of each sub-sequence at position - pos, op(pos) will be called. - - return the number of sub-sequences scanned -*/ -template -int64_t ProtoDataProvider::subSampleLoop(Op op, int64_t size, int slot) { - CHECK(iidData()) << "subSampleLoop only accepts iid data"; - size = std::min(sampleNums_ - currentSequenceIndex_, size); - int subSize = 0; - for (int64_t i = currentSequenceIndex_; i < currentSequenceIndex_ + size; - ++i) { - size_t pos = shuffledSequenceIds_[i]; - int64_t* indexs = slots_[slot].indices.data(); - int64_t* subIndexs = slots_[slot].subIndices.data(); - int64_t subSeqStart = 0; - int64_t subSeqEnd = 0; - for (int j = 0; j < (int)slots_[slot].subIndices.size(); j++) { - if (subIndexs[j] == indexs[pos]) { - subSeqStart = j; - if (subIndexs[pos] == subIndexs[pos + 1]) { - subSeqEnd = j + 1; - break; - } - } else if (subIndexs[j] == indexs[pos + 1]) { - subSeqEnd = j; - break; - } - } - for (int j = subSeqStart; j < subSeqEnd; j++) { - op(j); - } - subSize += subSeqEnd - subSeqStart; - } - return subSize; -} - -int64_t ProtoDataProvider::getNextBatchInternal(int64_t size, - DataBatch* batch) { - int64_t numSequences = 0; // actual number of sequences in the batch - - // the number of sequences scanned, including those skipped because too long - int64_t numScannedSeqs = 0; - std::lock_guard guard(lock_); - if (iidData()) { - size = std::min(getSize() - currentSequenceIndex_, size); - numScannedSeqs = numSequences = size; - } else { - int64_t sz = 0; - auto op = [&sz, &numSequences](int64_t begin, int64_t end) { - ++numSequences; - sz += end - begin; - }; - numScannedSeqs = sequenceLoop(op, size); - VLOG_IF(1, numScannedSeqs > numSequences) - << numScannedSeqs - numSequences - << " sequences are skipped because longer than " << size; - size = sz; - } - if (size <= 0) return 0; - - DataBatch& cpuBatch = *cpuBatch_; - std::vector& cpuArguments = cpuBatch.getStreams(); - cpuBatch.setSize(size); - cpuArguments.resize(header_.slot_defs_size()); - - if (!iidData()) { - ICpuGpuVector::resizeOrCreate(cpuArguments[0].sequenceStartPositions, - numSequences + 1, - /* useGpu= */ false); - int* buf = cpuArguments[0].sequenceStartPositions->getMutableData(false); - int pos = 0; - int i = 0; - auto op = [buf, &pos, &i](int64_t begin, int64_t end) { - buf[i] = pos; - pos += end - begin; - ++i; - }; - sequenceLoop(op, size); - buf[i] = size; - for (size_t slot = 1; slot < cpuArguments.size(); ++slot) { - cpuArguments[slot].sequenceStartPositions = - cpuArguments[0].sequenceStartPositions; - } - } - - for (int slot = 0; slot < header_.slot_defs_size(); ++slot) { - size_t dim = header_.slot_defs(slot).dim(); - SlotDef::SlotType slotType = header_.slot_defs(slot).type(); - - std::vector dataPos; - dataPos.reserve(size); - auto op = [this, &dataPos](int64_t pos) { dataPos.push_back(pos); }; - sampleLoop(op, size); - - switch (slotType) { - case SlotDef::VECTOR_DENSE: { - Matrix::resizeOrCreate(cpuArguments[slot].value, - size, - dim, - false, // trans = false - false); // useGpu = false - real* buf = cpuArguments[slot].value->getData(); - for (int i = 0; i < size; ++i) { - memcpy(buf + i * dim, - slots_[slot].denseData.data() + dataPos[i] * dim, - sizeof(real) * dim); - } - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: { - if (!(cpuArguments[slot].value)) { - cpuArguments[slot].value = - Matrix::createSparseMatrix(size, - dim, - size /*DEFAULT_AVG_WIDTH = 1*/, - NO_VALUE, - SPARSE_CSR, - false, - useGpu_); - } - auto mat = cpuArguments[slot].value; - mat->resize(size, dim); - if (std::dynamic_pointer_cast(mat)) { - std::dynamic_pointer_cast(mat)->copyFrom( - dataPos.data(), - slots_[slot].indices.data(), - slots_[slot].sparseNonValueData.data(), - HPPL_STREAM_1); - } else if (std::dynamic_pointer_cast(mat)) { - std::dynamic_pointer_cast(mat)->copyFrom( - dataPos.data(), - slots_[slot].indices.data(), - slots_[slot].sparseNonValueData.data()); - } else { - LOG(FATAL) << "Not Supported"; - } - size_t numElements = 0; - for (auto pos : dataPos) { - numElements += - slots_[slot].indices[pos + 1] - slots_[slot].indices[pos]; - } - nnzStats_[slot]->addSample(numElements); - - break; - } - case SlotDef::VECTOR_SPARSE_VALUE: { - if (!(cpuArguments[slot].value)) { - cpuArguments[slot].value = - Matrix::createSparseMatrix(size, - dim, - size /*DEFAULT_AVG_WIDTH = 1*/, - FLOAT_VALUE, - SPARSE_CSR, - false, - useGpu_); - } - auto mat = cpuArguments[slot].value; - mat->resize(size, dim); - if (std::dynamic_pointer_cast(mat)) { - std::dynamic_pointer_cast(mat)->copyFrom( - dataPos.data(), - slots_[slot].indices.data(), - slots_[slot].sparseFloatValueData.data(), - HPPL_STREAM_1); - } else if (std::dynamic_pointer_cast(mat)) { - std::dynamic_pointer_cast(mat)->copyFrom( - dataPos.data(), - slots_[slot].indices.data(), - slots_[slot].sparseFloatValueData.data()); - } else { - LOG(FATAL) << "Not Supported"; - } - break; - } - case SlotDef::INDEX: { - IVector::resizeOrCreate(cpuArguments[slot].ids, - size, - /* useGpu= */ false); - int* buf = cpuArguments[slot].ids->getData(); - for (int i = 0; i < size; ++i) { - buf[i] = slots_[slot].indexData[dataPos[i]]; - } - break; - } - case SlotDef::VAR_MDIM_DENSE: { - CHECK_EQ(size, 1); - auto mat = cpuArguments[slot].value; - size_t totalDim = slots_[slot].varDenseData[dataPos[0]].data.size(); - - CHECK_EQ(slots_[slot].varDenseData[dataPos[0]].dims.size(), size_t(3)); - size_t height, width, depth, oldWidth; - /* dims[2] is depth, will be changed to dims[0] in future */ - depth = slots_[slot].varDenseData[dataPos[0]].dims[2]; - height = slots_[slot].varDenseData[dataPos[0]].dims[1]; - width = slots_[slot].varDenseData[dataPos[0]].dims[0]; - oldWidth = width; - /* process the undesirable sample */ - if (oldWidth < height) { - width = height; - } - cpuArguments[slot].setFrameHeight(height); - cpuArguments[slot].setFrameWidth(width); - - if (oldWidth < height) { - totalDim = width * height * depth; - } - Matrix::resizeOrCreate(cpuArguments[slot].value, - size, - totalDim, - false, // trans = false - false); // useGpu = false - real* buf = cpuArguments[slot].value->getData(); - cpuArguments[slot].value->zeroMem(); - if (oldWidth < height) { - real* srcBuf = slots_[slot].varDenseData[dataPos[0]].data.data(); - for (size_t i = 0; i < depth; i++) { - for (size_t j = 0; j < height; j++) { - for (size_t k = 0; k < oldWidth; k++) { - buf[i * height * width + j * width + k] = - srcBuf[i * height * oldWidth + j * oldWidth + k]; - } - } - } - } else { - memcpy(buf, - slots_[slot].varDenseData[dataPos[0]].data.data(), - sizeof(real) * totalDim); - } - ICpuGpuVector::resizeOrCreate(cpuArguments[slot].sequenceStartPositions, - size + 1, /* size == 1 currently */ - /* useGpu= */ false); - int* bufStarts = - cpuArguments[slot].sequenceStartPositions->getMutableData(false); - bufStarts[0] = 0; - bufStarts[1] = 1; - break; - } - case SlotDef::VAR_MDIM_INDEX: { - CHECK_EQ(size, 1); - size_t totalDim = slots_[slot].varIndices[dataPos[0]].size(); - IVector::resizeOrCreate(cpuArguments[slot].ids, - totalDim, - /* useGpu= */ false); - int* buf = cpuArguments[slot].ids->getData(); - memcpy(buf, - slots_[slot].varIndices[dataPos[0]].data(), - sizeof(int) * totalDim); - - ICpuGpuVector::resizeOrCreate(cpuArguments[slot].sequenceStartPositions, - size + 1, /* size == 1 currently */ - /* useGpu= */ false); - int* bufStarts = - cpuArguments[slot].sequenceStartPositions->getMutableData(false); - bufStarts[0] = 0; - /* we expand the convolutinal feature map to a sequence data, - * so there should be a corresponding sequence labels */ - bufStarts[1] = totalDim; - break; - } - case SlotDef::STRING: { - if (cpuArguments[slot].strs) { - cpuArguments[slot].strs->resize(size); - } else { - cpuArguments[slot].strs = - std::make_shared>(size); - } - for (int i = 0; i < size; ++i) { - (*cpuArguments[slot].strs)[i] = slots_[slot].strData[dataPos[i]]; - } - break; - } - } - } - - if (useGpu_) { - std::vector& cpuArguments = cpuBatch.getStreams(); - DataBatch& gpuBatch = *gpuBatch_; - std::vector& gpuArguments = gpuBatch.getStreams(); - gpuArguments.resize(cpuArguments.size()); - gpuBatch.setSize(size); - for (int i = 0; i < header_.slot_defs_size(); ++i) { - SlotDef::SlotType slotType = header_.slot_defs(i).type(); - if (SlotDef::VECTOR_SPARSE_VALUE == slotType || - SlotDef::VECTOR_SPARSE_NON_VALUE == slotType) { - gpuArguments[i] = cpuArguments[i]; - gpuArguments[i].sequenceStartPositions = - cpuArguments[i].sequenceStartPositions; - } else { - gpuArguments[i].resizeAndCopyFrom( - cpuArguments[i], useGpu_, HPPL_STREAM_1); - } - } - hl_stream_synchronize(HPPL_STREAM_1); - *batch = gpuBatch; - } else { - *batch = cpuBatch; - } - - currentSequenceIndex_ += numScannedSeqs; - - return batch->getSize(); -} - -ProtoSequenceDataProvider::ProtoSequenceDataProvider(const DataConfig& config, - bool useGpu, - bool loadDataAll) - : ProtoDataProvider(config, useGpu, loadDataAll) {} - -int64_t ProtoSequenceDataProvider::getNextBatchInternal(int64_t size, - DataBatch* batch) { - CHECK(iidData()) << "ProtoSequenceDataProvider only accepts iid data"; - int64_t numSequences = 0; // actual number of sequences in the batch - - // the number of sequences scanned, including those skipped because too long - int64_t numScannedSeqs = 0; - std::lock_guard guard(lock_); - size = std::min(getSize() - currentSequenceIndex_, size); - numScannedSeqs = numSequences = size; - if (size <= 0) return 0; - - DataBatch& cpuBatch = *cpuBatch_; - std::vector& cpuArguments = cpuBatch.getStreams(); - cpuBatch.setSize(size); - cpuArguments.resize(header_.slot_defs_size()); - - for (int slot = 0; slot < header_.slot_defs_size(); ++slot) { - SlotDef::SlotType slotType = header_.slot_defs(slot).type(); - - std::vector dataPos; - dataPos.reserve(size); - auto op = [this, &dataPos](int64_t pos) { dataPos.push_back(pos); }; - sampleLoop(op, size); - - // current slot: sequenceStartPositions - ICpuGpuVector::resizeOrCreate(cpuArguments[slot].sequenceStartPositions, - size + 1, - /* useGpu= */ false); - - switch (slotType) { - case SlotDef::VECTOR_SPARSE_VALUE: - case SlotDef::VAR_MDIM_DENSE: - case SlotDef::VAR_MDIM_INDEX: { - LOG(FATAL) << "ProtoSequenceDataProvider only support" - << " VECTOR_DENSE, VECTOR_SPARSE_NON_VALUE and INDEX slots"; - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: { - // copy to IDS, not value - // pointers used in current slot - sparse_non_value_t* data = slots_[slot].sparseNonValueData.data(); - int64_t* indexs = slots_[slot].indices.data(); - int64_t* seqs = dataPos.data(); - - // current slot: i need size instances. what is the total length? - int totalFeatureInCurrentSlot = 0; - for (int ins = 0; ins < size; ins++) { - int64_t currInsId = seqs[ins]; - totalFeatureInCurrentSlot += - indexs[currInsId + 1] - indexs[currInsId]; - // special: if current instance has NO feature in current slot - if (indexs[currInsId + 1] == indexs[currInsId]) { - totalFeatureInCurrentSlot++; - } - } - // done - - // current slot: ids - IVector::resizeOrCreate(cpuArguments[slot].ids, - totalFeatureInCurrentSlot, - /* useGpu= */ false); - - // where to write - int* currPosOfArgumentId = cpuArguments[slot].ids->getData(); - int* currPosOfArgumentSeqStart = - cpuArguments[slot].sequenceStartPositions->getMutableData(false); - int allSequenceLength = 0; - currPosOfArgumentSeqStart[0] = 0; - // for each instance, copy data and fill sequence positions - for (int instance = 0; instance < size; instance++) { - int64_t currInstanceId = seqs[instance]; - int64_t currInstanceLength = - indexs[currInstanceId + 1] - indexs[currInstanceId]; - sparse_non_value_t* currInstanceData = data + indexs[currInstanceId]; - // write sequenceStartPositions - allSequenceLength += currInstanceLength; - currPosOfArgumentSeqStart[instance + 1] = allSequenceLength; - // copy features - for (int featCopier = 0; featCopier < currInstanceLength; - featCopier++) { - currPosOfArgumentId[featCopier] = currInstanceData[featCopier].col; - } - currPosOfArgumentId += currInstanceLength; - // special: if current instance has NO feature in current slot - if (currInstanceLength == 0) { - allSequenceLength++; - currPosOfArgumentSeqStart[instance + 1] = allSequenceLength; - currPosOfArgumentId[0] = -1; - currPosOfArgumentId++; - } - // done - } - if (slots_[slot].subIndices.size()) { - std::vector dataSubPos; - auto op = [this, &dataSubPos](int64_t pos) { - dataSubPos.push_back(pos); - }; - int subSize = subSampleLoop(op, size, slot); - ICpuGpuVector::resizeOrCreate( - cpuArguments[slot].subSequenceStartPositions, subSize + 1, false); - int* currPosOfArgumentSubSeqStart = - cpuArguments[slot].subSequenceStartPositions->getMutableData( - false); - int64_t* subSeqs = dataSubPos.data(); - int64_t* subIndexs = slots_[slot].subIndices.data(); - int allSubSequenceLength = 0; - currPosOfArgumentSubSeqStart[0] = 0; - // for each instance, compute sub-sequence number - for (int instance = 0; instance < subSize; instance++) { - int64_t currSubInstanceId = subSeqs[instance]; - int64_t currSubInstanceLength = - subIndexs[currSubInstanceId + 1] - subIndexs[currSubInstanceId]; - // write subSequenceStartPositions - allSubSequenceLength += currSubInstanceLength; - currPosOfArgumentSubSeqStart[instance + 1] = allSubSequenceLength; - // special: if current instance has NO feature in current slot - if (currSubInstanceLength == 0) { - allSubSequenceLength++; - currPosOfArgumentSubSeqStart[instance + 1] = allSubSequenceLength; - } - } - cpuArguments[slot].checkSubset(); - } - break; - } - case SlotDef::INDEX: { - // label slot - IVector::resizeOrCreate(cpuArguments[slot].ids, - size, - /* useGpu= */ false); - // fill labels - int* buf = cpuArguments[slot].ids->getData(); - for (int i = 0; i < size; ++i) { - buf[i] = slots_[slot].indexData[dataPos[i]]; - } - // label HAS sequence structure - cpuArguments[slot].sequenceStartPositions->fillSequence(false); - break; - } - case SlotDef::VECTOR_DENSE: { - // copy values - size_t dim = header_.slot_defs(slot).dim(); - Matrix::resizeOrCreate(cpuArguments[slot].value, - size, - dim, - false, // trans = false - false); // useGpu = false - real* buf = cpuArguments[slot].value->getData(); - for (int i = 0; i < size; ++i) { - memcpy(buf + i * dim, - slots_[slot].denseData.data() + dataPos[i] * dim, - sizeof(real) * dim); - } - // sequence structure - cpuArguments[slot].sequenceStartPositions->fillSequence(false); - break; - } - default: { LOG(FATAL) << "should not reach here"; } - } - } - - if (useGpu_) { - std::vector& cpuArguments = cpuBatch.getStreams(); - DataBatch& gpuBatch = *gpuBatch_; - std::vector& gpuArguments = gpuBatch.getStreams(); - gpuArguments.resize(cpuArguments.size()); - gpuBatch.setSize(size); - for (size_t i = 0; i < cpuArguments.size(); ++i) { - gpuArguments[i].resizeAndCopyFrom( - cpuArguments[i], useGpu_, HPPL_STREAM_1); - } - hl_stream_synchronize(HPPL_STREAM_1); - *batch = gpuBatch; - } else { - *batch = cpuBatch; - } - - currentSequenceIndex_ += numScannedSeqs; - return batch->getSize(); -} - -} // namespace paddle diff --git a/paddle/gserver/dataproviders/ProtoDataProvider.h b/paddle/gserver/dataproviders/ProtoDataProvider.h deleted file mode 100644 index 7dd45e062248f20d24c633dd4e1c8b7eebcbfa1b..0000000000000000000000000000000000000000 --- a/paddle/gserver/dataproviders/ProtoDataProvider.h +++ /dev/null @@ -1,179 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include - -#include "DataFormat.pb.h" -#include "paddle/utils/Stat.h" - -#include "DataProvider.h" -#include "ProtoReader.h" - -namespace paddle { - -/** - * @brief Provider data from protobuf data file with each sample - * specified by proto message - * - * DataSample defined in DataFormat.proto. - * - * The file format is - * - * header - * - * sample1 - * - * sample2 - * - * ... - * - * sampleN - * - * @note: In the data file, each message is prefixed with its length. - * The read/write of the protbuf are implemented in ProtoReader.h - */ -class ProtoDataProvider : public DataProvider { -public: - ProtoDataProvider(const DataConfig& config, - bool useGpu, - bool loadDataAll = true); - virtual void reset(); - - /** - * @note this size includes the sequences which are skipped because they - * are longer than the batch size. - */ - virtual int64_t getSize() { - int64_t size = sampleNums_; - if (usageRatio_ < 1.0f) { - size = static_cast(size * usageRatio_); - } - return size; - } - virtual void shuffle(); - - void loadData(const std::vector& fileList); - - virtual int64_t getNextBatchInternal(int64_t size, DataBatch* batch); - -protected: - /** - * @brief load protobuf data from a list of file - * @param[in] fileName file name of a file which contains - * a list of file names - */ - void loadData(const std::string& fileName); - - /** - * @brief load protobuf data from file - * @param[in] fileName data file name - */ - void loadDataFile(const std::string& fileName); - /** @brief check data header of each data sample - * @param[in] header data header read from protobuf data - */ - void checkDataHeader(const DataHeader& header); - /** - * @brief fill protobuf data into slot_, - * slot_ is a vector of ProtoSlot in memory. - * @param[in] sample data sample read from protobuf data - */ - void fillSlots(const DataSample& sample); - - /** - * @brief return true if each sample is one sequence, i.e., independent - * of other samples. - */ - inline bool iidData() const { return sequenceStartPositions_.empty(); } - - /** - * @brief check that sample is consistent with header_ - */ - void checkSample(const DataSample& sample); - - template - int64_t sequenceLoop(Op op, int64_t size); - - template - int64_t sampleLoop(Op op, int64_t size); - - template - int64_t subSampleLoop(Op op, int64_t size, int slot); - - void showDataStats(); - -protected: - struct ProtoVarSlot { - std::vector data; - std::vector dims; - }; - - struct ProtoSlot { - SlotDef::SlotType type; - int dim; - std::vector indexData; - std::vector denseData; - std::vector sparseNonValueData; - std::vector sparseFloatValueData; - std::vector indices; - std::vector subIndices; - - std::vector varDenseData; - std::vector> varIndices; - std::vector strData; - }; - DataHeader header_; - int numVecSlots_; - - std::vector slots_; - size_t sampleNums_; - - /** - * The starting position of each sequence in samples. - * The last element should be num of samples. - * If empty, each sample is one sequence. - */ - std::vector sequenceStartPositions_; - - int64_t currentSequenceIndex_; - - // The size should be the number of sequences. - std::vector shuffledSequenceIds_; - - ThreadLocalD cpuBatch_; - ThreadLocalD gpuBatch_; - - RWLock lock_; - std::vector nnzStats_; // stats for number of none-zeros entries -}; - -/** - * @brief Special use for Proto data: instances should contain sparse-non-value - * slots - * and label. - * - * @note ProtoSequenceDataProvider treats each SPARSE SLOT as a SEQUENCE - */ -class ProtoSequenceDataProvider : public ProtoDataProvider { -public: - ProtoSequenceDataProvider(const DataConfig& config, - bool useGpu, - bool loadDataAll = true); - ~ProtoSequenceDataProvider() {} - virtual int64_t getNextBatchInternal(int64_t size, DataBatch* batch); -}; - -} // namespace paddle diff --git a/paddle/gserver/tests/CMakeLists.txt b/paddle/gserver/tests/CMakeLists.txt index aa94ee406e27c86e6d49b6d2b5327a3f86bcacd6..232fa015682b80449391b6fc68c1e17474f43acb 100644 --- a/paddle/gserver/tests/CMakeLists.txt +++ b/paddle/gserver/tests/CMakeLists.txt @@ -58,17 +58,6 @@ if(NOT WITH_DOUBLE) endif() if(NOT MOBILE_INFERENCE) -################### test_ProtoDataProvider ############ - add_unittest_without_exec(test_ProtoDataProvider - test_ProtoDataProvider.cpp) - - # test_ProtoDataProvider will mkdir as same name, - # so if WORKING_DIRECTORY is default directory, then - # mkdir will get error. - add_test(NAME test_ProtoDataProvider - COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test_ProtoDataProvider - WORKING_DIRECTORY ${PADDLE_SOURCE_DIR}/paddle) - ################## test_Evaluator ####################### add_unittest(test_Evaluator test_Evaluator.cpp) diff --git a/paddle/gserver/tests/proto_files.txt b/paddle/gserver/tests/proto_files.txt deleted file mode 100644 index 691b38c7940bd21360eb00384e060554aa4b3e22..0000000000000000000000000000000000000000 --- a/paddle/gserver/tests/proto_files.txt +++ /dev/null @@ -1,2 +0,0 @@ -./test_ProtoDataProvider/data1.bin -./test_ProtoDataProvider/data2.bin diff --git a/paddle/gserver/tests/proto_files_compressed.txt b/paddle/gserver/tests/proto_files_compressed.txt deleted file mode 100644 index 7413c81e185d02e0d03aefa06480b9722357c5eb..0000000000000000000000000000000000000000 --- a/paddle/gserver/tests/proto_files_compressed.txt +++ /dev/null @@ -1,2 +0,0 @@ -./test_ProtoDataProvider/data1.bin.gz -./test_ProtoDataProvider/data2.bin.gz diff --git a/paddle/gserver/tests/test_ProtoDataProvider.cpp b/paddle/gserver/tests/test_ProtoDataProvider.cpp deleted file mode 100644 index af6472619d1840e82787974d265d601b4a406c09..0000000000000000000000000000000000000000 --- a/paddle/gserver/tests/test_ProtoDataProvider.cpp +++ /dev/null @@ -1,732 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include -#include - -#include - -#include "paddle/gserver/dataproviders/ProtoDataProvider.h" -#include "paddle/utils/Util.h" - -#include "paddle/testing/TestUtil.h" - -using namespace std; // NOLINT - -std::vector protoFiles{ - "./test_ProtoDataProvider/data1.bin", "./test_ProtoDataProvider/data2.bin", -}; -std::vector protoFilesCompressed{ - "./test_ProtoDataProvider/data1.bin.gz", - "./test_ProtoDataProvider/data2.bin.gz", -}; - -const char* kTestDir = "./test_ProtoDataProvider"; -const char kProtoFileList[] = "gserver/tests/proto_files.txt"; -const char kProtoFileListCompressed[] = - "gserver/tests/proto_files_compressed.txt"; -const int kSpraseMatrixDim = 1024; - -using namespace paddle; // NOLINT - -void prepareData(DataBatch* batch, - const int* numPerSlotType, - bool iid, - bool useGpu) { - batch->clear(); - int64_t size = uniformRandom(100) + 10; - batch->setSize(size); - - ICpuGpuVectorPtr sequenceStartPositions; - ICpuGpuVectorPtr subSequenceStartPositions; - if (!iid) { - int numSeqs = uniformRandom(10) + 1; - sequenceStartPositions = - ICpuGpuVector::create(numSeqs + 1, /* useGpu= */ false); - int* buf = sequenceStartPositions->getMutableData(false); - subSequenceStartPositions = - ICpuGpuVector::create(numSeqs + 1, /* useGpu= */ false); - int* subBuf = subSequenceStartPositions->getMutableData(false); - int64_t pos = 0; - int maxLen = 2 * size / numSeqs; - for (int i = 0; i < numSeqs; ++i) { - int len = - uniformRandom(min(maxLen, size - pos - numSeqs + i)) + 1; - buf[i] = pos; - subBuf[i] = pos; - pos += len; - VLOG(1) << " len=" << len; - } - buf[numSeqs] = size; - subBuf[numSeqs] = size; - } - - vector& arguments = batch->getStreams(); - for (int i = 0; i < numPerSlotType[SlotDef::VECTOR_DENSE]; ++i) { - int64_t dim = rand() % 10 + 4; // NOLINT rand_r - MatrixPtr mat = Matrix::create(size, dim, /* trans= */ false, false); - mat->randomizeUniform(); - Argument arg; - arg.value = mat; - arg.sequenceStartPositions = sequenceStartPositions; - arguments.push_back(arg); - } - for (int i = 0; i < numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE]; ++i) { - MatrixPtr mat = - makeRandomSparseMatrix(size, kSpraseMatrixDim, false, useGpu); - Argument arg; - arg.value = mat; - arg.sequenceStartPositions = sequenceStartPositions; - arg.subSequenceStartPositions = subSequenceStartPositions; - arguments.push_back(arg); - } - for (int i = 0; i < numPerSlotType[SlotDef::VECTOR_SPARSE_VALUE]; ++i) { - MatrixPtr mat = - makeRandomSparseMatrix(size, kSpraseMatrixDim, true, useGpu); - Argument arg; - arg.value = mat; - arg.sequenceStartPositions = sequenceStartPositions; - arguments.push_back(arg); - } - for (int i = 0; i < numPerSlotType[SlotDef::STRING]; ++i) { - int64_t dim = rand() % 10 + 4; // NOLINT rand_r - SVectorPtr vec = std::make_shared>(); - for (int j = 0; j < size; ++j) { - vec->push_back(randStr(dim)); - } - Argument arg; - arg.strs = vec; - arg.sequenceStartPositions = sequenceStartPositions; - arguments.push_back(arg); - } - for (int i = 0; i < numPerSlotType[SlotDef::INDEX]; ++i) { - int64_t dim = rand() % 10 + 4; // NOLINT rand_r - IVectorPtr vec = IVector::create(size, /* useGpu= */ false); - int* buf = vec->getData(); - for (int j = 0; j < size; ++j) { - buf[j] = uniformRandom(dim); - } - Argument arg; - arg.ids = vec; - arg.sequenceStartPositions = sequenceStartPositions; - arguments.push_back(arg); - } -} - -inline int getSlotDim(const Argument& arg) { - if (arg.value) { - return arg.value->getWidth(); - } else if (arg.ids) { - return arg.ids->getMax() + 1; - } else if (arg.strs) { - return 1; - } - LOG(FATAL) << "Invalid argument"; - return 0; -} - -inline SlotDef::SlotType getSlotType(const Argument& arg) { - if (arg.value) { - auto& m = *arg.value; - auto& type = typeid(m); - if (type == typeid(CpuMatrix) || type == typeid(GpuMatrix)) { - return SlotDef::VECTOR_DENSE; - } - if (type == typeid(CpuSparseMatrix)) { - auto valueType = - std::dynamic_pointer_cast(arg.value)->getValueType(); - if (NO_VALUE == valueType) { - return SlotDef::VECTOR_SPARSE_NON_VALUE; - } else { - return SlotDef::VECTOR_SPARSE_VALUE; - } - } - if (type == typeid(GpuSparseMatrix)) { - auto valueType = - std::dynamic_pointer_cast(arg.value)->getValueType(); - if (NO_VALUE == valueType) { - return SlotDef::VECTOR_SPARSE_NON_VALUE; - } else { - return SlotDef::VECTOR_SPARSE_VALUE; - } - } - - LOG(FATAL) << "Unknown matrix type"; - } - if (arg.ids) return SlotDef::INDEX; - if (arg.strs) return SlotDef::STRING; - LOG(FATAL) << "Invalid argument"; - return SlotDef::VECTOR_DENSE; -} - -void getColRow(const Argument& arg, - int64_t pos, - bool useGpu, - int* colNum, - const int** rowCols, - const real** rowValues) { - SlotDef::SlotType type = getSlotType(arg); - GpuSparseMatrixPtr matGpu; - CpuSparseMatrixPtr matCpu; - if (useGpu) { - matGpu = dynamic_pointer_cast(arg.value); - ASSERT_TRUE(matGpu != NULL); - } else { - matCpu = dynamic_pointer_cast(arg.value); - ASSERT_TRUE(matCpu != NULL); - } - *colNum = useGpu ? matGpu->getColNum(pos) : matCpu->getColNum(pos); - *rowCols = useGpu ? matGpu->getRowCols(pos) : matCpu->getRowCols(pos); - if (type == SlotDef::VECTOR_SPARSE_VALUE) { - *rowValues = useGpu ? matGpu->getRowValues(pos) : matCpu->getRowValues(pos); - } else { - *rowValues = NULL; - } -} - -void makeSample(const vector& arguments, - int64_t pos, - bool isBeginning, - DataSample* sample, - bool useGpu) { - sample->set_is_beginning(isBeginning); - int slotid = 0; - for (auto& arg : arguments) { - SlotDef::SlotType type = getSlotType(arg); - int64_t dim = getSlotDim(arg); - switch (type) { - case SlotDef::VECTOR_DENSE: { - VectorSlot* vecSlot = sample->add_vector_slots(); - auto values = vecSlot->mutable_values(); - values->Reserve(dim); - for (int i = 0; i < dim; ++i) { - values->AddAlreadyReserved( - static_cast(arg.value->getElement(pos, i))); - } - break; - } - case SlotDef::INDEX: { - sample->add_id_slots(arg.ids->get(pos)); - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: { - VectorSlot* vecSlot = sample->add_vector_slots(); - auto ids = vecSlot->mutable_ids(); - int colNum; - const int* rowCols; - const real* rowValues; // nullptr - getColRow(arg, pos, useGpu, &colNum, &rowCols, &rowValues); - ids->Reserve(colNum); - for (int i = 0; i < colNum; ++i) { - ids->AddAlreadyReserved(rowCols[i]); - } - SubseqSlot* subseqSlot = sample->add_subseq_slots(); // subseq - subseqSlot->set_slot_id(slotid); - auto lens = subseqSlot->mutable_lens(); - lens->Add(colNum); - break; - } - case SlotDef::VECTOR_SPARSE_VALUE: { - VectorSlot* vecSlot = sample->add_vector_slots(); - auto values = vecSlot->mutable_values(); - auto ids = vecSlot->mutable_ids(); - int colNum; - const int* rowCols; - const real* rowValues; - getColRow(arg, pos, useGpu, &colNum, &rowCols, &rowValues); - ids->Reserve(colNum); - values->Reserve(colNum); - for (int i = 0; i < colNum; ++i) { - ids->AddAlreadyReserved(rowCols[i]); - values->AddAlreadyReserved(rowValues[i]); - } - break; - } - case SlotDef::VAR_MDIM_DENSE: - case SlotDef::VAR_MDIM_INDEX: { - LOG(FATAL) << "Not implemented"; - break; - } - case SlotDef::STRING: { - VectorSlot* vecSlot = sample->add_vector_slots(); - vecSlot->add_strs((*arg.strs)[pos]); - break; - } - } - slotid++; - } -} - -void writeData(const DataBatch& batch, bool useGpu, bool dataCompression) { - DataHeader header; - const vector& arguments = batch.getStreams(); - for (auto& argument : arguments) { - SlotDef* slotDef = header.add_slot_defs(); - slotDef->set_type(getSlotType(argument)); - slotDef->set_dim(getSlotDim(argument)); - } - VLOG(1) << "header=" << header.DebugString(); - - int64_t totalSeqs = batch.getNumSequences(); - int64_t seq = 0; - ICpuGpuVectorPtr sequenceStartPositions = arguments[0].sequenceStartPositions; - int64_t numWritten = 0; - vector curProtoFiles = - dataCompression ? protoFilesCompressed : protoFiles; - for (size_t i = 0; i < curProtoFiles.size(); ++i) { - int64_t numSeqs = totalSeqs * (i + 1) / curProtoFiles.size() - - totalSeqs * i / curProtoFiles.size(); - ofstream os(curProtoFiles[i]); - CHECK(os) << "Fail to open " << curProtoFiles[i]; - unique_ptr writer(new ProtoWriter(&os, dataCompression)); - CHECK(writer->write(header)); - for (int j = 0; j < numSeqs; ++j, ++seq) { - int64_t begin = seq; - int64_t end = seq + 1; - if (sequenceStartPositions) { - begin = sequenceStartPositions->getElement(seq); - end = sequenceStartPositions->getElement(seq + 1); - } - for (int pos = begin; pos < end; ++pos) { - DataSample sample; - makeSample(arguments, pos, pos == begin, &sample, useGpu); - CHECK(writer->write(sample)); - ++numWritten; - } - } - - writer.reset(nullptr); - os.close(); - } - CHECK_EQ(arguments[0].getBatchSize(), numWritten); -} - -// check that the sample at pos1 in args1 is same as the sample at pos2 in args2 -void checkSample(const vector& args1, - int64_t pos1, - const vector& args2, - int64_t pos2, - bool useGpu) { - EXPECT_EQ(args1.size(), args2.size()); - VLOG(1) << " pos1=" << pos1 << " pos2=" << pos2; - - for (size_t i = 0; i < args1.size(); ++i) { - auto type = getSlotType(args1[i]); - int dim = getSlotDim(args1[i]); - EXPECT_EQ(type, getSlotType(args2[i])); - if (type == SlotDef::INDEX) { - EXPECT_GE(dim, getSlotDim(args2[i])); - } else { - EXPECT_EQ(dim, getSlotDim(args2[i])); - } - switch (type) { - case SlotDef::VECTOR_DENSE: { - for (int j = 0; j < dim; ++j) { - EXPECT_EQ(static_cast(args1[i].value->getElement(pos1, j)), - static_cast(args2[i].value->getElement(pos2, j))); - } - break; - } - case SlotDef::INDEX: { - EXPECT_EQ(args1[i].ids->get(pos1), args2[i].ids->get(pos2)); - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: - case SlotDef::VECTOR_SPARSE_VALUE: { - int colNum1, colNum2; - const int *rowCols1, *rowCols2; - const real *rowValues1, *rowValues2; - getColRow(args1[i], pos1, useGpu, &colNum1, &rowCols1, &rowValues1); - getColRow(args2[i], pos2, useGpu, &colNum2, &rowCols2, &rowValues2); - EXPECT_EQ(colNum1, colNum2); - for (int j = 0; j < colNum1; ++j) { - EXPECT_EQ(rowCols1[j], rowCols2[j]); - if (type == SlotDef::VECTOR_SPARSE_VALUE) { - EXPECT_EQ(rowValues1[j], rowValues2[j]); - } - } - break; - } - case SlotDef::VAR_MDIM_DENSE: - case SlotDef::VAR_MDIM_INDEX: { - LOG(FATAL) << "Not implemented"; - break; - } - case SlotDef::STRING: { - EXPECT_EQ((*args1[i].strs)[pos1], (*args2[i].strs)[pos2]); - break; - } - } - } -} - -void testProtoDataProvider(int* numPerSlotType, - bool iid, - bool async, - bool useGpu, - bool dataCompression, - int numConstantSlots = 0) { - mkDir(kTestDir); - DataBatch data; - - prepareData(&data, numPerSlotType, iid, useGpu); - writeData(data, useGpu, dataCompression); - - DataConfig config; - config.set_type("proto"); - config.set_files(dataCompression ? kProtoFileListCompressed : kProtoFileList); - config.set_async_load_data(async); - - for (int i = 0; i < numConstantSlots; ++i) { - config.add_constant_slots(i + 11); - MatrixPtr w = Matrix::create(data.getSize(), - 1, - /* trans= */ false, - /* useGpu= */ false); - w->assign(config.constant_slots(i)); - data.appendData(w); - } - - unique_ptr dataProvider(DataProvider::create(config, useGpu)); - dataProvider->setSkipShuffle(); - - EXPECT_EQ(data.getSize(), dataProvider->getSize()); - - int64_t batchSize = 10; - DataBatch batch; - - size_t seq1 = 0; - vector& args1 = data.getStreams(); - ICpuGpuVectorPtr sequenceStartPositions1 = args1[0].sequenceStartPositions; - - dataProvider->reset(); - - while (dataProvider->getNextBatch(batchSize, &batch) > 0) { - CHECK_EQ(data.getNumStreams(), batch.getNumStreams()); - vector& args2 = batch.getStreams(); - ICpuGpuVectorPtr sequenceStartPositions2 = args2[0].sequenceStartPositions; - for (auto& arg : args2) { - EXPECT_EQ(iid, !arg.sequenceStartPositions); - } - size_t numSeqs = batch.getNumSequences(); - VLOG(1) << "numSeqs=" << numSeqs; - for (size_t seq2 = 0; seq2 < numSeqs; ++seq1, ++seq2) { - int64_t begin1 = seq1; - int64_t end1 = seq1 + 1; - if (sequenceStartPositions1) { - begin1 = sequenceStartPositions1->getElement(seq1); - end1 = sequenceStartPositions1->getElement(seq1 + 1); - EXPECT_LT(seq1, sequenceStartPositions1->getSize() - 1); - } - - int64_t begin2 = seq2; - int64_t end2 = seq2 + 1; - if (sequenceStartPositions2) { - begin2 = sequenceStartPositions2->getElement(seq2); - end2 = sequenceStartPositions2->getElement(seq2 + 1); - } - VLOG(1) << " begin1=" << begin1 << " end1=" << end1 - << " begin2=" << begin2 << " end2=" << end2; - EXPECT_EQ(end1 - begin1, end2 - begin2); - for (int i = 0; i < end1 - begin1; ++i) { - checkSample(args1, begin1 + i, args2, begin2 + i, useGpu); - } - } - } - - EXPECT_EQ(seq1, (size_t)data.getNumSequences()); - rmDir(kTestDir); -} - -TEST(ProtoDataProvider, test) { - int numSlotsArray[] = {0, 3}; - int numTwoArray[] = {0, 1}; - int numSlotsArraySize = sizeof(numSlotsArray) / sizeof(numSlotsArray[0]); - const int numSlot = 5; - int combination[numSlot] = {0}; - int k = numSlot - 1; - while (k >= 0) { - int numDenseVecSlots = numSlotsArray[combination[0]]; - int numSparseNonValueVecSlots = numSlotsArray[combination[1]]; - int numSparseValueVectorSlots = numSlotsArray[combination[2]]; - int numStrSlots = numSlotsArray[combination[3]]; - int numIdSlots = numSlotsArray[combination[4]]; - // while loop : traverse all cases - k = numSlot - 1; - while (k >= 0) { - if (combination[k] < (numSlotsArraySize - 1)) { - ++combination[k]; - break; - } else { - combination[k] = 0; - --k; - } - } - if (numDenseVecSlots + numSparseNonValueVecSlots + - numSparseValueVectorSlots + numStrSlots + numIdSlots < - 1) - continue; - for (int iid : numTwoArray) { - for (int async : numTwoArray) { - for (int useGpu : numTwoArray) { - for (int dataCompression : numTwoArray) { - if (async && useGpu) { - // Currently in async mode, useGpu is not supported - continue; - } -#ifndef PADDLE_WITH_CUDA - if (useGpu) { - continue; - } -#endif - LOG(INFO) << " numDenseVecSlots=" << numDenseVecSlots - << " numSparseNonValueVecSlots=" - << numSparseNonValueVecSlots - << " numSparseValueVectorSlots=" - << numSparseValueVectorSlots - << " numStrSlots=" << numStrSlots - << " numIdSlots=" << numIdSlots << " iid=" << iid - << " async=" << async << " useGpu=" << useGpu - << " dataCompression=" << dataCompression; - int numPerSlotType[SlotDef::SlotType_ARRAYSIZE] = {0}; - numPerSlotType[SlotDef::VECTOR_DENSE] = numDenseVecSlots; - numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE] = - numSparseNonValueVecSlots; - numPerSlotType[SlotDef::VECTOR_SPARSE_VALUE] = - numSparseValueVectorSlots; - numPerSlotType[SlotDef::INDEX] = numIdSlots; - numPerSlotType[SlotDef::STRING] = numStrSlots; - testProtoDataProvider( - numPerSlotType, iid, async, useGpu, dataCompression); - } // end for (int dataCompression : numTwoArray) - } // end for (int useGpu : numTwoArray) - } // end for (int async : numTwoArray) - } // end for (int iid : numTwoArray) - } // end for (while, traverse all slots) -} - -TEST(ProtoDataProvider, constant_slots) { - int numSlotsArray[] = {0, 3}; - int numTwoArray[] = {0, 1}; - for (int numDenseVecSlots : numSlotsArray) { - for (int numSparseNonValueVecSlots : numSlotsArray) { - if (numDenseVecSlots + numSparseNonValueVecSlots < 1) continue; - for (int numConstantSlots : {1, 2}) { - for (int useGpu : numTwoArray) { - for (int dataCompression : numTwoArray) { -#ifndef PADDLE_WITH_CUDA - if (useGpu) { - continue; - } -#endif - LOG(INFO) << " numDenseVecSlots=" << numDenseVecSlots - << " numSparseNonValueVecSlots=" - << numSparseNonValueVecSlots - << " numConstantSlogs=" << numConstantSlots - << " useGpu=" << useGpu - << " dataCompression=" << dataCompression; - int numPerSlotType[SlotDef::SlotType_ARRAYSIZE] = {0}; - numPerSlotType[SlotDef::VECTOR_DENSE] = numDenseVecSlots; - numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE] = - numSparseNonValueVecSlots; - numPerSlotType[SlotDef::VECTOR_SPARSE_VALUE] = 1; - numPerSlotType[SlotDef::INDEX] = 1; - testProtoDataProvider(numPerSlotType, - /* iid= */ true, - /* async= */ false, - useGpu, - dataCompression, - numConstantSlots); - } // end for (int dataCompression : numTwoArray) - } // end for (int useGpu : numTwoArray) - } // end for (int numConstantSlots : {1, 2}) - } // end for (int numSparseNonValueVecSlots : numSlotsArray) - } // end for (int numDenseVecSlots : numSlotsArray) -} - -void checkSampleSequence(const vector& args1, - const vector& args2, - int64_t offset, - int64_t numSeqs, - bool useGpu) { - // check slot num are equal - EXPECT_EQ(args1.size(), args2.size()); - for (size_t i = 0; i < args1.size(); i++) { - auto type = getSlotType(args1[i]); - // check for args2: sequenceStartPositions vs numSeqs - // (1) size - EXPECT_EQ(args2[i].sequenceStartPositions->getSize(), (size_t)numSeqs + 1); - // (2) content - auto checkArgContent = [&](const Argument& args, int numSeqs) { - for (int j = 0; j <= numSeqs; j++) { - int start_pos = args.sequenceStartPositions->getElement(j); - EXPECT_EQ(start_pos, j); - } - }; - switch (type) { - case SlotDef::INDEX: { - // args1: for label - checkArgContent(args2[i], numSeqs); - // check for args2: ids are equal to args1[offset] - // (1) size - EXPECT_EQ(args2[i].ids->getSize(), (size_t)numSeqs); - // (2) content - for (int j = 0; j < numSeqs; j++) { - EXPECT_EQ(args2[i].ids->get(j), args1[i].ids->get(offset + j)); - } - break; - } - case SlotDef::VECTOR_SPARSE_NON_VALUE: { - // args1: for sparse_non_value - // args2 should put sparse indexes in ids - int colNum1; - const int* rowCols1; - const real* rowValues1; // nullptr - int totalLength = 0; - for (int j = 0; j < numSeqs; j++) { - getColRow( - args1[i], offset + j, useGpu, &colNum1, &rowCols1, &rowValues1); - // (1) lengths - EXPECT_EQ(totalLength, - args2[i].sequenceStartPositions->getElement(j)); - EXPECT_EQ(totalLength, - args2[i].subSequenceStartPositions->getElement(j)); - // (2) content - for (int k = 0; k < colNum1; k++) { - EXPECT_EQ(rowCols1[k], args2[i].ids->get(totalLength + k)); - } - totalLength += colNum1; - if (colNum1 == 0) { - // special case here: we will put a "-1" into ids when column num is - // zero. see ProtoSequenceDataProvider::getNextBatchInternal. - EXPECT_EQ(-1, args2[i].ids->get(totalLength)); - totalLength++; - } - } - EXPECT_EQ(totalLength, - args2[i].sequenceStartPositions->getElement(numSeqs)); - EXPECT_EQ(totalLength, - args2[i].subSequenceStartPositions->getElement(numSeqs)); - break; - } - case SlotDef::VECTOR_DENSE: { - // args1: for dense vector - checkArgContent(args2[i], numSeqs); - // check for args2: values are equal to args1[offset] - // (1) size - EXPECT_EQ(args2[i].value->getHeight(), (size_t)numSeqs); - EXPECT_EQ(args2[i].value->getWidth(), (size_t)getSlotDim(args1[i])); - // (2) content - for (int j = 0; j < numSeqs; j++) { - for (size_t k = 0; k < args2[i].value->getWidth(); k++) { - EXPECT_EQ( - static_cast(args1[i].value->getElement(j + offset, k)), - static_cast(args2[i].value->getElement(j, k))); - } - } - break; - } - default: { EXPECT_EQ(true, false) << "should not reach here"; } - } - } -} - -void testProtoSequenceDataProvider(int* numPerSlotType, - bool async, - bool useGpu) { - mkDir(kTestDir); - DataBatch data; - - prepareData(&data, - numPerSlotType, - /* iid */ true, - useGpu); - writeData(data, useGpu, /* dataCompression */ false); - - DataConfig config; - config.set_type("proto_sequence"); - config.set_files(kProtoFileList); - config.set_async_load_data(async); - - unique_ptr dataProvider(DataProvider::create(config, useGpu)); - dataProvider->setSkipShuffle(); - - EXPECT_EQ(data.getSize(), dataProvider->getSize()); - - int64_t batchSize = 10; - DataBatch batch; - - vector& args1 = data.getStreams(); - ICpuGpuVectorPtr sequenceStartPositions1 = args1[0].sequenceStartPositions; - - dataProvider->reset(); - - size_t args1Offset = 0; - while (dataProvider->getNextBatch(batchSize, &batch) > 0) { - CHECK_EQ(data.getNumStreams(), batch.getNumStreams()); - vector& args2 = batch.getStreams(); - ICpuGpuVectorPtr sequenceStartPositions2 = args2[0].sequenceStartPositions; - for (auto& arg : args1) { - // args1 should not has sequence - EXPECT_EQ(true, !arg.sequenceStartPositions); - } - for (auto& arg : args2) { - // args2 should has sequence - EXPECT_NE(true, !arg.sequenceStartPositions); - } - size_t numSeqs = batch.getNumSequences(); - checkSampleSequence(args1, args2, args1Offset, numSeqs, useGpu); - args1Offset += numSeqs; - } - - EXPECT_EQ(args1Offset, (size_t)data.getNumSequences()); - rmDir(kTestDir); -} - -TEST(ProtoSequenceDataProvider, test) { - int numSlotsArray[] = {0, 3}; - int numTwoArray[] = {0, 1}; - for (int numSparseNonValueVecSlots : numSlotsArray) { - for (int numIdSlots : numSlotsArray) { - for (int numDenseVecSlots : numSlotsArray) { - if (numDenseVecSlots + numSparseNonValueVecSlots + numIdSlots < 1) - continue; - for (int async : numTwoArray) { - for (int useGpu : numTwoArray) { - if (async && useGpu) { - // Currently in async mode, useGpu is not supported - continue; - } -#ifndef PADDLE_WITH_CUDA - if (useGpu) { - continue; - } -#endif - LOG(INFO) << " numDenseVecSlots=" << numDenseVecSlots - << " numSparseNonValueVecSlots=" - << numSparseNonValueVecSlots - << " numIdSlots=" << numIdSlots << " async=" << async - << " useGpu=" << useGpu; - int numPerSlotType[SlotDef::SlotType_ARRAYSIZE] = {0}; - numPerSlotType[SlotDef::VECTOR_DENSE] = numDenseVecSlots; - numPerSlotType[SlotDef::VECTOR_SPARSE_NON_VALUE] = - numSparseNonValueVecSlots; - numPerSlotType[SlotDef::INDEX] = numIdSlots; - testProtoSequenceDataProvider(numPerSlotType, async, useGpu); - } // end for (int useGpu : numTwoArray) - } // end for (int async : numTwoArray) - } // end for (int numDenseVecSlots : numSlotsArray) - } // end for (int numIdSlots : numSlotsArray) - } // end for (int numSparseNonValueVecSlots : numSlotsArray) -}