diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index 5b2d96e941ba2fbdbd63ccf13032624b0b5048d1..50b474ea2fe9713166b75c6906552ca2551e88d7 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -20,7 +20,10 @@ limitations under the License. */ #include #include // NOLINT #include -#include // NOLINT +#include // NOLINT +#include // NOLINT +#include // NOLINT +#include // NOLINT #include #include "paddle/fluid/framework/data_feed.h" @@ -194,6 +197,10 @@ class DownpourWorker : public HogwildWorker { void PushGradients(); void CollectLabelInfo(size_t table_id); void AdjustInsWeight(); + void DumpParam(); + void CopySparseTable(); + void CopyDenseTable(); + void CopyDenseVars(); private: bool need_to_push_dense_; @@ -211,6 +218,8 @@ class DownpourWorker : public HogwildWorker { std::map> sparse_grad_names_; std::map> dense_value_names_; std::map> dense_grad_names_; + // actually pushed feasign of each table + std::map> sparse_push_keys_; // feasign std::map> features_; @@ -232,6 +241,12 @@ class DownpourWorker : public HogwildWorker { std::vector nid_show_; // check nan and inf during training std::vector check_nan_var_names_; + // copy table + CopyTableConfig copy_table_config_; + std::map table_dependency_; + std::vector> copy_sparse_tables_; + std::vector> copy_dense_tables_; + std::unordered_map> feasign_set_; }; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 784f6abb4490f38213f72756eaf39b2cf70b4b35..13209237684571ec89b8a81867aeadbe86cb1744 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -44,6 +44,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { sparse_grad_names_[table_id][j] = table.sparse_grad_name(j); } label_var_name_[table_id] = table.label_var_name(); + sparse_push_keys_[table_id] = std::vector(); } for (int i = 0; i < param_.dense_table_size(); ++i) { @@ -84,6 +85,29 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { for (int i = 0; i < desc.check_nan_var_names_size(); ++i) { check_nan_var_names_.push_back(desc.check_nan_var_names(i)); } + copy_table_config_ = desc.copy_table_config(); + for (int i = 0; i < copy_table_config_.src_sparse_tables_size(); ++i) { + uint64_t src_table = copy_table_config_.src_sparse_tables(i); + uint64_t dest_table = copy_table_config_.dest_sparse_tables(i); + VLOG(3) << "copy_sparse_tables_ push back " << src_table << "->" + << dest_table; + copy_sparse_tables_.push_back(std::make_pair(src_table, dest_table)); + } + for (int i = 0; i < copy_table_config_.src_dense_tables_size(); ++i) { + uint64_t src_table = copy_table_config_.src_dense_tables(i); + uint64_t dest_table = copy_table_config_.dest_dense_tables(i); + VLOG(3) << "copy_dense_tables_ push back " << src_table << "->" + << dest_table; + copy_dense_tables_.push_back(std::make_pair(src_table, dest_table)); + } + for (auto& m : copy_table_config_.table_denpendency_map()) { + if (sparse_key_names_.find(m.key()) != sparse_key_names_.end()) { + // currently only support one dependency + for (auto& value : m.values()) { + table_dependency_[m.key()] = value; + } + } + } } void DownpourWorker::SetChannelWriter(ChannelObject* queue) { @@ -191,6 +215,14 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) { LoDTensor* tensor = fea_var->GetMutable(); CHECK(tensor != nullptr) << "tensor of var " << sparse_key_names_[table_id][i] << " is null"; + + // skip slots which do not have embedding + Variable* emb_var = + thread_scope_->FindVar(sparse_value_names_[table_id][i]); + if (emb_var == nullptr) { + continue; + } + int64_t* ids = tensor->data(); size_t fea_idx = 0; // tensor->lod()[0].size() == batch_size + 1 @@ -237,6 +269,9 @@ void DownpourWorker::FillSparseValue(size_t table_idx) { int64_t* ids = tensor->data(); int len = tensor->numel(); Variable* var_emb = thread_scope_->FindVar(emb_slot_name); + if (var_emb == nullptr) { + continue; + } LoDTensor* tensor_emb = var_emb->GetMutable(); float* ptr = tensor_emb->mutable_data({len, table.emb_dim()}, platform::CPUPlace()); @@ -368,6 +403,102 @@ void DownpourWorker::AdjustInsWeight() { #endif } +void DownpourWorker::CopySparseTable() { + for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { + int64_t src_table = copy_sparse_tables_[i].first; + int64_t dest_table = copy_sparse_tables_[i].second; + int32_t feanum = 0; + if (src_table == dest_table) { + continue; + } else if (!copy_table_config_.sparse_copy_by_feasign()) { + if (feasign_set_.find(src_table) == feasign_set_.end()) { + continue; + } else if (feasign_set_[src_table].size() == 0) { + continue; + } + feanum = fleet_ptr_->CopyTable(src_table, dest_table); + } else { + std::vector fea_vec(feasign_set_[src_table].begin(), + feasign_set_[src_table].end()); + feanum = fleet_ptr_->CopyTableByFeasign(src_table, dest_table, fea_vec); + fea_vec.clear(); + std::vector().swap(fea_vec); + } + VLOG(3) << "copy feasign from table " << src_table << " to table " + << dest_table << ", feasign num=" << feanum; + feasign_set_[src_table].clear(); + std::unordered_set().swap(feasign_set_[src_table]); + } + feasign_set_.clear(); +} + +void DownpourWorker::CopyDenseTable() { + if (thread_id_ != 0) { + return; + } + thread_local std::vector> pull_dense_status; + for (size_t i = 0; i < copy_dense_tables_.size(); ++i) { + uint64_t src_table = copy_dense_tables_[i].first; + uint64_t dest_table = copy_dense_tables_[i].second; + if (src_table == dest_table) { + continue; + } + int32_t dim = fleet_ptr_->CopyTable(src_table, dest_table); + VLOG(3) << "copy param from table " << src_table << " to table " + << dest_table << ", dim=" << dim; + if (copy_table_config_.dense_pull_after_copy()) { + VLOG(3) << "dense pull after copy, table=" << dest_table; + pull_dense_status.resize(0); + fleet_ptr_->PullDenseVarsAsync(*root_scope_, dest_table, + dense_value_names_[dest_table], + &pull_dense_status); + for (auto& t : pull_dense_status) { + t.wait(); + auto status = t.get(); + if (status != 0) { + LOG(WARNING) << "pull dense after copy table failed," + << " table=" << dest_table; + } + } + } + } +} + +void DownpourWorker::CopyDenseVars() { + if (thread_id_ != 0) { + return; + } + for (int i = 0; i < copy_table_config_.src_var_list_size(); ++i) { + auto& src_var_name = copy_table_config_.src_var_list(i); + auto& dest_var_name = copy_table_config_.dest_var_list(i); + if (src_var_name == dest_var_name) { + continue; + } + VLOG(3) << "copy dense var from " << src_var_name << " to " + << dest_var_name; + Variable* src_var = thread_scope_->FindVar(src_var_name); + CHECK(src_var != nullptr) << src_var_name << " not found"; // NOLINT + LoDTensor* src_tensor = src_var->GetMutable(); + CHECK(src_tensor != nullptr) << src_var_name + << " tensor is null"; // NOLINT + float* src_data = src_tensor->data(); + + Variable* dest_var = thread_scope_->FindVar(dest_var_name); + CHECK(dest_var != nullptr) << dest_var_name << " not found"; // NOLINT + LoDTensor* dest_tensor = dest_var->GetMutable(); + CHECK(dest_tensor != nullptr) << dest_var_name + << " tensor is null"; // NOLINT + float* dest_data = dest_tensor->data(); + + CHECK(src_tensor->numel() == dest_tensor->numel()) + << "tensor numel not equal," << src_tensor->numel() << " vs " + << dest_tensor->numel(); + for (int i = 0; i < src_tensor->numel(); i++) { + dest_data[i] = src_data[i]; + } + } +} + void DownpourWorker::TrainFilesWithProfiler() { VLOG(3) << "Begin to train files with profiler"; platform::SetNumThreads(1); @@ -401,6 +532,7 @@ void DownpourWorker::TrainFilesWithProfiler() { double fill_sparse_time = 0.0; double push_sparse_time = 0.0; double push_dense_time = 0.0; + double copy_table_time = 0.0; int cur_batch; int batch_cnt = 0; uint64_t total_inst = 0; @@ -409,6 +541,27 @@ void DownpourWorker::TrainFilesWithProfiler() { timeline.Pause(); read_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); + + timeline.Start(); + if (copy_table_config_.need_copy()) { + VLOG(3) << "copy_sparse_tables_.size " << copy_sparse_tables_.size(); + if (copy_table_config_.sparse_copy_by_feasign()) { + for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { + uint64_t tid = copy_sparse_tables_[i].first; + feasign_set_[tid].insert(sparse_push_keys_[tid].begin(), + sparse_push_keys_[tid].end()); + } + } + if (batch_cnt % copy_table_config_.batch_num() == 0) { + CopySparseTable(); + CopyDenseTable(); + CopyDenseVars(); + } + } + timeline.Pause(); + copy_table_time += timeline.ElapsedSec(); + total_time += timeline.ElapsedSec(); + VLOG(3) << "program config size: " << param_.program_config_size(); for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); ++i) { @@ -422,9 +575,9 @@ void DownpourWorker::TrainFilesWithProfiler() { } } timeline.Start(); - fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid, - sparse_key_names_[tid], &features_[tid], - &feature_values_[tid], table.fea_dim()); + fleet_ptr_->PullSparseVarsSync( + *thread_scope_, tid, sparse_key_names_[tid], &features_[tid], + &feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]); timeline.Pause(); pull_sparse_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); @@ -504,7 +657,7 @@ void DownpourWorker::TrainFilesWithProfiler() { *thread_scope_, tid, features_[tid], feature_labels_[tid], sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, - dump_slot_); + dump_slot_, &sparse_push_keys_[tid]); timeline.Pause(); push_sparse_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); @@ -605,6 +758,7 @@ void DownpourWorker::TrainFilesWithProfiler() { collect_label_time / batch_cnt); fprintf(stderr, "adjust ins weight time: %fs\n", adjust_ins_weight_time / batch_cnt); + fprintf(stderr, "copy table time: %fs\n", copy_table_time / batch_cnt); fprintf(stderr, "mean read time: %fs\n", read_time / batch_cnt); fprintf(stderr, "IO percent: %f\n", read_time / total_time * 100); fprintf(stderr, "op run percent: %f\n", op_sum_time / total_time * 100); @@ -612,6 +766,8 @@ void DownpourWorker::TrainFilesWithProfiler() { pull_sparse_time / total_time * 100); fprintf(stderr, "adjust ins weight time percent: %f\n", adjust_ins_weight_time / total_time * 100); + fprintf(stderr, "copy table time percent: %f\n", + copy_table_time / total_time * 100); fprintf(stderr, "collect label time percent: %f\n", collect_label_time / total_time * 100); fprintf(stderr, "fill sparse time percent: %f\n", @@ -625,6 +781,11 @@ void DownpourWorker::TrainFilesWithProfiler() { } timeline.Start(); } + if (copy_table_config_.need_copy()) { + CopySparseTable(); + CopyDenseTable(); + CopyDenseVars(); + } } void DownpourWorker::TrainFiles() { @@ -634,6 +795,20 @@ void DownpourWorker::TrainFiles() { int batch_cnt = 0; int cur_batch; while ((cur_batch = device_reader_->Next()) > 0) { + if (copy_table_config_.need_copy()) { + if (copy_table_config_.sparse_copy_by_feasign()) { + for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) { + uint64_t tid = copy_sparse_tables_[i].first; + feasign_set_[tid].insert(sparse_push_keys_[tid].begin(), + sparse_push_keys_[tid].end()); + } + } + if (batch_cnt % copy_table_config_.batch_num() == 0) { + CopySparseTable(); + CopyDenseTable(); + CopyDenseVars(); + } + } // pull sparse here for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); ++i) { @@ -646,9 +821,9 @@ void DownpourWorker::TrainFiles() { break; } } - fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid, - sparse_key_names_[tid], &features_[tid], - &feature_values_[tid], table.fea_dim()); + fleet_ptr_->PullSparseVarsSync( + *thread_scope_, tid, sparse_key_names_[tid], &features_[tid], + &feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]); CollectLabelInfo(i); FillSparseValue(i); auto nid_iter = std::find(sparse_value_names_[tid].begin(), @@ -707,7 +882,7 @@ void DownpourWorker::TrainFiles() { *thread_scope_, tid, features_[tid], feature_labels_[tid], sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, - dump_slot_); + dump_slot_, &sparse_push_keys_[tid]); } } @@ -811,6 +986,11 @@ void DownpourWorker::TrainFiles() { if (need_dump_field_) { writer_.Flush(); } + if (copy_table_config_.need_copy()) { + CopySparseTable(); + CopyDenseTable(); + CopyDenseVars(); + } } } // end namespace framework diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 22a9b79d7fcba6894073052c3c211358dece96ec..bef9c85e6d2ba8e815b2bcbbf8f74404f3ba0571 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -40,28 +40,6 @@ const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100; std::shared_ptr FleetWrapper::s_instance_ = NULL; bool FleetWrapper::is_initialized_ = false; -#ifdef PADDLE_WITH_PSLIB -template -paddle::ps::Archive& operator<<(paddle::ps::Archive& ar, - const MultiSlotType& ins) { - ar << ins.GetType(); - ar << ins.GetOffset(); - ar << ins.GetFloatData(); - ar << ins.GetUint64Data(); - return ar; -} - -template -paddle::ps::Archive& operator>>(paddle::ps::Archive& ar, - MultiSlotType& ins) { - ar >> ins.MutableType(); - ar >> ins.MutableOffset(); - ar >> ins.MutableFloatData(); - ar >> ins.MutableUint64Data(); - return ar; -} -#endif - #ifdef PADDLE_WITH_PSLIB std::shared_ptr FleetWrapper::pslib_ptr_ = NULL; #endif @@ -159,14 +137,16 @@ void FleetWrapper::CreateClient2ClientConnection() { void FleetWrapper::PullSparseVarsSync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector* fea_keys, - std::vector>* fea_values, int fea_value_dim) { + std::vector>* fea_values, int fea_value_dim, + const std::vector& var_emb_names) { #ifdef PADDLE_WITH_PSLIB std::vector<::std::future> pull_sparse_status; pull_sparse_status.resize(0); fea_keys->clear(); fea_keys->resize(0); fea_keys->reserve(MAX_FEASIGN_NUM); - for (auto name : var_names) { + for (size_t var_index = 0; var_index < var_names.size(); ++var_index) { + const std::string& name = var_names[var_index]; Variable* var = scope.FindVar(name); if (var == nullptr) { continue; @@ -175,6 +155,14 @@ void FleetWrapper::PullSparseVarsSync( CHECK(tensor != nullptr) << "tensor of var " << name << " is null"; int64_t* ids = tensor->data(); int len = tensor->numel(); + + // skip slots which do not have embedding + const std::string& emb_name = var_emb_names[var_index]; + Variable* emb_var = scope.FindVar(emb_name); + if (emb_var == nullptr) { + continue; + } + for (auto i = 0u; i < len; ++i) { if (ids[i] == 0u) { continue; @@ -314,7 +302,8 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( const std::vector& sparse_grad_names, const int emb_dim, std::vector>* push_values, std::vector<::std::future>* push_sparse_status, - const int batch_size, const bool use_cvm, const bool dump_slot) { + const int batch_size, const bool use_cvm, const bool dump_slot, + std::vector* sparse_push_keys) { #ifdef PADDLE_WITH_PSLIB int offset = 2; int slot_offset = 0; @@ -332,12 +321,15 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( } CHECK_GE(grad_dim, 0); + sparse_push_keys->clear(); + sparse_push_keys->reserve(fea_keys.size() + 1); push_values->resize(fea_keys.size() + 1); for (auto& t : *push_values) { t.resize(emb_dim + offset + slot_offset); } uint64_t fea_idx = 0u; - for (size_t i = 0; i < sparse_key_names.size(); ++i) { + for (size_t i = 0; + i < sparse_key_names.size() && i < sparse_grad_names.size(); ++i) { Variable* var = scope.FindVar(sparse_key_names[i]); if (var == nullptr) { continue; @@ -376,6 +368,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( g += emb_dim; continue; } + sparse_push_keys->push_back(ids[id_idx]); CHECK(fea_idx < (*push_values).size()); CHECK(fea_idx < fea_labels.size()); @@ -396,17 +389,43 @@ void FleetWrapper::PushSparseVarsWithLabelAsync( fea_idx++; } } - CHECK(fea_idx == fea_keys.size()) << "fea_idx: " << fea_idx - << "features size: " << fea_keys.size(); + // slots whose embedding has been stop gradient or + // not involved in forward-backward + uint64_t no_grad_fea_num = 0u; + for (size_t i = sparse_grad_names.size(); i < sparse_key_names.size(); ++i) { + Variable* var = scope.FindVar(sparse_key_names[i]); + if (var == nullptr) { + continue; + } + LoDTensor* tensor = var->GetMutable(); + if (tensor == nullptr) { + LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null"; + exit(-1); + } + int len = tensor->numel(); + int64_t* ids = tensor->data(); + for (auto id_idx = 0u; id_idx < len; ++id_idx) { + if (ids[id_idx] == 0) { + continue; + } + ++no_grad_fea_num; + } + } + CHECK(fea_idx + no_grad_fea_num == fea_keys.size()) + << "fea_idx: " << fea_idx << " no_grad_fea_num: " << no_grad_fea_num + << " features size: " << fea_keys.size(); + CHECK(fea_idx == sparse_push_keys->size()); + if (fea_idx == 0) { + return; + } std::vector push_g_vec; - for (auto i = 0u; i < fea_keys.size(); ++i) { + for (auto i = 0u; i < sparse_push_keys->size(); ++i) { push_g_vec.push_back((*push_values)[i].data()); } auto status = pslib_ptr_->_worker_ptr->push_sparse( - table_id, fea_keys.data(), (const float**)push_g_vec.data(), - fea_keys.size()); + table_id, sparse_push_keys->data(), (const float**)push_g_vec.data(), + sparse_push_keys->size()); push_sparse_status->push_back(std::move(status)); - #endif } @@ -530,12 +549,12 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) { #endif } -double FleetWrapper::GetCacheThreshold() { +double FleetWrapper::GetCacheThreshold(int table_id) { #ifdef PADDLE_WITH_PSLIB double cache_threshold = 0.0; auto ret = pslib_ptr_->_worker_ptr->flush(); ret.wait(); - ret = pslib_ptr_->_worker_ptr->get_cache_threshold(0, cache_threshold); + ret = pslib_ptr_->_worker_ptr->get_cache_threshold(table_id, cache_threshold); ret.wait(); if (cache_threshold < 0) { LOG(ERROR) << "get cache threshold failed"; @@ -569,7 +588,8 @@ void FleetWrapper::CacheShuffle(int table_id, const std::string& path, int32_t FleetWrapper::SaveCache(int table_id, const std::string& path, const int mode) { #ifdef PADDLE_WITH_PSLIB - auto ret = pslib_ptr_->_worker_ptr->save_cache(0, path, std::to_string(mode)); + auto ret = + pslib_ptr_->_worker_ptr->save_cache(table_id, path, std::to_string(mode)); ret.wait(); int32_t feasign_cnt = ret.get(); if (feasign_cnt == -1) { @@ -688,40 +708,6 @@ std::future FleetWrapper::SendClientToClientMsg( return std::future(); } -template -void FleetWrapper::Serialize(const std::vector& t, std::string* str) { -#ifdef PADDLE_WITH_PSLIB - paddle::ps::BinaryArchive ar; - for (size_t i = 0; i < t.size(); ++i) { - ar << *(t[i]); - } - *str = std::string(ar.buffer(), ar.length()); -#else - VLOG(0) << "FleetWrapper::Serialize does nothing when no pslib"; -#endif -} - -template -void FleetWrapper::Deserialize(std::vector* t, const std::string& str) { -#ifdef PADDLE_WITH_PSLIB - if (str.length() == 0) { - return; - } - paddle::ps::BinaryArchive ar; - ar.set_read_buffer(const_cast(str.c_str()), str.length(), nullptr); - if (ar.cursor() == ar.finish()) { - return; - } - while (ar.cursor() < ar.finish()) { - t->push_back(ar.get()); - } - CHECK(ar.cursor() == ar.finish()); - VLOG(3) << "Deserialize size " << t->size(); -#else - VLOG(0) << "FleetWrapper::Deserialize does nothing when no pslib"; -#endif -} - std::default_random_engine& FleetWrapper::LocalRandomEngine() { struct engine_wrapper_t { std::default_random_engine engine; @@ -740,10 +726,43 @@ std::default_random_engine& FleetWrapper::LocalRandomEngine() { return r.engine; } -template void FleetWrapper::Serialize>( - const std::vector*>&, std::string*); -template void FleetWrapper::Deserialize>( - std::vector>*, const std::string&); +int32_t FleetWrapper::CopyTable(const uint64_t src_table_id, + const uint64_t dest_table_id) { +#ifdef PADDLE_WITH_PSLIB + auto ret = pslib_ptr_->_worker_ptr->copy_table(src_table_id, dest_table_id); + ret.wait(); + int32_t feasign_cnt = ret.get(); + if (feasign_cnt == -1) { + LOG(ERROR) << "copy table failed"; + sleep(sleep_seconds_before_fail_exit_); + exit(-1); + } + return feasign_cnt; +#else + VLOG(0) << "FleetWrapper::CopyTable does nothing when no pslib"; + return 0; +#endif +} + +int32_t FleetWrapper::CopyTableByFeasign( + const uint64_t src_table_id, const uint64_t dest_table_id, + const std::vector& feasign_list) { +#ifdef PADDLE_WITH_PSLIB + auto ret = pslib_ptr_->_worker_ptr->copy_table_by_feasign( + src_table_id, dest_table_id, feasign_list.data(), feasign_list.size()); + ret.wait(); + int32_t feasign_cnt = ret.get(); + if (feasign_cnt == -1) { + LOG(ERROR) << "copy table by feasign failed"; + sleep(sleep_seconds_before_fail_exit_); + exit(-1); + } + return feasign_cnt; +#else + VLOG(0) << "FleetWrapper::CopyTableByFeasign does nothing when no pslib"; + return 0; +#endif +} } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 4aa626340d4af44f95f6d15850f47d9d55fffb79..73247748b0b99803e3427d7dee8fc8644bc0850d 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -67,31 +67,38 @@ class FleetWrapper { client2client_max_retry_ = 3; } + // set client to client communication config void SetClient2ClientConfig(int request_timeout_ms, int connect_timeout_ms, int max_retry); - // Pull sparse variables from server in Sync mode - // Param: scope, table_id, var_names, fea_keys + // Pull sparse variables from server in sync mode + // Param: scope, table_id, var_names, fea_keys, fea_dim // Param: fea_values void PullSparseVarsSync(const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector* fea_keys, std::vector>* fea_values, - int fea_dim); + int fea_dim, + const std::vector& var_emb_names); + // pull dense variables from server in sync mod void PullDenseVarsSync(const Scope& scope, const uint64_t table_id, const std::vector& var_names); + // pull dense variables from server in async mod + // Param: scope, table_id, var_names + // Param: pull_dense_status void PullDenseVarsAsync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector<::std::future>* pull_dense_status); + // push dense parameters(not gradients) to server in sync mode void PushDenseParamSync(const Scope& scope, const uint64_t table_id, const std::vector& var_names); // Push dense variables to server in async mode - // Param: scope, table_id, var_names, + // Param: scope, table_id, var_names, scale_datanorm, batch_size // Param: push_sparse_status void PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, @@ -99,13 +106,14 @@ class FleetWrapper { std::vector<::std::future>* push_sparse_status, float scale_datanorm, int batch_size); + // push dense variables to server in sync mode void PushDenseVarsSync(Scope* scope, const uint64_t table_id, const std::vector& var_names); - // Push sparse variables with labels to server in Async mode + // Push sparse variables with labels to server in async mode // This is specially designed for click/show stats in server - // Param: scope, table_id, var_grad_names, - // fea_keys, fea_labels, sparse_grad_names + // Param: scope, table_id, fea_keys, fea_labels, sparse_key_names, + // sparse_grad_names, batch_size, use_cvm, dump_slot // Param: push_values, push_sparse_status void PushSparseVarsWithLabelAsync( const Scope& scope, const uint64_t table_id, @@ -115,7 +123,8 @@ class FleetWrapper { const std::vector& sparse_grad_names, const int emb_dim, std::vector>* push_values, std::vector<::std::future>* push_sparse_status, - const int batch_size, const bool use_cvm, const bool dump_slot); + const int batch_size, const bool use_cvm, const bool dump_slot, + std::vector* sparse_push_keys); // Push sparse variables to server in Async mode // Param: scope, table_id, fea_keys, sparse_grad_names @@ -130,12 +139,17 @@ class FleetWrapper { std::vector<::std::future>* push_sparse_status); */ + // init server void InitServer(const std::string& dist_desc, int index); + // init trainer void InitWorker(const std::string& dist_desc, const std::vector& host_sign_list, int node_num, int index); + // stop server void StopServer(); + // run server uint64_t RunServer(); + // gather server ip void GatherServers(const std::vector& host_sign_list, int node_num); // gather client ip void GatherClients(const std::vector& host_sign_list); @@ -143,7 +157,6 @@ class FleetWrapper { std::vector GetClientsInfo(); // create client to client connection void CreateClient2ClientConnection(); - // flush all push requests void ClientFlush(); // load from paddle model @@ -162,37 +175,42 @@ class FleetWrapper { // mode = 0, save all feature // mode = 1, save delta feature, which means save diff void SaveModel(const std::string& path, const int mode); - - double GetCacheThreshold(); + // get save cache threshold + double GetCacheThreshold(int table_id); + // shuffle cache model between servers void CacheShuffle(int table_id, const std::string& path, const int mode, const double cache_threshold); + // save cache model + // cache model can speed up online predict int32_t SaveCache(int table_id, const std::string& path, const int mode); - + // copy feasign key/value from src_table_id to dest_table_id + int32_t CopyTable(const uint64_t src_table_id, const uint64_t dest_table_id); + // copy feasign key/value from src_table_id to dest_table_id + int32_t CopyTableByFeasign(const uint64_t src_table_id, + const uint64_t dest_table_id, + const std::vector& feasign_list); + // clear all models, release their memory void ClearModel(); - + // shrink sparse table void ShrinkSparseTable(int table_id); + // shrink dense table void ShrinkDenseTable(int table_id, Scope* scope, std::vector var_list, float decay, int emb_dim); - // register client to client communication typedef std::function MsgHandlerFunc; + // register client to client communication int RegisterClientToClientMsgHandler(int msg_type, MsgHandlerFunc handler); // send client to client message std::future SendClientToClientMsg(int msg_type, int to_client_id, const std::string& msg); - - template - void Serialize(const std::vector& t, std::string* str); - template - void Deserialize(std::vector* t, const std::string& str); + // FleetWrapper singleton static std::shared_ptr GetInstance() { if (NULL == s_instance_) { s_instance_.reset(new paddle::framework::FleetWrapper()); } return s_instance_; } - // this performs better than rand_r, especially large data std::default_random_engine& LocalRandomEngine(); diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 11261e9e1774825761116d9fcc951b2b56d0cdc4..5212c09b65c15e33e3e022fdbe8584686767cdba 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -39,10 +39,13 @@ message TrainerDesc { optional string dump_fields_path = 12; repeated string dump_fields = 13; optional string dump_converter = 14; - + repeated string dump_param = 15; optional int32 mpi_size = 16 [ default = -1 ]; optional int32 dump_file_num = 17 [ default = 16 ]; repeated string check_nan_var_names = 18; + optional CopyTableConfig copy_table_config = 19; + // adjust ins weight + optional AdjustInsWeightConfig adjust_ins_weight_config = 20; // device worker parameters optional HogwildWorkerParameter hogwild_param = 101; @@ -51,8 +54,6 @@ message TrainerDesc { optional SectionWorkerParameter section_param = 104; // datafeed desc optional DataFeedDesc data_desc = 201; - // adjust ins weight - optional AdjustInsWeightConfig adjust_ins_weight_config = 301; } message HogwildWorkerParameter { repeated string skip_ops = 1; } @@ -107,6 +108,29 @@ message AdjustInsWeightConfig { optional string ins_weight_slot = 5 [ default = "" ]; } +message TableDependencyMap { + required int32 key = 1; + repeated int32 values = 2; +} + +message CopyTableConfig { + optional bool need_copy = 1 [ default = false ]; + optional int32 batch_num = 2 [ default = 100 ]; + repeated int32 src_sparse_tables = 3; + repeated int32 dest_sparse_tables = 4; + repeated int32 src_dense_tables = 5; + repeated int32 dest_dense_tables = 6; + repeated string src_var_list = 7; + repeated string dest_var_list = 8; + // when dest dense table has no grad, should pull explicitly + optional bool dense_pull_after_copy = 9 [ default = false ]; + // copy feasigns or copy the whole table + optional bool sparse_copy_by_feasign = 10 [ default = true ]; + // table dependency for pull/push + optional bool enable_dependency = 11 [ default = false ]; + repeated TableDependencyMap table_denpendency_map = 12; +} + message ProgramConfig { required string program_id = 1; repeated int32 push_sparse_table_id = 2; diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index e7c7750c27de617ba8f339302ffa0fde95a794af..31268f5e1826a6be63a23cbe29e8a960b1ac5705 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -67,7 +67,10 @@ void BindFleetWrapper(py::module* m) { &framework::FleetWrapper::LoadFromPaddleModel) .def("load_model_one_table", &framework::FleetWrapper::LoadModelOneTable) .def("set_client2client_config", - &framework::FleetWrapper::SetClient2ClientConfig); + &framework::FleetWrapper::SetClient2ClientConfig) + .def("copy_table", &framework::FleetWrapper::CopyTable) + .def("copy_table_by_feasign", + &framework::FleetWrapper::CopyTableByFeasign); } // end FleetWrapper } // end namespace pybind } // end namespace paddle diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index c6ca201d56745c09d464227312b5b2f4d3c3ebc0..db2c15a1d62ffd6ae3649911fcc45c6eaf1ddd8b 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Defination of device workers.""" __all__ = ['DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section'] @@ -23,9 +24,7 @@ class DeviceWorker(object): """ def __init__(self): - """ - Init. - """ + """Init.""" self._program = None self._infer = None @@ -75,9 +74,7 @@ class Hogwild(DeviceWorker): """ def __init__(self): - """ - Init. - """ + """Init.""" super(Hogwild, self).__init__() def _gen_worker_desc(self, trainer_desc): @@ -140,23 +137,29 @@ class DownpourSGD(DeviceWorker): trainer_desc.device_worker_name = "DownpourWorker" pull_thread = trainer_desc.pull_dense_param pull_thread.device_num = trainer_desc.thread_num - for i in self._fleet_desc.trainer_param.dense_table: + if opt_info.get("program_id_to_worker") is None: + raise ValueError("opt_info must have program_id_to_worker") + prog_id_to_worker = opt_info["program_id_to_worker"] + if prog_id_to_worker.get(program_id) is None: + raise ValueError("%s not found in program_id_to_worker" % + program_id) + worker = opt_info["program_id_to_worker"][program_id] + for i in worker.get_desc().dense_table: if i.table_id in dense_table_set: dense_table = pull_thread.dense_table.add() dense_table.dense_value_name.extend(i.dense_variable_name) dense_table.table_id = \ i.table_id - sparse_len = len(self._fleet_desc.trainer_param.sparse_table) + sparse_len = len(worker.get_desc().sparse_table) for i in range(sparse_len): sparse_table = downpour.sparse_table.add() - sparse_table.table_id = \ - self._fleet_desc.trainer_param.sparse_table[i].table_id - sparse_table.sparse_key_name.extend( - self._fleet_desc.trainer_param.sparse_table[i].slot_key) - sparse_table.sparse_value_name.extend( - self._fleet_desc.trainer_param.sparse_table[i].slot_value) - sparse_table.sparse_grad_name.extend( - self._fleet_desc.trainer_param.sparse_table[i].slot_gradient) + sparse_table.table_id = worker.get_desc().sparse_table[i].table_id + sparse_table.sparse_key_name.extend(worker.get_desc().sparse_table[ + i].slot_key) + sparse_table.sparse_value_name.extend(worker.get_desc() + .sparse_table[i].slot_value) + sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[ + i].slot_gradient) if opt_info["use_cvm"]: sparse_table.emb_dim = \ self._fleet_desc.server_param.downpour_server_param.downpour_table_param[ @@ -173,28 +176,24 @@ class DownpourSGD(DeviceWorker): for i in opt_info["stat_var_names"]: downpour.stat_var_names.extend([i]) - for i in self._fleet_desc.trainer_param.dense_table: + for i in worker.get_desc().dense_table: if i.table_id in dense_table_set: dense_table = downpour.dense_table.add() dense_table.table_id = i.table_id dense_table.dense_value_name.extend(i.dense_variable_name) dense_table.dense_grad_name.extend( i.dense_gradient_variable_name) - downpour.skip_ops.extend(self._fleet_desc.trainer_param.skip_op) + downpour.skip_ops.extend(worker.get_desc().skip_op) if self._infer: downpour.push_dense = False downpour.push_sparse = False class Section(DeviceWorker): - """ - SectionWorker - """ + """SectionWorker.""" def __init__(self): - """ - Init. - """ + """Init.""" super(Section, self).__init__() def _gen_worker_desc(self, trainer_desc): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index b8f185329b18e2c7440e3cb1e43567559f81d86b..c6d62b1d02742a8167f48eb69e2691f2db6a20b8 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -10,6 +10,7 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and +"""Defination of PSLib.""" import os import sys @@ -25,6 +26,8 @@ from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker class PSLib(Fleet): + """PSLib class.""" + def __init__(self): super(PSLib, self).__init__(Mode.PSLIB) self._opt_info = None @@ -89,7 +92,10 @@ class PSLib(Fleet): # barrier for init model self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): - tables = self._dist_desc.trainer_param.dense_table + tables = [] + for tp in self._dist_desc.trainer_param: + for i in tp.dense_table: + tables.append(i) for prog, scope in zip(self._main_programs, self._scopes): prog_id = str(id(prog)) prog_conf = self._opt_info['program_configs'][prog_id] @@ -244,7 +250,9 @@ class PSLib(Fleet): 3 means save batch model. Example: - >>> fleet.save_persistables(dirname="/you/path/to/model", mode = 0) + .. code-block:: python + + fleet.save_persistables(dirname="/you/path/to/model", mode = 0) """ mode = kwargs.get("mode", 0) @@ -260,35 +268,43 @@ class PSLib(Fleet): when using fleet, it will save sparse cache table Args: + executor(Executor): fluid executor dirname(str): save path. It can be hdfs/afs path or local path main_program(Program): fluid program, default None kwargs: use define property, current support following mode(int): define for feature extension in the future, - currently no use, will pass a default value 0 + currently no use, will pass a default value 0 + table_id(int): which table to save cache, default is 0 + + Returns: + feasign_num(int): cache feasign num Example: .. code-block:: python - >>> fleet.save_cache_model(None, dirname="/you/path/to/model", mode = 0) + + fleet.save_cache_model(None, dirname="/you/path/to/model", mode = 0) """ mode = kwargs.get("mode", 0) + table_id = kwargs.get("table_id", 0) self._fleet_ptr.client_flush() self._role_maker._barrier_worker() cache_threshold = 0.0 if self._role_maker.is_first_worker(): - cache_threshold = self._fleet_ptr.get_cache_threshold() + cache_threshold = self._fleet_ptr.get_cache_threshold(table_id) #check cache threshold right or not self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): - self._fleet_ptr.cache_shuffle(0, dirname, mode, cache_threshold) + self._fleet_ptr.cache_shuffle(table_id, dirname, mode, + cache_threshold) self._role_maker._barrier_worker() feasign_num = -1 if self._role_maker.is_first_worker(): - feasign_num = self._fleet_ptr.save_cache(0, dirname, mode) + feasign_num = self._fleet_ptr.save_cache(table_id, dirname, mode) self._role_maker._barrier_worker() return feasign_num @@ -304,8 +320,12 @@ class PSLib(Fleet): """ self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): - for i in self._opt_info["fleet_desc"].trainer_param.sparse_table: - self._fleet_ptr.shrink_sparse_table(i.table_id) + tables = [] + for tp in self._opt_info["fleet_desc"].trainer_param: + for i in tp.sparse_table: + tables.append(i.table_id) + for i in list(set(tables)): + self._fleet_ptr.shrink_sparse_table(i) self._role_maker._barrier_worker() def shrink_dense_table(self, decay, emb_dim=11, scope=None, table_id=None): @@ -330,19 +350,20 @@ class PSLib(Fleet): scope = fluid.global_scope() self._role_maker._barrier_worker() if self._role_maker.is_first_worker(): - for i in self._opt_info["fleet_desc"].trainer_param.dense_table: - if table_id is not None and table_id != i.table_id: - continue - var_list = [var for var in i.dense_variable_name] - skip = False - for var in var_list: - if scope.find_var(var) is None: - skip = True - break - if skip: - continue - self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list, - decay, emb_dim) + for tp in self._opt_info["fleet_desc"].trainer_param: + for i in tp.dense_table: + if table_id is not None and table_id != i.table_id: + continue + var_list = [var for var in i.dense_variable_name] + skip = False + for var in var_list: + if scope.find_var(var) is None: + skip = True + break + if skip: + continue + self._fleet_ptr.shrink_dense_table(i.table_id, scope, + var_list, decay, emb_dim) self._role_maker._barrier_worker() def clear_model(self): @@ -476,20 +497,21 @@ class PSLib(Fleet): if ret != 0: raise RuntimeError("download model proto file failed") model_proto_file = dest - for i in self._opt_info["fleet_desc"].trainer_param.dense_table: - if table_id is not None and table_id != i.table_id: - continue - table_var_names = [var for var in i.dense_variable_name] - skip = False - for var in table_var_names: - if scope.find_var(var) is None: - skip = True - break - if skip: - continue - self._fleet_ptr.load_from_paddle_model( - scope, table_id, var_names, model_path, model_proto_file, - table_var_names, load_combine) + for tp in self._opt_info["fleet_desc"].trainer_param: + for i in tp.dense_table: + if table_id is not None and table_id != i.table_id: + continue + table_var_names = [var for var in i.dense_variable_name] + skip = False + for var in table_var_names: + if scope.find_var(var) is None: + skip = True + break + if skip: + continue + self._fleet_ptr.load_from_paddle_model( + scope, table_id, var_names, model_path, + model_proto_file, table_var_names, load_combine) self._role_maker._barrier_worker() def _set_opt_info(self, opt_info): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index e789b856ee51ba8b657a36576cbae054f723f47e..a283c0185338ec23255e60583d94e545c829195f 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -10,13 +10,15 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and +"""Defination of Server and Worker.""" from . import ps_pb2 as pslib class Server(object): """ - A Server basic class. + A Server basic class + it's a base class, does not have implementation """ def __init__(self): @@ -26,6 +28,7 @@ class Server(object): class Worker(object): """ A Worker basic class. + it's a base class, does not have implementation """ def __init__(self): @@ -76,7 +79,8 @@ class DownpourServer(Server): 'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \ 'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \ 'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \ - 'sparse_converter', 'sparse_deconverter'] + 'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \ + 'sparse_cache_file_num'] for key in strategy: if key not in support_sparse_key_list: @@ -95,6 +99,12 @@ class DownpourServer(Server): table.table_class = table_class if table_class == 'DownpourSparseTable': + table.enable_sparse_table_cache = strategy.get( + 'sparse_enable_cache', True) + table.sparse_table_cache_rate = strategy.get('sparse_cache_rate', + 0.00055) + table.sparse_table_cache_file_num = strategy.get( + 'sparse_cache_file_num', 16) table.compress_in_save = strategy.get('sparse_compress_in_save', True) table.shard_num = strategy.get('sparse_shard_num', 1000) @@ -169,7 +179,10 @@ class DownpourServer(Server): """ Args: table_id(int): id of sparse params table - strategy(dict): the dense config dict. + param_var(list): param vars + grad_var(list): param grad vars + strategy(dict): the dense config dict + sparse_table_names(list): sparse table names Returns: return None """ @@ -230,7 +243,11 @@ class DownpourServer(Server): """ Args: table_id(int): id of datanorm table - strategy(dict): the datanorm config dict. + learning_rate(float): the learning rate used to update parameters + param_var(list): param vars + grad_var(list): param grad vars + strategy(dict): the datanorm config dict + sparse_table_names(list): sparse table names Returns: return None """ @@ -296,43 +313,60 @@ class DownpourWorker(Worker): self.window = window self._worker = pslib.DownpourTrainerParameter() - def add_sparse_table(self, table_id, slot_key_vars, slot_value_vars): + def add_sparse_table(self, + table_id, + slot_key_vars, + slot_value_vars, + slot_value_grads=None): """ Args: table_id(int): id of sparse params table - slot_key_vars(string): slot key id - slot_value_var(string): slot key value after embedding + slot_key_vars(list): slot key id + slot_value_vars(list): slot key value after embedding + slot_value_grads(list): grad of all params, default is None + Returns: return None """ + if slot_value_grads is None: + slot_value_grad_names = \ + [var.name + "@GRAD" for var in slot_value_vars] + else: + value_to_key = {} + for i in range(len(slot_key_vars)): + value_to_key[slot_value_vars[i].name] = slot_key_vars[i] + slot_value_grad_names = [] + all_grad_names = [var.name for var in slot_value_grads] + for var in slot_value_vars: + if var.name + "@GRAD" in all_grad_names: + slot_value_grad_names.append(var.name + "@GRAD") + sorted_slot_value_vars = [i for i in slot_value_vars if \ + i.name + "@GRAD" in slot_value_grad_names] + sorted_slot_value_vars += [i for i in slot_value_vars if \ + i.name + "@GRAD" not in slot_value_grad_names] + sorted_slot_key_vars = \ + [value_to_key[v.name] for v in sorted_slot_value_vars] + + target_table = None for table in self._worker.sparse_table: if table.table_id == table_id: - if [var.name for var in slot_key_vars - ] == self._worker.sparse_table[table_id].slot_key: - if [var.name for var in slot_value_vars - ] == self._worker.sparse_table[table_id].slot_value: - if [ - var.name + "@GRAD" for var in slot_value_vars - ] == self._worker.sparse_table[table_id].slot_gradient: - return - else: - raise ValueError( - "sparse table %s slot_gradient error" % - table_id) - - else: - raise ValueError("sparse table %s slot_value error" % + keys = table.slot_key + key_names = [var.name for var in sorted_slot_key_vars] + for key_name in key_names: + if key_name not in keys: + raise ValueError("sparse table %s slot_key error" % table_id) - else: - raise ValueError("sparse table %s slot_key error" % - table_id) + target_table = table + break + table = target_table + if table is not None: + self._worker.sparse_table.remove(table) table = self._worker.sparse_table.add() table.table_id = table_id - table.slot_key.extend([var.name for var in slot_key_vars]) - table.slot_value.extend([var.name for var in slot_value_vars]) - table.slot_gradient.extend( - [var.name + "@GRAD" for var in slot_value_vars]) + table.slot_key.extend([var.name for var in sorted_slot_key_vars]) + table.slot_value.extend([var.name for var in sorted_slot_value_vars]) + table.slot_gradient.extend(slot_value_grad_names) def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars, dense_start_table_id, sparse_table_names): @@ -341,8 +375,10 @@ class DownpourWorker(Worker): table_id(int): id of sparse params table learning_rate(float): the learning rate used to update parameters. \ Can be a float value - param_var(list): all dense param. it is a list. - grad_var(list): all dense grad parm it is a list. + param_vars(list): all dense param. it is a list. + grad_vars(list): all dense grad parm it is a list. + dense_start_table_id(int): dense table start index + sparse_table_names(list): sparse table names Returns: return None """ @@ -365,21 +401,19 @@ class DownpourWorker(Worker): for table in self._worker.dense_table: if table.table_id == table_id: - desc_dense_param_name = list(self._worker.dense_table[ - table_id - dense_start_table_id].dense_variable_name) + desc_dense_param_name = list(table.dense_variable_name) desc_dense_param_name.sort() if dense_param_name == desc_dense_param_name: - desc_dense_grad_name = list(self._worker.dense_table[ - table_id - dense_start_table_id] - .dense_gradient_variable_name) + desc_dense_grad_name = list( + table.dense_gradient_variable_name) desc_dense_grad_name.sort() if dense_grad_name == desc_dense_grad_name: return else: raise ValueError( - "dense table %s dense_gradient_variable_name error" - % table_id) + "dense table %s dense_gradient_variable_name " + "error" % table_id) else: raise ValueError( "dense table %s dense_variable_name error" % table_id) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index b47986905791fff741f1e0e4ceaaf4329754e524..ae93b7548064c8fa4d42125b395401ca94df1e56 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Optimizer Factory.""" __all__ = ["DistributedAdam"] import paddle.fluid as fluid @@ -18,11 +19,17 @@ from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs from google.protobuf import text_format +from collections import OrderedDict from .node import DownpourWorker, DownpourServer from . import ps_pb2 as pslib class DistributedOptimizerImplBase(object): + """ + DistributedOptimizerImplBase + base class of optimizers + """ + def __init__(self, optimizer): self._optimizer = optimizer self._learning_rate = optimizer._learning_rate @@ -33,10 +40,23 @@ class DistributedOptimizerImplBase(object): startup_program=None, parameter_list=None, no_grad_set=None): + """ + Args: + losses(Variable): loss variable defined by user + startup_program(Program): startup program that defined by user + parameter_list(str list): parameter names defined by users + no_grad_set(set): a set of variables that is defined by users + so that these variables do not need gradient computation + """ pass class DistributedAdam(DistributedOptimizerImplBase): + """ + DistributedAdam + adam optimizer in distributed training + """ + def __init__(self, optimizer): # todo(guru4elephant): add more optimizers here as argument # todo(guru4elephant): make learning_rate as a variable @@ -53,10 +73,10 @@ class DistributedAdam(DistributedOptimizerImplBase): Find input variable of distribute lookup table in program. We could support multi-distribute table now. Args: - program(Program): given program, locate distributed lookup table - table_name(str): given table names that is found beforehand + program(Program): given program, locate distributed lookup table + table_name(str): given table names that is found beforehand Returns: - inputs + inputs """ local_vars = program.current_block().vars inputs_dict = dict() @@ -75,10 +95,10 @@ class DistributedAdam(DistributedOptimizerImplBase): Find output variable of distribute lookup table in program. We could support multi-distribute table now. Args: - program(Program): given program, locate distributed lookup table - table_name(str): given table name that is found beforehand + programs(Program): given program, locate distributed lookup table + table_name(str): given table name that is found beforehand Returns: - outputs + outputs """ local_vars = program.current_block().vars outputs_dict = dict() @@ -92,6 +112,19 @@ class DistributedAdam(DistributedOptimizerImplBase): [local_vars[name] for name in op.output("Out")]) return outputs_dict + def _find_distributed_lookup_table_grads(self, program, table_names): + local_vars = program.current_block().vars + grads_dict = dict() + for table_name in table_names: + grads_dict[table_name] = [] + + for op in program.global_block().ops: + if op.type == "lookup_table_grad" and op.input("W")[ + 0] in table_names: + grads_dict[op.input("W")[0]].extend( + [local_vars[name] for name in op.input("Out@GRAD")]) + return grads_dict + def _find_multi_distributed_lookup_table(self, losses): """ find multi-sparse-table @@ -125,17 +158,57 @@ class DistributedAdam(DistributedOptimizerImplBase): Returns: [optimize_ops, grads_and_weights] """ + # sparse table names of each program + prog_id_to_sparse_table = OrderedDict() + # inputs_dict and outputs_dict of sparse tables of each program + prog_id_to_inputs_dict = OrderedDict() + prog_id_to_outputs_dict = OrderedDict() + # related to PSParameter + ps_param = pslib.PSParameter() + # related to ServerParameter + server = DownpourServer() + # program to worker (related to DownpourTrainerParameter) + prog_id_to_worker = OrderedDict() + # param_grads of each program + prog_id_to_param_grads = OrderedDict() + # sparse_grads of each program + prog_id_to_sparse_grads = OrderedDict() - sparse_table_names = self._find_multi_distributed_lookup_table(losses) - inputs_dict = self._find_distributed_lookup_table_inputs( - losses[0].block.program, sparse_table_names) + sparse_table_to_index = OrderedDict() + sparse_table_index = 0 + for loss in losses: + sparse_table = self._find_multi_distributed_lookup_table([loss]) + prog_id = str(id(loss.block.program)) + prog_id_to_sparse_table[prog_id] = sparse_table - outputs_dict = self._find_distributed_lookup_table_outputs( - losses[0].block.program, sparse_table_names) + # get sparse_table_to_index + for tn in sparse_table: + if sparse_table_to_index.get(tn) is None: + sparse_table_to_index[tn] = sparse_table_index + sparse_table_index += 1 + + # get inputs_dict + inputs_dict = self._find_distributed_lookup_table_inputs( + loss.block.program, sparse_table) + prog_id_to_inputs_dict[prog_id] = inputs_dict + # get outputs_dict + outputs_dict = self._find_distributed_lookup_table_outputs( + loss.block.program, sparse_table) + prog_id_to_outputs_dict[prog_id] = outputs_dict + + prog_id_to_worker[prog_id] = DownpourWorker(self._window) + + # param_grads of program + params_grads = sorted( + fluid.backward.append_backward(loss, parameter_list, + no_grad_set), + key=lambda x: x[0].name) + prog_id_to_param_grads[prog_id] = params_grads + + grads_dict = self._find_distributed_lookup_table_grads( + loss.block.program, sparse_table) + prog_id_to_sparse_grads[prog_id] = grads_dict - ps_param = pslib.PSParameter() - server = DownpourServer() - worker = DownpourWorker(self._window) # if user specify a fleet_desc.prototxt file, then load the file # instead of creating default fleet_desc.prototxt. # user can specify server_param or trainer_param or fs_client_param. @@ -144,37 +217,60 @@ class DistributedAdam(DistributedOptimizerImplBase): with open(fleet_desc_file) as f: text_format.Merge(f.read(), ps_param) server.get_desc().CopyFrom(ps_param.server_param) - worker.get_desc().CopyFrom(ps_param.trainer_param) + if len(ps_param.trainer_param) == 1: + for k in prog_id_to_worker: + prog_id_to_worker[k].get_desc().CopyFrom( + ps_param.trainer_param[0]) + else: + if len(ps_param.trainer_param) != len(prog_id_to_worker): + raise ValueError( + "trainer param size != program size, %s vs %s" % + (len(ps_param.trainer_param), len(prog_id_to_worker))) + idx = 0 + # prog_id_to_worker is OrderedDict + for k in prog_id_to_worker: + prog_id_to_worker[k].get_desc().CopyFrom( + ps_param.trainer_param[idx]) + idx += 1 - sparse_table_index = 0 - for tn in sparse_table_names: + # ServerParameter add all sparse tables + for tn in sparse_table_to_index: + sparse_table_index = sparse_table_to_index[tn] if strategy.get(tn) is not None: server.add_sparse_table(sparse_table_index, strategy[tn]) else: server.add_sparse_table(sparse_table_index, None) - worker.add_sparse_table(sparse_table_index, inputs_dict[tn], - outputs_dict[tn]) - sparse_table_index += 1 - dense_start_table_id = sparse_table_index - dense_table_index = sparse_table_index - program_configs = {} - param_grads_list = [] + # each DownpourTrainerParameter add its own sparse tables + for loss in losses: + prog_id = str(id(loss.block.program)) + worker = prog_id_to_worker[prog_id] + inputs_dict = prog_id_to_inputs_dict[prog_id] + outputs_dict = prog_id_to_outputs_dict[prog_id] + for tn in prog_id_to_sparse_table[prog_id]: + sparse_table_index = sparse_table_to_index[tn] + grads_dict = prog_id_to_sparse_grads[prog_id] + worker.add_sparse_table(sparse_table_index, inputs_dict[tn], + outputs_dict[tn], grads_dict[tn]) + dense_start_table_id = len(sparse_table_to_index) + dense_table_index = len(sparse_table_to_index) + program_configs = {} + # ServerParameter add all dense tables + # each DownpourTrainerParameter add its own dense tables for loss_index in range(len(losses)): program_id = str(id(losses[loss_index].block.program)) + worker = prog_id_to_worker[program_id] + sparse_table_names = prog_id_to_sparse_table[program_id] + sparse_table_index = \ + [sparse_table_to_index[i] for i in sparse_table_names] + program_configs[program_id] = { - "pull_sparse": - [t_index for t_index in range(sparse_table_index)], - "push_sparse": - [t_index for t_index in range(sparse_table_index)] + "pull_sparse": [t_index for t_index in sparse_table_index], + "push_sparse": [t_index for t_index in sparse_table_index] } - params_grads = sorted( - fluid.backward.append_backward(losses[loss_index], - parameter_list, no_grad_set), - key=lambda x: x[0].name) - param_grads_list.append(params_grads) + params_grads = prog_id_to_param_grads[program_id] params = [] grads = [] data_norm_params = [] @@ -230,15 +326,39 @@ class DistributedAdam(DistributedOptimizerImplBase): program_configs[program_id]["push_dense"].extend( [dense_table_index]) dense_table_index += 1 + + # Todo(guru4elephant): figure out how to support more sparse parameters + # currently only support lookup_table + worker_skipped_ops = ["lookup_table", "lookup_table_grad"] + if len(worker.get_desc().skip_op) == 0: + worker.get_desc().skip_op.extend(worker_skipped_ops) + ps_param.server_param.CopyFrom(server.get_desc()) - ps_param.trainer_param.CopyFrom(worker.get_desc()) - # Todo(guru4elephant): figure out how to support more sparse parameters - # currently only support lookup_table - worker_skipped_ops = ["lookup_table", "lookup_table_grad"] - if len(ps_param.trainer_param.skip_op) == 0: - ps_param.trainer_param.skip_op.extend(worker_skipped_ops) + # prog_id_to_worker is OrderedDict + if len(ps_param.trainer_param) == 0: + for k in prog_id_to_worker: + tp = ps_param.trainer_param.add() + tp.CopyFrom(prog_id_to_worker[k].get_desc()) + + if strategy.get("fs_uri") is not None: + ps_param.fs_client_param.uri = strategy["fs_uri"] + elif ps_param.fs_client_param.uri == "": + ps_param.fs_client_param.uri = "hdfs://your_hdfs_uri" + if strategy.get("fs_user") is not None: + ps_param.fs_client_param.user = strategy["fs_user"] + elif ps_param.fs_client_param.user == "": + ps_param.fs_client_param.user = "your_hdfs_user" + if strategy.get("fs_passwd") is not None: + ps_param.fs_client_param.passwd = strategy["fs_passwd"] + elif ps_param.fs_client_param.passwd == "": + ps_param.fs_client_param.passwd = "your_hdfs_passwd" + if strategy.get("fs_hadoop_bin") is not None: + ps_param.fs_client_param.hadoop_bin = strategy["fs_hadoop_bin"] + elif ps_param.fs_client_param.hadoop_bin == "": + ps_param.fs_client_param.hadoop_bin = "$HADOOP_HOME/bin/hadoop" opt_info = {} + opt_info["program_id_to_worker"] = prog_id_to_worker opt_info["program_configs"] = program_configs opt_info["trainer"] = "DistMultiTrainer" opt_info["device_worker"] = "DownpourSGD" @@ -259,8 +379,13 @@ class DistributedAdam(DistributedOptimizerImplBase): 0].accessor.accessor_class == "DownpourCtrAccessor": opt_info["dump_slot"] = True opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {}) + opt_info["copy_table"] = strategy.get("copy_table", {}) for loss in losses: loss.block.program._fleet_opt = opt_info - return None, param_grads_list[0], opt_info + param_grads_list = [] + for loss in losses: + prog_id = str(id(loss.block.program)) + param_grads_list.append(prog_id_to_param_grads[prog_id]) + return None, param_grads_list, opt_info diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/ps_pb2.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/ps_pb2.py index 6a241f37214d2687b735548609bce5127603a7c6..0021b610941a367860ba261aaaccfdd5b286fd8c 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/ps_pb2.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/ps_pb2.py @@ -32,7 +32,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( package='paddle', syntax='proto2', serialized_pb=_b( - '\n\x08ps.proto\x12\x06paddle\"\x9e\x02\n\x0bPSParameter\x12\x14\n\x0cworker_class\x18\x01 \x01(\t\x12\x14\n\x0cserver_class\x18\x02 \x01(\t\x12\x16\n\x0einstance_class\x18\x03 \x01(\t\x12-\n\x0cworker_param\x18\x65 \x01(\x0b\x32\x17.paddle.WorkerParameter\x12-\n\x0cserver_param\x18\x66 \x01(\x0b\x32\x17.paddle.ServerParameter\x12\x38\n\rtrainer_param\x18\xad\x02 \x01(\x0b\x32 .paddle.DownpourTrainerParameter\x12\x33\n\x0f\x66s_client_param\x18\xf5\x03 \x01(\x0b\x32\x19.paddle.FsClientParameter\"Q\n\x0fWorkerParameter\x12>\n\x15\x64ownpour_worker_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourWorkerParameter\"Q\n\x0fServerParameter\x12>\n\x15\x64ownpour_server_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourServerParameter\"O\n\x17\x44ownpourWorkerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\"\xfd\x01\n\x18\x44ownpourTrainerParameter\x12\x30\n\x0b\x64\x65nse_table\x18\x01 \x03(\x0b\x32\x1b.paddle.DenseTableParameter\x12\x32\n\x0csparse_table\x18\x02 \x03(\x0b\x32\x1c.paddle.SparseTableParameter\x12\x1d\n\x15push_sparse_per_batch\x18\x03 \x01(\x05\x12\x1c\n\x14push_dense_per_batch\x18\x04 \x01(\x05\x12\x0f\n\x07skip_op\x18\x05 \x03(\t\x12-\n\x0eprogram_config\x18\x06 \x03(\x0b\x32\x15.paddle.ProgramConfig\"\x99\x01\n\rProgramConfig\x12\x12\n\nprogram_id\x18\x01 \x02(\t\x12\x1c\n\x14push_sparse_table_id\x18\x02 \x03(\x05\x12\x1b\n\x13push_dense_table_id\x18\x03 \x03(\x05\x12\x1c\n\x14pull_sparse_table_id\x18\x04 \x03(\x05\x12\x1b\n\x13pull_dense_table_id\x18\x05 \x03(\x05\"{\n\x13\x44\x65nseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x1b\n\x13\x64\x65nse_variable_name\x18\x02 \x03(\t\x12$\n\x1c\x64\x65nse_gradient_variable_name\x18\x03 \x03(\t\x12\x0f\n\x07\x66\x65\x61_dim\x18\x04 \x01(\x05\"z\n\x14SparseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x66\x65\x61ture_dim\x18\x02 \x01(\x05\x12\x10\n\x08slot_key\x18\x03 \x03(\t\x12\x12\n\nslot_value\x18\x04 \x03(\t\x12\x15\n\rslot_gradient\x18\x05 \x03(\t\"\x86\x01\n\x17\x44ownpourServerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\x12\x35\n\rservice_param\x18\x02 \x01(\x0b\x32\x1e.paddle.ServerServiceParameter\"\xd7\x01\n\x16ServerServiceParameter\x12*\n\x0cserver_class\x18\x01 \x01(\t:\x14\x44ownpourBrpcPsServer\x12*\n\x0c\x63lient_class\x18\x02 \x01(\t:\x14\x44ownpourBrpcPsClient\x12(\n\rservice_class\x18\x03 \x01(\t:\x11\x44ownpourPsService\x12\x1c\n\x11start_server_port\x18\x04 \x01(\r:\x01\x30\x12\x1d\n\x11server_thread_num\x18\x05 \x01(\r:\x02\x31\x32\"\xc0\x02\n\x0eTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x04\x12\x13\n\x0btable_class\x18\x02 \x01(\t\x12\x17\n\tshard_num\x18\x03 \x01(\x04:\x04\x31\x30\x30\x30\x12\x30\n\x08\x61\x63\x63\x65ssor\x18\x04 \x01(\x0b\x32\x1e.paddle.TableAccessorParameter\x12\x1f\n\x04type\x18\x05 \x01(\x0e\x32\x11.paddle.TableType\x12\x1f\n\x10\x63ompress_in_save\x18\x06 \x01(\x08:\x05\x66\x61lse\x12\'\n\x19\x65nable_sparse_table_cache\x18\x07 \x01(\x08:\x04true\x12(\n\x17sparse_table_cache_rate\x18\x08 \x01(\x01:\x07\x30.00055\x12\'\n\x1bsparse_table_cache_file_num\x18\t \x01(\r:\x02\x31\x36\"\xfc\x02\n\x16TableAccessorParameter\x12\x16\n\x0e\x61\x63\x63\x65ssor_class\x18\x01 \x01(\t\x12\x38\n\x10sparse_sgd_param\x18\x02 \x01(\x0b\x32\x1e.paddle.SparseSGDRuleParameter\x12\x36\n\x0f\x64\x65nse_sgd_param\x18\x03 \x01(\x0b\x32\x1d.paddle.DenseSGDRuleParameter\x12\x13\n\x07\x66\x65\x61_dim\x18\x04 \x01(\r:\x02\x31\x31\x12\x15\n\nembedx_dim\x18\x05 \x01(\r:\x01\x38\x12\x1c\n\x10\x65mbedx_threshold\x18\x06 \x01(\r:\x02\x31\x30\x12G\n\x17\x64ownpour_accessor_param\x18\x07 \x01(\x0b\x32&.paddle.DownpourTableAccessorParameter\x12\x45\n\x19table_accessor_save_param\x18\x08 \x03(\x0b\x32\".paddle.TableAccessorSaveParameter\"\x96\x02\n\x1e\x44ownpourTableAccessorParameter\x12\x19\n\x0cnonclk_coeff\x18\x01 \x01(\x02:\x03\x30.1\x12\x16\n\x0b\x63lick_coeff\x18\x02 \x01(\x02:\x01\x31\x12\x1b\n\x0e\x62\x61se_threshold\x18\x03 \x01(\x02:\x03\x31.5\x12\x1d\n\x0f\x64\x65lta_threshold\x18\x04 \x01(\x02:\x04\x30.25\x12\x1b\n\x0f\x64\x65lta_keep_days\x18\x05 \x01(\x02:\x02\x31\x36\x12#\n\x15show_click_decay_rate\x18\x06 \x01(\x02:\x04\x30.98\x12\x1d\n\x10\x64\x65lete_threshold\x18\x07 \x01(\x02:\x03\x30.8\x12$\n\x18\x64\x65lete_after_unseen_days\x18\x08 \x01(\x02:\x02\x33\x30\"S\n\x1aTableAccessorSaveParameter\x12\r\n\x05param\x18\x01 \x01(\r\x12\x11\n\tconverter\x18\x02 \x01(\t\x12\x13\n\x0b\x64\x65\x63onverter\x18\x03 \x01(\t\"e\n\x10PsRequestMessage\x12\x0e\n\x06\x63md_id\x18\x01 \x02(\r\x12\x10\n\x08table_id\x18\x02 \x01(\r\x12\x0e\n\x06params\x18\x03 \x03(\x0c\x12\x11\n\tclient_id\x18\x04 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\"\x85\x01\n\x16SparseSGDRuleParameter\x12\x1b\n\rlearning_rate\x18\x01 \x01(\x01:\x04\x30.05\x12\x18\n\rinitial_g2sum\x18\x02 \x01(\x01:\x01\x33\x12\x1d\n\rinitial_range\x18\x03 \x01(\x01:\x06\x30.0001\x12\x15\n\rweight_bounds\x18\x04 \x03(\x02\"\xe1\x01\n\x15\x44\x65nseSGDRuleParameter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12&\n\x04\x61\x64\x61m\x18\x02 \x01(\x0b\x32\x18.paddle.AdamSGDParameter\x12(\n\x05naive\x18\x03 \x01(\x0b\x32\x19.paddle.NaiveSGDParameter\x12,\n\x07summary\x18\x04 \x01(\x0b\x32\x1b.paddle.SummarySGDParameter\x12:\n\x0emoving_average\x18\x05 \x01(\x0b\x32\".paddle.MovingAverageRuleParameter\"\xac\x01\n\x10\x41\x64\x61mSGDParameter\x12\x1c\n\rlearning_rate\x18\x01 \x01(\x01:\x05\x35\x65-06\x12 \n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01:\x08\x30.999993\x12\x1e\n\x0e\x61\x64\x61_decay_rate\x18\x03 \x01(\x01:\x06\x30.9999\x12\x1a\n\x0b\x61\x64\x61_epsilon\x18\x04 \x01(\x01:\x05\x31\x65-08\x12\x1c\n\x0emom_decay_rate\x18\x05 \x01(\x01:\x04\x30.99\"J\n\x11NaiveSGDParameter\x12\x1d\n\rlearning_rate\x18\x01 \x01(\x01:\x06\x30.0002\x12\x16\n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01\";\n\x13SummarySGDParameter\x12$\n\x12summary_decay_rate\x18\x01 \x01(\x01:\x08\x30.999999\".\n\x1aMovingAverageRuleParameter\x12\x10\n\x08momentum\x18\x01 \x01(\x01\"I\n\x11PsResponseMessage\x12\x13\n\x08\x65rr_code\x18\x01 \x02(\x05:\x01\x30\x12\x11\n\x07\x65rr_msg\x18\x02 \x02(\t:\x00\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\xd5\x01\n\x11\x46sClientParameter\x12:\n\x07\x66s_type\x18\x01 \x01(\x0e\x32#.paddle.FsClientParameter.FsApiType:\x04HDFS\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x0e\n\x06passwd\x18\x04 \x01(\t\x12\x13\n\x0b\x62uffer_size\x18\x05 \x01(\x05\x12\x12\n\nhadoop_bin\x18\x33 \x01(\t\x12\x10\n\x08\x61\x66s_conf\x18\x65 \x01(\t\"\x1e\n\tFsApiType\x12\x08\n\x04HDFS\x10\x00\x12\x07\n\x03\x41\x46S\x10\x01*4\n\tTableType\x12\x13\n\x0fPS_SPARSE_TABLE\x10\x00\x12\x12\n\x0ePS_DENSE_TABLE\x10\x01*\x9c\x03\n\x07PsCmdID\x12\x17\n\x13PS_PULL_DENSE_TABLE\x10\x00\x12\x17\n\x13PS_PUSH_DENSE_TABLE\x10\x01\x12\x18\n\x14PS_PULL_SPARSE_TABLE\x10\x02\x12\x18\n\x14PS_PUSH_SPARSE_TABLE\x10\x03\x12\x13\n\x0fPS_SHRINK_TABLE\x10\x04\x12\x15\n\x11PS_SAVE_ONE_TABLE\x10\x05\x12\x15\n\x11PS_SAVE_ALL_TABLE\x10\x06\x12\x15\n\x11PS_LOAD_ONE_TABLE\x10\x07\x12\x15\n\x11PS_LOAD_ALL_TABLE\x10\x08\x12\x16\n\x12PS_CLEAR_ONE_TABLE\x10\t\x12\x16\n\x12PS_CLEAR_ALL_TABLE\x10\n\x12\x17\n\x13PS_PUSH_DENSE_PARAM\x10\x0b\x12\x12\n\x0ePS_STOP_SERVER\x10\x0c\x12\x1b\n\x17PS_SAVE_ONE_CACHE_TABLE\x10\r\x12\x1a\n\x16PS_GET_CACHE_THRESHOLD\x10\x0e\x12\x14\n\x10PS_CACHE_SHUFFLE\x10\x0f\x12\x0e\n\nPS_S2S_MSG\x10\x65\x32K\n\tPsService\x12>\n\x07service\x12\x18.paddle.PsRequestMessage\x1a\x19.paddle.PsResponseMessageB\x03\x80\x01\x01' + '\n\x08ps.proto\x12\x06paddle\"\x9e\x02\n\x0bPSParameter\x12\x14\n\x0cworker_class\x18\x01 \x01(\t\x12\x14\n\x0cserver_class\x18\x02 \x01(\t\x12\x16\n\x0einstance_class\x18\x03 \x01(\t\x12-\n\x0cworker_param\x18\x65 \x01(\x0b\x32\x17.paddle.WorkerParameter\x12-\n\x0cserver_param\x18\x66 \x01(\x0b\x32\x17.paddle.ServerParameter\x12\x38\n\rtrainer_param\x18\xad\x02 \x03(\x0b\x32 .paddle.DownpourTrainerParameter\x12\x33\n\x0f\x66s_client_param\x18\xf5\x03 \x01(\x0b\x32\x19.paddle.FsClientParameter\"Q\n\x0fWorkerParameter\x12>\n\x15\x64ownpour_worker_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourWorkerParameter\"Q\n\x0fServerParameter\x12>\n\x15\x64ownpour_server_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourServerParameter\"O\n\x17\x44ownpourWorkerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\"\xfd\x01\n\x18\x44ownpourTrainerParameter\x12\x30\n\x0b\x64\x65nse_table\x18\x01 \x03(\x0b\x32\x1b.paddle.DenseTableParameter\x12\x32\n\x0csparse_table\x18\x02 \x03(\x0b\x32\x1c.paddle.SparseTableParameter\x12\x1d\n\x15push_sparse_per_batch\x18\x03 \x01(\x05\x12\x1c\n\x14push_dense_per_batch\x18\x04 \x01(\x05\x12\x0f\n\x07skip_op\x18\x05 \x03(\t\x12-\n\x0eprogram_config\x18\x06 \x03(\x0b\x32\x15.paddle.ProgramConfig\"\x99\x01\n\rProgramConfig\x12\x12\n\nprogram_id\x18\x01 \x02(\t\x12\x1c\n\x14push_sparse_table_id\x18\x02 \x03(\x05\x12\x1b\n\x13push_dense_table_id\x18\x03 \x03(\x05\x12\x1c\n\x14pull_sparse_table_id\x18\x04 \x03(\x05\x12\x1b\n\x13pull_dense_table_id\x18\x05 \x03(\x05\"{\n\x13\x44\x65nseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x1b\n\x13\x64\x65nse_variable_name\x18\x02 \x03(\t\x12$\n\x1c\x64\x65nse_gradient_variable_name\x18\x03 \x03(\t\x12\x0f\n\x07\x66\x65\x61_dim\x18\x04 \x01(\x05\"z\n\x14SparseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x66\x65\x61ture_dim\x18\x02 \x01(\x05\x12\x10\n\x08slot_key\x18\x03 \x03(\t\x12\x12\n\nslot_value\x18\x04 \x03(\t\x12\x15\n\rslot_gradient\x18\x05 \x03(\t\"\x86\x01\n\x17\x44ownpourServerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\x12\x35\n\rservice_param\x18\x02 \x01(\x0b\x32\x1e.paddle.ServerServiceParameter\"\xd7\x01\n\x16ServerServiceParameter\x12*\n\x0cserver_class\x18\x01 \x01(\t:\x14\x44ownpourBrpcPsServer\x12*\n\x0c\x63lient_class\x18\x02 \x01(\t:\x14\x44ownpourBrpcPsClient\x12(\n\rservice_class\x18\x03 \x01(\t:\x11\x44ownpourPsService\x12\x1c\n\x11start_server_port\x18\x04 \x01(\r:\x01\x30\x12\x1d\n\x11server_thread_num\x18\x05 \x01(\r:\x02\x31\x32\"\xc0\x02\n\x0eTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x04\x12\x13\n\x0btable_class\x18\x02 \x01(\t\x12\x17\n\tshard_num\x18\x03 \x01(\x04:\x04\x31\x30\x30\x30\x12\x30\n\x08\x61\x63\x63\x65ssor\x18\x04 \x01(\x0b\x32\x1e.paddle.TableAccessorParameter\x12\x1f\n\x04type\x18\x05 \x01(\x0e\x32\x11.paddle.TableType\x12\x1f\n\x10\x63ompress_in_save\x18\x06 \x01(\x08:\x05\x66\x61lse\x12\'\n\x19\x65nable_sparse_table_cache\x18\x07 \x01(\x08:\x04true\x12(\n\x17sparse_table_cache_rate\x18\x08 \x01(\x01:\x07\x30.00055\x12\'\n\x1bsparse_table_cache_file_num\x18\t \x01(\r:\x02\x31\x36\"\xfc\x02\n\x16TableAccessorParameter\x12\x16\n\x0e\x61\x63\x63\x65ssor_class\x18\x01 \x01(\t\x12\x38\n\x10sparse_sgd_param\x18\x02 \x01(\x0b\x32\x1e.paddle.SparseSGDRuleParameter\x12\x36\n\x0f\x64\x65nse_sgd_param\x18\x03 \x01(\x0b\x32\x1d.paddle.DenseSGDRuleParameter\x12\x13\n\x07\x66\x65\x61_dim\x18\x04 \x01(\r:\x02\x31\x31\x12\x15\n\nembedx_dim\x18\x05 \x01(\r:\x01\x38\x12\x1c\n\x10\x65mbedx_threshold\x18\x06 \x01(\r:\x02\x31\x30\x12G\n\x17\x64ownpour_accessor_param\x18\x07 \x01(\x0b\x32&.paddle.DownpourTableAccessorParameter\x12\x45\n\x19table_accessor_save_param\x18\x08 \x03(\x0b\x32\".paddle.TableAccessorSaveParameter\"\x96\x02\n\x1e\x44ownpourTableAccessorParameter\x12\x19\n\x0cnonclk_coeff\x18\x01 \x01(\x02:\x03\x30.1\x12\x16\n\x0b\x63lick_coeff\x18\x02 \x01(\x02:\x01\x31\x12\x1b\n\x0e\x62\x61se_threshold\x18\x03 \x01(\x02:\x03\x31.5\x12\x1d\n\x0f\x64\x65lta_threshold\x18\x04 \x01(\x02:\x04\x30.25\x12\x1b\n\x0f\x64\x65lta_keep_days\x18\x05 \x01(\x02:\x02\x31\x36\x12#\n\x15show_click_decay_rate\x18\x06 \x01(\x02:\x04\x30.98\x12\x1d\n\x10\x64\x65lete_threshold\x18\x07 \x01(\x02:\x03\x30.8\x12$\n\x18\x64\x65lete_after_unseen_days\x18\x08 \x01(\x02:\x02\x33\x30\"S\n\x1aTableAccessorSaveParameter\x12\r\n\x05param\x18\x01 \x01(\r\x12\x11\n\tconverter\x18\x02 \x01(\t\x12\x13\n\x0b\x64\x65\x63onverter\x18\x03 \x01(\t\"e\n\x10PsRequestMessage\x12\x0e\n\x06\x63md_id\x18\x01 \x02(\r\x12\x10\n\x08table_id\x18\x02 \x01(\r\x12\x0e\n\x06params\x18\x03 \x03(\x0c\x12\x11\n\tclient_id\x18\x04 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\"\x85\x01\n\x16SparseSGDRuleParameter\x12\x1b\n\rlearning_rate\x18\x01 \x01(\x01:\x04\x30.05\x12\x18\n\rinitial_g2sum\x18\x02 \x01(\x01:\x01\x33\x12\x1d\n\rinitial_range\x18\x03 \x01(\x01:\x06\x30.0001\x12\x15\n\rweight_bounds\x18\x04 \x03(\x02\"\xe1\x01\n\x15\x44\x65nseSGDRuleParameter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12&\n\x04\x61\x64\x61m\x18\x02 \x01(\x0b\x32\x18.paddle.AdamSGDParameter\x12(\n\x05naive\x18\x03 \x01(\x0b\x32\x19.paddle.NaiveSGDParameter\x12,\n\x07summary\x18\x04 \x01(\x0b\x32\x1b.paddle.SummarySGDParameter\x12:\n\x0emoving_average\x18\x05 \x01(\x0b\x32\".paddle.MovingAverageRuleParameter\"\xac\x01\n\x10\x41\x64\x61mSGDParameter\x12\x1c\n\rlearning_rate\x18\x01 \x01(\x01:\x05\x35\x65-06\x12 \n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01:\x08\x30.999993\x12\x1e\n\x0e\x61\x64\x61_decay_rate\x18\x03 \x01(\x01:\x06\x30.9999\x12\x1a\n\x0b\x61\x64\x61_epsilon\x18\x04 \x01(\x01:\x05\x31\x65-08\x12\x1c\n\x0emom_decay_rate\x18\x05 \x01(\x01:\x04\x30.99\"J\n\x11NaiveSGDParameter\x12\x1d\n\rlearning_rate\x18\x01 \x01(\x01:\x06\x30.0002\x12\x16\n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01\";\n\x13SummarySGDParameter\x12$\n\x12summary_decay_rate\x18\x01 \x01(\x01:\x08\x30.999999\".\n\x1aMovingAverageRuleParameter\x12\x10\n\x08momentum\x18\x01 \x01(\x01\"I\n\x11PsResponseMessage\x12\x13\n\x08\x65rr_code\x18\x01 \x02(\x05:\x01\x30\x12\x11\n\x07\x65rr_msg\x18\x02 \x02(\t:\x00\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\xd5\x01\n\x11\x46sClientParameter\x12:\n\x07\x66s_type\x18\x01 \x01(\x0e\x32#.paddle.FsClientParameter.FsApiType:\x04HDFS\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x0e\n\x06passwd\x18\x04 \x01(\t\x12\x13\n\x0b\x62uffer_size\x18\x05 \x01(\x05\x12\x12\n\nhadoop_bin\x18\x33 \x01(\t\x12\x10\n\x08\x61\x66s_conf\x18\x65 \x01(\t\"\x1e\n\tFsApiType\x12\x08\n\x04HDFS\x10\x00\x12\x07\n\x03\x41\x46S\x10\x01*4\n\tTableType\x12\x13\n\x0fPS_SPARSE_TABLE\x10\x00\x12\x12\n\x0ePS_DENSE_TABLE\x10\x01*\x9c\x03\n\x07PsCmdID\x12\x17\n\x13PS_PULL_DENSE_TABLE\x10\x00\x12\x17\n\x13PS_PUSH_DENSE_TABLE\x10\x01\x12\x18\n\x14PS_PULL_SPARSE_TABLE\x10\x02\x12\x18\n\x14PS_PUSH_SPARSE_TABLE\x10\x03\x12\x13\n\x0fPS_SHRINK_TABLE\x10\x04\x12\x15\n\x11PS_SAVE_ONE_TABLE\x10\x05\x12\x15\n\x11PS_SAVE_ALL_TABLE\x10\x06\x12\x15\n\x11PS_LOAD_ONE_TABLE\x10\x07\x12\x15\n\x11PS_LOAD_ALL_TABLE\x10\x08\x12\x16\n\x12PS_CLEAR_ONE_TABLE\x10\t\x12\x16\n\x12PS_CLEAR_ALL_TABLE\x10\n\x12\x17\n\x13PS_PUSH_DENSE_PARAM\x10\x0b\x12\x12\n\x0ePS_STOP_SERVER\x10\x0c\x12\x1b\n\x17PS_SAVE_ONE_CACHE_TABLE\x10\r\x12\x1a\n\x16PS_GET_CACHE_THRESHOLD\x10\x0e\x12\x14\n\x10PS_CACHE_SHUFFLE\x10\x0f\x12\x0e\n\nPS_S2S_MSG\x10\x65\x32K\n\tPsService\x12>\n\x07service\x12\x18.paddle.PsRequestMessage\x1a\x19.paddle.PsResponseMessageB\x03\x80\x01\x01' )) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -290,9 +290,9 @@ _PSPARAMETER = _descriptor.Descriptor( number=301, type=11, cpp_type=10, - label=1, + label=3, has_default_value=False, - default_value=None, + default_value=[], message_type=None, enum_type=None, containing_type=None, diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 6813b76789b899b886a88fb3c6a9b3551976ee89..305043c58b6b25dad3653d326947532868367d94 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -559,7 +559,8 @@ class FleetUtil(object): hadoop_fs_name, hadoop_fs_ugi, hadoop_home="$HADOOP_HOME", - donefile_name="sparse_cache.meta"): + donefile_name="sparse_cache.meta", + **kwargs): """ write cache donefile @@ -572,6 +573,9 @@ class FleetUtil(object): hadoop_fs_ugi(str): hdfs/afs fs ugi hadoop_home(str): hadoop home, default is "$HADOOP_HOME" donefile_name(str): donefile name, default is "sparse_cache.meta" + kwargs(dict): user defined properties + file_num(int): cache file num + table_id(int): cache table id Examples: .. code-block:: python @@ -591,12 +595,14 @@ class FleetUtil(object): day = str(day) pass_id = str(pass_id) key_num = int(key_num) + file_num = kwargs.get("file_num", 16) + table_id = kwargs.get("table_id", 0) if pass_id != "-1": - suffix_name = "/%s/delta-%s/000_cache" % (day, pass_id) + suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id) model_path = output_path.rstrip("/") + suffix_name else: - suffix_name = "/%s/base/000_cache" % day + suffix_name = "/%s/base/%03d_cache" % (day, table_id) model_path = output_path.rstrip("/") + suffix_name if fleet.worker_index() == 0: @@ -610,8 +616,8 @@ class FleetUtil(object): self.rank0_error( \ "not write because %s already exists" % donefile_path) else: - meta_str = \ - "file_prefix:part\npart_num:16\nkey_num:%d\n" % key_num + meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \ + % (file_num, key_num) with open(donefile_name, "w") as f: f.write(meta_str) client.upload( @@ -743,7 +749,7 @@ class FleetUtil(object): fleet.save_persistables(None, model_path, mode=2) self.rank0_print("save_xbox_base_model done") - def save_cache_model(self, output_path, day, pass_id, mode=1): + def save_cache_model(self, output_path, day, pass_id, mode=1, **kwargs): """ save cache model @@ -752,6 +758,8 @@ class FleetUtil(object): day(str|int): training day pass_id(str|int): training pass id mode(str|int): save mode + kwargs(dict): user defined properties + table_id(int): table id to save cache Returns: key_num(int): cache key num @@ -767,14 +775,16 @@ class FleetUtil(object): day = str(day) pass_id = str(pass_id) mode = int(mode) + table_id = kwargs.get("table_id", 0) suffix_name = "/%s/delta-%s" % (day, pass_id) model_path = output_path.rstrip("/") + suffix_name self.rank0_print("going to save_cache_model %s" % model_path) - key_num = fleet.save_cache_model(None, model_path, mode=mode) + key_num = fleet.save_cache_model( + None, model_path, mode=mode, table_id=table_id) self.rank0_print("save_cache_model done") return key_num - def save_cache_base_model(self, output_path, day): + def save_cache_base_model(self, output_path, day, **kwargs): """ save cache model @@ -782,6 +792,8 @@ class FleetUtil(object): output_path(str): output path day(str|int): training day pass_id(str|int): training pass id + kwargs(dict): user defined properties + table_id(int): table id to save cache Returns: key_num(int): cache key num @@ -795,10 +807,12 @@ class FleetUtil(object): """ day = str(day) + table_id = kwargs.get("table_id", 0) suffix_name = "/%s/base" % day model_path = output_path.rstrip("/") + suffix_name self.rank0_print("going to save_cache_base_model %s" % model_path) - key_num = fleet.save_cache_model(None, model_path, mode=2) + key_num = fleet.save_cache_model( + None, model_path, mode=2, table_id=table_id) self.rank0_print("save_cache_base_model done") return key_num @@ -820,8 +834,9 @@ class FleetUtil(object): """ fleet._role_maker._barrier_worker() if fleet._role_maker.is_first_worker(): - tables = fleet._dist_desc.trainer_param.dense_table prog_id = str(id(program)) + tables = fleet._opt_info["program_id_to_worker"][prog_id].\ + get_desc().dense_table prog_conf = fleet._opt_info['program_configs'][prog_id] prog_tables = {} for key in prog_conf: @@ -844,6 +859,95 @@ class FleetUtil(object): int(table.table_id), var_name_list) fleet._role_maker._barrier_worker() + def save_paddle_inference_model(self, + executor, + scope, + program, + feeded_vars, + target_vars, + output_path, + day, + pass_id, + hadoop_fs_name, + hadoop_fs_ugi, + hadoop_home="$HADOOP_HOME", + save_combine=True): + """ + save paddle inference model, and upload to hdfs dnn_plugin path + + Args: + executor(Executor): fluid Executor + scope(Scope): fluid Scope + program(Program): fluid Program + feeded_vars(list[Variable]): feed vars + target_vars(list[variable]): fetch vars + output_path(str): hdfs/afs output path + day(str|int): training day + pass_id(str|int): training pass + hadoop_fs_name(str): hadoop fs name + hadoop_fs_ugi(str): hadoop fs ugi + hadoop_home(str): hadoop home, default is "$HADOOP_HOME" + save_combine(bool): whether to save in a file or seperate files, + default is True + + Examples: + .. code-block:: python + + from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil + fleet_util = FleetUtil() + fleet_util.save_paddle_inference_model(exe, + join_scope, + join_program, + feeded_vars, + target_vars, + "hdfs:/my/output/path/", + day=20190727, + pass_id=6, + hadoop_fs_name="xxx", + hadoop_fs_ugi="xxx,xxx") + """ + day = str(day) + pass_id = str(pass_id) + feeded_var_names = [i.name for i in feeded_vars] + model_name = "inference_model" + # pull dense before save + self.pull_all_dense_params(scope, program) + if fleet.worker_index() == 0: + with fluid.scope_guard(scope): + if save_combine: + fluid.io.save_inference_model( + dirname=model_name, + feeded_var_names=feeded_var_names, + target_vars=target_vars, + executor=executor, + main_program=program, + params_filename="params") + else: + fluid.io.save_inference_model( + dirname=model_name, + feeded_var_names=feeded_var_names, + target_vars=target_vars, + executor=executor, + main_program=program) + + configs = { + "fs.default.name": hadoop_fs_name, + "hadoop.job.ugi": hadoop_fs_ugi + } + client = HDFSClient(hadoop_home, configs) + + if pass_id == "-1": + dest = "%s/%s/base/dnn_plugin/" % (output_path, day) + else: + dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day, + pass_id) + if not client.is_exist(dest): + client.makedirs(dest) + + client.upload(dest, model_name) + + fleet._role_maker._barrier_worker() + def save_paddle_params(self, executor, scope, diff --git a/python/paddle/fluid/incubate/fleet/utils/hdfs.py b/python/paddle/fluid/incubate/fleet/utils/hdfs.py index 1d1714bf72d306680d0b92d2b618f66f0f62a2be..d5b550b493583336d0d1479adaa5fcec136e3faf 100644 --- a/python/paddle/fluid/incubate/fleet/utils/hdfs.py +++ b/python/paddle/fluid/incubate/fleet/utils/hdfs.py @@ -595,8 +595,7 @@ class HDFSClient(object): if not self.is_exist(dest_dir): self.makedirs(dest_dir) put_command = ["-put", local_dir, dest_dir] - returncode, output, errors = self.__run_hdfs_cmd(put_command, - retry_times) + returncode, output, errors = self.__run_hdfs_cmd(put_command) if returncode != 0: _logger.error("Put local dir: {} to HDFS dir: {} failed".format( local_dir, dest_dir)) diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index a9b46273bcbe61e0537df324e94ceaf23b447474..795fc58bdad0b170fcb623fe8ec29d3cb3c9a73f 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -282,6 +282,7 @@ class TestDataset(unittest.TestCase): dataset.wait_preload_done() fleet_ptr = fluid.core.Fleet() fleet_ptr.set_client2client_config(1, 1, 1) + fleet_ptr.get_cache_threshold(0) os.remove("./test_in_memory_dataset_run_a.txt") os.remove("./test_in_memory_dataset_run_b.txt") @@ -406,14 +407,28 @@ class TestDataset(unittest.TestCase): class TestDatasetWithDataLoader(TestDataset): + """ + Test Dataset With Data Loader class. TestCases. + """ + def setUp(self): + """ + Test Dataset With Data Loader, setUp. + """ self.use_data_loader = True self.epoch_num = 10 self.drop_last = False class TestDatasetWithFetchHandler(unittest.TestCase): + """ + Test Dataset With Fetch Handler. TestCases. + """ + def net(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] poolings = [] @@ -431,6 +446,13 @@ class TestDatasetWithFetchHandler(unittest.TestCase): return slots_vars, fc def get_dataset(self, inputs, files): + """ + Test Dataset With Fetch Handler. TestCases. + + Args: + inputs(list): inputs of get_dataset + files(list): files of get_dataset + """ dataset = fluid.DatasetFactory().create_dataset("QueueDataset") dataset.set_batch_size(32) dataset.set_thread(3) @@ -440,6 +462,9 @@ class TestDatasetWithFetchHandler(unittest.TestCase): return dataset def setUp(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ with open("test_queue_dataset_run_a.txt", "w") as f: data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n" @@ -453,10 +478,16 @@ class TestDatasetWithFetchHandler(unittest.TestCase): f.write(data) def tearDown(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_b.txt") def test_dataset_none(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ slots_vars, out = self.net() files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] dataset = self.get_dataset(slots_vars, files) @@ -476,6 +507,9 @@ class TestDatasetWithFetchHandler(unittest.TestCase): self.assertTrue(False) def test_infer_from_dataset(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ slots_vars, out = self.net() files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] dataset = self.get_dataset(slots_vars, files) @@ -491,6 +525,9 @@ class TestDatasetWithFetchHandler(unittest.TestCase): self.assertTrue(False) def test_fetch_handler(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ slots_vars, out = self.net() files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] dataset = self.get_dataset(slots_vars, files) @@ -515,5 +552,146 @@ class TestDatasetWithFetchHandler(unittest.TestCase): self.assertTrue(False) +class TestDataset2(unittest.TestCase): + """ TestCases for Dataset. """ + + def setUp(self): + """ TestCases for Dataset. """ + self.use_data_loader = False + self.epoch_num = 10 + self.drop_last = False + + def test_dataset_fleet(self): + """ + Testcase for InMemoryDataset from create to run. + """ + with open("test_in_memory_dataset2_run_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_in_memory_dataset2_run_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + with fluid.program_guard(train_program, startup_program): + slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data(\ + name=slot, shape=[1], dtype="float32", lod_level=1) + slots_vars.append(var) + fake_cost = \ + fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1]) + fake_cost = fluid.layers.mean(fake_cost) + with fluid.scope_guard(scope): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + try: + fleet.init(exe) + except ImportError as e: + print("warning: no mpi4py") + adam = fluid.optimizer.Adam(learning_rate=0.000005) + try: + adam = fleet.distributed_optimizer(adam) + adam.minimize([fake_cost], [scope]) + except AttributeError as e: + print("warning: no mpi") + except ImportError as e: + print("warning: no mpi4py") + exe.run(startup_program) + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist([ + "test_in_memory_dataset2_run_a.txt", + "test_in_memory_dataset2_run_b.txt" + ]) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + dataset.load_into_memory() + fleet._opt_info = None + fleet._fleet_ptr = None + + os.remove("./test_in_memory_dataset2_run_a.txt") + os.remove("./test_in_memory_dataset2_run_b.txt") + + def test_dataset_fleet2(self): + """ + Testcase for InMemoryDataset from create to run. + """ + with open("test_in_memory_dataset2_run2_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_in_memory_dataset2_run2_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + with fluid.program_guard(train_program, startup_program): + slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data(\ + name=slot, shape=[1], dtype="float32", lod_level=1) + slots_vars.append(var) + fake_cost = \ + fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1]) + fake_cost = fluid.layers.mean(fake_cost) + with fluid.scope_guard(scope): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + try: + fleet.init(exe) + except ImportError as e: + print("warning: no mpi4py") + adam = fluid.optimizer.Adam(learning_rate=0.000005) + try: + adam = fleet.distributed_optimizer( + adam, + strategy={ + "fs_uri": "fs_uri_xxx", + "fs_user": "fs_user_xxx", + "fs_passwd": "fs_passwd_xxx", + "fs_hadoop_bin": "fs_hadoop_bin_xxx" + }) + adam.minimize([fake_cost], [scope]) + except AttributeError as e: + print("warning: no mpi") + except ImportError as e: + print("warning: no mpi4py") + exe.run(startup_program) + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_batch_size(32) + dataset.set_thread(3) + dataset.set_filelist([ + "test_in_memory_dataset2_run2_a.txt", + "test_in_memory_dataset2_run2_b.txt" + ]) + dataset.set_pipe_command("cat") + dataset.set_use_var(slots_vars) + dataset.load_into_memory() + fleet._opt_info = None + fleet._fleet_ptr = None + + os.remove("./test_in_memory_dataset2_run2_a.txt") + os.remove("./test_in_memory_dataset2_run2_b.txt") + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_downpoursgd.py b/python/paddle/fluid/tests/unittests/test_downpoursgd.py index 51b0b8d4dda40409f637e709cc0b85a33b8ce882..9dbea19ca556460da7fedb13217f8b20d58a15c4 100644 --- a/python/paddle/fluid/tests/unittests/test_downpoursgd.py +++ b/python/paddle/fluid/tests/unittests/test_downpoursgd.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Testcases for Downpour.""" from __future__ import print_function @@ -25,15 +26,19 @@ import sys from op_test import OpTest from paddle.fluid.trainer_desc import DistMultiTrainer from paddle.fluid.device_worker import DownpourSGD +from paddle.fluid.incubate.fleet.parameter_server.pslib.node import DownpourWorker from google.protobuf import text_format import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib class TestListenAndServOp(OpTest): + """TestListenAndServOp.""" + def setUp(self): pass def test_device_work_use_cvm(self): + """test device work use_cvm.""" if sys.platform == 'win32' or sys.platform == 'sys.platform': pass else: @@ -77,6 +82,9 @@ class TestListenAndServOp(OpTest): opt_info["scale_datanorm"] = -1 opt_info["dump_slot"] = False opt_info["stat_var_names"] = [] + worker = DownpourWorker(None) + worker.get_desc().CopyFrom(ps_param.trainer_param[0]) + opt_info["program_id_to_worker"] = {program_id: worker} main_program._fleet_opt = opt_info trainer = DistMultiTrainer() @@ -90,6 +98,7 @@ class TestListenAndServOp(OpTest): os.system(cmd) def test_device_work(self): + """test devicve worker.""" if sys.platform == 'win32' or sys.platform == 'sys.platform': pass else: @@ -133,6 +142,9 @@ class TestListenAndServOp(OpTest): opt_info["scale_datanorm"] = -1 opt_info["dump_slot"] = False opt_info["stat_var_names"] = [] + worker = DownpourWorker(None) + worker.get_desc().CopyFrom(ps_param.trainer_param[0]) + opt_info["program_id_to_worker"] = {program_id: worker} main_program._fleet_opt = opt_info trainer = DistMultiTrainer() diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index 4ee98d8b85ad8d58e354eb1d25a0c70e7d66be1d..47475896df3bb72fe075821ebede3d8d4c81e0c1 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Defination of trainers.""" import sys from os import path @@ -116,6 +117,78 @@ class TrainerDesc(object): self.proto_desc.adjust_ins_weight_config.ins_weight_slot = \ config_dict.get("ins_weight_slot", "") + def _set_copy_table_config(self, config_dict): + config = self.proto_desc.copy_table_config + config.need_copy = config_dict.get("need_copy", False) + config.batch_num = config_dict.get("batch_num", 100) + + src_sparse_tables = config_dict.get("src_sparse_tables", []) + if not isinstance(src_sparse_tables, list): + src_sparse_tables = [src_sparse_tables] + dest_sparse_tables = config_dict.get("dest_sparse_tables", []) + if not isinstance(dest_sparse_tables, list): + dest_sparse_tables = [dest_sparse_tables] + if len(src_sparse_tables) != len(dest_sparse_tables): + raise ValueError( + "len(src_sparse_tables) != len(dest_sparse_tables)," \ + " %s vs %s" % (len(src_sparse_tables), \ + len(dest_sparse_tables))) + for i in src_sparse_tables: + config.src_sparse_tables.append(i) + for i in dest_sparse_tables: + config.dest_sparse_tables.append(i) + + src_dense_tables = config_dict.get("src_dense_tables", []) + if not isinstance(src_dense_tables, list): + src_dense_tables = [src_dense_tables] + dest_dense_tables = config_dict.get("dest_dense_tables", []) + if not isinstance(dest_dense_tables, list): + dest_dense_tables = [dest_dense_tables] + if len(src_dense_tables) != len(dest_dense_tables): + raise ValueError( + "len(src_dense_tables) != len(dest_dense_tables)," \ + " %s vs %s" % (len(src_dense_tables), \ + len(dest_dense_tables))) + for i in src_dense_tables: + config.src_dense_tables.append(i) + for i in dest_dense_tables: + config.dest_dense_tables.append(i) + + # user can also specify dense variables to copy, + # instead of copy dense table + src_var_list = config_dict.get("src_var_list", []) + if not isinstance(src_var_list, list): + src_var_list = [src_var_list] + dest_var_list = config_dict.get("dest_var_list", []) + if not isinstance(dest_var_list, list): + dest_var_list = [dest_var_list] + if len(src_var_list) != len(dest_var_list): + raise ValueError( + "len(src_var_list) != len(dest_var_list), %s vs" \ + " %s" % (len(src_var_list), len(dest_var_list))) + for i in src_var_list: + config.src_var_list.append(i) + for i in dest_var_list: + config.dest_var_list.append(i) + + dependency_map = config_dict.get("dependency_map", {}) + for key in dependency_map: + m = config.table_denpendency_map.add() + m.key = key + values = dependency_map[key] + if not isinstance(values, list): + values = [values] + if len(values) != 1: + raise ValueError("dependency len %s != 1" % len(values)) + for value in values: + m.values.append(value) + config.dense_pull_after_copy = \ + config_dict.get("dense_pull_after_copy", True) + config.enable_dependency = \ + config_dict.get("enable_dependency", False) + config.sparse_copy_by_feasign = \ + config_dict.get("sparse_copy_by_feasign", True) + def _desc(self): from google.protobuf import text_format return self.proto_desc.SerializeToString() @@ -147,6 +220,11 @@ class MultiTrainer(TrainerDesc): class DistMultiTrainer(TrainerDesc): + """ + Implement of DistMultiTrainer. + It's for Distributed training. + """ + def __init__(self): super(DistMultiTrainer, self).__init__() pass @@ -166,6 +244,11 @@ class DistMultiTrainer(TrainerDesc): class PipelineTrainer(TrainerDesc): + """ + Implement of PipelineTrainer. + It's for Pipeline. + """ + def __init__(self): super(PipelineTrainer, self).__init__() pass diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index f6f794a2428a6fc13c0a1a6a1dbb640c77390eb3..70154e383a9ed0ecff7ada928992ec1f36a65ff1 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Defination of TrainerFactory.""" import threading import time @@ -24,6 +25,12 @@ __all__ = ["TrainerFactory", "FetchHandler", "FetchHandlerMonitor"] class TrainerFactory(object): + """ + Create trainer and device worker. + If opt_info is not None, it will get configs from opt_info, + otherwise create MultiTrainer and Hogwild. + """ + def __init__(self): pass @@ -43,23 +50,44 @@ class TrainerFactory(object): if "fleet_desc" in opt_info: device_worker._set_fleet_desc(opt_info["fleet_desc"]) trainer._set_fleet_desc(opt_info["fleet_desc"]) - trainer._set_use_cvm(opt_info["use_cvm"]) - trainer._set_scale_datanorm(opt_info["scale_datanorm"]) - trainer._set_dump_slot(opt_info["dump_slot"]) - trainer._set_mpi_rank(opt_info["mpi_rank"]) - trainer._set_mpi_size(opt_info["mpi_size"]) - trainer._set_dump_fields(opt_info["dump_fields"]) - trainer._set_dump_fields_path(opt_info["dump_fields_path"]) - trainer._set_dump_file_num(opt_info["dump_file_num"]) - trainer._set_dump_converter(opt_info["dump_converter"]) - trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"]) - trainer._set_check_nan_var_names(opt_info[ - "check_nan_var_names"]) + if opt_info.get("use_cvm") is not None: + trainer._set_use_cvm(opt_info["use_cvm"]) + if opt_info.get("scale_datanorm") is not None: + trainer._set_scale_datanorm(opt_info["scale_datanorm"]) + if opt_info.get("dump_slot") is not None: + trainer._set_dump_slot(opt_info["dump_slot"]) + if opt_info.get("mpi_rank") is not None: + trainer._set_mpi_rank(opt_info["mpi_rank"]) + if opt_info.get("mpi_size") is not None: + trainer._set_mpi_size(opt_info["mpi_size"]) + if opt_info.get("dump_fields") is not None: + trainer._set_dump_fields(opt_info["dump_fields"]) + if opt_info.get("dump_fields_path") is not None: + trainer._set_dump_fields_path(opt_info["dump_fields_path"]) + if opt_info.get("dump_file_num") is not None: + trainer._set_dump_file_num(opt_info["dump_file_num"]) + if opt_info.get("dump_converter") is not None: + trainer._set_dump_converter(opt_info["dump_converter"]) + if opt_info.get("adjust_ins_weight") is not None: + trainer._set_adjust_ins_weight(opt_info[ + "adjust_ins_weight"]) + if opt_info.get("copy_table") is not None: + trainer._set_copy_table_config(opt_info["copy_table"]) + if opt_info.get("check_nan_var_names") is not None: + trainer._set_check_nan_var_names(opt_info[ + "check_nan_var_names"]) + if opt_info.get("dump_param") is not None: + trainer._set_dump_param(opt_info["dump_param"]) trainer._set_device_worker(device_worker) return trainer class FetchHandlerMonitor(object): + """ + Defination of FetchHandlerMonitor class, + it's for fetch handler. + """ + def __init__(self, scope, handler): self.fetch_instance = handler self.fetch_thread = threading.Thread( @@ -68,11 +96,21 @@ class FetchHandlerMonitor(object): self.running = False def start(self): + """ + start monitor, + it will start a monitor thread. + """ self.running = True self.fetch_thread.setDaemon(True) self.fetch_thread.start() def handler_decorator(self, fetch_scope, fetch_handler): + """ + decorator of handler, + Args: + fetch_scope(Scope): fetch scope + fetch_handler(Handler): fetch handler + """ fetch_target_names = self.fetch_instance.fetch_target_names period_secs = self.fetch_instance.period_secs