diff --git a/paddle/fluid/framework/archive.h b/paddle/fluid/framework/archive.h index 100eb9518f71e76134e1baf4da9d1c569880a2db..73fcc7424e43500d5efc005bf7fb206cbde626b1 100644 --- a/paddle/fluid/framework/archive.h +++ b/paddle/fluid/framework/archive.h @@ -168,10 +168,10 @@ class ArchiveBase { #else if (newsize > Capacity()) { #endif - Reserve(std::max(Capacity() * 2, newsize)); + Reserve((std::max)(Capacity() * 2, newsize)); } finish_ = buffer_ + newsize; - cursor_ = std::min(cursor_, finish_); + cursor_ = (std::min)(cursor_, finish_); } void Reserve(size_t newcap) { @@ -207,7 +207,7 @@ class ArchiveBase { #else if (size > size_t(limit_ - finish_)) { #endif - Reserve(std::max(Capacity() * 2, Length() + size)); + Reserve((std::max)(Capacity() * 2, Length() + size)); } } @@ -311,6 +311,18 @@ class Archive : public ArchiveBase { *this >> x; return x; } + + template + void Printf(const char* fmt, ARGS&&... args) { + size_t temp = Limit() - Finish(); + int len = snprintf(Finish(), temp, fmt, args...); + CHECK(len >= 0); // NOLINT + if ((size_t)len >= temp) { + PrepareWrite(len + 1); + CHECK(snprintf(Finish(), (size_t)len + 1, fmt, args...) == len); + } + AdvanceFinish(len); + } }; template diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index 644f60dbebf61203c8d811aa8722e0f239018b5d..d186ef1274625827d8e7e0174c6ff8e9475d0dae 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -40,7 +40,7 @@ class ChannelObject { // capacity can be zero explicit ChannelObject(size_t capacity) { - capacity_ = std::min(MaxCapacity(), capacity); + capacity_ = (std::min)(MaxCapacity(), capacity); } void Clear() { @@ -192,7 +192,7 @@ class ChannelObject { std::condition_variable full_cond_; static constexpr size_t MaxCapacity() { - return std::numeric_limits::max() / 2; + return (std::numeric_limits::max)() / 2; } void Notify() { @@ -289,7 +289,7 @@ template using Channel = std::shared_ptr>; template -Channel MakeChannel(size_t capacity = std::numeric_limits::max()) { +Channel MakeChannel(size_t capacity = (std::numeric_limits::max)()) { return std::make_shared>(capacity); } @@ -370,7 +370,7 @@ class ChannelWriter { void Reset(ChannelObject* channel) { CHECK(buffer_.empty()) << "Forgot to flush"; - CHECK(channel != nullptr) << "Channel can not be nullptr"; + // CHECK(channel != nullptr) << "Channel can not be nullptr"; channel_ = channel; buffer_.clear(); failed_ = !channel; diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index f33b4668f01c139c6c8ba7f93c05f7fa3322633f..bfeb29778efd6811ebcd30ca099281b45d01005c 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -224,6 +224,7 @@ InMemoryDataFeed::InMemoryDataFeed() { this->thread_id_ = 0; this->thread_num_ = 1; this->parse_ins_id_ = false; + this->parse_content_ = false; this->input_channel_ = nullptr; this->output_channel_ = nullptr; this->consume_channel_ = nullptr; @@ -307,6 +308,11 @@ void InMemoryDataFeed::SetThreadNum(int thread_num) { thread_num_ = thread_num; } +template +void InMemoryDataFeed::SetParseContent(bool parse_content) { + parse_content_ = parse_content; +} + template void InMemoryDataFeed::SetParseInsId(bool parse_ins_id) { parse_ins_id_ = parse_ins_id; @@ -766,6 +772,18 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { pos += len + 1; VLOG(3) << "ins_id " << instance->ins_id_; } + if (parse_content_) { + int num = strtol(&str[pos], &endptr, 10); + CHECK(num == 1); // NOLINT + pos = endptr - str + 1; + size_t len = 0; + while (str[pos + len] != ' ') { + ++len; + } + instance->content_ = std::string(str + pos, len); + pos += len + 1; + VLOG(3) << "content " << instance->content_; + } for (size_t i = 0; i < use_slots_index_.size(); ++i) { int idx = use_slots_index_[i]; int num = strtol(&str[pos], &endptr, 10); @@ -890,8 +908,14 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec( std::vector> offset(use_slots_.size(), std::vector{0}); std::vector visit(use_slots_.size(), false); + ins_content_vec_.clear(); + ins_content_vec_.reserve(ins_vec.size()); + ins_id_vec_.clear(); + ins_id_vec_.reserve(ins_vec.size()); for (size_t i = 0; i < ins_vec.size(); ++i) { auto& r = ins_vec[i]; + ins_id_vec_.push_back(r.ins_id_); + ins_content_vec_.push_back(r.content_); for (auto& item : r.float_feasigns_) { batch_float_feasigns[item.slot()].push_back(item.sign().float_feasign_); visit[item.slot()] = true; diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 5b314905143350b9d547fe08116703ff92dd4203..9ea9be41999145f69a600598e42ee5cce2d64afa 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -105,10 +105,18 @@ class DataFeed { virtual void SetThreadNum(int thread_num) {} // This function will do nothing at default virtual void SetParseInsId(bool parse_ins_id) {} + virtual void SetParseContent(bool parse_content) {} virtual void SetFileListMutex(std::mutex* mutex) { mutex_for_pick_file_ = mutex; } virtual void SetFileListIndex(size_t* file_index) { file_idx_ = file_index; } + virtual const std::vector& GetInsIdVec() const { + return ins_id_vec_; + } + virtual const std::vector& GetInsContentVec() const { + return ins_content_vec_; + } + virtual int GetCurBatchSize() { return batch_size_; } virtual void LoadIntoMemory() { PADDLE_THROW("This function(LoadIntoMemory) is not implemented."); } @@ -164,6 +172,8 @@ class DataFeed { bool finish_set_filelist_; bool finish_start_; std::string pipe_command_; + std::vector ins_id_vec_; + std::vector ins_content_vec_; platform::Place place_; }; @@ -222,6 +232,7 @@ class InMemoryDataFeed : public DataFeed { virtual void SetThreadId(int thread_id); virtual void SetThreadNum(int thread_num); virtual void SetParseInsId(bool parse_ins_id); + virtual void SetParseContent(bool parse_content); virtual void LoadIntoMemory(); protected: @@ -232,6 +243,7 @@ class InMemoryDataFeed : public DataFeed { int thread_id_; int thread_num_; bool parse_ins_id_; + bool parse_content_; std::ifstream file_; std::shared_ptr fp_; paddle::framework::ChannelObject* input_channel_; @@ -426,6 +438,7 @@ struct Record { std::vector uint64_feasigns_; std::vector float_feasigns_; std::string ins_id_; + std::string content_; }; struct RecordCandidate { diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index a7e12cb817b0848e2b72e9457918af920df01964..e59c176344c202892c7264559ee9eeeccd292842 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -48,6 +48,8 @@ DatasetImpl::DatasetImpl() { erase_duplicate_feas_ = true; keep_unmerged_ins_ = true; min_merge_size_ = 2; + parse_ins_id_ = false; + parse_content_ = false; } // set filelist, file_idx_ will reset to zero. @@ -103,6 +105,16 @@ void DatasetImpl::SetChannelNum(int channel_num) { channel_num_ = channel_num; } +template +void DatasetImpl::SetParseInsId(bool parse_ins_id) { + parse_ins_id_ = parse_ins_id; +} + +template +void DatasetImpl::SetParseContent(bool parse_content) { + parse_content_ = parse_content; +} + template void DatasetImpl::SetMergeByInsId( const std::vector& merge_slot_list, bool erase_duplicate_feas, @@ -378,7 +390,8 @@ void DatasetImpl::CreateReaders() { readers_[i]->SetFileListMutex(&mutex_for_pick_file_); readers_[i]->SetFileListIndex(&file_idx_); readers_[i]->SetFileList(filelist_); - readers_[i]->SetParseInsId(merge_by_insid_); + readers_[i]->SetParseInsId(parse_ins_id_); + readers_[i]->SetParseContent(parse_content_); if (input_channel_ != nullptr) { readers_[i]->SetInputChannel(input_channel_.get()); } diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 7b725a6f2739c439e8edbeba498c1ea77c840af0..047dc0116fea4d0cf0ed24e81d1befdeb310964c 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -58,6 +58,9 @@ class Dataset { virtual void SetDataFeedDesc(const std::string& data_feed_desc_str) = 0; // set channel num virtual void SetChannelNum(int channel_num) = 0; + // set parse ins id + virtual void SetParseInsId(bool parse_ins_id) = 0; + virtual void SetParseContent(bool parse_content) = 0; // set merge by ins id virtual void SetMergeByInsId(const std::vector& merge_slot_list, bool erase_duplicate_feas, int min_merge_size, @@ -133,6 +136,8 @@ class DatasetImpl : public Dataset { const std::string& fs_ugi); virtual void SetDataFeedDesc(const std::string& data_feed_desc_str); virtual void SetChannelNum(int channel_num); + virtual void SetParseInsId(bool parse_ins_id); + virtual void SetParseContent(bool parse_content); virtual void SetMergeByInsId(const std::vector& merge_slot_list, bool erase_duplicate_feas, int min_merge_size, bool keep_unmerged_ins); @@ -193,6 +198,8 @@ class DatasetImpl : public Dataset { int64_t fleet_send_sleep_seconds_; std::vector preload_threads_; bool merge_by_insid_; + bool parse_ins_id_; + bool parse_content_; bool erase_duplicate_feas_; bool keep_unmerged_ins_; int min_merge_size_; diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index d0c40210a5510d1f7209e8b80bc665b43b9195af..c278980adc93290287ccb89fb874e486aba821cb 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -114,6 +114,8 @@ class DeviceWorker { virtual void BindingDataFeedMemory() = 0; virtual void SetRootScope(Scope* root_scope); virtual void SetDataFeed(DataFeed* data_feed); + virtual void SetNeedDump(bool need_dump_field) {} + virtual void SetChannelWriter(ChannelObject* queue) {} virtual void SetPlace(const paddle::platform::Place& place) { place_ = place; } @@ -172,6 +174,8 @@ class DownpourWorker : public HogwildWorker { virtual void Initialize(const TrainerDesc& desc); virtual void TrainFiles(); virtual void TrainFilesWithProfiler(); + virtual void SetNeedDump(bool need_dump_field); + virtual void SetChannelWriter(ChannelObject* queue); protected: std::shared_ptr fleet_ptr_; @@ -183,8 +187,11 @@ class DownpourWorker : public HogwildWorker { private: bool need_to_push_dense_; + bool need_dump_field_; bool dump_slot_; bool need_to_push_sparse_; + std::vector dump_fields_; + ChannelWriter writer_; DownpourWorkerParameter param_; float scale_datanorm_; // just save the value in param_ for easy access diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 8cd0789c0aeb429827e97804fb8afaed4214a75c..20be90e622346d66ebf30b323e35b8984fdcd31d 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -14,6 +14,7 @@ limitations under the License. */ #include #include +#include "io/fs.h" #include "paddle/fluid/framework/data_feed_factory.h" #include "paddle/fluid/framework/data_set.h" #include "paddle/fluid/framework/device_worker_factory.h" @@ -27,6 +28,19 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, thread_num_ = trainer_desc.thread_num(); SetDataset(dataset); + dump_fields_path_ = trainer_desc.dump_fields_path(); + dump_converter_ = trainer_desc.dump_converter(); + need_dump_field_ = false; + if (trainer_desc.dump_fields_size() != 0 && dump_fields_path_ != "") { + need_dump_field_ = true; + } + if (need_dump_field_) { + auto& file_list = dataset->GetFileList(); + if (file_list.size() == 0) { + need_dump_field_ = false; + } + } + mpi_rank_ = trainer_desc.mpi_rank() / 2; const std::vector readers = dataset->GetReaders(); @@ -39,6 +53,7 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, workers_[i]->SetDeviceIndex(i); workers_[i]->SetDataFeed(readers[i]); workers_[i]->Initialize(trainer_desc); + workers_[i]->SetNeedDump(need_dump_field_); } VLOG(3) << "going to initialize pull dense worker"; @@ -48,7 +63,51 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, SetDebug(trainer_desc.debug()); } +void DistMultiTrainer::DumpWork() { +#ifdef _LINUX + while (1) { + std::string out_str; + if (!queue_->Get(out_str)) { + break; + } + size_t write_count = + fwrite_unlocked(out_str.data(), 1, out_str.length(), fp_.get()); + if (write_count != out_str.length()) { + VLOG(3) << "dump text failed"; + continue; + } + write_count = fwrite_unlocked("\n", 1, 1, fp_.get()); + if (write_count != 1) { + VLOG(3) << "dump text failed"; + continue; + } + } +#endif +} + +void DistMultiTrainer::InitDumpEnv() { + queue_ = paddle::framework::MakeChannel(); + int err_no = 0; + std::string path = string::format_string( + "%s/part-%03d", dump_fields_path_.c_str(), mpi_rank_); + + fp_ = fs_open_write(path, &err_no, dump_converter_); + for (int i = 0; i < thread_num_; ++i) { + workers_[i]->SetChannelWriter(queue_.get()); + } + dump_thread_ = std::thread(&DistMultiTrainer::DumpWork, this); +} + +void DistMultiTrainer::FinalizeDumpEnv() { + queue_->Close(); + dump_thread_.join(); + queue_.reset(); +} + void DistMultiTrainer::InitOtherEnv(const ProgramDesc& main_program) { + if (need_dump_field_) { + InitDumpEnv(); + } pull_dense_worker_->SetRootScope(root_scope_); pull_dense_worker_->Start(); VLOG(3) << "init other env done."; @@ -70,6 +129,9 @@ void DistMultiTrainer::Finalize() { for (auto& th : threads_) { th.join(); } + if (need_dump_field_) { + FinalizeDumpEnv(); + } pull_dense_worker_->Stop(); root_scope_->DropKids(); } diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 2d9412712c06b87798cfdb19cf05ede5b0b23a57..113de8020379bd8e44a736a42a39267d4f0614c3 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/platform/cpu_helper.h" +#include "paddle/fluid/string/string_helper.h" #if defined _WIN32 || defined __APPLE__ #else @@ -71,9 +72,89 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { use_cvm_ = desc.use_cvm(); scale_datanorm_ = desc.scale_datanorm(); dump_slot_ = desc.dump_slot(); + dump_fields_.resize(desc.dump_fields_size()); + for (int i = 0; i < desc.dump_fields_size(); ++i) { + dump_fields_[i] = desc.dump_fields(i); + } adjust_ins_weight_config_ = desc.adjust_ins_weight_config(); } +void DownpourWorker::SetChannelWriter(ChannelObject* queue) { + writer_.Reset(queue); +} + +void DownpourWorker::SetNeedDump(bool need_dump_field) { + need_dump_field_ = need_dump_field; +} + +template +std::string PrintLodTensorType(LoDTensor* tensor, int64_t start, int64_t end) { + auto count = tensor->numel(); + if (start < 0 || end > count) { + VLOG(3) << "access violation"; + return "access violation"; + } + std::ostringstream os; + for (int64_t i = start; i < end; i++) { + os << ":" << tensor->data()[i]; + } + return os.str(); +} + +std::string PrintLodTensorIntType(LoDTensor* tensor, int64_t start, + int64_t end) { + auto count = tensor->numel(); + if (start < 0 || end > count) { + VLOG(3) << "access violation"; + return "access violation"; + } + std::ostringstream os; + for (int64_t i = start; i < end; i++) { + os << ":" << static_cast(tensor->data()[i]); + } + return os.str(); +} + +std::string PrintLodTensor(LoDTensor* tensor, int64_t start, int64_t end) { + std::string out_val; + if (tensor->type() == proto::VarType::FP32) { + out_val = PrintLodTensorType(tensor, start, end); + } else if (tensor->type() == proto::VarType::INT64) { + out_val = PrintLodTensorIntType(tensor, start, end); + } else if (tensor->type() == proto::VarType::FP64) { + out_val = PrintLodTensorType(tensor, start, end); + } else { + out_val = "unsupported type"; + } + return out_val; +} + +std::pair GetTensorBound(LoDTensor* tensor, int index) { + auto& dims = tensor->dims(); + if (tensor->lod().size() != 0) { + auto& lod = tensor->lod()[0]; + return {lod[index] * dims[1], lod[index + 1] * dims[1]}; + } else { + return {index * dims[1], (index + 1) * dims[1]}; + } +} + +bool CheckValidOutput(LoDTensor* tensor, int batch_size) { + auto& dims = tensor->dims(); + if (dims.size() != 2) return false; + if (tensor->lod().size() != 0) { + auto& lod = tensor->lod()[0]; + if (lod.size() != batch_size + 1) { + return false; + } + } else { + if (dims[0] != batch_size) { + return false; + } + } + return true; +} + void DownpourWorker::CollectLabelInfo(size_t table_idx) { uint64_t table_id = static_cast( param_.program_config(0).pull_sparse_table_id(table_idx)); @@ -646,11 +727,52 @@ void DownpourWorker::TrainFiles() { pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid); } } + if (need_dump_field_) { + int batch_size = device_reader_->GetCurBatchSize(); + std::vector ars(batch_size); + for (auto& ar : ars) { + ar.clear(); + } + auto& ins_id_vec = device_reader_->GetInsIdVec(); + auto& ins_content_vec = device_reader_->GetInsContentVec(); + for (size_t i = 0; i < ins_id_vec.size(); i++) { + ars[i] += ins_id_vec[i]; + ars[i] = ars[i] + "\t" + ins_content_vec[i]; + } + for (auto& field : dump_fields_) { + Variable* var = thread_scope_->FindVar(field); + if (var == nullptr) { + continue; + } + LoDTensor* tensor = var->GetMutable(); + if (!CheckValidOutput(tensor, batch_size)) { + continue; + } + for (int i = 0; i < batch_size; ++i) { + auto output_dim = tensor->dims()[1]; + std::string output_dimstr = + boost::lexical_cast(output_dim); + ars[i] = ars[i] + "\t" + field + ":" + output_dimstr; + auto bound = GetTensorBound(tensor, i); + ars[i] += PrintLodTensor(tensor, bound.first, bound.second); + } + } + // #pragma omp parallel for + for (size_t i = 0; i < ars.size(); i++) { + if (ars[i].length() == 0) { + continue; + } + writer_ << ars[i]; + } + } PrintFetchVars(); thread_scope_->DropKids(); ++batch_cnt; } + if (need_dump_field_) { + writer_.Flush(); + } } } // end namespace framework diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index 5fe296ff20df74947c206d28aa44f27a45042d81..a4913cb26a9b913c55e14185c11e6383ea9b4b63 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -86,9 +86,21 @@ class DistMultiTrainer : public MultiTrainer { virtual void InitOtherEnv(const ProgramDesc& main_program); virtual void Run(); virtual void Finalize(); + virtual void FinalizeDumpEnv(); + virtual void InitDumpEnv(); + virtual void DumpWork(); protected: std::shared_ptr pull_dense_worker_; + std::thread dump_thread_; + std::shared_ptr fp_; + std::shared_ptr> queue_; + + bool need_dump_field_; + std::string dump_fields_path_; + std::string dump_converter_; + std::vector dump_fields_; + int mpi_rank_; }; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index d84708918fb74477e544e97ef30c257cbd60944f..e859ced328b0a7843b155af3668c2a49503a49c7 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -35,6 +35,10 @@ message TrainerDesc { optional bool use_cvm = 8 [ default = false ]; optional bool dump_slot = 9 [ default = false ]; optional float scale_datanorm = 10 [ default = -1 ]; + optional int32 mpi_rank = 11 [ default = -1 ]; + optional string dump_fields_path = 12; + repeated string dump_fields = 13; + optional string dump_converter = 14; // device worker parameters optional HogwildWorkerParameter hogwild_param = 101; diff --git a/paddle/fluid/pybind/data_set_py.cc b/paddle/fluid/pybind/data_set_py.cc index 3e787822ecd642b94c80e016415c0286ea4d5926..9e114394dd9eb3039a46b10689c12a5fd92c6ab7 100644 --- a/paddle/fluid/pybind/data_set_py.cc +++ b/paddle/fluid/pybind/data_set_py.cc @@ -100,6 +100,10 @@ void BindDataset(py::module* m) { py::call_guard()) .def("set_queue_num", &framework::Dataset::SetChannelNum, py::call_guard()) + .def("set_parse_ins_id", &framework::Dataset::SetParseInsId, + py::call_guard()) + .def("set_parse_content", &framework::Dataset::SetParseContent, + py::call_guard()) .def("set_merge_by_lineid", &framework::Dataset::SetMergeByInsId, py::call_guard()) .def("merge_by_lineid", &framework::Dataset::MergeByInsId, diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index d4c8a32d6cf54768521d57e9416b1694054c3f25..499dcdf359ebc6cbfa3bde3669fc80f8d9a5dd61 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -282,6 +282,8 @@ class InMemoryDataset(DatasetBase): self.proto_desc.name = "MultiSlotInMemoryDataFeed" self.fleet_send_batch_size = None self.queue_num = None + self.parse_ins_id = False + self.parse_content = False self.merge_by_lineid = False def _prepare_to_run(self): @@ -297,6 +299,8 @@ class InMemoryDataset(DatasetBase): if self.queue_num is None: self.queue_num = self.thread_num self.dataset.set_queue_num(self.queue_num) + self.dataset.set_parse_ins_id(self.parse_ins_id) + self.dataset.set_parse_content(self.parse_content) self.dataset.set_data_feed_desc(self.desc()) self.dataset.create_channel() self.dataset.create_readers() @@ -318,6 +322,40 @@ class InMemoryDataset(DatasetBase): """ self.queue_num = queue_num + def set_parse_ins_id(self, parse_ins_id): + """ + Set id Dataset need to parse insid + + Args: + parse_ins_id(bool): if parse ins_id or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_parse_ins_id(True) + + """ + self.parse_ins_id = parse_ins_id + + def set_parse_content(self, parse_content): + """ + Set if Dataset need to parse content + + Args: + parse_content(bool): if parse content or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_parse_content(True) + + """ + self.parse_content = parse_content + def set_fleet_send_batch_size(self, fleet_send_batch_size): """ Set fleet send batch size, default is 80000 diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 4791e85ac886f8dd5cf8101d05d0e3b39fee85ee..1e84365adaebb0a18c5dd2ae83d2a024f217860a 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -347,6 +347,21 @@ class PSLib(Fleet): self._fleet_ptr.clear_model() self._role_maker._barrier_worker() + def clear_model(self): + """ + clear_model() will be called by user. It will clear sparse model. + + Examples: + .. code-block:: python + + fleet.clear_model() + + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.clear_model() + self._role_maker._barrier_worker() + def load_one_table(self, table_id, model_path, **kwargs): """ load pslib model for one table or load params from paddle model @@ -385,6 +400,7 @@ class PSLib(Fleet): fout.write(my_program.desc.serialize_to_string()) """ + self._role_maker._barrier_worker() mode = kwargs.get("mode", 0) scope = kwargs.get("scope", None) model_proto_file = kwargs.get("model_proto_file", None) @@ -558,7 +574,7 @@ class DownpourOptimizer(DistributedOptimizer): parameter_list, no_grad_set, self._strategy) - + opt_info["mpi_rank"] = fleet._role_maker._get_rank() fleet._set_opt_info(opt_info) programs = [loss.block.program for loss in losses] diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index b454ecf2ffc612f3a4460eefb01a669720d49e60..2d25f466d51046ad4ef6be7b80d1708a97662e4e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -248,6 +248,9 @@ class DistributedAdam(DistributedOptimizerImplBase): opt_info["use_cvm"] = strategy.get("use_cvm", False) opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1) opt_info["dump_slot"] = False + opt_info["dump_converter"] = "" + opt_info["dump_fields"] = strategy.get("dump_fields", []) + opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "") if server._server.downpour_server_param.downpour_table_param[ 0].accessor.accessor_class == "DownpourCtrAccessor": opt_info["dump_slot"] = True diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 5575f73883d7b58da02209ef1a6ae8a7e40a3c14..2505cb6b3d3a28d89c90f8043cc60ff2731be91f 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -52,6 +52,65 @@ class TestDataset(unittest.TestCase): except: self.assertTrue(True) + def test_config(self): + """ + Testcase for python config. + """ + dataset = fluid.InMemoryDataset() + dataset.set_parse_ins_id(True) + dataset.set_parse_content(True) + self.assertTrue(dataset.parse_ins_id) + self.assertTrue(dataset.parse_content) + + def test_run_with_dump(self): + """ + Testcase for InMemoryDataset from create to run. + """ + with open("test_run_with_dump_a.txt", "w") as f: + data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_run_with_dump_b.txt", "w") as f: + data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data( + name=slot, shape=[1], dtype="int64", lod_level=1) + slots_vars.append(var) + + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist( + ["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"]) + dataset.set_parse_ins_id(True) + dataset.set_parse_content(True) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + dataset.load_into_memory() + dataset.set_fea_eval(10000, True) + dataset.local_shuffle() + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + for i in range(2): + try: + exe.train_from_dataset(fluid.default_main_program(), dataset) + except ImportError as e: + pass + except Exception as e: + self.assertTrue(False) + + os.remove("./test_run_with_dump_a.txt") + os.remove("./test_run_with_dump_b.txt") + def test_dataset_config(self): """ Testcase for dataset configuration. """ dataset = fluid.core.Dataset("MultiSlotDataset") diff --git a/python/paddle/fluid/tests/unittests/test_trainer_desc.py b/python/paddle/fluid/tests/unittests/test_trainer_desc.py new file mode 100644 index 0000000000000000000000000000000000000000..f2724ea22b006c786576a3a3a2d02e99a43722b7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_trainer_desc.py @@ -0,0 +1,50 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +TestCases for TrainerDesc, +including config, etc. +""" + +from __future__ import print_function +import paddle.fluid as fluid +import numpy as np +import os +import shutil +import unittest + + +class TestTrainerDesc(unittest.TestCase): + """ TestCases for TrainerDesc. """ + + def test_config(self): + """ + Testcase for python config. + """ + trainer_desc = fluid.trainer_desc.TrainerDesc() + trainer_desc._set_dump_fields(["a", "b"]) + trainer_desc._set_mpi_rank(1) + trainer_desc._set_dump_fields_path("path") + + dump_fields = trainer_desc.proto_desc.dump_fields + mpi_rank = trainer_desc.proto_desc.mpi_rank + dump_fields_path = trainer_desc.proto_desc.dump_fields_path + self.assertEqual(len(dump_fields), 2) + self.assertEqual(dump_fields[0], "a") + self.assertEqual(dump_fields[1], "b") + self.assertEqual(mpi_rank, 1) + self.assertEqual(dump_fields_path, "path") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index 97bd52d5f15c9591eb3337fe3fd0eeb3233385d4..21522da46d4b588b41764b37653c29782edb17e1 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -81,6 +81,19 @@ class TrainerDesc(object): def _set_dump_slot(self, dump_slot): self.proto_desc.dump_slot = dump_slot + def _set_mpi_rank(self, mpi_rank): + self.proto_desc.mpi_rank = mpi_rank + + def _set_dump_fields(self, dump_fields): + for field in dump_fields: + self.proto_desc.dump_fields.append(field) + + def _set_dump_fields_path(self, path): + self.proto_desc.dump_fields_path = path + + def _set_dump_converter(self, converter): + self.proto_desc.dump_converter = converter + def _set_adjust_ins_weight(self, config_dict): self.proto_desc.adjust_ins_weight_config.need_adjust = \ config_dict.get("need_adjust", False) diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 519672f7c8c24568c38385c58753ee73ba71a89e..5f312ea075ba7b3f30441645a46bb43b5d882bd5 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -41,6 +41,10 @@ class TrainerFactory(object): trainer._set_use_cvm(opt_info["use_cvm"]) trainer._set_scale_datanorm(opt_info["scale_datanorm"]) trainer._set_dump_slot(opt_info["dump_slot"]) + trainer._set_mpi_rank(opt_info["mpi_rank"]) + trainer._set_dump_fields(opt_info["dump_fields"]) + trainer._set_dump_fields_path(opt_info["dump_fields_path"]) + trainer._set_dump_converter(opt_info["dump_converter"]) trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"]) trainer._set_device_worker(device_worker) return trainer