// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "paddle/fluid/operators/reader/ctr_reader.h" #include #include #include #include #include #include #include #include #include namespace paddle { namespace operators { namespace reader { static inline void string_split(const std::string& s, const char delimiter, std::vector* output) { size_t start = 0; size_t end = s.find_first_of(delimiter); while (end <= std::string::npos) { output->emplace_back(s.substr(start, end - start)); if (end == std::string::npos) { break; } start = end + 1; end = s.find_first_of(delimiter, start); } } static inline void parse_line( const std::string& line, const std::unordered_map& slot_to_index, int64_t* label, std::unordered_map>* slot_to_data) { std::vector ret; string_split(line, ' ', &ret); *label = std::stoi(ret[2]) > 0; for (size_t i = 3; i < ret.size(); ++i) { const std::string& item = ret[i]; std::vector feasign_and_slot; string_split(item, ':', &feasign_and_slot); if (feasign_and_slot.size() == 2 && slot_to_index.find(feasign_and_slot[1]) != slot_to_index.end()) { int64_t feasign = std::strtoll(feasign_and_slot[0].c_str(), NULL, 10); (*slot_to_data)[feasign_and_slot[1]].push_back(feasign); } } // NOTE:: if the slot has no value, then fill [0] as it's data. for (auto& item : slot_to_index) { if (slot_to_data->find(item.first) == slot_to_data->end()) { (*slot_to_data)[item.first].push_back(0); } } } // label slot1:fea_sign slot2:fea_sign slot1:fea_sign static inline void parse_svm_line(const std::string& line) {} // label,dense_fea,dense_fea,sparse_fea,sparse_fea static inline void parse_csv_line(const std::string& line, const std::vector& dense_slots, const std::vector& sparse_slots, int64_t* label, std::vector* dense_datas, std::vector* sparse_datas) { std::vector ret; string_split(line, ',', &ret); *label = std::stoi(ret[2]) > 0; } class Reader { public: virtual ~Reader() {} virtual bool HasNext() = 0; virtual void NextLine(std::string* line) = 0; }; class GzipReader : public Reader { public: explicit GzipReader(const std::string& file_name) : gzstream_(file_name.c_str()) {} ~GzipReader() {} bool HasNext() override { return gzstream_.peek() != EOF; } void NextLine(std::string* line) override { std::getline(gzstream_, *line); } private: igzstream gzstream_; }; class PlainFileReader : public Reader { public: explicit PlainFileReader(const std::string& file_name) : myfile_(file_name.c_str()) {} ~PlainFileReader() {} bool HasNext() override { return myfile_.peek() != EOF; } void NextLine(std::string* line) override { std::getline(myfile_, *line); } private: std::ifstream myfile_; }; template class MultiFileReader : public Reader { public: explicit MultiFileReader(const std::vector& file_list) { for (auto& file : file_list) { readers_.emplace_back(std::make_shared(file)); } } bool HasNext() override { if (current_reader_index_ >= readers_.size()) { return false; } if (!readers_[current_reader_index_]->HasNext()) { current_reader_index_++; return HasNext(); } return true; } void NextLine(std::string* line) override { readers_[current_reader_index_]->NextLine(line); } private: std::vector> readers_; size_t current_reader_index_ = 0; }; void MonitorThread(std::vector* thread_status, std::shared_ptr queue) { VLOG(3) << "monitor thread in"; bool reader_thread_is_running = true; while (reader_thread_is_running) { VLOG(3) << "reader_thread_is_running"; reader_thread_is_running = false; for (size_t i = 0; i < (*thread_status).size(); ++i) { if ((*thread_status)[i] == Running) { VLOG(3) << "reader is running!"; reader_thread_is_running = true; } } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } VLOG(3) << "all reader thread is stopped, push empty data into queue"; queue->Push({}); VLOG(3) << "monitor thread exited"; } void ReadThread(const std::vector& file_list, const DataDesc& data_desc, int thread_id, std::vector* thread_status, std::shared_ptr queue) { VLOG(3) << "[" << thread_id << "]" << " reader thread start! thread_id = " << thread_id; for (auto& file : file_list) { VLOG(3) << "[" << thread_id << "]" << " file " << file; } (*thread_status)[thread_id] = Running; VLOG(3) << "set status to running"; std::unordered_map slot_to_index; for (size_t i = 0; i < data_desc.sparse_slot_ids_.size(); ++i) { slot_to_index[data_desc.sparse_slot_ids_[i]] = i; } std::string line; std::vector>> batch_data; std::vector batch_label; std::unique_ptr reader; if (data_desc.file_type_ == "gzip") { reader.reset(new MultiFileReader(file_list)); } else if (data_desc.file_type_ == "plain") { reader.reset(new MultiFileReader(file_list)); } else { PADDLE_THROW("do not support file format %s", data_desc.file_type_); } VLOG(3) << "reader inited"; while (reader->HasNext()) { batch_data.clear(); batch_data.reserve(data_desc.batch_size_); batch_label.clear(); batch_label.reserve(data_desc.batch_size_); // read batch_size data for (int i = 0; i < data_desc.batch_size_; ++i) { if (reader->HasNext()) { reader->NextLine(&line); std::unordered_map> slot_to_data; int64_t label; parse_line(line, slot_to_index, &label, &slot_to_data); batch_data.push_back(slot_to_data); batch_label.push_back(label); } else { break; } } std::vector lod_datas; // first insert tensor for each sparse_slots for (auto& slot : data_desc.sparse_slot_ids_) { std::vector lod_data{0}; std::vector batch_feasign; for (size_t i = 0; i < batch_data.size(); ++i) { auto& feasign = batch_data[i][slot]; lod_data.push_back(lod_data.back() + feasign.size()); batch_feasign.insert(batch_feasign.end(), feasign.begin(), feasign.end()); } framework::LoDTensor lod_tensor; framework::LoD lod{lod_data}; lod_tensor.set_lod(lod); int64_t* tensor_data = lod_tensor.mutable_data( framework::make_ddim({1, static_cast(batch_feasign.size())}), platform::CPUPlace()); memcpy(tensor_data, batch_feasign.data(), batch_feasign.size() * sizeof(int64_t)); lod_datas.push_back(lod_tensor); } // insert label tensor framework::LoDTensor label_tensor; auto* label_tensor_data = label_tensor.mutable_data( framework::make_ddim({1, static_cast(batch_label.size())}), platform::CPUPlace()); memcpy(label_tensor_data, batch_label.data(), batch_label.size() * sizeof(int64_t)); lod_datas.push_back(label_tensor); queue->Push(lod_datas); VLOG(4) << "push one data, queue_size=" << queue->Size(); } (*thread_status)[thread_id] = Stopped; VLOG(3) << "set status to stopped, thread " << thread_id << " exited"; } } // namespace reader } // namespace operators } // namespace paddle