From f5af6905fc11f6df88cc48560d82e65dbf872b10 Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Tue, 17 Sep 2019 18:06:03 +0800 Subject: [PATCH] add sparse cache --- BCLOUD | 1 + .../fluid/train/custom_trainer/feed/main.cc | 1 + .../feed/process/learner_process.cc | 83 +++++++++++++++++++ .../feed/process/learner_process.h | 6 +- .../custom_trainer/feed/trainer_context.h | 55 +++++++++++- 5 files changed, 141 insertions(+), 5 deletions(-) diff --git a/BCLOUD b/BCLOUD index 7fd4d470..a618c420 100755 --- a/BCLOUD +++ b/BCLOUD @@ -36,6 +36,7 @@ CONFIGS('baidu/third-party/pybind11@v2.2.4@git_branch') CONFIGS('baidu/third-party/python@gcc482output@git_branch') CONFIGS('baidu/third-party/yaml-cpp@yaml-cpp_0-6-2-0_GEN_PD_BL@git_tag') CONFIGS('baidu/third-party/openmpi@openmpi_1-4-5-0-feed_mlarch@git_branch') +CONFIGS('baidu/feed-mlarch/hopscotch-map@stable') CONFIGS('baidu/paddlepaddle/pslib@stable') CONFIGS('third-64/gtest@gtest_1-7-0-100_PD_BL') HEADERS('paddle/fluid/memory/*.h', '$INC/paddle/fluid/memory/') diff --git a/paddle/fluid/train/custom_trainer/feed/main.cc b/paddle/fluid/train/custom_trainer/feed/main.cc index de71d9ae..df40840b 100644 --- a/paddle/fluid/train/custom_trainer/feed/main.cc +++ b/paddle/fluid/train/custom_trainer/feed/main.cc @@ -21,6 +21,7 @@ int main(int argc, char* argv[]) { //load trainer config auto trainer_context_ptr = std::make_shared(); + trainer_context_ptr->cache_dict.reset(new SignCacheDict); trainer_context_ptr->trainer_config = YAML::LoadFile(FLAGS_feed_trainer_conf_path); //environment diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc index 954c1011..d004287f 100755 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.cc @@ -16,6 +16,8 @@ namespace feed { int LearnerProcess::initialize(std::shared_ptr context_ptr) { int ret = Process::initialize(context_ptr); auto& config = _context_ptr->trainer_config; + _is_dump_cache_model = config["dump_cache_model"].as(false); + _cache_load_converter = config["load_cache_converter"].as(""); _startup_dump_inference_base = config["startup_dump_inference_base"].as(false); if (config["executor"]) { _executors.resize(config["executor"].size()); @@ -27,6 +29,53 @@ int LearnerProcess::initialize(std::shared_ptr context_ptr) { return 0; } +// 更新各节点存储的CacheModel +int LearnerProcess::update_cache_model(uint64_t epoch_id, ModelSaveWay way) { + auto fs = _context_ptr->file_system; + auto* ps_client = _context_ptr->pslib->ps_client(); + auto* environment = _context_ptr->environment.get(); + auto* epoch_accessor = _context_ptr->epoch_accessor.get(); + if (!epoch_accessor->need_save_model(epoch_id, way)) { + return 0; + } + auto* ps_param = _context_ptr->pslib->get_param(); + if (_is_dump_cache_model && way == ModelSaveWay::ModelSaveInferenceBase) { + auto model_dir = epoch_accessor->model_save_path(epoch_id, way); + auto& table_param = ps_param->server_param().downpour_server_param().downpour_table_param(); + for (auto& param : table_param) { + if (param.type() != paddle::PS_SPARSE_TABLE) { + continue; + } + auto cache_model_path = fs->path_join( + model_dir, string::format_string("%03d_cache/", param.table_id())); + if (!fs->exists(cache_model_path)) { + continue; + } + auto& cache_dict = *(_context_ptr->cache_dict.get()); + cache_dict.clear(); + cache_dict.reserve(_cache_sign_max_num); + auto cache_file_list = fs->list(fs->path_join(cache_model_path, "part*")); + for (auto& cache_path : cache_file_list) { + auto cache_file = fs->open_read(cache_path, _cache_load_converter); + char *buffer = nullptr; + size_t buffer_size = 0; + ssize_t line_len = 0; + while ((line_len = getline(&buffer, &buffer_size, cache_file.get())) != -1) { + if (line_len <= 1) { + continue; + } + char* data_ptr; + cache_dict.append(strtoul(buffer, &data_ptr, 10)); + } + if (buffer != nullptr) { + free(buffer); + } + } + break; + } + } + return 0; +} int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way, bool is_force_dump) { auto fs = _context_ptr->file_system; auto* ps_client = _context_ptr->pslib->ps_client(); @@ -65,7 +114,38 @@ int LearnerProcess::wait_save_model(uint64_t epoch_id, ModelSaveWay way, bool is } timer.Pause(); VLOG(2) << "Save Model Cost(s):" << timer.ElapsedSec(); + + // save cache model, 只有inference需要cache_model + auto* ps_param = _context_ptr->pslib->get_param(); + if (_is_dump_cache_model && (way == ModelSaveWay::ModelSaveInferenceBase || + way == ModelSaveWay::ModelSaveInferenceDelta)) { + auto& table_param = ps_param->server_param().downpour_server_param().downpour_table_param(); + for (auto& param : table_param) { + if (param.type() != paddle::PS_SPARSE_TABLE) { + continue; + } + double cache_threshold = 0.0; + auto status = ps_client->get_cache_threshold(param.table_id(), cache_threshold); + CHECK(status.get() == 0) << "CacheThreshold Get failed!"; + status = ps_client->cache_shuffle(param.table_id(), model_dir, std::to_string((int)way), + std::to_string(cache_threshold)); + CHECK(status.get() == 0) << "Cache Shuffler Failed"; + status = ps_client->save_cache(param.table_id(), model_dir, std::to_string((int)way)); + auto feature_size = status.get(); + CHECK(feature_size >= 0) << "Cache Save Failed"; + auto cache_model_path = fs->path_join( + model_dir, string::format_string("%03d_cache/sparse_cache.meta", param.table_id())); + auto cache_meta_file = fs->open_write(cache_model_path, ""); + auto meta = string::format_string("file_prefix:part\npart_num:%d\nkey_num:%d\n", + param.sparse_table_cache_file_num(), feature_size); + CHECK(fwrite(meta.c_str(), meta.size(), 1, cache_meta_file.get()) == 1) << "Cache Meta Failed"; + if (feature_size > _cache_sign_max_num) { + _cache_sign_max_num = feature_size; + } + } + } _context_ptr->epoch_accessor->update_model_donefile(epoch_id, way); + return all_ret; } @@ -176,6 +256,9 @@ int LearnerProcess::run() { { wait_save_model(epoch_id, ModelSaveWay::ModelSaveInferenceBase); environment->barrier(EnvironmentRole::WORKER); + update_cache_model(epoch_id, ModelSaveWay::ModelSaveInferenceBase); + environment->barrier(EnvironmentRole::WORKER); + if (epoch_accessor->is_last_epoch(epoch_id)) { wait_save_model(epoch_id, ModelSaveWay::ModelSaveTrainCheckpointBase); } else { diff --git a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h index 2f0ccf4b..4daca3f6 100644 --- a/paddle/fluid/train/custom_trainer/feed/process/learner_process.h +++ b/paddle/fluid/train/custom_trainer/feed/process/learner_process.h @@ -22,9 +22,13 @@ protected: virtual int load_model(uint64_t epoch_id); // 同步保存所有模型, is_force_dump:不判断dump条件,强制dump出模型 virtual int wait_save_model(uint64_t epoch_id, ModelSaveWay way, bool is_force_dump = false); +virtual int update_cache_model(uint64_t epoch_id, ModelSaveWay way); private: - bool _startup_dump_inference_base; //启动立即dump base + bool _is_dump_cache_model; // 是否进行cache dump + uint32_t _cache_sign_max_num = 0; // cache sign最大个数 + std::string _cache_load_converter; // cache加载的前置转换脚本 + bool _startup_dump_inference_base; // 启动立即dump base std::vector> _executors; }; diff --git a/paddle/fluid/train/custom_trainer/feed/trainer_context.h b/paddle/fluid/train/custom_trainer/feed/trainer_context.h index 71b98be3..f0af4e93 100755 --- a/paddle/fluid/train/custom_trainer/feed/trainer_context.h +++ b/paddle/fluid/train/custom_trainer/feed/trainer_context.h @@ -3,6 +3,7 @@ #include #include #include +#include #include "paddle/fluid/platform/place.h" #include "paddle/fluid/train/custom_trainer/feed/common/yaml_helper.h" #include "paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h" @@ -35,15 +36,61 @@ enum class TrainerStatus { Saving = 1 // 模型存储状态 }; +const uint32_t SignCacheMaxValueNum = 13; +struct SignCacheData { + SignCacheData() { + memset(cache_value, 0, sizeof(float) * SignCacheMaxValueNum); + } + uint32_t idx; + float cache_value[SignCacheMaxValueNum]; +}; class SignCacheDict { public: - int32_t sign2index(uint64_t sign) { - return -1; + inline int32_t sign2index(uint64_t sign) { + auto itr = _sign2data_map.find(sign); + if (itr == _sign2data_map.end()) { + return -1; + } + return itr->second.idx; + } + + inline uint64_t index2sign(int32_t index) { + if (index >= _sign_list.size()) { + return 0; + } + return _sign_list[index]; + } + + inline void reserve(uint32_t size) { + _sign_list.reserve(size); + _sign2data_map.reserve(size); + } + + inline void clear() { + _sign_list.clear(); + _sign2data_map.clear(); + } + + inline void append(uint64_t sign) { + if (_sign2data_map.find(sign) != _sign2data_map.end()) { + return; + } + SignCacheData data; + data.idx = _sign_list.size(); + _sign_list.push_back(sign); + _sign2data_map.emplace(sign, std::move(data)); } - uint64_t index2sign(int32_t index) { - return 0; + inline SignCacheData* data(uint64_t sign) { + tsl::bhopscotch_pg_map::iterator itr = _sign2data_map.find(sign); + if (itr == _sign2data_map.end()) { + return nullptr; + } + return const_cast(&(itr->second)); } +private: + std::vector _sign_list; + tsl::bhopscotch_pg_map _sign2data_map; }; class TrainerContext { -- GitLab