提交 9f53aad1 编写于 作者: Q Qiao Longfei

add test for read csv data

上级 fbd6f501
...@@ -76,22 +76,6 @@ static inline void parse_line( ...@@ -76,22 +76,6 @@ static inline void parse_line(
// label slot1:fea_sign slot2:fea_sign slot1:fea_sign // label slot1:fea_sign slot2:fea_sign slot1:fea_sign
static inline void parse_svm_line(const std::string& line) {} static inline void parse_svm_line(const std::string& line) {}
// label,dense_fea,dense_fea,sparse_fea,sparse_fea
static inline void parse_csv_line(const std::string& line,
const DataDesc& data_desc, int64_t* label,
std::vector<float>* dense_datas,
std::vector<int64_t>* sparse_datas) {
std::vector<std::string> ret;
string_split(line, ',', &ret);
*label = std::stol(ret[2]) > 0;
for (auto& idx : data_desc.dense_slot_index_) {
dense_datas->push_back(std::stof(ret[idx]));
}
for (auto& idx : data_desc.sparse_slot_index_) {
sparse_datas->push_back(std::stol(ret[idx]));
}
}
class Reader { class Reader {
public: public:
virtual ~Reader() {} virtual ~Reader() {}
...@@ -250,6 +234,132 @@ void ReadSvmData(const DataDesc& data_desc, std::shared_ptr<Reader> reader, ...@@ -250,6 +234,132 @@ void ReadSvmData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
} }
} }
// label dense_fea,dense_fea sparse_fea,sparse_fea
static inline void parse_csv_line(
const std::string& line, const DataDesc& data_desc, int64_t* label,
std::vector<std::vector<float>>* dense_datas,
std::vector<std::vector<int64_t>>* sparse_datas) {
std::vector<std::string> ret;
string_split(line, ' ', &ret);
*label = std::stol(ret[0]);
dense_datas->resize(data_desc.dense_slot_index_.size());
for (size_t i = 0; i < data_desc.dense_slot_index_.size(); ++i) {
int slot_idx = data_desc.dense_slot_index_[i];
auto& slot_data = ret[slot_idx];
std::vector<std::string> data_in_slot_str;
string_split(ret[slot_idx], ',', &data_in_slot_str);
std::vector<float> data_in_slot;
for (auto& data_str : data_in_slot_str) {
(*dense_datas)[i].push_back(std::stof(data_str));
}
}
sparse_datas->resize(data_desc.sparse_slot_index_.size());
for (size_t i = 0; i < data_desc.sparse_slot_index_.size(); ++i) {
int slot_idx = data_desc.sparse_slot_index_[i];
auto& slot_data = ret[slot_idx];
std::vector<std::string> data_in_slot_str;
string_split(ret[slot_idx], ',', &data_in_slot_str);
std::vector<int64_t> data_in_slot;
for (auto& data_str : data_in_slot_str) {
(*sparse_datas)[i].push_back(std::stol(data_str));
}
}
}
void ReadCsvData(const DataDesc& data_desc, std::shared_ptr<Reader> reader,
std::shared_ptr<LoDTensorBlockingQueue> queue) {
std::string line;
while (reader->HasNext()) {
std::vector<int64_t> batch_label;
batch_label.reserve(data_desc.batch_size_);
std::vector<std::vector<std::vector<float>>> batch_dense_data;
batch_dense_data.reserve(data_desc.batch_size_);
std::vector<std::vector<std::vector<int64_t>>> batch_sparse_data;
batch_sparse_data.reserve(data_desc.batch_size_);
// read batch_size data
for (int i = 0; i < data_desc.batch_size_; ++i) {
if (reader->HasNext()) {
reader->NextLine(&line);
int64_t label;
std::vector<std::vector<float>> dense_datas;
std::vector<std::vector<int64_t>> sparse_datas;
parse_csv_line(line, data_desc, &label, &dense_datas, &sparse_datas);
batch_label.push_back(label);
if (!batch_dense_data.empty()) {
PADDLE_ENFORCE_EQ(batch_dense_data[0].size(), dense_datas.size(),
"dense data should have the same shape");
}
batch_dense_data.push_back(dense_datas);
batch_sparse_data.push_back(sparse_datas);
} else {
break;
}
}
// the order of output data is label, dense_datas, sparse_datas
std::vector<framework::LoDTensor> lod_datas;
// insert label tensor
framework::LoDTensor label_tensor;
auto* label_tensor_data = label_tensor.mutable_data<int64_t>(
framework::make_ddim({static_cast<int64_t>(batch_label.size()), 1}),
platform::CPUPlace());
memcpy(label_tensor_data, batch_label.data(),
batch_label.size() * sizeof(int64_t));
auto dim =
framework::make_ddim({static_cast<int64_t>(batch_label.size()), 1});
lod_datas.push_back(label_tensor);
// insert tensor for each dense_slots
for (size_t i = 0; i < data_desc.dense_slot_index_.size(); ++i) {
framework::LoDTensor lod_tensor;
size_t width = batch_dense_data[0][i].size();
auto* tensor_data = lod_tensor.mutable_data<float>(
framework::make_ddim(
{static_cast<int64_t>(batch_dense_data.size()), // batch_size
static_cast<int64_t>(width)}),
platform::CPUPlace());
for (size_t j = 0; j < batch_dense_data.size(); ++j) {
auto& dense_data_row = batch_dense_data[j][i];
memcpy(tensor_data + j * width, dense_data_row.data(),
width * sizeof(float));
}
lod_datas.push_back(lod_tensor);
}
// insert tensor for each sparse_slots
for (size_t i = 0; i < data_desc.sparse_slot_index_.size(); ++i) {
std::vector<size_t> lod_data{0};
std::vector<int64_t> batch_feasign;
for (size_t row_idx = 0; row_idx < batch_sparse_data.size(); ++row_idx) {
auto& sparse_ids = batch_sparse_data[row_idx][i];
lod_data.push_back(lod_data.back() + sparse_ids.size());
batch_feasign.insert(batch_feasign.end(), sparse_ids.begin(),
sparse_ids.end());
}
framework::LoDTensor lod_tensor;
framework::LoD lod{lod_data};
lod_tensor.set_lod(lod);
int64_t* tensor_data = lod_tensor.mutable_data<int64_t>(
framework::make_ddim({static_cast<int64_t>(batch_feasign.size()), 1}),
platform::CPUPlace());
memcpy(tensor_data, batch_feasign.data(),
batch_feasign.size() * sizeof(int64_t));
lod_datas.push_back(lod_tensor);
}
queue->Push(lod_datas);
VLOG(4) << "push one data, queue_size=" << queue->Size();
}
}
void ReadThread(const std::vector<std::string>& file_list, void ReadThread(const std::vector<std::string>& file_list,
const DataDesc& data_desc, int thread_id, const DataDesc& data_desc, int thread_id,
std::vector<ReaderThreadStatus>* thread_status, std::vector<ReaderThreadStatus>* thread_status,
...@@ -276,6 +386,8 @@ void ReadThread(const std::vector<std::string>& file_list, ...@@ -276,6 +386,8 @@ void ReadThread(const std::vector<std::string>& file_list,
if (data_desc.file_format_ == "svm") { if (data_desc.file_format_ == "svm") {
ReadSvmData(data_desc, reader, queue); ReadSvmData(data_desc, reader, queue);
} else if (data_desc.file_format_ == "csv") {
ReadCsvData(data_desc, reader, queue);
} }
(*thread_status)[thread_id] = Stopped; (*thread_status)[thread_id] = Stopped;
......
...@@ -159,3 +159,71 @@ TEST(CTR_READER, read_data) { ...@@ -159,3 +159,71 @@ TEST(CTR_READER, read_data) {
&reader); &reader);
reader.Shutdown(); reader.Shutdown();
} }
static void GenereteCsvData(const std::string& file_name,
const std::vector<std::string>& data) {
std::ofstream out(file_name.c_str());
PADDLE_ENFORCE(out.good(), "open file %s failed!", file_name);
for (auto& c : data) {
out << c;
}
out.close();
PADDLE_ENFORCE(out.good(), "save file %s failed!", file_name);
}
static void CheckReadCsvOut(const std::vector<LoDTensor>& out) {
ASSERT_EQ(out.size(), 3);
ASSERT_EQ(out[0].dims()[1], 1);
ASSERT_EQ(out[1].dims()[1], 2);
ASSERT_EQ(out[2].dims()[1], 1);
for (size_t i = 0; i < out[0].numel(); ++i) {
int64_t label = out[0].data<int64_t>()[i];
auto& dense_dim = out[1].dims();
for (size_t j = 0; j < dense_dim[1]; ++j) {
ASSERT_EQ(out[1].data<float>()[i * dense_dim[1] + j],
static_cast<float>(label + 0.1));
}
auto& sparse_lod = out[2].lod();
for (size_t j = sparse_lod[0][i]; j < sparse_lod[0][i + 1]; ++j) {
ASSERT_EQ(out[2].data<int64_t>()[j], label);
}
}
}
TEST(CTR_READER, read_csv_data) {
std::string file_name = "test_ctr_reader_data.csv";
const std::vector<std::string> csv_data = {
"0 0.1,0.1 0,0,0,0\n", "1 1.1,1.1 1,1,1,1\n", "2 2.1,2.1 2,2,2,2\n",
"3 3.1,3.1 3,3,3,3\n",
};
GenereteCsvData(file_name, csv_data);
LoDTensorBlockingQueueHolder queue_holder;
int capacity = 64;
queue_holder.InitOnce(capacity, false);
std::shared_ptr<LoDTensorBlockingQueue> queue = queue_holder.GetQueue();
int batch_size = 3;
int thread_num = 1;
std::vector<std::string> file_list;
for (int i = 0; i < thread_num; ++i) {
file_list.push_back(file_name);
}
DataDesc data_desc(batch_size, file_list, "plain", "csv", {1}, {2}, {});
CTRReader reader(queue, thread_num, data_desc);
for (size_t i = 0; i < 2; ++i) {
reader.Start();
std::vector<LoDTensor> out;
while (true) {
reader.ReadNext(&out);
if (out.empty()) {
break;
}
CheckReadCsvOut(out);
}
reader.Shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册