未验证 提交 072eb5b6 编写于 作者: X xujiaqi01 提交者: GitHub

cherry-pick1.6 fix cache table bug, add save_paddle_inference_model, fix hdfs util bug (#21339)

* fix cache table bug, add save_paddle_inference_model, fix hdfs util bug (#21052)

* fix cache table bug
* add save_paddle_inference_model
* fix hdfs util bug
* test=develop

* fix several sparse table issuses (#20686)

* no longer need to define all embedding layers (no one less) of all slots in each program. make trainer_param repeated in ps.proto.
* add find_distributed_lookup_table_grads instead of hard code GRAD
* support embedding stop gradient. push sparse has error before fix this.* 
* fix fill sparse, skip slots which do not have embedding. each slot's embedding in a sparse table should be used in all training programs before fix this.
* fix pull sparse, skip slots which do not have embedding.
* fix collect feasign label info, skip slots which do not have embedding.
* support when there are multi sparse tables in one or multi training programs, each program can pull/push its own related sparse tables instead of all sparse tables.
* test=develop

* add copy table (#21086)

* copy some feasigns and corresponding embeddings from one sparse table to another
* copy all feasigns and corresponding embeddings from one sparse table to another
* copy all dense params from one table to another
* copy some local vars to other local vars

* fix fs_client_param bug (#21212)

* fix fs_client_param bug, user can set this config through fleet_desc_file or fleet config
* test=develop

* fix fleet util bug (#21254)

* fix fleet util bug in save paddle inference model
* test=develop
上级 97bbab47
......@@ -21,6 +21,9 @@ limitations under the License. */
#include <mutex> // NOLINT
#include <string>
#include <thread> // NOLINT
#include <unordered_map> // NOLINT
#include <unordered_set> // NOLINT
#include <utility> // NOLINT
#include <vector>
#include "paddle/fluid/framework/data_feed.h"
......@@ -194,6 +197,10 @@ class DownpourWorker : public HogwildWorker {
void PushGradients();
void CollectLabelInfo(size_t table_id);
void AdjustInsWeight();
void DumpParam();
void CopySparseTable();
void CopyDenseTable();
void CopyDenseVars();
private:
bool need_to_push_dense_;
......@@ -211,6 +218,8 @@ class DownpourWorker : public HogwildWorker {
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
// actually pushed feasign of each table
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
// feasign
std::map<uint64_t, std::vector<uint64_t>> features_;
......@@ -232,6 +241,12 @@ class DownpourWorker : public HogwildWorker {
std::vector<float> nid_show_;
// check nan and inf during training
std::vector<std::string> check_nan_var_names_;
// copy table
CopyTableConfig copy_table_config_;
std::map<uint64_t, uint64_t> table_dependency_;
std::vector<std::pair<uint64_t, uint64_t>> copy_sparse_tables_;
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
};
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
......
......@@ -44,6 +44,7 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
sparse_grad_names_[table_id][j] = table.sparse_grad_name(j);
}
label_var_name_[table_id] = table.label_var_name();
sparse_push_keys_[table_id] = std::vector<uint64_t>();
}
for (int i = 0; i < param_.dense_table_size(); ++i) {
......@@ -84,6 +85,29 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
for (int i = 0; i < desc.check_nan_var_names_size(); ++i) {
check_nan_var_names_.push_back(desc.check_nan_var_names(i));
}
copy_table_config_ = desc.copy_table_config();
for (int i = 0; i < copy_table_config_.src_sparse_tables_size(); ++i) {
uint64_t src_table = copy_table_config_.src_sparse_tables(i);
uint64_t dest_table = copy_table_config_.dest_sparse_tables(i);
VLOG(3) << "copy_sparse_tables_ push back " << src_table << "->"
<< dest_table;
copy_sparse_tables_.push_back(std::make_pair(src_table, dest_table));
}
for (int i = 0; i < copy_table_config_.src_dense_tables_size(); ++i) {
uint64_t src_table = copy_table_config_.src_dense_tables(i);
uint64_t dest_table = copy_table_config_.dest_dense_tables(i);
VLOG(3) << "copy_dense_tables_ push back " << src_table << "->"
<< dest_table;
copy_dense_tables_.push_back(std::make_pair(src_table, dest_table));
}
for (auto& m : copy_table_config_.table_denpendency_map()) {
if (sparse_key_names_.find(m.key()) != sparse_key_names_.end()) {
// currently only support one dependency
for (auto& value : m.values()) {
table_dependency_[m.key()] = value;
}
}
}
}
void DownpourWorker::SetChannelWriter(ChannelObject<std::string>* queue) {
......@@ -191,6 +215,14 @@ void DownpourWorker::CollectLabelInfo(size_t table_idx) {
LoDTensor* tensor = fea_var->GetMutable<LoDTensor>();
CHECK(tensor != nullptr) << "tensor of var "
<< sparse_key_names_[table_id][i] << " is null";
// skip slots which do not have embedding
Variable* emb_var =
thread_scope_->FindVar(sparse_value_names_[table_id][i]);
if (emb_var == nullptr) {
continue;
}
int64_t* ids = tensor->data<int64_t>();
size_t fea_idx = 0;
// tensor->lod()[0].size() == batch_size + 1
......@@ -237,6 +269,9 @@ void DownpourWorker::FillSparseValue(size_t table_idx) {
int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel();
Variable* var_emb = thread_scope_->FindVar(emb_slot_name);
if (var_emb == nullptr) {
continue;
}
LoDTensor* tensor_emb = var_emb->GetMutable<LoDTensor>();
float* ptr = tensor_emb->mutable_data<float>({len, table.emb_dim()},
platform::CPUPlace());
......@@ -368,6 +403,102 @@ void DownpourWorker::AdjustInsWeight() {
#endif
}
void DownpourWorker::CopySparseTable() {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
int64_t src_table = copy_sparse_tables_[i].first;
int64_t dest_table = copy_sparse_tables_[i].second;
int32_t feanum = 0;
if (src_table == dest_table) {
continue;
} else if (!copy_table_config_.sparse_copy_by_feasign()) {
if (feasign_set_.find(src_table) == feasign_set_.end()) {
continue;
} else if (feasign_set_[src_table].size() == 0) {
continue;
}
feanum = fleet_ptr_->CopyTable(src_table, dest_table);
} else {
std::vector<uint64_t> fea_vec(feasign_set_[src_table].begin(),
feasign_set_[src_table].end());
feanum = fleet_ptr_->CopyTableByFeasign(src_table, dest_table, fea_vec);
fea_vec.clear();
std::vector<uint64_t>().swap(fea_vec);
}
VLOG(3) << "copy feasign from table " << src_table << " to table "
<< dest_table << ", feasign num=" << feanum;
feasign_set_[src_table].clear();
std::unordered_set<uint64_t>().swap(feasign_set_[src_table]);
}
feasign_set_.clear();
}
void DownpourWorker::CopyDenseTable() {
if (thread_id_ != 0) {
return;
}
thread_local std::vector<std::future<int32_t>> pull_dense_status;
for (size_t i = 0; i < copy_dense_tables_.size(); ++i) {
uint64_t src_table = copy_dense_tables_[i].first;
uint64_t dest_table = copy_dense_tables_[i].second;
if (src_table == dest_table) {
continue;
}
int32_t dim = fleet_ptr_->CopyTable(src_table, dest_table);
VLOG(3) << "copy param from table " << src_table << " to table "
<< dest_table << ", dim=" << dim;
if (copy_table_config_.dense_pull_after_copy()) {
VLOG(3) << "dense pull after copy, table=" << dest_table;
pull_dense_status.resize(0);
fleet_ptr_->PullDenseVarsAsync(*root_scope_, dest_table,
dense_value_names_[dest_table],
&pull_dense_status);
for (auto& t : pull_dense_status) {
t.wait();
auto status = t.get();
if (status != 0) {
LOG(WARNING) << "pull dense after copy table failed,"
<< " table=" << dest_table;
}
}
}
}
}
void DownpourWorker::CopyDenseVars() {
if (thread_id_ != 0) {
return;
}
for (int i = 0; i < copy_table_config_.src_var_list_size(); ++i) {
auto& src_var_name = copy_table_config_.src_var_list(i);
auto& dest_var_name = copy_table_config_.dest_var_list(i);
if (src_var_name == dest_var_name) {
continue;
}
VLOG(3) << "copy dense var from " << src_var_name << " to "
<< dest_var_name;
Variable* src_var = thread_scope_->FindVar(src_var_name);
CHECK(src_var != nullptr) << src_var_name << " not found"; // NOLINT
LoDTensor* src_tensor = src_var->GetMutable<LoDTensor>();
CHECK(src_tensor != nullptr) << src_var_name
<< " tensor is null"; // NOLINT
float* src_data = src_tensor->data<float>();
Variable* dest_var = thread_scope_->FindVar(dest_var_name);
CHECK(dest_var != nullptr) << dest_var_name << " not found"; // NOLINT
LoDTensor* dest_tensor = dest_var->GetMutable<LoDTensor>();
CHECK(dest_tensor != nullptr) << dest_var_name
<< " tensor is null"; // NOLINT
float* dest_data = dest_tensor->data<float>();
CHECK(src_tensor->numel() == dest_tensor->numel())
<< "tensor numel not equal," << src_tensor->numel() << " vs "
<< dest_tensor->numel();
for (int i = 0; i < src_tensor->numel(); i++) {
dest_data[i] = src_data[i];
}
}
}
void DownpourWorker::TrainFilesWithProfiler() {
VLOG(3) << "Begin to train files with profiler";
platform::SetNumThreads(1);
......@@ -401,6 +532,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
double fill_sparse_time = 0.0;
double push_sparse_time = 0.0;
double push_dense_time = 0.0;
double copy_table_time = 0.0;
int cur_batch;
int batch_cnt = 0;
uint64_t total_inst = 0;
......@@ -409,6 +541,27 @@ void DownpourWorker::TrainFilesWithProfiler() {
timeline.Pause();
read_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
timeline.Start();
if (copy_table_config_.need_copy()) {
VLOG(3) << "copy_sparse_tables_.size " << copy_sparse_tables_.size();
if (copy_table_config_.sparse_copy_by_feasign()) {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
uint64_t tid = copy_sparse_tables_[i].first;
feasign_set_[tid].insert(sparse_push_keys_[tid].begin(),
sparse_push_keys_[tid].end());
}
}
if (batch_cnt % copy_table_config_.batch_num() == 0) {
CopySparseTable();
CopyDenseTable();
CopyDenseVars();
}
}
timeline.Pause();
copy_table_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
VLOG(3) << "program config size: " << param_.program_config_size();
for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
++i) {
......@@ -422,9 +575,9 @@ void DownpourWorker::TrainFilesWithProfiler() {
}
}
timeline.Start();
fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid,
sparse_key_names_[tid], &features_[tid],
&feature_values_[tid], table.fea_dim());
fleet_ptr_->PullSparseVarsSync(
*thread_scope_, tid, sparse_key_names_[tid], &features_[tid],
&feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]);
timeline.Pause();
pull_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
......@@ -504,7 +657,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_);
dump_slot_, &sparse_push_keys_[tid]);
timeline.Pause();
push_sparse_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
......@@ -605,6 +758,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
collect_label_time / batch_cnt);
fprintf(stderr, "adjust ins weight time: %fs\n",
adjust_ins_weight_time / batch_cnt);
fprintf(stderr, "copy table time: %fs\n", copy_table_time / batch_cnt);
fprintf(stderr, "mean read time: %fs\n", read_time / batch_cnt);
fprintf(stderr, "IO percent: %f\n", read_time / total_time * 100);
fprintf(stderr, "op run percent: %f\n", op_sum_time / total_time * 100);
......@@ -612,6 +766,8 @@ void DownpourWorker::TrainFilesWithProfiler() {
pull_sparse_time / total_time * 100);
fprintf(stderr, "adjust ins weight time percent: %f\n",
adjust_ins_weight_time / total_time * 100);
fprintf(stderr, "copy table time percent: %f\n",
copy_table_time / total_time * 100);
fprintf(stderr, "collect label time percent: %f\n",
collect_label_time / total_time * 100);
fprintf(stderr, "fill sparse time percent: %f\n",
......@@ -625,6 +781,11 @@ void DownpourWorker::TrainFilesWithProfiler() {
}
timeline.Start();
}
if (copy_table_config_.need_copy()) {
CopySparseTable();
CopyDenseTable();
CopyDenseVars();
}
}
void DownpourWorker::TrainFiles() {
......@@ -634,6 +795,20 @@ void DownpourWorker::TrainFiles() {
int batch_cnt = 0;
int cur_batch;
while ((cur_batch = device_reader_->Next()) > 0) {
if (copy_table_config_.need_copy()) {
if (copy_table_config_.sparse_copy_by_feasign()) {
for (size_t i = 0; i < copy_sparse_tables_.size(); ++i) {
uint64_t tid = copy_sparse_tables_[i].first;
feasign_set_[tid].insert(sparse_push_keys_[tid].begin(),
sparse_push_keys_[tid].end());
}
}
if (batch_cnt % copy_table_config_.batch_num() == 0) {
CopySparseTable();
CopyDenseTable();
CopyDenseVars();
}
}
// pull sparse here
for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size();
++i) {
......@@ -646,9 +821,9 @@ void DownpourWorker::TrainFiles() {
break;
}
}
fleet_ptr_->PullSparseVarsSync(*thread_scope_, tid,
sparse_key_names_[tid], &features_[tid],
&feature_values_[tid], table.fea_dim());
fleet_ptr_->PullSparseVarsSync(
*thread_scope_, tid, sparse_key_names_[tid], &features_[tid],
&feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]);
CollectLabelInfo(i);
FillSparseValue(i);
auto nid_iter = std::find(sparse_value_names_[tid].begin(),
......@@ -707,7 +882,7 @@ void DownpourWorker::TrainFiles() {
*thread_scope_, tid, features_[tid], feature_labels_[tid],
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
dump_slot_);
dump_slot_, &sparse_push_keys_[tid]);
}
}
......@@ -811,6 +986,11 @@ void DownpourWorker::TrainFiles() {
if (need_dump_field_) {
writer_.Flush();
}
if (copy_table_config_.need_copy()) {
CopySparseTable();
CopyDenseTable();
CopyDenseVars();
}
}
} // end namespace framework
......
......@@ -40,28 +40,6 @@ const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100;
std::shared_ptr<FleetWrapper> FleetWrapper::s_instance_ = NULL;
bool FleetWrapper::is_initialized_ = false;
#ifdef PADDLE_WITH_PSLIB
template <class AR>
paddle::ps::Archive<AR>& operator<<(paddle::ps::Archive<AR>& ar,
const MultiSlotType& ins) {
ar << ins.GetType();
ar << ins.GetOffset();
ar << ins.GetFloatData();
ar << ins.GetUint64Data();
return ar;
}
template <class AR>
paddle::ps::Archive<AR>& operator>>(paddle::ps::Archive<AR>& ar,
MultiSlotType& ins) {
ar >> ins.MutableType();
ar >> ins.MutableOffset();
ar >> ins.MutableFloatData();
ar >> ins.MutableUint64Data();
return ar;
}
#endif
#ifdef PADDLE_WITH_PSLIB
std::shared_ptr<paddle::distributed::PSlib> FleetWrapper::pslib_ptr_ = NULL;
#endif
......@@ -159,14 +137,16 @@ void FleetWrapper::CreateClient2ClientConnection() {
void FleetWrapper::PullSparseVarsSync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names, std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values, int fea_value_dim) {
std::vector<std::vector<float>>* fea_values, int fea_value_dim,
const std::vector<std::string>& var_emb_names) {
#ifdef PADDLE_WITH_PSLIB
std::vector<::std::future<int32_t>> pull_sparse_status;
pull_sparse_status.resize(0);
fea_keys->clear();
fea_keys->resize(0);
fea_keys->reserve(MAX_FEASIGN_NUM);
for (auto name : var_names) {
for (size_t var_index = 0; var_index < var_names.size(); ++var_index) {
const std::string& name = var_names[var_index];
Variable* var = scope.FindVar(name);
if (var == nullptr) {
continue;
......@@ -175,6 +155,14 @@ void FleetWrapper::PullSparseVarsSync(
CHECK(tensor != nullptr) << "tensor of var " << name << " is null";
int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel();
// skip slots which do not have embedding
const std::string& emb_name = var_emb_names[var_index];
Variable* emb_var = scope.FindVar(emb_name);
if (emb_var == nullptr) {
continue;
}
for (auto i = 0u; i < len; ++i) {
if (ids[i] == 0u) {
continue;
......@@ -314,7 +302,8 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot) {
const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys) {
#ifdef PADDLE_WITH_PSLIB
int offset = 2;
int slot_offset = 0;
......@@ -332,12 +321,15 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
}
CHECK_GE(grad_dim, 0);
sparse_push_keys->clear();
sparse_push_keys->reserve(fea_keys.size() + 1);
push_values->resize(fea_keys.size() + 1);
for (auto& t : *push_values) {
t.resize(emb_dim + offset + slot_offset);
}
uint64_t fea_idx = 0u;
for (size_t i = 0; i < sparse_key_names.size(); ++i) {
for (size_t i = 0;
i < sparse_key_names.size() && i < sparse_grad_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
......@@ -376,6 +368,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
g += emb_dim;
continue;
}
sparse_push_keys->push_back(ids[id_idx]);
CHECK(fea_idx < (*push_values).size());
CHECK(fea_idx < fea_labels.size());
......@@ -396,17 +389,43 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
fea_idx++;
}
}
CHECK(fea_idx == fea_keys.size()) << "fea_idx: " << fea_idx
<< "features size: " << fea_keys.size();
// slots whose embedding has been stop gradient or
// not involved in forward-backward
uint64_t no_grad_fea_num = 0u;
for (size_t i = sparse_grad_names.size(); i < sparse_key_names.size(); ++i) {
Variable* var = scope.FindVar(sparse_key_names[i]);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr) {
LOG(ERROR) << "tensor of var[" << sparse_key_names[i] << "] is null";
exit(-1);
}
int len = tensor->numel();
int64_t* ids = tensor->data<int64_t>();
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
continue;
}
++no_grad_fea_num;
}
}
CHECK(fea_idx + no_grad_fea_num == fea_keys.size())
<< "fea_idx: " << fea_idx << " no_grad_fea_num: " << no_grad_fea_num
<< " features size: " << fea_keys.size();
CHECK(fea_idx == sparse_push_keys->size());
if (fea_idx == 0) {
return;
}
std::vector<float*> push_g_vec;
for (auto i = 0u; i < fea_keys.size(); ++i) {
for (auto i = 0u; i < sparse_push_keys->size(); ++i) {
push_g_vec.push_back((*push_values)[i].data());
}
auto status = pslib_ptr_->_worker_ptr->push_sparse(
table_id, fea_keys.data(), (const float**)push_g_vec.data(),
fea_keys.size());
table_id, sparse_push_keys->data(), (const float**)push_g_vec.data(),
sparse_push_keys->size());
push_sparse_status->push_back(std::move(status));
#endif
}
......@@ -530,12 +549,12 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) {
#endif
}
double FleetWrapper::GetCacheThreshold() {
double FleetWrapper::GetCacheThreshold(int table_id) {
#ifdef PADDLE_WITH_PSLIB
double cache_threshold = 0.0;
auto ret = pslib_ptr_->_worker_ptr->flush();
ret.wait();
ret = pslib_ptr_->_worker_ptr->get_cache_threshold(0, cache_threshold);
ret = pslib_ptr_->_worker_ptr->get_cache_threshold(table_id, cache_threshold);
ret.wait();
if (cache_threshold < 0) {
LOG(ERROR) << "get cache threshold failed";
......@@ -569,7 +588,8 @@ void FleetWrapper::CacheShuffle(int table_id, const std::string& path,
int32_t FleetWrapper::SaveCache(int table_id, const std::string& path,
const int mode) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->save_cache(0, path, std::to_string(mode));
auto ret =
pslib_ptr_->_worker_ptr->save_cache(table_id, path, std::to_string(mode));
ret.wait();
int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) {
......@@ -688,40 +708,6 @@ std::future<int32_t> FleetWrapper::SendClientToClientMsg(
return std::future<int32_t>();
}
template <typename T>
void FleetWrapper::Serialize(const std::vector<T*>& t, std::string* str) {
#ifdef PADDLE_WITH_PSLIB
paddle::ps::BinaryArchive ar;
for (size_t i = 0; i < t.size(); ++i) {
ar << *(t[i]);
}
*str = std::string(ar.buffer(), ar.length());
#else
VLOG(0) << "FleetWrapper::Serialize does nothing when no pslib";
#endif
}
template <typename T>
void FleetWrapper::Deserialize(std::vector<T>* t, const std::string& str) {
#ifdef PADDLE_WITH_PSLIB
if (str.length() == 0) {
return;
}
paddle::ps::BinaryArchive ar;
ar.set_read_buffer(const_cast<char*>(str.c_str()), str.length(), nullptr);
if (ar.cursor() == ar.finish()) {
return;
}
while (ar.cursor() < ar.finish()) {
t->push_back(ar.get<T>());
}
CHECK(ar.cursor() == ar.finish());
VLOG(3) << "Deserialize size " << t->size();
#else
VLOG(0) << "FleetWrapper::Deserialize does nothing when no pslib";
#endif
}
std::default_random_engine& FleetWrapper::LocalRandomEngine() {
struct engine_wrapper_t {
std::default_random_engine engine;
......@@ -740,10 +726,43 @@ std::default_random_engine& FleetWrapper::LocalRandomEngine() {
return r.engine;
}
template void FleetWrapper::Serialize<std::vector<MultiSlotType>>(
const std::vector<std::vector<MultiSlotType>*>&, std::string*);
template void FleetWrapper::Deserialize<std::vector<MultiSlotType>>(
std::vector<std::vector<MultiSlotType>>*, const std::string&);
int32_t FleetWrapper::CopyTable(const uint64_t src_table_id,
const uint64_t dest_table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->copy_table(src_table_id, dest_table_id);
ret.wait();
int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) {
LOG(ERROR) << "copy table failed";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
return feasign_cnt;
#else
VLOG(0) << "FleetWrapper::CopyTable does nothing when no pslib";
return 0;
#endif
}
int32_t FleetWrapper::CopyTableByFeasign(
const uint64_t src_table_id, const uint64_t dest_table_id,
const std::vector<uint64_t>& feasign_list) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->copy_table_by_feasign(
src_table_id, dest_table_id, feasign_list.data(), feasign_list.size());
ret.wait();
int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) {
LOG(ERROR) << "copy table by feasign failed";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
return feasign_cnt;
#else
VLOG(0) << "FleetWrapper::CopyTableByFeasign does nothing when no pslib";
return 0;
#endif
}
} // end namespace framework
} // end namespace paddle
......@@ -67,31 +67,38 @@ class FleetWrapper {
client2client_max_retry_ = 3;
}
// set client to client communication config
void SetClient2ClientConfig(int request_timeout_ms, int connect_timeout_ms,
int max_retry);
// Pull sparse variables from server in Sync mode
// Param<in>: scope, table_id, var_names, fea_keys
// Pull sparse variables from server in sync mode
// Param<in>: scope, table_id, var_names, fea_keys, fea_dim
// Param<out>: fea_values
void PullSparseVarsSync(const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<uint64_t>* fea_keys,
std::vector<std::vector<float>>* fea_values,
int fea_dim);
int fea_dim,
const std::vector<std::string>& var_emb_names);
// pull dense variables from server in sync mod
void PullDenseVarsSync(const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names);
// pull dense variables from server in async mod
// Param<in>: scope, table_id, var_names
// Param<out>: pull_dense_status
void PullDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names,
std::vector<::std::future<int32_t>>* pull_dense_status);
// push dense parameters(not gradients) to server in sync mode
void PushDenseParamSync(const Scope& scope, const uint64_t table_id,
const std::vector<std::string>& var_names);
// Push dense variables to server in async mode
// Param<in>: scope, table_id, var_names,
// Param<in>: scope, table_id, var_names, scale_datanorm, batch_size
// Param<out>: push_sparse_status
void PushDenseVarsAsync(
const Scope& scope, const uint64_t table_id,
......@@ -99,13 +106,14 @@ class FleetWrapper {
std::vector<::std::future<int32_t>>* push_sparse_status,
float scale_datanorm, int batch_size);
// push dense variables to server in sync mode
void PushDenseVarsSync(Scope* scope, const uint64_t table_id,
const std::vector<std::string>& var_names);
// Push sparse variables with labels to server in Async mode
// Push sparse variables with labels to server in async mode
// This is specially designed for click/show stats in server
// Param<in>: scope, table_id, var_grad_names,
// fea_keys, fea_labels, sparse_grad_names
// Param<in>: scope, table_id, fea_keys, fea_labels, sparse_key_names,
// sparse_grad_names, batch_size, use_cvm, dump_slot
// Param<out>: push_values, push_sparse_status
void PushSparseVarsWithLabelAsync(
const Scope& scope, const uint64_t table_id,
......@@ -115,7 +123,8 @@ class FleetWrapper {
const std::vector<std::string>& sparse_grad_names, const int emb_dim,
std::vector<std::vector<float>>* push_values,
std::vector<::std::future<int32_t>>* push_sparse_status,
const int batch_size, const bool use_cvm, const bool dump_slot);
const int batch_size, const bool use_cvm, const bool dump_slot,
std::vector<uint64_t>* sparse_push_keys);
// Push sparse variables to server in Async mode
// Param<In>: scope, table_id, fea_keys, sparse_grad_names
......@@ -130,12 +139,17 @@ class FleetWrapper {
std::vector<::std::future<int32_t>>* push_sparse_status);
*/
// init server
void InitServer(const std::string& dist_desc, int index);
// init trainer
void InitWorker(const std::string& dist_desc,
const std::vector<uint64_t>& host_sign_list, int node_num,
int index);
// stop server
void StopServer();
// run server
uint64_t RunServer();
// gather server ip
void GatherServers(const std::vector<uint64_t>& host_sign_list, int node_num);
// gather client ip
void GatherClients(const std::vector<uint64_t>& host_sign_list);
......@@ -143,7 +157,6 @@ class FleetWrapper {
std::vector<uint64_t> GetClientsInfo();
// create client to client connection
void CreateClient2ClientConnection();
// flush all push requests
void ClientFlush();
// load from paddle model
......@@ -162,37 +175,42 @@ class FleetWrapper {
// mode = 0, save all feature
// mode = 1, save delta feature, which means save diff
void SaveModel(const std::string& path, const int mode);
double GetCacheThreshold();
// get save cache threshold
double GetCacheThreshold(int table_id);
// shuffle cache model between servers
void CacheShuffle(int table_id, const std::string& path, const int mode,
const double cache_threshold);
// save cache model
// cache model can speed up online predict
int32_t SaveCache(int table_id, const std::string& path, const int mode);
// copy feasign key/value from src_table_id to dest_table_id
int32_t CopyTable(const uint64_t src_table_id, const uint64_t dest_table_id);
// copy feasign key/value from src_table_id to dest_table_id
int32_t CopyTableByFeasign(const uint64_t src_table_id,
const uint64_t dest_table_id,
const std::vector<uint64_t>& feasign_list);
// clear all models, release their memory
void ClearModel();
// shrink sparse table
void ShrinkSparseTable(int table_id);
// shrink dense table
void ShrinkDenseTable(int table_id, Scope* scope,
std::vector<std::string> var_list, float decay,
int emb_dim);
// register client to client communication
typedef std::function<int32_t(int, int, const std::string&)> MsgHandlerFunc;
// register client to client communication
int RegisterClientToClientMsgHandler(int msg_type, MsgHandlerFunc handler);
// send client to client message
std::future<int32_t> SendClientToClientMsg(int msg_type, int to_client_id,
const std::string& msg);
template <typename T>
void Serialize(const std::vector<T*>& t, std::string* str);
template <typename T>
void Deserialize(std::vector<T>* t, const std::string& str);
// FleetWrapper singleton
static std::shared_ptr<FleetWrapper> GetInstance() {
if (NULL == s_instance_) {
s_instance_.reset(new paddle::framework::FleetWrapper());
}
return s_instance_;
}
// this performs better than rand_r, especially large data
std::default_random_engine& LocalRandomEngine();
......
......@@ -39,10 +39,13 @@ message TrainerDesc {
optional string dump_fields_path = 12;
repeated string dump_fields = 13;
optional string dump_converter = 14;
repeated string dump_param = 15;
optional int32 mpi_size = 16 [ default = -1 ];
optional int32 dump_file_num = 17 [ default = 16 ];
repeated string check_nan_var_names = 18;
optional CopyTableConfig copy_table_config = 19;
// adjust ins weight
optional AdjustInsWeightConfig adjust_ins_weight_config = 20;
// device worker parameters
optional HogwildWorkerParameter hogwild_param = 101;
......@@ -51,8 +54,6 @@ message TrainerDesc {
optional SectionWorkerParameter section_param = 104;
// datafeed desc
optional DataFeedDesc data_desc = 201;
// adjust ins weight
optional AdjustInsWeightConfig adjust_ins_weight_config = 301;
}
message HogwildWorkerParameter { repeated string skip_ops = 1; }
......@@ -107,6 +108,29 @@ message AdjustInsWeightConfig {
optional string ins_weight_slot = 5 [ default = "" ];
}
message TableDependencyMap {
required int32 key = 1;
repeated int32 values = 2;
}
message CopyTableConfig {
optional bool need_copy = 1 [ default = false ];
optional int32 batch_num = 2 [ default = 100 ];
repeated int32 src_sparse_tables = 3;
repeated int32 dest_sparse_tables = 4;
repeated int32 src_dense_tables = 5;
repeated int32 dest_dense_tables = 6;
repeated string src_var_list = 7;
repeated string dest_var_list = 8;
// when dest dense table has no grad, should pull explicitly
optional bool dense_pull_after_copy = 9 [ default = false ];
// copy feasigns or copy the whole table
optional bool sparse_copy_by_feasign = 10 [ default = true ];
// table dependency for pull/push
optional bool enable_dependency = 11 [ default = false ];
repeated TableDependencyMap table_denpendency_map = 12;
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
......
......@@ -67,7 +67,10 @@ void BindFleetWrapper(py::module* m) {
&framework::FleetWrapper::LoadFromPaddleModel)
.def("load_model_one_table", &framework::FleetWrapper::LoadModelOneTable)
.def("set_client2client_config",
&framework::FleetWrapper::SetClient2ClientConfig);
&framework::FleetWrapper::SetClient2ClientConfig)
.def("copy_table", &framework::FleetWrapper::CopyTable)
.def("copy_table_by_feasign",
&framework::FleetWrapper::CopyTableByFeasign);
} // end FleetWrapper
} // end namespace pybind
} // end namespace paddle
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defination of device workers."""
__all__ = ['DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section']
......@@ -23,9 +24,7 @@ class DeviceWorker(object):
"""
def __init__(self):
"""
Init.
"""
"""Init."""
self._program = None
self._infer = None
......@@ -75,9 +74,7 @@ class Hogwild(DeviceWorker):
"""
def __init__(self):
"""
Init.
"""
"""Init."""
super(Hogwild, self).__init__()
def _gen_worker_desc(self, trainer_desc):
......@@ -140,23 +137,29 @@ class DownpourSGD(DeviceWorker):
trainer_desc.device_worker_name = "DownpourWorker"
pull_thread = trainer_desc.pull_dense_param
pull_thread.device_num = trainer_desc.thread_num
for i in self._fleet_desc.trainer_param.dense_table:
if opt_info.get("program_id_to_worker") is None:
raise ValueError("opt_info must have program_id_to_worker")
prog_id_to_worker = opt_info["program_id_to_worker"]
if prog_id_to_worker.get(program_id) is None:
raise ValueError("%s not found in program_id_to_worker" %
program_id)
worker = opt_info["program_id_to_worker"][program_id]
for i in worker.get_desc().dense_table:
if i.table_id in dense_table_set:
dense_table = pull_thread.dense_table.add()
dense_table.dense_value_name.extend(i.dense_variable_name)
dense_table.table_id = \
i.table_id
sparse_len = len(self._fleet_desc.trainer_param.sparse_table)
sparse_len = len(worker.get_desc().sparse_table)
for i in range(sparse_len):
sparse_table = downpour.sparse_table.add()
sparse_table.table_id = \
self._fleet_desc.trainer_param.sparse_table[i].table_id
sparse_table.sparse_key_name.extend(
self._fleet_desc.trainer_param.sparse_table[i].slot_key)
sparse_table.sparse_value_name.extend(
self._fleet_desc.trainer_param.sparse_table[i].slot_value)
sparse_table.sparse_grad_name.extend(
self._fleet_desc.trainer_param.sparse_table[i].slot_gradient)
sparse_table.table_id = worker.get_desc().sparse_table[i].table_id
sparse_table.sparse_key_name.extend(worker.get_desc().sparse_table[
i].slot_key)
sparse_table.sparse_value_name.extend(worker.get_desc()
.sparse_table[i].slot_value)
sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[
i].slot_gradient)
if opt_info["use_cvm"]:
sparse_table.emb_dim = \
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
......@@ -173,28 +176,24 @@ class DownpourSGD(DeviceWorker):
for i in opt_info["stat_var_names"]:
downpour.stat_var_names.extend([i])
for i in self._fleet_desc.trainer_param.dense_table:
for i in worker.get_desc().dense_table:
if i.table_id in dense_table_set:
dense_table = downpour.dense_table.add()
dense_table.table_id = i.table_id
dense_table.dense_value_name.extend(i.dense_variable_name)
dense_table.dense_grad_name.extend(
i.dense_gradient_variable_name)
downpour.skip_ops.extend(self._fleet_desc.trainer_param.skip_op)
downpour.skip_ops.extend(worker.get_desc().skip_op)
if self._infer:
downpour.push_dense = False
downpour.push_sparse = False
class Section(DeviceWorker):
"""
SectionWorker
"""
"""SectionWorker."""
def __init__(self):
"""
Init.
"""
"""Init."""
super(Section, self).__init__()
def _gen_worker_desc(self, trainer_desc):
......
......@@ -10,6 +10,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
"""Defination of PSLib."""
import os
import sys
......@@ -25,6 +26,8 @@ from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
class PSLib(Fleet):
"""PSLib class."""
def __init__(self):
super(PSLib, self).__init__(Mode.PSLIB)
self._opt_info = None
......@@ -89,7 +92,10 @@ class PSLib(Fleet):
# barrier for init model
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
tables = self._dist_desc.trainer_param.dense_table
tables = []
for tp in self._dist_desc.trainer_param:
for i in tp.dense_table:
tables.append(i)
for prog, scope in zip(self._main_programs, self._scopes):
prog_id = str(id(prog))
prog_conf = self._opt_info['program_configs'][prog_id]
......@@ -244,7 +250,9 @@ class PSLib(Fleet):
3 means save batch model.
Example:
>>> fleet.save_persistables(dirname="/you/path/to/model", mode = 0)
.. code-block:: python
fleet.save_persistables(dirname="/you/path/to/model", mode = 0)
"""
mode = kwargs.get("mode", 0)
......@@ -260,35 +268,43 @@ class PSLib(Fleet):
when using fleet, it will save sparse cache table
Args:
executor(Executor): fluid executor
dirname(str): save path. It can be hdfs/afs path or local path
main_program(Program): fluid program, default None
kwargs: use define property, current support following
mode(int): define for feature extension in the future,
currently no use, will pass a default value 0
table_id(int): which table to save cache, default is 0
Returns:
feasign_num(int): cache feasign num
Example:
.. code-block:: python
>>> fleet.save_cache_model(None, dirname="/you/path/to/model", mode = 0)
fleet.save_cache_model(None, dirname="/you/path/to/model", mode = 0)
"""
mode = kwargs.get("mode", 0)
table_id = kwargs.get("table_id", 0)
self._fleet_ptr.client_flush()
self._role_maker._barrier_worker()
cache_threshold = 0.0
if self._role_maker.is_first_worker():
cache_threshold = self._fleet_ptr.get_cache_threshold()
cache_threshold = self._fleet_ptr.get_cache_threshold(table_id)
#check cache threshold right or not
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.cache_shuffle(0, dirname, mode, cache_threshold)
self._fleet_ptr.cache_shuffle(table_id, dirname, mode,
cache_threshold)
self._role_maker._barrier_worker()
feasign_num = -1
if self._role_maker.is_first_worker():
feasign_num = self._fleet_ptr.save_cache(0, dirname, mode)
feasign_num = self._fleet_ptr.save_cache(table_id, dirname, mode)
self._role_maker._barrier_worker()
return feasign_num
......@@ -304,8 +320,12 @@ class PSLib(Fleet):
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
for i in self._opt_info["fleet_desc"].trainer_param.sparse_table:
self._fleet_ptr.shrink_sparse_table(i.table_id)
tables = []
for tp in self._opt_info["fleet_desc"].trainer_param:
for i in tp.sparse_table:
tables.append(i.table_id)
for i in list(set(tables)):
self._fleet_ptr.shrink_sparse_table(i)
self._role_maker._barrier_worker()
def shrink_dense_table(self, decay, emb_dim=11, scope=None, table_id=None):
......@@ -330,7 +350,8 @@ class PSLib(Fleet):
scope = fluid.global_scope()
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
for i in self._opt_info["fleet_desc"].trainer_param.dense_table:
for tp in self._opt_info["fleet_desc"].trainer_param:
for i in tp.dense_table:
if table_id is not None and table_id != i.table_id:
continue
var_list = [var for var in i.dense_variable_name]
......@@ -341,8 +362,8 @@ class PSLib(Fleet):
break
if skip:
continue
self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list,
decay, emb_dim)
self._fleet_ptr.shrink_dense_table(i.table_id, scope,
var_list, decay, emb_dim)
self._role_maker._barrier_worker()
def clear_model(self):
......@@ -476,7 +497,8 @@ class PSLib(Fleet):
if ret != 0:
raise RuntimeError("download model proto file failed")
model_proto_file = dest
for i in self._opt_info["fleet_desc"].trainer_param.dense_table:
for tp in self._opt_info["fleet_desc"].trainer_param:
for i in tp.dense_table:
if table_id is not None and table_id != i.table_id:
continue
table_var_names = [var for var in i.dense_variable_name]
......@@ -488,8 +510,8 @@ class PSLib(Fleet):
if skip:
continue
self._fleet_ptr.load_from_paddle_model(
scope, table_id, var_names, model_path, model_proto_file,
table_var_names, load_combine)
scope, table_id, var_names, model_path,
model_proto_file, table_var_names, load_combine)
self._role_maker._barrier_worker()
def _set_opt_info(self, opt_info):
......
......@@ -10,13 +10,15 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
"""Defination of Server and Worker."""
from . import ps_pb2 as pslib
class Server(object):
"""
A Server basic class.
A Server basic class
it's a base class, does not have implementation
"""
def __init__(self):
......@@ -26,6 +28,7 @@ class Server(object):
class Worker(object):
"""
A Worker basic class.
it's a base class, does not have implementation
"""
def __init__(self):
......@@ -76,7 +79,8 @@ class DownpourServer(Server):
'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \
'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \
'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \
'sparse_converter', 'sparse_deconverter']
'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \
'sparse_cache_file_num']
for key in strategy:
if key not in support_sparse_key_list:
......@@ -95,6 +99,12 @@ class DownpourServer(Server):
table.table_class = table_class
if table_class == 'DownpourSparseTable':
table.enable_sparse_table_cache = strategy.get(
'sparse_enable_cache', True)
table.sparse_table_cache_rate = strategy.get('sparse_cache_rate',
0.00055)
table.sparse_table_cache_file_num = strategy.get(
'sparse_cache_file_num', 16)
table.compress_in_save = strategy.get('sparse_compress_in_save',
True)
table.shard_num = strategy.get('sparse_shard_num', 1000)
......@@ -169,7 +179,10 @@ class DownpourServer(Server):
"""
Args:
table_id(int): id of sparse params table
strategy(dict): the dense config dict.
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the dense config dict
sparse_table_names(list): sparse table names
Returns:
return None
"""
......@@ -230,7 +243,11 @@ class DownpourServer(Server):
"""
Args:
table_id(int): id of datanorm table
strategy(dict): the datanorm config dict.
learning_rate(float): the learning rate used to update parameters
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the datanorm config dict
sparse_table_names(list): sparse table names
Returns:
return None
"""
......@@ -296,43 +313,60 @@ class DownpourWorker(Worker):
self.window = window
self._worker = pslib.DownpourTrainerParameter()
def add_sparse_table(self, table_id, slot_key_vars, slot_value_vars):
def add_sparse_table(self,
table_id,
slot_key_vars,
slot_value_vars,
slot_value_grads=None):
"""
Args:
table_id(int): id of sparse params table
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
slot_key_vars(list): slot key id
slot_value_vars(list): slot key value after embedding
slot_value_grads(list): grad of all params, default is None
Returns:
return None
"""
if slot_value_grads is None:
slot_value_grad_names = \
[var.name + "@GRAD" for var in slot_value_vars]
else:
value_to_key = {}
for i in range(len(slot_key_vars)):
value_to_key[slot_value_vars[i].name] = slot_key_vars[i]
slot_value_grad_names = []
all_grad_names = [var.name for var in slot_value_grads]
for var in slot_value_vars:
if var.name + "@GRAD" in all_grad_names:
slot_value_grad_names.append(var.name + "@GRAD")
sorted_slot_value_vars = [i for i in slot_value_vars if \
i.name + "@GRAD" in slot_value_grad_names]
sorted_slot_value_vars += [i for i in slot_value_vars if \
i.name + "@GRAD" not in slot_value_grad_names]
sorted_slot_key_vars = \
[value_to_key[v.name] for v in sorted_slot_value_vars]
target_table = None
for table in self._worker.sparse_table:
if table.table_id == table_id:
if [var.name for var in slot_key_vars
] == self._worker.sparse_table[table_id].slot_key:
if [var.name for var in slot_value_vars
] == self._worker.sparse_table[table_id].slot_value:
if [
var.name + "@GRAD" for var in slot_value_vars
] == self._worker.sparse_table[table_id].slot_gradient:
return
else:
raise ValueError(
"sparse table %s slot_gradient error" %
table_id)
else:
raise ValueError("sparse table %s slot_value error" %
table_id)
else:
keys = table.slot_key
key_names = [var.name for var in sorted_slot_key_vars]
for key_name in key_names:
if key_name not in keys:
raise ValueError("sparse table %s slot_key error" %
table_id)
target_table = table
break
table = target_table
if table is not None:
self._worker.sparse_table.remove(table)
table = self._worker.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in slot_key_vars])
table.slot_value.extend([var.name for var in slot_value_vars])
table.slot_gradient.extend(
[var.name + "@GRAD" for var in slot_value_vars])
table.slot_key.extend([var.name for var in sorted_slot_key_vars])
table.slot_value.extend([var.name for var in sorted_slot_value_vars])
table.slot_gradient.extend(slot_value_grad_names)
def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars,
dense_start_table_id, sparse_table_names):
......@@ -341,8 +375,10 @@ class DownpourWorker(Worker):
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
param_vars(list): all dense param. it is a list.
grad_vars(list): all dense grad parm it is a list.
dense_start_table_id(int): dense table start index
sparse_table_names(list): sparse table names
Returns:
return None
"""
......@@ -365,21 +401,19 @@ class DownpourWorker(Worker):
for table in self._worker.dense_table:
if table.table_id == table_id:
desc_dense_param_name = list(self._worker.dense_table[
table_id - dense_start_table_id].dense_variable_name)
desc_dense_param_name = list(table.dense_variable_name)
desc_dense_param_name.sort()
if dense_param_name == desc_dense_param_name:
desc_dense_grad_name = list(self._worker.dense_table[
table_id - dense_start_table_id]
.dense_gradient_variable_name)
desc_dense_grad_name = list(
table.dense_gradient_variable_name)
desc_dense_grad_name.sort()
if dense_grad_name == desc_dense_grad_name:
return
else:
raise ValueError(
"dense table %s dense_gradient_variable_name error"
% table_id)
"dense table %s dense_gradient_variable_name "
"error" % table_id)
else:
raise ValueError(
"dense table %s dense_variable_name error" % table_id)
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Optimizer Factory."""
__all__ = ["DistributedAdam"]
import paddle.fluid as fluid
......@@ -18,11 +19,17 @@ from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs
from google.protobuf import text_format
from collections import OrderedDict
from .node import DownpourWorker, DownpourServer
from . import ps_pb2 as pslib
class DistributedOptimizerImplBase(object):
"""
DistributedOptimizerImplBase
base class of optimizers
"""
def __init__(self, optimizer):
self._optimizer = optimizer
self._learning_rate = optimizer._learning_rate
......@@ -33,10 +40,23 @@ class DistributedOptimizerImplBase(object):
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
Args:
losses(Variable): loss variable defined by user
startup_program(Program): startup program that defined by user
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
"""
pass
class DistributedAdam(DistributedOptimizerImplBase):
"""
DistributedAdam
adam optimizer in distributed training
"""
def __init__(self, optimizer):
# todo(guru4elephant): add more optimizers here as argument
# todo(guru4elephant): make learning_rate as a variable
......@@ -75,7 +95,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
Find output variable of distribute lookup table in program.
We could support multi-distribute table now.
Args:
program(Program): given program, locate distributed lookup table
programs(Program): given program, locate distributed lookup table
table_name(str): given table name that is found beforehand
Returns:
outputs
......@@ -92,6 +112,19 @@ class DistributedAdam(DistributedOptimizerImplBase):
[local_vars[name] for name in op.output("Out")])
return outputs_dict
def _find_distributed_lookup_table_grads(self, program, table_names):
local_vars = program.current_block().vars
grads_dict = dict()
for table_name in table_names:
grads_dict[table_name] = []
for op in program.global_block().ops:
if op.type == "lookup_table_grad" and op.input("W")[
0] in table_names:
grads_dict[op.input("W")[0]].extend(
[local_vars[name] for name in op.input("Out@GRAD")])
return grads_dict
def _find_multi_distributed_lookup_table(self, losses):
"""
find multi-sparse-table
......@@ -125,17 +158,57 @@ class DistributedAdam(DistributedOptimizerImplBase):
Returns:
[optimize_ops, grads_and_weights]
"""
# sparse table names of each program
prog_id_to_sparse_table = OrderedDict()
# inputs_dict and outputs_dict of sparse tables of each program
prog_id_to_inputs_dict = OrderedDict()
prog_id_to_outputs_dict = OrderedDict()
# related to PSParameter
ps_param = pslib.PSParameter()
# related to ServerParameter
server = DownpourServer()
# program to worker (related to DownpourTrainerParameter)
prog_id_to_worker = OrderedDict()
# param_grads of each program
prog_id_to_param_grads = OrderedDict()
# sparse_grads of each program
prog_id_to_sparse_grads = OrderedDict()
sparse_table_names = self._find_multi_distributed_lookup_table(losses)
inputs_dict = self._find_distributed_lookup_table_inputs(
losses[0].block.program, sparse_table_names)
sparse_table_to_index = OrderedDict()
sparse_table_index = 0
for loss in losses:
sparse_table = self._find_multi_distributed_lookup_table([loss])
prog_id = str(id(loss.block.program))
prog_id_to_sparse_table[prog_id] = sparse_table
# get sparse_table_to_index
for tn in sparse_table:
if sparse_table_to_index.get(tn) is None:
sparse_table_to_index[tn] = sparse_table_index
sparse_table_index += 1
# get inputs_dict
inputs_dict = self._find_distributed_lookup_table_inputs(
loss.block.program, sparse_table)
prog_id_to_inputs_dict[prog_id] = inputs_dict
# get outputs_dict
outputs_dict = self._find_distributed_lookup_table_outputs(
losses[0].block.program, sparse_table_names)
loss.block.program, sparse_table)
prog_id_to_outputs_dict[prog_id] = outputs_dict
prog_id_to_worker[prog_id] = DownpourWorker(self._window)
# param_grads of program
params_grads = sorted(
fluid.backward.append_backward(loss, parameter_list,
no_grad_set),
key=lambda x: x[0].name)
prog_id_to_param_grads[prog_id] = params_grads
grads_dict = self._find_distributed_lookup_table_grads(
loss.block.program, sparse_table)
prog_id_to_sparse_grads[prog_id] = grads_dict
ps_param = pslib.PSParameter()
server = DownpourServer()
worker = DownpourWorker(self._window)
# if user specify a fleet_desc.prototxt file, then load the file
# instead of creating default fleet_desc.prototxt.
# user can specify server_param or trainer_param or fs_client_param.
......@@ -144,37 +217,60 @@ class DistributedAdam(DistributedOptimizerImplBase):
with open(fleet_desc_file) as f:
text_format.Merge(f.read(), ps_param)
server.get_desc().CopyFrom(ps_param.server_param)
worker.get_desc().CopyFrom(ps_param.trainer_param)
if len(ps_param.trainer_param) == 1:
for k in prog_id_to_worker:
prog_id_to_worker[k].get_desc().CopyFrom(
ps_param.trainer_param[0])
else:
if len(ps_param.trainer_param) != len(prog_id_to_worker):
raise ValueError(
"trainer param size != program size, %s vs %s" %
(len(ps_param.trainer_param), len(prog_id_to_worker)))
idx = 0
# prog_id_to_worker is OrderedDict
for k in prog_id_to_worker:
prog_id_to_worker[k].get_desc().CopyFrom(
ps_param.trainer_param[idx])
idx += 1
sparse_table_index = 0
for tn in sparse_table_names:
# ServerParameter add all sparse tables
for tn in sparse_table_to_index:
sparse_table_index = sparse_table_to_index[tn]
if strategy.get(tn) is not None:
server.add_sparse_table(sparse_table_index, strategy[tn])
else:
server.add_sparse_table(sparse_table_index, None)
# each DownpourTrainerParameter add its own sparse tables
for loss in losses:
prog_id = str(id(loss.block.program))
worker = prog_id_to_worker[prog_id]
inputs_dict = prog_id_to_inputs_dict[prog_id]
outputs_dict = prog_id_to_outputs_dict[prog_id]
for tn in prog_id_to_sparse_table[prog_id]:
sparse_table_index = sparse_table_to_index[tn]
grads_dict = prog_id_to_sparse_grads[prog_id]
worker.add_sparse_table(sparse_table_index, inputs_dict[tn],
outputs_dict[tn])
sparse_table_index += 1
outputs_dict[tn], grads_dict[tn])
dense_start_table_id = sparse_table_index
dense_table_index = sparse_table_index
dense_start_table_id = len(sparse_table_to_index)
dense_table_index = len(sparse_table_to_index)
program_configs = {}
param_grads_list = []
# ServerParameter add all dense tables
# each DownpourTrainerParameter add its own dense tables
for loss_index in range(len(losses)):
program_id = str(id(losses[loss_index].block.program))
worker = prog_id_to_worker[program_id]
sparse_table_names = prog_id_to_sparse_table[program_id]
sparse_table_index = \
[sparse_table_to_index[i] for i in sparse_table_names]
program_configs[program_id] = {
"pull_sparse":
[t_index for t_index in range(sparse_table_index)],
"push_sparse":
[t_index for t_index in range(sparse_table_index)]
"pull_sparse": [t_index for t_index in sparse_table_index],
"push_sparse": [t_index for t_index in sparse_table_index]
}
params_grads = sorted(
fluid.backward.append_backward(losses[loss_index],
parameter_list, no_grad_set),
key=lambda x: x[0].name)
param_grads_list.append(params_grads)
params_grads = prog_id_to_param_grads[program_id]
params = []
grads = []
data_norm_params = []
......@@ -230,15 +326,39 @@ class DistributedAdam(DistributedOptimizerImplBase):
program_configs[program_id]["push_dense"].extend(
[dense_table_index])
dense_table_index += 1
ps_param.server_param.CopyFrom(server.get_desc())
ps_param.trainer_param.CopyFrom(worker.get_desc())
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
if len(ps_param.trainer_param.skip_op) == 0:
ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
if len(worker.get_desc().skip_op) == 0:
worker.get_desc().skip_op.extend(worker_skipped_ops)
ps_param.server_param.CopyFrom(server.get_desc())
# prog_id_to_worker is OrderedDict
if len(ps_param.trainer_param) == 0:
for k in prog_id_to_worker:
tp = ps_param.trainer_param.add()
tp.CopyFrom(prog_id_to_worker[k].get_desc())
if strategy.get("fs_uri") is not None:
ps_param.fs_client_param.uri = strategy["fs_uri"]
elif ps_param.fs_client_param.uri == "":
ps_param.fs_client_param.uri = "hdfs://your_hdfs_uri"
if strategy.get("fs_user") is not None:
ps_param.fs_client_param.user = strategy["fs_user"]
elif ps_param.fs_client_param.user == "":
ps_param.fs_client_param.user = "your_hdfs_user"
if strategy.get("fs_passwd") is not None:
ps_param.fs_client_param.passwd = strategy["fs_passwd"]
elif ps_param.fs_client_param.passwd == "":
ps_param.fs_client_param.passwd = "your_hdfs_passwd"
if strategy.get("fs_hadoop_bin") is not None:
ps_param.fs_client_param.hadoop_bin = strategy["fs_hadoop_bin"]
elif ps_param.fs_client_param.hadoop_bin == "":
ps_param.fs_client_param.hadoop_bin = "$HADOOP_HOME/bin/hadoop"
opt_info = {}
opt_info["program_id_to_worker"] = prog_id_to_worker
opt_info["program_configs"] = program_configs
opt_info["trainer"] = "DistMultiTrainer"
opt_info["device_worker"] = "DownpourSGD"
......@@ -259,8 +379,13 @@ class DistributedAdam(DistributedOptimizerImplBase):
0].accessor.accessor_class == "DownpourCtrAccessor":
opt_info["dump_slot"] = True
opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {})
opt_info["copy_table"] = strategy.get("copy_table", {})
for loss in losses:
loss.block.program._fleet_opt = opt_info
return None, param_grads_list[0], opt_info
param_grads_list = []
for loss in losses:
prog_id = str(id(loss.block.program))
param_grads_list.append(prog_id_to_param_grads[prog_id])
return None, param_grads_list, opt_info
......@@ -32,7 +32,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
package='paddle',
syntax='proto2',
serialized_pb=_b(
'\n\x08ps.proto\x12\x06paddle\"\x9e\x02\n\x0bPSParameter\x12\x14\n\x0cworker_class\x18\x01 \x01(\t\x12\x14\n\x0cserver_class\x18\x02 \x01(\t\x12\x16\n\x0einstance_class\x18\x03 \x01(\t\x12-\n\x0cworker_param\x18\x65 \x01(\x0b\x32\x17.paddle.WorkerParameter\x12-\n\x0cserver_param\x18\x66 \x01(\x0b\x32\x17.paddle.ServerParameter\x12\x38\n\rtrainer_param\x18\xad\x02 \x01(\x0b\x32 .paddle.DownpourTrainerParameter\x12\x33\n\x0f\x66s_client_param\x18\xf5\x03 \x01(\x0b\x32\x19.paddle.FsClientParameter\"Q\n\x0fWorkerParameter\x12>\n\x15\x64ownpour_worker_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourWorkerParameter\"Q\n\x0fServerParameter\x12>\n\x15\x64ownpour_server_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourServerParameter\"O\n\x17\x44ownpourWorkerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\"\xfd\x01\n\x18\x44ownpourTrainerParameter\x12\x30\n\x0b\x64\x65nse_table\x18\x01 \x03(\x0b\x32\x1b.paddle.DenseTableParameter\x12\x32\n\x0csparse_table\x18\x02 \x03(\x0b\x32\x1c.paddle.SparseTableParameter\x12\x1d\n\x15push_sparse_per_batch\x18\x03 \x01(\x05\x12\x1c\n\x14push_dense_per_batch\x18\x04 \x01(\x05\x12\x0f\n\x07skip_op\x18\x05 \x03(\t\x12-\n\x0eprogram_config\x18\x06 \x03(\x0b\x32\x15.paddle.ProgramConfig\"\x99\x01\n\rProgramConfig\x12\x12\n\nprogram_id\x18\x01 \x02(\t\x12\x1c\n\x14push_sparse_table_id\x18\x02 \x03(\x05\x12\x1b\n\x13push_dense_table_id\x18\x03 \x03(\x05\x12\x1c\n\x14pull_sparse_table_id\x18\x04 \x03(\x05\x12\x1b\n\x13pull_dense_table_id\x18\x05 \x03(\x05\"{\n\x13\x44\x65nseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x1b\n\x13\x64\x65nse_variable_name\x18\x02 \x03(\t\x12$\n\x1c\x64\x65nse_gradient_variable_name\x18\x03 \x03(\t\x12\x0f\n\x07\x66\x65\x61_dim\x18\x04 \x01(\x05\"z\n\x14SparseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x66\x65\x61ture_dim\x18\x02 \x01(\x05\x12\x10\n\x08slot_key\x18\x03 \x03(\t\x12\x12\n\nslot_value\x18\x04 \x03(\t\x12\x15\n\rslot_gradient\x18\x05 \x03(\t\"\x86\x01\n\x17\x44ownpourServerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\x12\x35\n\rservice_param\x18\x02 \x01(\x0b\x32\x1e.paddle.ServerServiceParameter\"\xd7\x01\n\x16ServerServiceParameter\x12*\n\x0cserver_class\x18\x01 \x01(\t:\x14\x44ownpourBrpcPsServer\x12*\n\x0c\x63lient_class\x18\x02 \x01(\t:\x14\x44ownpourBrpcPsClient\x12(\n\rservice_class\x18\x03 \x01(\t:\x11\x44ownpourPsService\x12\x1c\n\x11start_server_port\x18\x04 \x01(\r:\x01\x30\x12\x1d\n\x11server_thread_num\x18\x05 \x01(\r:\x02\x31\x32\"\xc0\x02\n\x0eTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x04\x12\x13\n\x0btable_class\x18\x02 \x01(\t\x12\x17\n\tshard_num\x18\x03 \x01(\x04:\x04\x31\x30\x30\x30\x12\x30\n\x08\x61\x63\x63\x65ssor\x18\x04 \x01(\x0b\x32\x1e.paddle.TableAccessorParameter\x12\x1f\n\x04type\x18\x05 \x01(\x0e\x32\x11.paddle.TableType\x12\x1f\n\x10\x63ompress_in_save\x18\x06 \x01(\x08:\x05\x66\x61lse\x12\'\n\x19\x65nable_sparse_table_cache\x18\x07 \x01(\x08:\x04true\x12(\n\x17sparse_table_cache_rate\x18\x08 \x01(\x01:\x07\x30.00055\x12\'\n\x1bsparse_table_cache_file_num\x18\t \x01(\r:\x02\x31\x36\"\xfc\x02\n\x16TableAccessorParameter\x12\x16\n\x0e\x61\x63\x63\x65ssor_class\x18\x01 \x01(\t\x12\x38\n\x10sparse_sgd_param\x18\x02 \x01(\x0b\x32\x1e.paddle.SparseSGDRuleParameter\x12\x36\n\x0f\x64\x65nse_sgd_param\x18\x03 \x01(\x0b\x32\x1d.paddle.DenseSGDRuleParameter\x12\x13\n\x07\x66\x65\x61_dim\x18\x04 \x01(\r:\x02\x31\x31\x12\x15\n\nembedx_dim\x18\x05 \x01(\r:\x01\x38\x12\x1c\n\x10\x65mbedx_threshold\x18\x06 \x01(\r:\x02\x31\x30\x12G\n\x17\x64ownpour_accessor_param\x18\x07 \x01(\x0b\x32&.paddle.DownpourTableAccessorParameter\x12\x45\n\x19table_accessor_save_param\x18\x08 \x03(\x0b\x32\".paddle.TableAccessorSaveParameter\"\x96\x02\n\x1e\x44ownpourTableAccessorParameter\x12\x19\n\x0cnonclk_coeff\x18\x01 \x01(\x02:\x03\x30.1\x12\x16\n\x0b\x63lick_coeff\x18\x02 \x01(\x02:\x01\x31\x12\x1b\n\x0e\x62\x61se_threshold\x18\x03 \x01(\x02:\x03\x31.5\x12\x1d\n\x0f\x64\x65lta_threshold\x18\x04 \x01(\x02:\x04\x30.25\x12\x1b\n\x0f\x64\x65lta_keep_days\x18\x05 \x01(\x02:\x02\x31\x36\x12#\n\x15show_click_decay_rate\x18\x06 \x01(\x02:\x04\x30.98\x12\x1d\n\x10\x64\x65lete_threshold\x18\x07 \x01(\x02:\x03\x30.8\x12$\n\x18\x64\x65lete_after_unseen_days\x18\x08 \x01(\x02:\x02\x33\x30\"S\n\x1aTableAccessorSaveParameter\x12\r\n\x05param\x18\x01 \x01(\r\x12\x11\n\tconverter\x18\x02 \x01(\t\x12\x13\n\x0b\x64\x65\x63onverter\x18\x03 \x01(\t\"e\n\x10PsRequestMessage\x12\x0e\n\x06\x63md_id\x18\x01 \x02(\r\x12\x10\n\x08table_id\x18\x02 \x01(\r\x12\x0e\n\x06params\x18\x03 \x03(\x0c\x12\x11\n\tclient_id\x18\x04 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\"\x85\x01\n\x16SparseSGDRuleParameter\x12\x1b\n\rlearning_rate\x18\x01 \x01(\x01:\x04\x30.05\x12\x18\n\rinitial_g2sum\x18\x02 \x01(\x01:\x01\x33\x12\x1d\n\rinitial_range\x18\x03 \x01(\x01:\x06\x30.0001\x12\x15\n\rweight_bounds\x18\x04 \x03(\x02\"\xe1\x01\n\x15\x44\x65nseSGDRuleParameter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12&\n\x04\x61\x64\x61m\x18\x02 \x01(\x0b\x32\x18.paddle.AdamSGDParameter\x12(\n\x05naive\x18\x03 \x01(\x0b\x32\x19.paddle.NaiveSGDParameter\x12,\n\x07summary\x18\x04 \x01(\x0b\x32\x1b.paddle.SummarySGDParameter\x12:\n\x0emoving_average\x18\x05 \x01(\x0b\x32\".paddle.MovingAverageRuleParameter\"\xac\x01\n\x10\x41\x64\x61mSGDParameter\x12\x1c\n\rlearning_rate\x18\x01 \x01(\x01:\x05\x35\x65-06\x12 \n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01:\x08\x30.999993\x12\x1e\n\x0e\x61\x64\x61_decay_rate\x18\x03 \x01(\x01:\x06\x30.9999\x12\x1a\n\x0b\x61\x64\x61_epsilon\x18\x04 \x01(\x01:\x05\x31\x65-08\x12\x1c\n\x0emom_decay_rate\x18\x05 \x01(\x01:\x04\x30.99\"J\n\x11NaiveSGDParameter\x12\x1d\n\rlearning_rate\x18\x01 \x01(\x01:\x06\x30.0002\x12\x16\n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01\";\n\x13SummarySGDParameter\x12$\n\x12summary_decay_rate\x18\x01 \x01(\x01:\x08\x30.999999\".\n\x1aMovingAverageRuleParameter\x12\x10\n\x08momentum\x18\x01 \x01(\x01\"I\n\x11PsResponseMessage\x12\x13\n\x08\x65rr_code\x18\x01 \x02(\x05:\x01\x30\x12\x11\n\x07\x65rr_msg\x18\x02 \x02(\t:\x00\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\xd5\x01\n\x11\x46sClientParameter\x12:\n\x07\x66s_type\x18\x01 \x01(\x0e\x32#.paddle.FsClientParameter.FsApiType:\x04HDFS\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x0e\n\x06passwd\x18\x04 \x01(\t\x12\x13\n\x0b\x62uffer_size\x18\x05 \x01(\x05\x12\x12\n\nhadoop_bin\x18\x33 \x01(\t\x12\x10\n\x08\x61\x66s_conf\x18\x65 \x01(\t\"\x1e\n\tFsApiType\x12\x08\n\x04HDFS\x10\x00\x12\x07\n\x03\x41\x46S\x10\x01*4\n\tTableType\x12\x13\n\x0fPS_SPARSE_TABLE\x10\x00\x12\x12\n\x0ePS_DENSE_TABLE\x10\x01*\x9c\x03\n\x07PsCmdID\x12\x17\n\x13PS_PULL_DENSE_TABLE\x10\x00\x12\x17\n\x13PS_PUSH_DENSE_TABLE\x10\x01\x12\x18\n\x14PS_PULL_SPARSE_TABLE\x10\x02\x12\x18\n\x14PS_PUSH_SPARSE_TABLE\x10\x03\x12\x13\n\x0fPS_SHRINK_TABLE\x10\x04\x12\x15\n\x11PS_SAVE_ONE_TABLE\x10\x05\x12\x15\n\x11PS_SAVE_ALL_TABLE\x10\x06\x12\x15\n\x11PS_LOAD_ONE_TABLE\x10\x07\x12\x15\n\x11PS_LOAD_ALL_TABLE\x10\x08\x12\x16\n\x12PS_CLEAR_ONE_TABLE\x10\t\x12\x16\n\x12PS_CLEAR_ALL_TABLE\x10\n\x12\x17\n\x13PS_PUSH_DENSE_PARAM\x10\x0b\x12\x12\n\x0ePS_STOP_SERVER\x10\x0c\x12\x1b\n\x17PS_SAVE_ONE_CACHE_TABLE\x10\r\x12\x1a\n\x16PS_GET_CACHE_THRESHOLD\x10\x0e\x12\x14\n\x10PS_CACHE_SHUFFLE\x10\x0f\x12\x0e\n\nPS_S2S_MSG\x10\x65\x32K\n\tPsService\x12>\n\x07service\x12\x18.paddle.PsRequestMessage\x1a\x19.paddle.PsResponseMessageB\x03\x80\x01\x01'
'\n\x08ps.proto\x12\x06paddle\"\x9e\x02\n\x0bPSParameter\x12\x14\n\x0cworker_class\x18\x01 \x01(\t\x12\x14\n\x0cserver_class\x18\x02 \x01(\t\x12\x16\n\x0einstance_class\x18\x03 \x01(\t\x12-\n\x0cworker_param\x18\x65 \x01(\x0b\x32\x17.paddle.WorkerParameter\x12-\n\x0cserver_param\x18\x66 \x01(\x0b\x32\x17.paddle.ServerParameter\x12\x38\n\rtrainer_param\x18\xad\x02 \x03(\x0b\x32 .paddle.DownpourTrainerParameter\x12\x33\n\x0f\x66s_client_param\x18\xf5\x03 \x01(\x0b\x32\x19.paddle.FsClientParameter\"Q\n\x0fWorkerParameter\x12>\n\x15\x64ownpour_worker_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourWorkerParameter\"Q\n\x0fServerParameter\x12>\n\x15\x64ownpour_server_param\x18\x01 \x01(\x0b\x32\x1f.paddle.DownpourServerParameter\"O\n\x17\x44ownpourWorkerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\"\xfd\x01\n\x18\x44ownpourTrainerParameter\x12\x30\n\x0b\x64\x65nse_table\x18\x01 \x03(\x0b\x32\x1b.paddle.DenseTableParameter\x12\x32\n\x0csparse_table\x18\x02 \x03(\x0b\x32\x1c.paddle.SparseTableParameter\x12\x1d\n\x15push_sparse_per_batch\x18\x03 \x01(\x05\x12\x1c\n\x14push_dense_per_batch\x18\x04 \x01(\x05\x12\x0f\n\x07skip_op\x18\x05 \x03(\t\x12-\n\x0eprogram_config\x18\x06 \x03(\x0b\x32\x15.paddle.ProgramConfig\"\x99\x01\n\rProgramConfig\x12\x12\n\nprogram_id\x18\x01 \x02(\t\x12\x1c\n\x14push_sparse_table_id\x18\x02 \x03(\x05\x12\x1b\n\x13push_dense_table_id\x18\x03 \x03(\x05\x12\x1c\n\x14pull_sparse_table_id\x18\x04 \x03(\x05\x12\x1b\n\x13pull_dense_table_id\x18\x05 \x03(\x05\"{\n\x13\x44\x65nseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x1b\n\x13\x64\x65nse_variable_name\x18\x02 \x03(\t\x12$\n\x1c\x64\x65nse_gradient_variable_name\x18\x03 \x03(\t\x12\x0f\n\x07\x66\x65\x61_dim\x18\x04 \x01(\x05\"z\n\x14SparseTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x05\x12\x13\n\x0b\x66\x65\x61ture_dim\x18\x02 \x01(\x05\x12\x10\n\x08slot_key\x18\x03 \x03(\t\x12\x12\n\nslot_value\x18\x04 \x03(\t\x12\x15\n\rslot_gradient\x18\x05 \x03(\t\"\x86\x01\n\x17\x44ownpourServerParameter\x12\x34\n\x14\x64ownpour_table_param\x18\x01 \x03(\x0b\x32\x16.paddle.TableParameter\x12\x35\n\rservice_param\x18\x02 \x01(\x0b\x32\x1e.paddle.ServerServiceParameter\"\xd7\x01\n\x16ServerServiceParameter\x12*\n\x0cserver_class\x18\x01 \x01(\t:\x14\x44ownpourBrpcPsServer\x12*\n\x0c\x63lient_class\x18\x02 \x01(\t:\x14\x44ownpourBrpcPsClient\x12(\n\rservice_class\x18\x03 \x01(\t:\x11\x44ownpourPsService\x12\x1c\n\x11start_server_port\x18\x04 \x01(\r:\x01\x30\x12\x1d\n\x11server_thread_num\x18\x05 \x01(\r:\x02\x31\x32\"\xc0\x02\n\x0eTableParameter\x12\x10\n\x08table_id\x18\x01 \x01(\x04\x12\x13\n\x0btable_class\x18\x02 \x01(\t\x12\x17\n\tshard_num\x18\x03 \x01(\x04:\x04\x31\x30\x30\x30\x12\x30\n\x08\x61\x63\x63\x65ssor\x18\x04 \x01(\x0b\x32\x1e.paddle.TableAccessorParameter\x12\x1f\n\x04type\x18\x05 \x01(\x0e\x32\x11.paddle.TableType\x12\x1f\n\x10\x63ompress_in_save\x18\x06 \x01(\x08:\x05\x66\x61lse\x12\'\n\x19\x65nable_sparse_table_cache\x18\x07 \x01(\x08:\x04true\x12(\n\x17sparse_table_cache_rate\x18\x08 \x01(\x01:\x07\x30.00055\x12\'\n\x1bsparse_table_cache_file_num\x18\t \x01(\r:\x02\x31\x36\"\xfc\x02\n\x16TableAccessorParameter\x12\x16\n\x0e\x61\x63\x63\x65ssor_class\x18\x01 \x01(\t\x12\x38\n\x10sparse_sgd_param\x18\x02 \x01(\x0b\x32\x1e.paddle.SparseSGDRuleParameter\x12\x36\n\x0f\x64\x65nse_sgd_param\x18\x03 \x01(\x0b\x32\x1d.paddle.DenseSGDRuleParameter\x12\x13\n\x07\x66\x65\x61_dim\x18\x04 \x01(\r:\x02\x31\x31\x12\x15\n\nembedx_dim\x18\x05 \x01(\r:\x01\x38\x12\x1c\n\x10\x65mbedx_threshold\x18\x06 \x01(\r:\x02\x31\x30\x12G\n\x17\x64ownpour_accessor_param\x18\x07 \x01(\x0b\x32&.paddle.DownpourTableAccessorParameter\x12\x45\n\x19table_accessor_save_param\x18\x08 \x03(\x0b\x32\".paddle.TableAccessorSaveParameter\"\x96\x02\n\x1e\x44ownpourTableAccessorParameter\x12\x19\n\x0cnonclk_coeff\x18\x01 \x01(\x02:\x03\x30.1\x12\x16\n\x0b\x63lick_coeff\x18\x02 \x01(\x02:\x01\x31\x12\x1b\n\x0e\x62\x61se_threshold\x18\x03 \x01(\x02:\x03\x31.5\x12\x1d\n\x0f\x64\x65lta_threshold\x18\x04 \x01(\x02:\x04\x30.25\x12\x1b\n\x0f\x64\x65lta_keep_days\x18\x05 \x01(\x02:\x02\x31\x36\x12#\n\x15show_click_decay_rate\x18\x06 \x01(\x02:\x04\x30.98\x12\x1d\n\x10\x64\x65lete_threshold\x18\x07 \x01(\x02:\x03\x30.8\x12$\n\x18\x64\x65lete_after_unseen_days\x18\x08 \x01(\x02:\x02\x33\x30\"S\n\x1aTableAccessorSaveParameter\x12\r\n\x05param\x18\x01 \x01(\r\x12\x11\n\tconverter\x18\x02 \x01(\t\x12\x13\n\x0b\x64\x65\x63onverter\x18\x03 \x01(\t\"e\n\x10PsRequestMessage\x12\x0e\n\x06\x63md_id\x18\x01 \x02(\r\x12\x10\n\x08table_id\x18\x02 \x01(\r\x12\x0e\n\x06params\x18\x03 \x03(\x0c\x12\x11\n\tclient_id\x18\x04 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\"\x85\x01\n\x16SparseSGDRuleParameter\x12\x1b\n\rlearning_rate\x18\x01 \x01(\x01:\x04\x30.05\x12\x18\n\rinitial_g2sum\x18\x02 \x01(\x01:\x01\x33\x12\x1d\n\rinitial_range\x18\x03 \x01(\x01:\x06\x30.0001\x12\x15\n\rweight_bounds\x18\x04 \x03(\x02\"\xe1\x01\n\x15\x44\x65nseSGDRuleParameter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12&\n\x04\x61\x64\x61m\x18\x02 \x01(\x0b\x32\x18.paddle.AdamSGDParameter\x12(\n\x05naive\x18\x03 \x01(\x0b\x32\x19.paddle.NaiveSGDParameter\x12,\n\x07summary\x18\x04 \x01(\x0b\x32\x1b.paddle.SummarySGDParameter\x12:\n\x0emoving_average\x18\x05 \x01(\x0b\x32\".paddle.MovingAverageRuleParameter\"\xac\x01\n\x10\x41\x64\x61mSGDParameter\x12\x1c\n\rlearning_rate\x18\x01 \x01(\x01:\x05\x35\x65-06\x12 \n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01:\x08\x30.999993\x12\x1e\n\x0e\x61\x64\x61_decay_rate\x18\x03 \x01(\x01:\x06\x30.9999\x12\x1a\n\x0b\x61\x64\x61_epsilon\x18\x04 \x01(\x01:\x05\x31\x65-08\x12\x1c\n\x0emom_decay_rate\x18\x05 \x01(\x01:\x04\x30.99\"J\n\x11NaiveSGDParameter\x12\x1d\n\rlearning_rate\x18\x01 \x01(\x01:\x06\x30.0002\x12\x16\n\x0e\x61vg_decay_rate\x18\x02 \x01(\x01\";\n\x13SummarySGDParameter\x12$\n\x12summary_decay_rate\x18\x01 \x01(\x01:\x08\x30.999999\".\n\x1aMovingAverageRuleParameter\x12\x10\n\x08momentum\x18\x01 \x01(\x01\"I\n\x11PsResponseMessage\x12\x13\n\x08\x65rr_code\x18\x01 \x02(\x05:\x01\x30\x12\x11\n\x07\x65rr_msg\x18\x02 \x02(\t:\x00\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\xd5\x01\n\x11\x46sClientParameter\x12:\n\x07\x66s_type\x18\x01 \x01(\x0e\x32#.paddle.FsClientParameter.FsApiType:\x04HDFS\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x0c\n\x04user\x18\x03 \x01(\t\x12\x0e\n\x06passwd\x18\x04 \x01(\t\x12\x13\n\x0b\x62uffer_size\x18\x05 \x01(\x05\x12\x12\n\nhadoop_bin\x18\x33 \x01(\t\x12\x10\n\x08\x61\x66s_conf\x18\x65 \x01(\t\"\x1e\n\tFsApiType\x12\x08\n\x04HDFS\x10\x00\x12\x07\n\x03\x41\x46S\x10\x01*4\n\tTableType\x12\x13\n\x0fPS_SPARSE_TABLE\x10\x00\x12\x12\n\x0ePS_DENSE_TABLE\x10\x01*\x9c\x03\n\x07PsCmdID\x12\x17\n\x13PS_PULL_DENSE_TABLE\x10\x00\x12\x17\n\x13PS_PUSH_DENSE_TABLE\x10\x01\x12\x18\n\x14PS_PULL_SPARSE_TABLE\x10\x02\x12\x18\n\x14PS_PUSH_SPARSE_TABLE\x10\x03\x12\x13\n\x0fPS_SHRINK_TABLE\x10\x04\x12\x15\n\x11PS_SAVE_ONE_TABLE\x10\x05\x12\x15\n\x11PS_SAVE_ALL_TABLE\x10\x06\x12\x15\n\x11PS_LOAD_ONE_TABLE\x10\x07\x12\x15\n\x11PS_LOAD_ALL_TABLE\x10\x08\x12\x16\n\x12PS_CLEAR_ONE_TABLE\x10\t\x12\x16\n\x12PS_CLEAR_ALL_TABLE\x10\n\x12\x17\n\x13PS_PUSH_DENSE_PARAM\x10\x0b\x12\x12\n\x0ePS_STOP_SERVER\x10\x0c\x12\x1b\n\x17PS_SAVE_ONE_CACHE_TABLE\x10\r\x12\x1a\n\x16PS_GET_CACHE_THRESHOLD\x10\x0e\x12\x14\n\x10PS_CACHE_SHUFFLE\x10\x0f\x12\x0e\n\nPS_S2S_MSG\x10\x65\x32K\n\tPsService\x12>\n\x07service\x12\x18.paddle.PsRequestMessage\x1a\x19.paddle.PsResponseMessageB\x03\x80\x01\x01'
))
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -290,9 +290,9 @@ _PSPARAMETER = _descriptor.Descriptor(
number=301,
type=11,
cpp_type=10,
label=1,
label=3,
has_default_value=False,
default_value=None,
default_value=[],
message_type=None,
enum_type=None,
containing_type=None,
......
......@@ -559,7 +559,8 @@ class FleetUtil(object):
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
donefile_name="sparse_cache.meta"):
donefile_name="sparse_cache.meta",
**kwargs):
"""
write cache donefile
......@@ -572,6 +573,9 @@ class FleetUtil(object):
hadoop_fs_ugi(str): hdfs/afs fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is "sparse_cache.meta"
kwargs(dict): user defined properties
file_num(int): cache file num
table_id(int): cache table id
Examples:
.. code-block:: python
......@@ -591,12 +595,14 @@ class FleetUtil(object):
day = str(day)
pass_id = str(pass_id)
key_num = int(key_num)
file_num = kwargs.get("file_num", 16)
table_id = kwargs.get("table_id", 0)
if pass_id != "-1":
suffix_name = "/%s/delta-%s/000_cache" % (day, pass_id)
suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id)
model_path = output_path.rstrip("/") + suffix_name
else:
suffix_name = "/%s/base/000_cache" % day
suffix_name = "/%s/base/%03d_cache" % (day, table_id)
model_path = output_path.rstrip("/") + suffix_name
if fleet.worker_index() == 0:
......@@ -610,8 +616,8 @@ class FleetUtil(object):
self.rank0_error( \
"not write because %s already exists" % donefile_path)
else:
meta_str = \
"file_prefix:part\npart_num:16\nkey_num:%d\n" % key_num
meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \
% (file_num, key_num)
with open(donefile_name, "w") as f:
f.write(meta_str)
client.upload(
......@@ -743,7 +749,7 @@ class FleetUtil(object):
fleet.save_persistables(None, model_path, mode=2)
self.rank0_print("save_xbox_base_model done")
def save_cache_model(self, output_path, day, pass_id, mode=1):
def save_cache_model(self, output_path, day, pass_id, mode=1, **kwargs):
"""
save cache model
......@@ -752,6 +758,8 @@ class FleetUtil(object):
day(str|int): training day
pass_id(str|int): training pass id
mode(str|int): save mode
kwargs(dict): user defined properties
table_id(int): table id to save cache
Returns:
key_num(int): cache key num
......@@ -767,14 +775,16 @@ class FleetUtil(object):
day = str(day)
pass_id = str(pass_id)
mode = int(mode)
table_id = kwargs.get("table_id", 0)
suffix_name = "/%s/delta-%s" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_model %s" % model_path)
key_num = fleet.save_cache_model(None, model_path, mode=mode)
key_num = fleet.save_cache_model(
None, model_path, mode=mode, table_id=table_id)
self.rank0_print("save_cache_model done")
return key_num
def save_cache_base_model(self, output_path, day):
def save_cache_base_model(self, output_path, day, **kwargs):
"""
save cache model
......@@ -782,6 +792,8 @@ class FleetUtil(object):
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
kwargs(dict): user defined properties
table_id(int): table id to save cache
Returns:
key_num(int): cache key num
......@@ -795,10 +807,12 @@ class FleetUtil(object):
"""
day = str(day)
table_id = kwargs.get("table_id", 0)
suffix_name = "/%s/base" % day
model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_base_model %s" % model_path)
key_num = fleet.save_cache_model(None, model_path, mode=2)
key_num = fleet.save_cache_model(
None, model_path, mode=2, table_id=table_id)
self.rank0_print("save_cache_base_model done")
return key_num
......@@ -820,8 +834,9 @@ class FleetUtil(object):
"""
fleet._role_maker._barrier_worker()
if fleet._role_maker.is_first_worker():
tables = fleet._dist_desc.trainer_param.dense_table
prog_id = str(id(program))
tables = fleet._opt_info["program_id_to_worker"][prog_id].\
get_desc().dense_table
prog_conf = fleet._opt_info['program_configs'][prog_id]
prog_tables = {}
for key in prog_conf:
......@@ -844,6 +859,95 @@ class FleetUtil(object):
int(table.table_id), var_name_list)
fleet._role_maker._barrier_worker()
def save_paddle_inference_model(self,
executor,
scope,
program,
feeded_vars,
target_vars,
output_path,
day,
pass_id,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
save_combine=True):
"""
save paddle inference model, and upload to hdfs dnn_plugin path
Args:
executor(Executor): fluid Executor
scope(Scope): fluid Scope
program(Program): fluid Program
feeded_vars(list[Variable]): feed vars
target_vars(list[variable]): fetch vars
output_path(str): hdfs/afs output path
day(str|int): training day
pass_id(str|int): training pass
hadoop_fs_name(str): hadoop fs name
hadoop_fs_ugi(str): hadoop fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
save_combine(bool): whether to save in a file or seperate files,
default is True
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_paddle_inference_model(exe,
join_scope,
join_program,
feeded_vars,
target_vars,
"hdfs:/my/output/path/",
day=20190727,
pass_id=6,
hadoop_fs_name="xxx",
hadoop_fs_ugi="xxx,xxx")
"""
day = str(day)
pass_id = str(pass_id)
feeded_var_names = [i.name for i in feeded_vars]
model_name = "inference_model"
# pull dense before save
self.pull_all_dense_params(scope, program)
if fleet.worker_index() == 0:
with fluid.scope_guard(scope):
if save_combine:
fluid.io.save_inference_model(
dirname=model_name,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
executor=executor,
main_program=program,
params_filename="params")
else:
fluid.io.save_inference_model(
dirname=model_name,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
executor=executor,
main_program=program)
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if pass_id == "-1":
dest = "%s/%s/base/dnn_plugin/" % (output_path, day)
else:
dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day,
pass_id)
if not client.is_exist(dest):
client.makedirs(dest)
client.upload(dest, model_name)
fleet._role_maker._barrier_worker()
def save_paddle_params(self,
executor,
scope,
......
......@@ -595,8 +595,7 @@ class HDFSClient(object):
if not self.is_exist(dest_dir):
self.makedirs(dest_dir)
put_command = ["-put", local_dir, dest_dir]
returncode, output, errors = self.__run_hdfs_cmd(put_command,
retry_times)
returncode, output, errors = self.__run_hdfs_cmd(put_command)
if returncode != 0:
_logger.error("Put local dir: {} to HDFS dir: {} failed".format(
local_dir, dest_dir))
......
......@@ -282,6 +282,7 @@ class TestDataset(unittest.TestCase):
dataset.wait_preload_done()
fleet_ptr = fluid.core.Fleet()
fleet_ptr.set_client2client_config(1, 1, 1)
fleet_ptr.get_cache_threshold(0)
os.remove("./test_in_memory_dataset_run_a.txt")
os.remove("./test_in_memory_dataset_run_b.txt")
......@@ -406,14 +407,28 @@ class TestDataset(unittest.TestCase):
class TestDatasetWithDataLoader(TestDataset):
"""
Test Dataset With Data Loader class. TestCases.
"""
def setUp(self):
"""
Test Dataset With Data Loader, setUp.
"""
self.use_data_loader = True
self.epoch_num = 10
self.drop_last = False
class TestDatasetWithFetchHandler(unittest.TestCase):
"""
Test Dataset With Fetch Handler. TestCases.
"""
def net(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
poolings = []
......@@ -431,6 +446,13 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
return slots_vars, fc
def get_dataset(self, inputs, files):
"""
Test Dataset With Fetch Handler. TestCases.
Args:
inputs(list): inputs of get_dataset
files(list): files of get_dataset
"""
dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
dataset.set_batch_size(32)
dataset.set_thread(3)
......@@ -440,6 +462,9 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
return dataset
def setUp(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
with open("test_queue_dataset_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
......@@ -453,10 +478,16 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
f.write(data)
def tearDown(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
def test_dataset_none(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
......@@ -476,6 +507,9 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
self.assertTrue(False)
def test_infer_from_dataset(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
......@@ -491,6 +525,9 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
self.assertTrue(False)
def test_fetch_handler(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
......@@ -515,5 +552,146 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
self.assertTrue(False)
class TestDataset2(unittest.TestCase):
""" TestCases for Dataset. """
def setUp(self):
""" TestCases for Dataset. """
self.use_data_loader = False
self.epoch_num = 10
self.drop_last = False
def test_dataset_fleet(self):
"""
Testcase for InMemoryDataset from create to run.
"""
with open("test_in_memory_dataset2_run_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_in_memory_dataset2_run_b.txt", "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
f.write(data)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
with fluid.program_guard(train_program, startup_program):
slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
slots_vars = []
for slot in slots:
var = fluid.layers.data(\
name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var)
fake_cost = \
fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
fake_cost = fluid.layers.mean(fake_cost)
with fluid.scope_guard(scope):
place = fluid.CPUPlace()
exe = fluid.Executor(place)
try:
fleet.init(exe)
except ImportError as e:
print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005)
try:
adam = fleet.distributed_optimizer(adam)
adam.minimize([fake_cost], [scope])
except AttributeError as e:
print("warning: no mpi")
except ImportError as e:
print("warning: no mpi4py")
exe.run(startup_program)
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_batch_size(32)
dataset.set_thread(3)
dataset.set_filelist([
"test_in_memory_dataset2_run_a.txt",
"test_in_memory_dataset2_run_b.txt"
])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory()
fleet._opt_info = None
fleet._fleet_ptr = None
os.remove("./test_in_memory_dataset2_run_a.txt")
os.remove("./test_in_memory_dataset2_run_b.txt")
def test_dataset_fleet2(self):
"""
Testcase for InMemoryDataset from create to run.
"""
with open("test_in_memory_dataset2_run2_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_in_memory_dataset2_run2_b.txt", "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
f.write(data)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
with fluid.program_guard(train_program, startup_program):
slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
slots_vars = []
for slot in slots:
var = fluid.layers.data(\
name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var)
fake_cost = \
fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
fake_cost = fluid.layers.mean(fake_cost)
with fluid.scope_guard(scope):
place = fluid.CPUPlace()
exe = fluid.Executor(place)
try:
fleet.init(exe)
except ImportError as e:
print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005)
try:
adam = fleet.distributed_optimizer(
adam,
strategy={
"fs_uri": "fs_uri_xxx",
"fs_user": "fs_user_xxx",
"fs_passwd": "fs_passwd_xxx",
"fs_hadoop_bin": "fs_hadoop_bin_xxx"
})
adam.minimize([fake_cost], [scope])
except AttributeError as e:
print("warning: no mpi")
except ImportError as e:
print("warning: no mpi4py")
exe.run(startup_program)
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_batch_size(32)
dataset.set_thread(3)
dataset.set_filelist([
"test_in_memory_dataset2_run2_a.txt",
"test_in_memory_dataset2_run2_b.txt"
])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory()
fleet._opt_info = None
fleet._fleet_ptr = None
os.remove("./test_in_memory_dataset2_run2_a.txt")
os.remove("./test_in_memory_dataset2_run2_b.txt")
if __name__ == '__main__':
unittest.main()
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Testcases for Downpour."""
from __future__ import print_function
......@@ -25,15 +26,19 @@ import sys
from op_test import OpTest
from paddle.fluid.trainer_desc import DistMultiTrainer
from paddle.fluid.device_worker import DownpourSGD
from paddle.fluid.incubate.fleet.parameter_server.pslib.node import DownpourWorker
from google.protobuf import text_format
import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib
class TestListenAndServOp(OpTest):
"""TestListenAndServOp."""
def setUp(self):
pass
def test_device_work_use_cvm(self):
"""test device work use_cvm."""
if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass
else:
......@@ -77,6 +82,9 @@ class TestListenAndServOp(OpTest):
opt_info["scale_datanorm"] = -1
opt_info["dump_slot"] = False
opt_info["stat_var_names"] = []
worker = DownpourWorker(None)
worker.get_desc().CopyFrom(ps_param.trainer_param[0])
opt_info["program_id_to_worker"] = {program_id: worker}
main_program._fleet_opt = opt_info
trainer = DistMultiTrainer()
......@@ -90,6 +98,7 @@ class TestListenAndServOp(OpTest):
os.system(cmd)
def test_device_work(self):
"""test devicve worker."""
if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass
else:
......@@ -133,6 +142,9 @@ class TestListenAndServOp(OpTest):
opt_info["scale_datanorm"] = -1
opt_info["dump_slot"] = False
opt_info["stat_var_names"] = []
worker = DownpourWorker(None)
worker.get_desc().CopyFrom(ps_param.trainer_param[0])
opt_info["program_id_to_worker"] = {program_id: worker}
main_program._fleet_opt = opt_info
trainer = DistMultiTrainer()
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defination of trainers."""
import sys
from os import path
......@@ -116,6 +117,78 @@ class TrainerDesc(object):
self.proto_desc.adjust_ins_weight_config.ins_weight_slot = \
config_dict.get("ins_weight_slot", "")
def _set_copy_table_config(self, config_dict):
config = self.proto_desc.copy_table_config
config.need_copy = config_dict.get("need_copy", False)
config.batch_num = config_dict.get("batch_num", 100)
src_sparse_tables = config_dict.get("src_sparse_tables", [])
if not isinstance(src_sparse_tables, list):
src_sparse_tables = [src_sparse_tables]
dest_sparse_tables = config_dict.get("dest_sparse_tables", [])
if not isinstance(dest_sparse_tables, list):
dest_sparse_tables = [dest_sparse_tables]
if len(src_sparse_tables) != len(dest_sparse_tables):
raise ValueError(
"len(src_sparse_tables) != len(dest_sparse_tables)," \
" %s vs %s" % (len(src_sparse_tables), \
len(dest_sparse_tables)))
for i in src_sparse_tables:
config.src_sparse_tables.append(i)
for i in dest_sparse_tables:
config.dest_sparse_tables.append(i)
src_dense_tables = config_dict.get("src_dense_tables", [])
if not isinstance(src_dense_tables, list):
src_dense_tables = [src_dense_tables]
dest_dense_tables = config_dict.get("dest_dense_tables", [])
if not isinstance(dest_dense_tables, list):
dest_dense_tables = [dest_dense_tables]
if len(src_dense_tables) != len(dest_dense_tables):
raise ValueError(
"len(src_dense_tables) != len(dest_dense_tables)," \
" %s vs %s" % (len(src_dense_tables), \
len(dest_dense_tables)))
for i in src_dense_tables:
config.src_dense_tables.append(i)
for i in dest_dense_tables:
config.dest_dense_tables.append(i)
# user can also specify dense variables to copy,
# instead of copy dense table
src_var_list = config_dict.get("src_var_list", [])
if not isinstance(src_var_list, list):
src_var_list = [src_var_list]
dest_var_list = config_dict.get("dest_var_list", [])
if not isinstance(dest_var_list, list):
dest_var_list = [dest_var_list]
if len(src_var_list) != len(dest_var_list):
raise ValueError(
"len(src_var_list) != len(dest_var_list), %s vs" \
" %s" % (len(src_var_list), len(dest_var_list)))
for i in src_var_list:
config.src_var_list.append(i)
for i in dest_var_list:
config.dest_var_list.append(i)
dependency_map = config_dict.get("dependency_map", {})
for key in dependency_map:
m = config.table_denpendency_map.add()
m.key = key
values = dependency_map[key]
if not isinstance(values, list):
values = [values]
if len(values) != 1:
raise ValueError("dependency len %s != 1" % len(values))
for value in values:
m.values.append(value)
config.dense_pull_after_copy = \
config_dict.get("dense_pull_after_copy", True)
config.enable_dependency = \
config_dict.get("enable_dependency", False)
config.sparse_copy_by_feasign = \
config_dict.get("sparse_copy_by_feasign", True)
def _desc(self):
from google.protobuf import text_format
return self.proto_desc.SerializeToString()
......@@ -147,6 +220,11 @@ class MultiTrainer(TrainerDesc):
class DistMultiTrainer(TrainerDesc):
"""
Implement of DistMultiTrainer.
It's for Distributed training.
"""
def __init__(self):
super(DistMultiTrainer, self).__init__()
pass
......@@ -166,6 +244,11 @@ class DistMultiTrainer(TrainerDesc):
class PipelineTrainer(TrainerDesc):
"""
Implement of PipelineTrainer.
It's for Pipeline.
"""
def __init__(self):
super(PipelineTrainer, self).__init__()
pass
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defination of TrainerFactory."""
import threading
import time
......@@ -24,6 +25,12 @@ __all__ = ["TrainerFactory", "FetchHandler", "FetchHandlerMonitor"]
class TrainerFactory(object):
"""
Create trainer and device worker.
If opt_info is not None, it will get configs from opt_info,
otherwise create MultiTrainer and Hogwild.
"""
def __init__(self):
pass
......@@ -43,23 +50,44 @@ class TrainerFactory(object):
if "fleet_desc" in opt_info:
device_worker._set_fleet_desc(opt_info["fleet_desc"])
trainer._set_fleet_desc(opt_info["fleet_desc"])
if opt_info.get("use_cvm") is not None:
trainer._set_use_cvm(opt_info["use_cvm"])
if opt_info.get("scale_datanorm") is not None:
trainer._set_scale_datanorm(opt_info["scale_datanorm"])
if opt_info.get("dump_slot") is not None:
trainer._set_dump_slot(opt_info["dump_slot"])
if opt_info.get("mpi_rank") is not None:
trainer._set_mpi_rank(opt_info["mpi_rank"])
if opt_info.get("mpi_size") is not None:
trainer._set_mpi_size(opt_info["mpi_size"])
if opt_info.get("dump_fields") is not None:
trainer._set_dump_fields(opt_info["dump_fields"])
if opt_info.get("dump_fields_path") is not None:
trainer._set_dump_fields_path(opt_info["dump_fields_path"])
if opt_info.get("dump_file_num") is not None:
trainer._set_dump_file_num(opt_info["dump_file_num"])
if opt_info.get("dump_converter") is not None:
trainer._set_dump_converter(opt_info["dump_converter"])
trainer._set_adjust_ins_weight(opt_info["adjust_ins_weight"])
if opt_info.get("adjust_ins_weight") is not None:
trainer._set_adjust_ins_weight(opt_info[
"adjust_ins_weight"])
if opt_info.get("copy_table") is not None:
trainer._set_copy_table_config(opt_info["copy_table"])
if opt_info.get("check_nan_var_names") is not None:
trainer._set_check_nan_var_names(opt_info[
"check_nan_var_names"])
if opt_info.get("dump_param") is not None:
trainer._set_dump_param(opt_info["dump_param"])
trainer._set_device_worker(device_worker)
return trainer
class FetchHandlerMonitor(object):
"""
Defination of FetchHandlerMonitor class,
it's for fetch handler.
"""
def __init__(self, scope, handler):
self.fetch_instance = handler
self.fetch_thread = threading.Thread(
......@@ -68,11 +96,21 @@ class FetchHandlerMonitor(object):
self.running = False
def start(self):
"""
start monitor,
it will start a monitor thread.
"""
self.running = True
self.fetch_thread.setDaemon(True)
self.fetch_thread.start()
def handler_decorator(self, fetch_scope, fetch_handler):
"""
decorator of handler,
Args:
fetch_scope(Scope): fetch scope
fetch_handler(Handler): fetch handler
"""
fetch_target_names = self.fetch_instance.fetch_target_names
period_secs = self.fetch_instance.period_secs
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册