/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/fleet/fleet_wrapper.h" namespace paddle { namespace framework { const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100; std::shared_ptr FleetWrapper::s_instance_ = NULL; void FleetWrapper::InitServer(const std::string& dist_desc, int index) { #ifdef PADDLE_WITH_PSLIB if (!is_initialized_) { pslib_ptr_ = std::shared_ptr( new paddle::distributed::PSlib()); pslib_ptr_->init_server(dist_desc, index); is_initialized_ = true; } else { LOG(WARNING) << "Server can be initialized only once"; } #endif } void FleetWrapper::InitWorker(const std::string& dist_desc, const std::vector& host_sign_list, int node_num, int index) { #ifdef PADDLE_WITH_PSLIB if (!is_initialized_) { pslib_ptr_ = std::shared_ptr( new paddle::distributed::PSlib()); pslib_ptr_->init_worker(dist_desc, const_cast(host_sign_list.data()), node_num, index); is_initialized_ = true; } else { LOG(WARNING) << "Worker can be initialized only once"; } #endif } void FleetWrapper::StopServer() { #ifdef PADDLE_WITH_PSLIB pslib_ptr_->stop_server(); #endif } uint64_t FleetWrapper::RunServer() { #ifdef PADDLE_WITH_PSLIB return pslib_ptr_->run_server(); #else return 0; #endif } void FleetWrapper::GatherServers(const std::vector& host_sign_list, int node_num) { #ifdef PADDLE_WITH_PSLIB pslib_ptr_->gather_servers(const_cast(host_sign_list.data()), node_num); #endif } void FleetWrapper::PullSparseVarsSync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector* fea_keys, std::vector>* fea_values, int fea_value_dim) { #ifdef PADDLE_WITH_PSLIB std::vector<::std::future> pull_sparse_status; pull_sparse_status.resize(0); fea_keys->clear(); fea_keys->resize(0); fea_keys->reserve(MAX_FEASIGN_NUM); for (auto name : var_names) { Variable* var = scope.FindVar(name); LoDTensor* tensor = var->GetMutable(); int64_t* ids = tensor->data(); int len = tensor->numel(); for (auto i = 0u; i < len; ++i) { if (ids[i] == 0u) { continue; } fea_keys->push_back(static_cast(ids[i])); } fea_values->resize(fea_keys->size() + 1); for (auto& t : *fea_values) { t.resize(fea_value_dim); } std::vector pull_result_ptr; for (auto& t : *fea_values) { pull_result_ptr.push_back(t.data()); } auto status = pslib_ptr_->_worker_ptr->pull_sparse( pull_result_ptr.data(), table_id, fea_keys->data(), fea_keys->size()); pull_sparse_status.push_back(std::move(status)); } for (auto& t : pull_sparse_status) { t.wait(); auto status = t.get(); if (status != 0) { LOG(ERROR) << "fleet pull sparse failed, status[" << status << "]"; exit(-1); } } #endif } void FleetWrapper::PullDenseVarsAsync( const Scope& scope, const uint64_t tid, const std::vector& var_names, std::vector<::std::future>* pull_dense_status) { #ifdef PADDLE_WITH_PSLIB std::vector regions; regions.reserve(var_names.size()); for (auto& t : var_names) { Variable* var = scope.FindVar(t); LoDTensor* tensor = var->GetMutable(); float* w = tensor->data(); paddle::ps::Region reg(w, tensor->numel()); regions.emplace_back(std::move(reg)); } auto status = pslib_ptr_->_worker_ptr->pull_dense(regions.data(), regions.size(), tid); pull_dense_status->push_back(std::move(status)); #endif } void FleetWrapper::PullDenseVarsSync( const Scope& scope, const uint64_t tid, const std::vector& var_names) { #ifdef PADDLE_WITH_PSLIB std::vector regions; regions.reserve(var_names.size()); for (auto& t : var_names) { Variable* var = scope.FindVar(t); LoDTensor* tensor = var->GetMutable(); float* w = tensor->data(); paddle::ps::Region reg(w, tensor->numel()); regions.emplace_back(std::move(reg)); } auto status = pslib_ptr_->_worker_ptr->pull_dense(regions.data(), regions.size(), tid); status.wait(); #endif } void FleetWrapper::PushDenseVarsAsync( const Scope& scope, const uint64_t table_id, const std::vector& var_names, std::vector<::std::future>* push_sparse_status) { #ifdef PADDLE_WITH_PSLIB std::vector regions; for (auto& t : var_names) { Variable* var = scope.FindVar(t); LoDTensor* tensor = var->GetMutable(); int count = tensor->numel(); float* g = tensor->data(); paddle::ps::Region reg(g, count); regions.emplace_back(std::move(reg)); } auto status = pslib_ptr_->_worker_ptr->push_dense(regions.data(), regions.size(), table_id); push_sparse_status->push_back(std::move(status)); #endif } void FleetWrapper::PushSparseVarsWithLabelAsync( const Scope& scope, const uint64_t table_id, const std::vector& fea_keys, const std::vector& fea_labels, const std::vector& sparse_key_names, const std::vector& sparse_grad_names, const int emb_dim, std::vector>* push_values, std::vector<::std::future>* push_sparse_status) { #ifdef PADDLE_WITH_PSLIB int offset = 2; uint64_t fea_idx = 0u; for (size_t i = 0; i < sparse_key_names.size(); ++i) { Variable* g_var = scope.FindVar(sparse_key_names[i]); LoDTensor* g_tensor = g_var->GetMutable(); if (g_tensor == NULL) { LOG(ERROR) << "var[" << sparse_key_names[i] << "] not found"; exit(-1); } float* g = g_tensor->data(); Variable* var = scope.FindVar(sparse_key_names[i]); CHECK(var != nullptr) << "var[" << sparse_key_names[i] << "] not found"; LoDTensor* tensor = var->GetMutable(); if (tensor == NULL) { LOG(ERROR) << "var[" << sparse_key_names[i] << "] not found"; exit(-1); } int len = tensor->numel(); int64_t* ids = tensor->data(); for (auto id_idx = 0u; id_idx < len; ++id_idx) { if (ids[id_idx] == 0) { g += emb_dim; continue; } memcpy((*push_values)[fea_idx].data() + offset, g, sizeof(float) * emb_dim); (*push_values)[fea_idx][0] = 1.0f; (*push_values)[fea_idx][1] = static_cast(fea_labels[fea_idx]); g += emb_dim; fea_idx++; } } CHECK(fea_idx == fea_keys.size()) << "fea_idx: " << fea_idx << "features size: " << fea_keys.size(); std::vector push_g_vec; for (auto i = 0u; i < fea_keys.size(); ++i) { push_g_vec.push_back((*push_values)[i].data()); } auto status = pslib_ptr_->_worker_ptr->push_sparse( table_id, fea_keys.data(), (const float**)push_g_vec.data(), fea_keys.size()); push_sparse_status->push_back(std::move(status)); #endif } } // end namespace framework } // end namespace paddle