diff --git a/cmake/external/libmct.cmake b/cmake/external/libmct.cmake index 351806f6e1a733292d07e83d18c7f3f7889f3b3e..239183cb6d008be61d1b6fbd1c508e89de53c8f2 100644 --- a/cmake/external/libmct.cmake +++ b/cmake/external/libmct.cmake @@ -40,9 +40,6 @@ SET(LIBMCT_INSTALL_ROOT "${THIRD_PARTY_PATH}/install") SET(LIBMCT_INSTALL_DIR ${LIBMCT_INSTALL_ROOT}/${LIBMCT_DST_DIR}) SET(LIBMCT_ROOT ${LIBMCT_INSTALL_DIR}) SET(LIBMCT_INC_DIR ${LIBMCT_ROOT}/include) -SET(LIBMCT_LIB_DIR ${LIBMCT_ROOT}/lib) -SET(LIBMCT_LIB ${LIBMCT_LIB_DIR}/libps.so) -SET(LIBMCT_IOMP_LIB ${LIBMCT_LIB_DIR}/libiomp5.so) #todo what is this SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${LIBMCT_ROOT}/lib") INCLUDE_DIRECTORIES(${LIBMCT_INC_DIR}) @@ -66,11 +63,15 @@ ExternalProject_Add( CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${LIBMCT_INSTALL_ROOT} ) -ADD_LIBRARY(libmct SHARED IMPORTED GLOBAL) -SET_PROPERTY(TARGET libmct PROPERTY IMPORTED_LOCATION ${LIBMCT_LIB}) +if (${CMAKE_VERSION} VERSION_LESS "3.3.0" OR NOT WIN32) + set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/boost_dummy.c) + file(WRITE ${dummyfile} "const char *dummy = \"${dummyfile}\";") + add_library(libmct STATIC ${dummyfile}) +else() + add_library(libmct INTERFACE) +endif() + +#ADD_LIBRARY(libmct SHARED IMPORTED GLOBAL) ADD_DEPENDENCIES(libmct ${LIBMCT_PROJECT}) LIST(APPEND external_project_dependencies libmct) -IF(WITH_C_API) - INSTALL(FILES ${LIBMCT_LIB} ${LIBMCT_IOMP_LIB} DESTINATION lib) -ENDIF() diff --git a/cmake/external/pslib.cmake b/cmake/external/pslib.cmake index 812af5efa20395061773d0135c0d1db32286bc99..586f66d6fdbc8581c52b83f43416d6014d99b29d 100644 --- a/cmake/external/pslib.cmake +++ b/cmake/external/pslib.cmake @@ -66,7 +66,7 @@ ExternalProject_Add( CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${PSLIB_INSTALL_ROOT} ) -ADD_LIBRARY(pslib SHARED IMPORTED GLOBAL) +ADD_LIBRARY(pslib STATIC IMPORTED GLOBAL) SET_PROPERTY(TARGET pslib PROPERTY IMPORTED_LOCATION ${PSLIB_LIB}) ADD_DEPENDENCIES(pslib ${PSLIB_PROJECT}) LIST(APPEND external_project_dependencies pslib) diff --git a/paddle/fluid/framework/async_executor.cc b/paddle/fluid/framework/async_executor.cc index aa76e03e838fe08a11e8d33f7d366ab801cbd236..94ed8c2fca42d1815e66b8ed6c57164547d68882 100644 --- a/paddle/fluid/framework/async_executor.cc +++ b/paddle/fluid/framework/async_executor.cc @@ -48,6 +48,10 @@ void AsyncExecutor::CreateThreads( worker->SetDataFeed(reader); worker->SetFetchVarNames(fetch_var_names); worker->BindingDataFeedMemory(); + worker->SetPSlibPtr(_pslib_ptr); + worker->SetPullDenseThread(_pull_dense_thread); + worker->BindingSlotVariableMemory(); + worker->SetParamConfig(&_param_config); } void PrepareReaders(std::vector>& readers, // NOLINT @@ -61,6 +65,77 @@ void PrepareReaders(std::vector>& readers, // NOLINT readers[0]->SetFileList(filelist); } +void AsyncExecutor::ConfigPslib(const std::string& dist_desc, std::vector& host_sign_list, int node_num, int index) { + _pslib_ptr = std::shared_ptr(new paddle::distributed::PSlib()); + _pslib_ptr->init_and_config(dist_desc, host_sign_list, node_num, index);//TODO +} + +void AsyncExecutor::StartServer() { + _pslib_ptr->run_server(); +} + +void AsyncExecutor::InitModel() { + //TODO only rank = 0 do this + std::vector all_dense_table_id; //TODO + all_dense_table_id.push_back(0); + for (auto table_id: all_dense_table_id) { + std::vector regions; + std::vector variables; //TODO + for (auto& t : variables) { + Variable* var = root_scope_->FindVar(t); + CHECK(var != nullptr) << "var[" << t << "] not found"; + LoDTensor* tensor = var->GetMutable(); + + float* g = tensor->data(); + CHECK(g != nullptr) << "var[" << t << "] value not initialized"; + + float init_range = 0.2; + int rown = tensor->dims()[0]; + init_range /= sqrt(rown); + + std::normal_distribution ndistr(0.0, 1.0); + for (auto i = 0u; i < tensor->numel(); ++i) { + g[i] = ndistr(local_random_engine()) * init_range; + } + + paddle::ps::Region reg(g, tensor->numel()); + regions.emplace_back(std::move(reg)); + } + + auto push_status = _pslib_ptr->_worker_ptr->push_dense_param(regions.data(), regions.size(), table_id); + push_status.wait(); + auto status = push_status.get(); + if (status != 0) { + LOG(FATAL) << "push dense param failed, status[" << status << "]"; + exit(-1); + } + } +} + +void AsyncExecutor::SaveModel(const std::string& path) { + auto ret = _pslib_ptr->_worker_ptr->flush(); + ret.wait(); + ret = _pslib_ptr->_worker_ptr->save(path, 0); + ret.wait(); + int32_t feasign_cnt = ret.get(); + if (feasign_cnt == -1) { // TODO should be feasign_cnt < 0, because server bug + LOG(FATAL) << "save model failed"; + exit(-1); + } +} + +void AsyncExecutor::PrepareDenseThread() { + DensePullThreadParam param; + param.ps_client = _pslib_ptr->_worker_ptr;; + param.threshold = 1;//GlobalConfig::instance().pull_dense_per_batch; //TODO + param.training_thread_num = actual_thread_num; + param.root_scope = root_scope_; + //param.dense_params = &GlobalConfig::instance().dense_variable_name; //TODO + + _pull_dense_thread = std::shared_ptr(new DensePullThread(param)); + +} + void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, const std::string& data_feed_desc_str, const std::vector& filelist, @@ -83,7 +158,7 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, google::protobuf::TextFormat::ParseFromString(data_feed_desc_str, &data_feed_desc); - int actual_thread_num = thread_num; + actual_thread_num = thread_num; int file_cnt = filelist.size(); PADDLE_ENFORCE(file_cnt > 0, "File list cannot be empty"); @@ -107,11 +182,11 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, // todo: should be factory method for creating datafeed std::vector> readers; PrepareReaders(readers, actual_thread_num, data_feed_desc, filelist); - + PrepareDenseThread(); std::vector> workers; workers.resize(actual_thread_num); for (auto& worker : workers) { - worker.reset(new ExecutorThreadWorker); + worker.reset(new AsyncExecutorThreadWorker); } // prepare thread resource here @@ -129,7 +204,7 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, for (auto& th : threads) { th.join(); } - + _pull_dense_thread->stop(); root_scope_->DropKids(); return; diff --git a/paddle/fluid/framework/async_executor.h b/paddle/fluid/framework/async_executor.h index 6aa59c89dc4b4b00487e0adfe7adc8a2f9c11a74..67f4e5deeee4b0a9112fa222ec43f76bf9538e98 100644 --- a/paddle/fluid/framework/async_executor.h +++ b/paddle/fluid/framework/async_executor.h @@ -22,6 +22,8 @@ limitations under the License. */ #include // NOLINT #include #include +#include //local_random_engine +#include //local_random_engine #include "paddle/fluid/framework/data_feed.pb.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor_thread_worker.h" @@ -30,6 +32,26 @@ limitations under the License. */ namespace paddle { namespace framework { + +inline double current_realtime() { + struct timespec tp; + clock_gettime(CLOCK_REALTIME, &tp); + return tp.tv_sec + tp.tv_nsec * 1e-9; +} + +inline std::default_random_engine& local_random_engine() { + struct engine_wrapper_t { + std::default_random_engine engine; + engine_wrapper_t() { + static std::atomic x(0); + std::seed_seq sseq = {x++, x++, x++, (unsigned long)(current_realtime() * 1000)}; + engine.seed(sseq); + } + }; + thread_local engine_wrapper_t r; + return r.engine; +} + class AsyncExecutor { public: AsyncExecutor(Scope* scope, const platform::Place& place); @@ -40,9 +62,12 @@ class AsyncExecutor { const int thread_num, const std::vector& fetch_names, const bool debug = false); - void ConfigServer() {} - void ConfigWorker() {} - void StartServer() {} + //void ConfigPslib(const char* dist_desc, uint64_t* host_sign_list, int node_num, int index); + void ConfigPslib(const std::string& dist_desc, std::vector& host_sign_list, int node_num, int index); + //void ConfigWorker() {} + void StartServer(); + void InitModel(); + void SaveModel(const std::string& path); private: void CreateThreads(ExecutorThreadWorker* worker, @@ -51,11 +76,19 @@ class AsyncExecutor { const std::vector& fetch_var_names, Scope* root_scope, const int thread_index, const bool debug); - + void PrepareDenseThread(); public: + std::shared_ptr _pslib_ptr; + std::shared_ptr _pull_dense_thread; Scope* root_scope_; platform::Place place_; + + AsyncWorkerParamConfig _param_config; + private: + int actual_thread_num; }; + + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/executor_thread_worker.cc b/paddle/fluid/framework/executor_thread_worker.cc index 4e4001e979fdd0774779fa288402c7847af90637..19d8818be7431d52a7272afa63d46451206b779f 100644 --- a/paddle/fluid/framework/executor_thread_worker.cc +++ b/paddle/fluid/framework/executor_thread_worker.cc @@ -31,6 +31,85 @@ limitations under the License. */ namespace paddle { namespace framework { +int DensePullThread::start() { + _running = true; + _t = std::thread(&DensePullThread::run, this); + return 0; +} + +void DensePullThread::run() { + while (_running) { + _pull_dense_status.resize(0); + for (auto& t : _dense_variable_name) { + if (check_update_param(t.first)) { + auto status = pull_dense(t.first); + _pull_dense_status.emplace_back(std::move(status)); + reset_thread_version(t.first); + } + } + if (_pull_dense_status.size() != 0) { + wait_all(); + } + + usleep(_sleep_time_ms * 1000); + } +} +bool DensePullThread::check_update_param(uint64_t table_id) { + { + std::lock_guard lock(_mutex_for_version); + auto& version = _training_versions[table_id]; + _current_version[table_id] = *(std::min_element(version.begin(), version.end())); + } + if (_current_version[table_id] - _last_versions[table_id] < _threshold) { + return false; + } + return true; +} + +void DensePullThread::reset_thread_version(uint64_t table_id) { + std::lock_guard lock(_mutex_for_version); + _last_versions[table_id] = _current_version[table_id]; +} +std::future DensePullThread::pull_dense(uint64_t table_id) { + auto& regions = _regions[table_id]; + regions.clear(); + auto& variables = _dense_variable_name[table_id]; + regions.resize(variables.size()); + + for (auto i = 0u; i < variables.size(); ++i) { + auto& t = variables[i]; + Variable* var = _root_scope->FindVar(t); + LoDTensor* tensor = var->GetMutable(); + + float* w = tensor->data(); + paddle::ps::Region reg(w, tensor->numel()); + regions[i] = std::move(reg); + } + return _ps_client->pull_dense(regions.data(), regions.size(), table_id); +} + +void DensePullThread::wait_all() { + for (auto& t : _pull_dense_status) { + t.wait(); + auto status = t.get(); + if (status != 0) { + LOG(WARNING) << "pull dense failed times:" << ++_pull_dense_fail_times; + } + } + + if (_pull_dense_fail_times > 20) { + LOG(FATAL) << "pull dense failed times more than 20 times"; + exit(-1); + } + + _pull_dense_status.resize(0); +} + +void DensePullThread::increase_thread_version(int thread_id, uint64_t table_id) { + std::lock_guard lock(_mutex_for_version); + _training_versions[table_id][thread_id]++; +} + void ExecutorThreadWorker::CreateThreadOperators(const ProgramDesc& program) { auto& block = program.Block(0); op_names_.clear(); @@ -90,6 +169,11 @@ void ExecutorThreadWorker::SetFetchVarNames( fetch_var_names.end()); } +void ExecutorThreadWorker::SetPSlibPtr(std::shared_ptr pslib_ptr) { + +} + + void ExecutorThreadWorker::SetDevice() { #if defined _WIN32 || defined __APPLE__ return; @@ -219,5 +303,377 @@ void ExecutorThreadWorker::SetRootScope(Scope* g_scope) { root_scope_ = g_scope; } +//AsyncExecutor +void AsyncExecutorThreadWorker::TrainFiles() { + SetDevice(); + + int fetch_var_num = fetch_var_names_.size(); + fetch_values_.clear(); + fetch_values_.resize(fetch_var_num); + + thread_reader_->Start(); + + int cur_batch; + int batch_cnt = 0; + while ((cur_batch = thread_reader_->Next()) > 0) { + // executor run here + TrainOneNetwork(); + + ++batch_cnt; + thread_scope_->DropKids(); + + if (debug_ == false || thread_id_ != 0) { + continue; + } + + for (int i = 0; i < fetch_var_num; ++i) { + print_fetch_var(thread_scope_, fetch_var_names_[i]); + } // end for (int i = 0...) + } // end while () +} + +void AsyncExecutorThreadWorker::SetPSlibPtr(std::shared_ptr pslib_ptr) { + _pslib_ptr = pslib_ptr; +} +void AsyncExecutorThreadWorker::SetPullDenseThread(std::shared_ptr dpt) { + _pull_dense_thread = dpt; +} +void AsyncExecutorThreadWorker::TrainOneNetwork() { + PrepareParams(); + + for (auto& op : ops_) { + if (op->Type().find("sgd") != std::string::npos) { + continue; + } + op->Run(*thread_scope_, place_); + } + + UpdateParams(); +} + +void AsyncExecutorThreadWorker::BindingSlotVariableMemory() { + /* + std::vector ins_slot_offset(batch_size + 1, 0); + for (auto i = 1u; i <= batch_size; ++i) { + ins_slot_offset[i] += ins_slot_offset[i - 1] + slot_dim; + } + + std::vector tensor_lod(batch_size + 1, 0); + for (auto i = 1u; i <= batch_size; ++i) { + tensor_lod[i] += tensor_lod[i - 1] + 1; + } + + auto& used_slots = reader->get_use_slot_alias(); + slot_input_vec.resize(used_slots.size() - 1); + for (auto slot_idx = 1u; slot_idx < used_slots.size(); ++slot_idx) { + auto var = slot_input_variable_name[slot_idx]; + + auto v = thread_scope->FindVar(var); + CHECK(v != nullptr) << "var[" << var << "] not found"; + + LoDTensor* tensor = v->GetMutable(); + float* tensor_ptr = tensor->mutable_data({batch_size, slot_dim}, platform::CPUPlace()); + memset(tensor_ptr, 0, sizeof(float) * ins_slot_offset.back()); + + LoD data_lod{tensor_lod}; + tensor->set_lod(data_lod); + + slot_input_vec[slot_idx - 1].reset(tensor); + } + */ +} +void AsyncExecutorThreadWorker::SetParamConfig(AsyncWorkerParamConfig* pc) { + _param_config = pc; +} + +void AsyncExecutorThreadWorker::PrepareParams() { + int table_id = 0; //TODO + PullSparse(table_id); + for (auto& t : _pull_sparse_status) { + t.wait(); + auto status = t.get(); + if (status != 0) { + LOG(ERROR) << "pull sparse failed, status[" << status << "]"; + exit(-1); + } + } + _pull_sparse_status.resize(0); + + FillSparse(table_id); +} + +void AsyncExecutorThreadWorker::UpdateParams() { + //for (auto i = 0u; i < GlobalConfig::instance().dense_table_id.size(); ++i) {//TODO + for (int i = 0; i < 1; ++i) { + PushSparse(i); + } + //for (auto i = 0u; i < GlobalConfig::instance().dense_table_id.size(); ++i) {//TODO + for (int i = 1; i < 2; ++i) { + PushDense(i); + } + int32_t tmp_push_dense_wait_times = _param_config->tmp_push_dense_wait_times; //TODO + int32_t tmp_push_sparse_wait_times = _param_config->tmp_push_sparse_wait_times; //TODO + static uint32_t push_dense_wait_times = static_cast(tmp_push_dense_wait_times); + static uint32_t push_sparse_wait_times = static_cast(tmp_push_sparse_wait_times); + + if (_push_dense_status.size() >= push_dense_wait_times) { + for (auto& t : _push_dense_status) { + t.wait(); + } + _push_dense_status.resize(0); + } + if (tmp_push_dense_wait_times == -1) { + _push_dense_status.resize(0); + } + + if (_push_sparse_status.size() >= push_sparse_wait_times) { + for (auto& t : _push_sparse_status) { + t.wait(); + } + _push_sparse_status.resize(0); + } + if (tmp_push_sparse_wait_times == -1) { + _push_sparse_status.resize(0); + } + + //for (auto dense_table_id : GlobalConfig::instance().dense_table_id) {//TODO + int dense_table_id = 1; + _pull_dense_thread->increase_thread_version(thread_id_, dense_table_id); + //} +} + +void AsyncExecutorThreadWorker::PushDense(int table_id) { + //auto table_id = GlobalConfig::instance().dense_table_id[table_id_index]; TODO + + std::vector regions; + //auto& variables = GlobalConfig::instance().dense_gradient_variable_name[table_id]; + std::vector variables; + for (auto& t : variables) { + Variable* var = thread_scope_->FindVar(t); + CHECK(var != nullptr) << "var[" << t << "] not found"; + LoDTensor* tensor = var->GetMutable(); + int count = tensor->numel(); + float* g = tensor->data(); + paddle::ps::Region reg(g, count); + regions.emplace_back(std::move(reg)); + } + + auto status = _pslib_ptr->_worker_ptr->push_dense(regions.data(), regions.size(), table_id); + _push_dense_status.push_back(std::move(status)); + +} + +void AsyncExecutorThreadWorker::PullSparse(int table_id) { + + + auto& features = _features[table_id]; + auto& feature_value = _feature_value[table_id]; + auto fea_dim = _param_config->fea_dim; //TODO + // slot id starts from 1 + features.clear(); + features.resize(0); + features.reserve(MAX_FEASIGN_NUM); + + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); + // slot_idx = 0 is label TODO + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + LoDTensor* tensor = var->GetMutable(); + int64_t* ids = tensor->data(); + int len = tensor->numel(); + for (auto i = 0u; i < len; ++i) { + //todo: current trick - filter feasign=use_slot_mod(bug: datafeed fill use_slot_mod for empty slot) + if (ids[i] == 0u) { + continue; + } + features.push_back(static_cast(ids[i])); + } + } + + check_pull_push_memory(features, feature_value, fea_dim); + + std::vector pull_feature_value; + for (auto i = 0u; i < features.size(); ++i) { + pull_feature_value.push_back(feature_value[i].data()); + } + + auto status = _pslib_ptr->_worker_ptr->pull_sparse( + pull_feature_value.data(), table_id, features.data(), features.size()); + _pull_sparse_status.push_back(std::move(status)); + + //to save time + auto& push_g = _feature_push_value[table_id]; + check_pull_push_memory(features, push_g, fea_dim); + + //binding_slot_embed_with_concat(); TODO + collect_feasign_info(table_id); //TODO +} + +void AsyncExecutorThreadWorker::FillSparse(int table_id) { + auto slot_dim = _param_config->slot_dim; // TODO + auto fea_dim = _param_config->fea_dim; //TODO + auto& features = _features[table_id]; + auto& fea_value = _feature_value[table_id]; + + CHECK(features.size() > 0) << "feature size check failed"; + + auto fea_idx = 0u; + + std::vector init_value(fea_dim); + + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); + // slot_idx = 0 is label TODO + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + LoDTensor* tensor = var->GetMutable(); + int64_t* ids = tensor->data(); + int len = tensor->numel(); + + Variable* var_emb = thread_scope_->FindVar(_param_config->slot_input_vec[slot_idx - 1]); + LoDTensor* tensor_emb = var_emb->GetMutable(); + float* ptr = tensor_emb->data(); + + for (auto index = 0u; index < len; ++index){ + //if (_current_train_job.use_cvm_feature()) { + // if (ids[index] == 0u) { + // memcpy(ptr + slot_dim * index, init_value.data(), sizeof(float) * slot_dim); + // continue; + // } + // memcpy(ptr + slot_dim * index, fea_value[fea_idx].data(), sizeof(float) * slot_dim); + // (ptr + slot_dim * index)[0] = log((ptr + slot_dim * index)[0] + 1); + // (ptr + slot_dim * index)[1] = log((ptr + slot_dim * index)[1] + 1) - (ptr + slot_dim * index)[0]; + // fea_idx++; + //} else { + if (ids[index] == 0u) { + memcpy(ptr + slot_dim * index, init_value.data() + 2, sizeof(float) * slot_dim); + continue; + } + memcpy(ptr + slot_dim * index, fea_value[fea_idx].data() + 2, sizeof(float) * slot_dim); + fea_idx++; + //} + } + } +} + +void AsyncExecutorThreadWorker::PushSparse(int table_id) { + + auto slot_dim = _param_config->slot_dim; //TODO + auto fea_dim = _param_config->fea_dim;//_current_train_job.fea_dim();TODO + auto& features = _features[table_id]; + //std::vector gradient_var; + //auto& gradient_var = GlobalConfig::instance().input_gradient_variable_name; //TODO + auto& push_g = _feature_push_value[table_id]; + check_pull_push_memory(features, push_g, fea_dim); + uint64_t fea_idx = 0u; + auto& fea_info = _fea_info[table_id]; //TODO + int offset = 0; + //if (!_current_train_job.use_cvm_feature()) { //TODO + offset = 2; + //} + + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); + + // slot_idx = 0 is label TODO + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + if (_slot_alias_to_table[feed_vec[slot_idx]] != table_id) { + continue; + } + Variable* g_var = thread_scope_->FindVar(_param_config->gradient_var[slot_idx - 1]); + LoDTensor* g_tensor = g_var->GetMutable(); + //int count = g_tensor->numel(); + float* g = g_tensor->data(); + /* + if (FLAGS_scale_sparse_gradient_with_batch_size) { + Eigen::Map g_mat(g, 1, tensor->numel()); + g_mat *= _batch_size; + } + */ + + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + LoDTensor* tensor = var->GetMutable(); + int len = tensor->lod()[0].back(); + //assert(slot_dim * len == count); + int64_t* ids = tensor->data(); + for (auto id_idx = 0u; id_idx < len; ++id_idx){ + if (ids[id_idx] == 0) { + g += slot_dim; + continue; + } + memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim); + push_g[fea_idx][0] = 1.0f; + push_g[fea_idx][1] = static_cast(fea_info[fea_idx].label); + g += slot_dim; + fea_idx++; + } + } + assert(fea_idx == features.size()); + CHECK(features.size() > 0); + + std::vector push_g_vec; + for (auto i = 0u; i < features.size(); ++i) { + push_g_vec.push_back(push_g[i].data()); + } + auto status = _pslib_ptr->_worker_ptr->push_sparse( + table_id, features.data(), (const float**)push_g_vec.data(), features.size()); + _push_sparse_status.push_back(std::move(status)); +} + +void AsyncExecutorThreadWorker::collect_feasign_info( + int table_id) { + auto& fea_info = _fea_info[table_id]; + auto& feature = _features[table_id]; + fea_info.resize(feature.size()); + + const std::vector& feed_vec = thread_reader_->GetUseSlotAlias(); + Variable* var = thread_scope_->FindVar(feed_vec[0]); + LoDTensor* tensor = var->GetMutable(); + int64_t* label = tensor->data(); + + int global_index = 0; + for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) { + Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]); + LoDTensor* tensor = var->GetMutable(); + int64_t* ids = tensor->data(); + + int fea_idx = 0; + for (auto ins_idx = 1u; ins_idx < tensor->lod()[0].size(); ++ins_idx) { + for (; fea_idx < tensor->lod()[0][ins_idx]; ++fea_idx) { + if (ids[fea_idx] == 0u) { + continue; + } + FeasignInfo info{slot_idx, ins_idx, label[ins_idx - 1]}; + + fea_info[global_index++] = std::move(info); + } + } + } + CHECK(global_index == feature.size()) << "expect fea info size:" << feature.size() + << " real:" << global_index; +} + +void AsyncExecutorThreadWorker::check_pull_push_memory( + std::vector& features, + std::vector>& push_g, + int dim) { + push_g.resize(features.size() + 1); + for (auto& t : push_g) { + t.resize(dim); + } +} + +void AsyncExecutorThreadWorker::check_pull_push_memory( + std::vector& features, + std::vector& push_g, + int dim) { + if (features.size() > push_g.size()) { + push_g.reserve(features.size() + 1); + auto size = features.size() - push_g.size() + 1; + for (auto i = 0u; i < size; ++i) { + float* ptr = new float[dim]; + push_g.push_back(ptr); + } + } +} + } // einit_modelnd namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/executor_thread_worker.h b/paddle/fluid/framework/executor_thread_worker.h index 13ec2442c46459116320236bf98f23c91340f389..63f383cd4799ddc2eb1afa63e67be47759f0f114 100644 --- a/paddle/fluid/framework/executor_thread_worker.h +++ b/paddle/fluid/framework/executor_thread_worker.h @@ -25,16 +25,107 @@ limitations under the License. */ #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" +#include "pslib.h" namespace paddle { namespace framework { + +const static uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100; + void CreateTensor(Variable* var, proto::VarType::Type var_type); +struct AsyncWorkerParamConfig { + int slot_dim; + int fea_dim; + int32_t tmp_push_dense_wait_times; + int32_t tmp_push_sparse_wait_times; + + std::vector slot_input_vec; //6048slot 6050slot //name + std::vector gradient_var; //6048slot_embed +}; + +struct DensePullThreadParam { + std::shared_ptr ps_client; + int threshold; + int training_thread_num; + Scope* root_scope; + std::map>* dense_params; + int sleep_time_ms = 2; +}; + +class DensePullThread { +public: + DensePullThread(DensePullThreadParam& param) : + _running(false) { + _ps_client = param.ps_client; + _threshold = param.threshold; + _thread_num = param.training_thread_num; + _root_scope = param.root_scope; + _sleep_time_ms = param.sleep_time_ms; + + for (auto& t : *param.dense_params) { + _dense_variable_name[t.first].insert( + _dense_variable_name[t.first].end(), + t.second.begin(), t.second.end()); + _training_versions[t.first].resize(_thread_num, 0); + _last_versions[t.first] = 0; + _current_version[t.first] = 0; + } + } + + int start(); + + void stop() { + if (_running) { + _running = false; + _t.join(); + } + } + + void increase_thread_version(int thread_id, uint64_t table_id); + void reset_thread_version(uint64_t table_id); + std::future pull_dense(uint64_t table_id); + void pull_dense2(uint64_t table_id); + void wait_all(); + +private: + void run(); + bool check_update_param(uint64_t table_id); + +private: + std::shared_ptr _ps_client; + int _thread_num; + int _threshold; + int _sleep_time_ms; + Scope* _root_scope; + bool _running; + + std::map _last_versions; + std::map _current_version; + std::mutex _mutex_for_version; + std::map> _training_versions; + std::map> _dense_variable_name; + + std::thread _t; + + std::vector<::std::future> _pull_dense_status; + + std::map> _regions; + uint32_t _pull_dense_fail_times = 0; + + std::vector _base_norm_param; + std::vector _mean; + std::vector _scale; + float _squared_sum_epsilon = 1e-4; + std::mutex _mutex_for_mean_scale; + + float _total_batch_num = 0; +}; class ExecutorThreadWorker { public: ExecutorThreadWorker() : thread_id_(-1), root_scope_(NULL), thread_scope_(NULL), debug_(false) {} - ~ExecutorThreadWorker() {} + virtual ~ExecutorThreadWorker() {} void CreateThreadResource(const framework::ProgramDesc& program, const paddle::platform::Place& place); @@ -51,10 +142,13 @@ class ExecutorThreadWorker { // set data feed declared in executor void SetDataFeed(const std::shared_ptr& datafeed); // A multi-thread training function - void TrainFiles(); + virtual void TrainFiles(); // set fetch variable names from python interface assigned by users void SetFetchVarNames(const std::vector& fetch_var_names); - + virtual void SetPSlibPtr(std::shared_ptr pslib_ptr); + virtual void SetPullDenseThread(std::shared_ptr dpt) {}; + virtual void BindingSlotVariableMemory() {}; + virtual void SetParamConfig(AsyncWorkerParamConfig* pc) {}; private: void CreateThreadScope(const framework::ProgramDesc& program); void CreateThreadOperators(const framework::ProgramDesc& program); @@ -77,12 +171,58 @@ class ExecutorThreadWorker { Scope* root_scope_; // a thread scope, father scope is global score which is shared Scope* thread_scope_; - - private: + //private: std::vector fetch_var_names_; std::vector> fetch_values_; bool debug_; }; +class AsyncExecutorThreadWorker: public ExecutorThreadWorker { +public: + AsyncExecutorThreadWorker(){}; + virtual ~AsyncExecutorThreadWorker() {} + void SetPSlibPtr(std::shared_ptr pslib_ptr); + void SetPullDenseThread(std::shared_ptr dpt); + void BindingSlotVariableMemory(); + void SetParamConfig(AsyncWorkerParamConfig* pc); + void TrainFiles(); + void TrainOneNetwork(); + void PrepareParams(); + void UpdateParams(); + void PullSparse(int table_id); + void FillSparse(int table_id); + void PushSparse(int table_id); + void PushDense(int table_id); + + void check_pull_push_memory(std::vector& features, std::vector& push_g, int dim); + void check_pull_push_memory(std::vector& features, std::vector>& push_g, int dim); + void collect_feasign_info(int table_id); +private: + struct FeasignInfo { + uint32_t slot; + uint32_t ins; + int64_t label; + }; + + std::map> _features; + std::map> _fea_info; + std::map>> _feature_value; + std::map>> _feature_push_value; + + std::unordered_map _slot_alias_to_table; //TODO + + std::shared_ptr _pslib_ptr; + + std::shared_ptr _pull_dense_thread; + + std::vector<::std::future> _pull_sparse_status; + std::vector<::std::future> _pull_dense_status; + std::vector<::std::future> _push_sparse_status; + std::vector<::std::future> _push_dense_status; + + AsyncWorkerParamConfig* _param_config; + +}; + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/pybind/async_executor_py.cc b/paddle/fluid/pybind/async_executor_py.cc index 470e8b050808295d49728bbdb757b6a612df9a01..63fd06224f06e93a697a517d3d1e1688532e0c45 100644 --- a/paddle/fluid/pybind/async_executor_py.cc +++ b/paddle/fluid/pybind/async_executor_py.cc @@ -47,7 +47,11 @@ void BindAsyncExecutor(py::module* m) { return std::unique_ptr( new framework::AsyncExecutor(scope, place)); })) - .def("run_from_files", &framework::AsyncExecutor::RunFromFile); + .def("run_from_files", &framework::AsyncExecutor::RunFromFile) + .def("config_pslib", &framework::AsyncExecutor::ConfigPslib) + .def("start_server", &framework::AsyncExecutor::StartServer) + .def("init_model", &framework::AsyncExecutor::InitModel) + .def("save_model", &framework::AsyncExecutor::SaveModel); } // end BindAsyncExecutor } // end namespace pybind } // end namespace paddle diff --git a/python/paddle/fluid/async_executor.py b/python/paddle/fluid/async_executor.py index 2664a7301db3bf471126ff26504e7042f02b7d84..2945e6e14363bfc16cf4128315657faa74ec0bb4 100644 --- a/python/paddle/fluid/async_executor.py +++ b/python/paddle/fluid/async_executor.py @@ -149,3 +149,16 @@ class AsyncExecutor(object): self.executor.run_from_files(program_desc, data_feed.desc(), filelist, thread_num, fetch_var_names, debug) + + def config_ps(self, dist_desc, host_sign_list, node_num, index): + self.executor.config_pslib(dist_desc, host_sign_list, node_num, index) + + def start_server(self): + self.executor.start_server() + + def init_model(self): + self.executor.init_model() + + def save_model(self, save_path): + self.executor.save_model(save_path) +