From b1c458d0444f20c0fcb8865db32edb6f2972cb0b Mon Sep 17 00:00:00 2001 From: Thunderbrook <52529258+Thunderbrook@users.noreply.github.com> Date: Tue, 6 Jul 2021 19:50:43 +0800 Subject: [PATCH] add so parser (#33969) * add delta score, scale show * so parser * windows * windows --- paddle/fluid/framework/data_feed.cc | 68 ++++++++++++++++++ paddle/fluid/framework/data_feed.h | 95 ++++++++++++++++++++++++++ paddle/fluid/framework/data_feed.proto | 1 + python/paddle/fluid/dataset.py | 17 +++++ 4 files changed, 181 insertions(+) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 7b91d545b54..cc4609a740f 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -31,6 +31,11 @@ USE_INT_STAT(STAT_total_feasign_num_in_mem); namespace paddle { namespace framework { +DLManager& global_dlmanager_pool() { + static DLManager manager; + return manager; +} + void RecordCandidateList::ReSize(size_t length) { mutex_.lock(); capacity_ = length; @@ -366,6 +371,10 @@ void InMemoryDataFeed::SetParseInsId(bool parse_ins_id) { template void InMemoryDataFeed::LoadIntoMemory() { #ifdef _LINUX + if (!so_parser_name_.empty()) { + LoadIntoMemoryFromSo(); + return; + } VLOG(3) << "LoadIntoMemory() begin, thread_id=" << thread_id_; std::string filename; while (this->PickOneFile(&filename)) { @@ -408,6 +417,51 @@ void InMemoryDataFeed::LoadIntoMemory() { #endif } +template +void InMemoryDataFeed::LoadIntoMemoryFromSo() { +#ifdef _LINUX + VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_; + + string::LineFileReader reader; + paddle::framework::CustomParser* parser = + global_dlmanager_pool().Load(so_parser_name_, slot_conf_); + + std::string filename; + while (this->PickOneFile(&filename)) { + VLOG(3) << "PickOneFile, filename=" << filename + << ", thread_id=" << thread_id_; + int err_no = 0; + this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_); + CHECK(this->fp_ != nullptr); + __fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER); + + paddle::framework::ChannelWriter writer(input_channel_); + T instance; + platform::Timer timeline; + timeline.Start(); + + while (1) { + if (!reader.getline(&*(fp_.get()))) { + break; + } else { + const char* str = reader.get(); + ParseOneInstanceFromSo(str, &instance, parser); + } + + writer << std::move(instance); + instance = T(); + } + + writer.Flush(); + timeline.Pause(); + VLOG(3) << "LoadIntoMemoryFromSo() read all lines, file=" << filename + << ", cost time=" << timeline.ElapsedSec() + << " seconds, thread_id=" << thread_id_; + } + VLOG(3) << "LoadIntoMemoryFromSo() end, thread_id=" << thread_id_; +#endif +} + // explicit instantiation template class InMemoryDataFeed; @@ -827,16 +881,23 @@ void MultiSlotInMemoryDataFeed::Init( inductive_shape_index_.resize(all_slot_num); use_slots_.clear(); use_slots_is_dense_.clear(); + slot_conf_.resize(all_slot_num); for (size_t i = 0; i < all_slot_num; ++i) { const auto& slot = multi_slot_desc.slots(i); all_slots_[i] = slot.name(); all_slots_type_[i] = slot.type(); use_slots_index_[i] = slot.is_used() ? use_slots_.size() : -1; + + slot_conf_[i].name = slot.name(); + slot_conf_[i].type = slot.type(); + slot_conf_[i].use_slots_index = use_slots_index_[i]; + total_dims_without_inductive_[i] = 1; inductive_shape_index_[i] = -1; if (slot.is_used()) { use_slots_.push_back(all_slots_[i]); use_slots_is_dense_.push_back(slot.is_dense()); + slot_conf_[i].use_slots_is_dense = slot.is_dense(); std::vector local_shape; if (slot.is_dense()) { for (int j = 0; j < slot.shape_size(); ++j) { @@ -869,6 +930,7 @@ void MultiSlotInMemoryDataFeed::Init( } visit_.resize(all_slot_num, false); pipe_command_ = data_feed_desc.pipe_command(); + so_parser_name_ = data_feed_desc.so_parser_name(); finish_init_ = true; input_type_ = data_feed_desc.input_type(); } @@ -887,6 +949,12 @@ void MultiSlotInMemoryDataFeed::GetMsgFromLogKey(const std::string& log_key, *rank = (uint32_t)strtoul(rank_str.c_str(), NULL, 16); } +void MultiSlotInMemoryDataFeed::ParseOneInstanceFromSo(const char* str, + Record* instance, + CustomParser* parser) { + parser->ParseOneInstance(str, instance); +} + bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { #ifdef _LINUX thread_local string::LineFileReader reader; diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index ec79005dfec..04a5b9b4d3a 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -117,6 +117,94 @@ using PvInstance = PvInstanceObject*; inline PvInstance make_pv_instance() { return new PvInstanceObject(); } +struct SlotConf { + std::string name; + std::string type; + int use_slots_index; + int use_slots_is_dense; +}; + +class CustomParser { + public: + CustomParser() {} + virtual ~CustomParser() {} + virtual void Init(const std::vector& slots) = 0; + virtual void ParseOneInstance(const char* str, Record* instance) = 0; +}; + +typedef paddle::framework::CustomParser* (*CreateParserObjectFunc)(); + +class DLManager { + struct DLHandle { + void* module; + paddle::framework::CustomParser* parser; + }; + + public: + DLManager() {} + + ~DLManager() { +#ifdef _LINUX + std::lock_guard lock(mutex_); + for (auto it = handle_map_.begin(); it != handle_map_.end(); ++it) { + delete it->second.parser; + dlclose(it->second.module); + } +#endif + } + + bool Close(const std::string& name) { +#ifdef _LINUX + auto it = handle_map_.find(name); + if (it == handle_map_.end()) { + return true; + } + delete it->second.parser; + dlclose(it->second.module); +#endif + VLOG(0) << "Not implement in windows"; + return false; + } + + paddle::framework::CustomParser* Load(const std::string& name, + std::vector& conf) { +#ifdef _LINUX + std::lock_guard lock(mutex_); + DLHandle handle; + std::map::iterator it = handle_map_.find(name); + if (it != handle_map_.end()) { + return it->second.parser; + } + + handle.module = dlopen(name.c_str(), RTLD_NOW); + if (handle.module == nullptr) { + VLOG(0) << "Create so of " << name << " fail"; + return nullptr; + } + + CreateParserObjectFunc create_parser_func = + (CreateParserObjectFunc)dlsym(handle.module, "CreateParserObject"); + handle.parser = create_parser_func(); + handle.parser->Init(conf); + handle_map_.insert({name, handle}); + + return handle.parser; +#endif + VLOG(0) << "Not implement in windows"; + return nullptr; + } + + paddle::framework::CustomParser* ReLoad(const std::string& name, + std::vector& conf) { + Close(name); + return Load(name, conf); + } + + private: + std::mutex mutex_; + std::map handle_map_; +}; + class DataFeed { public: DataFeed() { @@ -252,6 +340,8 @@ class DataFeed { bool finish_set_filelist_; bool finish_start_; std::string pipe_command_; + std::string so_parser_name_; + std::vector slot_conf_; std::vector ins_id_vec_; std::vector ins_content_vec_; platform::Place place_; @@ -324,10 +414,13 @@ class InMemoryDataFeed : public DataFeed { virtual void SetEnablePvMerge(bool enable_pv_merge); virtual void SetCurrentPhase(int current_phase); virtual void LoadIntoMemory(); + virtual void LoadIntoMemoryFromSo(); protected: virtual bool ParseOneInstance(T* instance) = 0; virtual bool ParseOneInstanceFromPipe(T* instance) = 0; + virtual void ParseOneInstanceFromSo(const char* str, T* instance, + CustomParser* parser) {} virtual void PutToFeedVec(const std::vector& ins_vec) = 0; int thread_id_; @@ -688,6 +781,8 @@ class MultiSlotInMemoryDataFeed : public InMemoryDataFeed { protected: virtual bool ParseOneInstance(Record* instance); virtual bool ParseOneInstanceFromPipe(Record* instance); + virtual void ParseOneInstanceFromSo(const char* str, Record* instance, + CustomParser* parser); virtual void PutToFeedVec(const std::vector& ins_vec); virtual void GetMsgFromLogKey(const std::string& log_key, uint64_t* search_id, uint32_t* cmatch, uint32_t* rank); diff --git a/paddle/fluid/framework/data_feed.proto b/paddle/fluid/framework/data_feed.proto index 8bbbd06e7ef..c1149ed7518 100644 --- a/paddle/fluid/framework/data_feed.proto +++ b/paddle/fluid/framework/data_feed.proto @@ -33,4 +33,5 @@ message DataFeedDesc { optional string rank_offset = 6; optional int32 pv_batch_size = 7 [ default = 32 ]; optional int32 input_type = 8 [ default = 0 ]; + optional string so_parser_name = 9; } diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index ea9c2ea7550..8d20dd99447 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -95,6 +95,23 @@ class DatasetBase(object): """ self.proto_desc.pipe_command = pipe_command + def set_so_parser_name(self, so_parser_name): + """ + Set so parser name of current dataset + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_so_parser_name("./abc.so") + + Args: + pipe_command(str): pipe command + + """ + self.proto_desc.so_parser_name = so_parser_name + def set_rank_offset(self, rank_offset): """ Set rank_offset for merge_pv. It set the message of Pv. -- GitLab