diff --git a/CMakeLists.txt b/CMakeLists.txt index f30671bd3a87e87732b3a047e91811452370e06e..2f16c390d8bc7fb02fd2e23d7ccf68a38ac406ac 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -353,11 +353,6 @@ if (WITH_MIPS) add_definitions(-DPADDLE_WITH_MIPS) endif() -if (WITH_HETERPS) - if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -faligned-new") - endif() -endif() set(PADDLE_PYTHON_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/python/build") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g -DNDEBUG") diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index a1440260bf2e77093bb937e62b13b54ad06a3e64..a9370561a540bea3416508b45d8cbf8cb997ed33 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -14,7 +14,6 @@ #include "paddle/fluid/distributed/service/brpc_ps_server.h" #include // NOLINT -#include "butil/object_pool.h" #include "paddle/fluid/distributed/table/depends/sparse_utils.h" #include "paddle/fluid/distributed/table/table.h" #include "paddle/fluid/framework/archive.h" @@ -197,13 +196,12 @@ int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request, return 0; } - auto res_data = butil::get_object>(); - res_data->resize(num * table->value_accesor()->select_size() / sizeof(float)); - table->pull_dense(res_data->data(), num); + std::vector res_data; + res_data.resize(num * table->value_accesor()->select_size() / sizeof(float)); + table->pull_dense(res_data.data(), num); - cntl->response_attachment().append((char *)(res_data->data()), - res_data->size() * sizeof(float)); - butil::return_object(res_data); + cntl->response_attachment().append((char *)res_data.data(), + res_data.size() * sizeof(float)); return 0; } @@ -369,13 +367,12 @@ int32_t BrpcPsService::pull_sparse(Table *table, value.DeserializeFromBytes(const_cast(data)); - auto res_data = butil::get_object>(); - res_data->resize(num * dim); - table->pull_sparse(res_data->data(), value); + std::vector res_data; + res_data.resize(num * dim); + table->pull_sparse(res_data.data(), value); - cntl->response_attachment().append((char *)(res_data->data()), - res_data->size() * sizeof(float)); - butil::return_object(res_data); + cntl->response_attachment().append((char *)res_data.data(), + res_data.size() * sizeof(float)); return 0; } diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index 718fce9950719fb99e9831bad9490610ec3834cf..1c315d34abcb6ef73d898da4f71e0659842e5588 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -125,37 +125,34 @@ void ProcessALine(const std::vector& columns, const Meta& meta, int64_t SaveToText(std::ostream* os, std::shared_ptr block, const int mode) { - int64_t save_num = 0; - for (auto& table : block->values_) { - for (auto& value : table) { - if (mode == SaveMode::delta && !value.second->need_save_) { - continue; - } - save_num += 1; - - auto* vs = value.second->data_.data(); - std::stringstream ss; - auto id = value.first; - ss << id << "\t" << value.second->count_ << "\t" - << value.second->unseen_days_ << "\t" << value.second->is_entry_ - << "\t"; - - for (int i = 0; i < block->value_length_; i++) { - ss << vs[i]; - ss << ","; - } + int64_t not_save_num = 0; + for (auto& value : block->values_) { + if (mode == SaveMode::delta && !value.second.need_save_) { + not_save_num++; + continue; + } - ss << "\n"; + auto* vs = value.second.data_; + std::stringstream ss; + auto id = value.first; + ss << id << "\t" << value.second.count_ << "\t" << value.second.unseen_days_ + << "\t" << value.second.is_entry_ << "\t"; - os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); + for (int i = 0; i < block->value_length_; i++) { + ss << vs[i]; + ss << ","; + } - if (mode == SaveMode::base || mode == SaveMode::delta) { - value.second->need_save_ = false; - } + ss << "\n"; + + os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); + + if (mode == SaveMode::base || mode == SaveMode::delta) { + value.second.need_save_ = false; } } - return save_num; + return block->values_.size() - not_save_num; } int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, @@ -186,7 +183,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, block->Init(id, false); - VALUE* value_instant = block->GetValue(id); + auto value_instant = block->GetValue(id); if (values.size() == 5) { value_instant->count_ = std::stoi(values[1]); value_instant->unseen_days_ = std::stoi(values[2]); @@ -376,10 +373,8 @@ std::pair CommonSparseTable::print_table_stat() { int64_t feasign_size = 0; int64_t mf_size = 0; - for (auto& shard : shard_values_) { - for (auto& table : shard->values_) { - feasign_size += table.size(); - } + for (auto& value : shard_values_) { + feasign_size += value->values_.size(); } return {feasign_size, mf_size}; diff --git a/paddle/fluid/distributed/table/depends/large_scale_kv.h b/paddle/fluid/distributed/table/depends/large_scale_kv.h index 5c10fca98cda4d6cbdcb430ab5f2b8016a6ff7f2..bb4174bd2c579699e0afbf896a17bcdd42d1ee36 100644 --- a/paddle/fluid/distributed/table/depends/large_scale_kv.h +++ b/paddle/fluid/distributed/table/depends/large_scale_kv.h @@ -26,7 +26,6 @@ #include #include "gflags/gflags.h" -#include "butil/object_pool.h" #include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/distributed/table/depends/initializers.h" #include "paddle/fluid/distributed/thirdparty/round_robin.h" @@ -49,10 +48,6 @@ namespace distributed { enum Mode { training, infer }; -static const int SPARSE_SHARD_BUCKET_NUM_BITS = 6; -static const size_t SPARSE_SHARD_BUCKET_NUM = (size_t)1 - << SPARSE_SHARD_BUCKET_NUM_BITS; - struct VALUE { explicit VALUE(size_t length) : length_(length), @@ -60,16 +55,46 @@ struct VALUE { unseen_days_(0), need_save_(false), is_entry_(false) { - data_.resize(length); - memset(data_.data(), 0, sizeof(float) * length); + data_ = new float[length]; + memset(data_, 0, sizeof(float) * length); + } + + VALUE(const VALUE &value) { + length_ = value.length_; + count_ = value.count_; + unseen_days_ = value.unseen_days_; + need_save_ = value.need_save_; + is_entry_ = value.is_entry_; + data_ = new float[length_]; + memcpy(data_, value.data_, sizeof(float) * length_); + } + + VALUE &operator=(const VALUE &value) { + if (this != &value) { + delete[] data_; + length_ = value.length_; + count_ = value.count_; + unseen_days_ = value.unseen_days_; + need_save_ = value.need_save_; + is_entry_ = value.is_entry_; + + data_ = new float[length_]; + memcpy(data_, value.data_, sizeof(float) * length_); + } + return *this; + } + + ~VALUE() { + delete[] data_; + data_ = nullptr; } size_t length_; - std::vector data_; int count_; int unseen_days_; // use to check knock-out bool need_save_; // whether need to save bool is_entry_; // whether knock-in + float *data_; }; inline bool count_entry(VALUE *value, int threshold) { @@ -151,12 +176,12 @@ class ValueBlock { const std::vector &value_dims) { auto pts = std::vector(); pts.reserve(value_names.size()); - auto values = GetValue(id); + auto &values = values_.at(id); for (int i = 0; i < static_cast(value_names.size()); i++) { PADDLE_ENFORCE_EQ( value_dims[i], value_dims_[i], platform::errors::InvalidArgument("value dims is not match")); - pts.push_back(values->data_.data() + + pts.push_back(values.data_ + value_offsets_.at(value_idx_.at(value_names[i]))); } return pts; @@ -165,45 +190,33 @@ class ValueBlock { // pull float *Init(const uint64_t &id, const bool with_update = true, const int counter = 1) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - - auto &table = values_[bucket]; - auto res = table.find(id); - - VALUE *value = nullptr; - if (res == table.end()) { - value = butil::get_object(value_length_); - - table[id] = value; - - } else { - value = res->second; + if (!Has(id)) { + values_.emplace(std::make_pair(id, VALUE(value_length_))); } + auto &value = values_.at(id); + if (with_update) { - AttrUpdate(value, counter); + AttrUpdate(&value, counter); } - return value->data_.data(); + + return value.data_; } + VALUE *InitGet(const uint64_t &id, const bool with_update = true, const int counter = 1) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); + if (!Has(id)) { + values_.emplace(std::make_pair(id, VALUE(value_length_))); + } - auto &table = values_[bucket]; - auto res = table.find(id); + auto &value = values_.at(id); - VALUE *value = nullptr; - if (res == table.end()) { - value = butil::get_object(value_length_); - // value = _alloc.acquire(value_length_); - table[id] = value; - } else { - value = (VALUE *)(void *)(res->second); + if (with_update) { + AttrUpdate(&value, counter); } - return value; + + return &value; } void AttrUpdate(VALUE *value, const int counter) { @@ -216,7 +229,7 @@ class ValueBlock { if (value->is_entry_) { // initialize for (size_t x = 0; x < value_names_.size(); ++x) { - initializers_[x]->GetValue(value->data_.data() + value_offsets_[x], + initializers_[x]->GetValue(value->data_ + value_offsets_[x], value_dims_[x]); } value->need_save_ = true; @@ -230,73 +243,42 @@ class ValueBlock { // dont jude if (has(id)) float *Get(const uint64_t &id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - // auto &value = table.at(id); - // return value->data_.data(); - auto res = table.find(id); - VALUE *value = res->second; - return value->data_.data(); + auto &value = values_.at(id); + return value.data_; } // for load, to reset count, unseen_days - VALUE *GetValue(const uint64_t &id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - - auto &table = values_[bucket]; - auto res = table.find(id); - return res->second; - } + VALUE *GetValue(const uint64_t &id) { return &values_.at(id); } bool GetEntry(const uint64_t &id) { - auto value = GetValue(id); - return value->is_entry_; + auto &value = values_.at(id); + return value.is_entry_; } void SetEntry(const uint64_t &id, const bool state) { - auto value = GetValue(id); - value->is_entry_ = state; + auto &value = values_.at(id); + value.is_entry_ = state; } void Shrink(const int threshold) { - for (auto &table : values_) { - for (auto iter = table.begin(); iter != table.end();) { - // VALUE* value = (VALUE*)(void*)(iter->second); - VALUE *value = iter->second; - value->unseen_days_++; - if (value->unseen_days_ >= threshold) { - butil::return_object(iter->second); - //_alloc.release(iter->second); - //_alloc.release(value); - iter = table.erase(iter); - } else { - ++iter; - } + for (auto iter = values_.begin(); iter != values_.end();) { + auto &value = iter->second; + value.unseen_days_++; + if (value.unseen_days_ >= threshold) { + iter = values_.erase(iter); + } else { + ++iter; } } return; } float GetThreshold() { return threshold_; } - size_t compute_bucket(size_t hash) { - if (SPARSE_SHARD_BUCKET_NUM == 1) { - return 0; - } else { - return hash >> (sizeof(size_t) * 8 - SPARSE_SHARD_BUCKET_NUM_BITS); - } - } private: bool Has(const uint64_t id) { - size_t hash = _hasher(id); - size_t bucket = compute_bucket(hash); - auto &table = values_[bucket]; - - auto got = table.find(id); - if (got == table.end()) { + auto got = values_.find(id); + if (got == values_.end()) { return false; } else { return true; @@ -304,9 +286,8 @@ class ValueBlock { } public: - robin_hood::unordered_map values_[SPARSE_SHARD_BUCKET_NUM]; + robin_hood::unordered_map values_; size_t value_length_ = 0; - std::hash _hasher; private: const std::vector &value_names_; @@ -321,3 +302,4 @@ class ValueBlock { } // namespace distributed } // namespace paddle + diff --git a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt index db562045dcc41fe73439e27bf320e290fa278f97..6df2cd52bb401d3cc378c2776073471070f1e411 100644 --- a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt @@ -1,10 +1,5 @@ IF(WITH_GPU) - SET(HETERPS_DEPS device_context) - if (${CMAKE_CUDA_COMPILER_VERSION} LESS 11.0) - SET(HETERPS_DEPS ${HETERPS_DEPS} cub) - endif() - - nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS ${HETERPS_DEPS}) + nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context) nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm) nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) ENDIF() diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index 24b83662c9dbf9be8832e34967256013e836349b..df07a7a6e778356803b985e37ade5de37621a4ab 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -77,13 +77,10 @@ class CommonAccessor: ("Moment2", None), ("Beta1Pow", 1), ("Beta2Pow", 1), ("LearningRate", 1)] opt_input_map["sum"] = [("Param", None)] - opt_input_map["naive_adagrad"] = [("Param", None), ("G2Sum", 1), - ("LearningRate", 1)] opt_attr_map = {} opt_attr_map["sgd"] = [] opt_attr_map["sum"] = [] - opt_attr_map["naive_adagrad"] = [] opt_attr_map["adam"] = [("beta1", "f"), ("beta2", "f"), ("epsilon", "f")] @@ -172,10 +169,6 @@ class CommonAccessor: param_varnames = self.opt_input_map["sum"] attr_varnames = self.opt_attr_map["sum"] self.accessor_class = "sum" - elif compiled_strategy.use_ps_gpu and is_sparse: - param_varnames = self.opt_input_map["naive_adagrad"] - attr_varnames = self.opt_attr_map["naive_adagrad"] - self.accessor_class = "sgd" else: param_varnames = self.opt_input_map[oop.type] attr_varnames = self.opt_attr_map[oop.type] @@ -183,28 +176,20 @@ class CommonAccessor: for (formal_name, shape) in param_varnames: params.append(formal_name) - if formal_name == "G2Sum": - dims.append(1) - initializer = "fill_constant&0" - initializers.append(initializer) - else: - param = main_program.global_block().vars[oop.input(formal_name)[ - 0]] - if formal_name == "LearningRate" and param.name != "learning_rate_0": - warnings.warn("will support decay soon") - param = main_program.global_block().vars["learning_rate_0"] - - if shape is None: - if is_sparse: - shape = total_dims - else: - shape = self.get_shard(total_dims, pserver_num, - pserver_id) - dims.append(shape) + param = main_program.global_block().vars[oop.input(formal_name)[0]] + if formal_name == "LearningRate" and param.name != "learning_rate_0": + warnings.warn("will support decay soon") + param = main_program.global_block().vars["learning_rate_0"] + + if shape is None: + if is_sparse: + shape = total_dims + else: + shape = self.get_shard(total_dims, pserver_num, pserver_id) + dims.append(shape) - initializer = self.get_initializer_attr(param.name, - startup_program) - initializers.append(initializer) + initializer = self.get_initializer_attr(param.name, startup_program) + initializers.append(initializer) for (attr_varname, type_) in attr_varnames: value = oop.attr(attr_varname) @@ -450,8 +435,6 @@ class TheOnePSRuntime(RuntimeBase): if not strategy: raise ValueError("k_steps must be invalid value, please check") - if dist_strategy.a_sync_configs["use_ps_gpu"]: - strategy.use_ps_gpu = True return strategy def build_compiled_startegy(self): @@ -460,8 +443,6 @@ class TheOnePSRuntime(RuntimeBase): compiled_config = CompileTimeStrategy( self.origin_main_program, self.origin_main_program, self.async_strategy, self.role_maker) - if self.async_strategy.use_ps_gpu: - compiled_config.use_ps_gpu = True return compiled_config def _init_worker(self): diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py index 2a9d26daaed90120c782ace98a09b2aaee1a1c68..35029a3dfc7e70575f66e49d845ec7b51b65f470 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py @@ -149,7 +149,6 @@ class DistributedStrategy(object): if num_threads > 1: self._build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.debug_opt = None - self.use_ps_gpu = False def set_debug_opt(self, opt_info): self.debug_opt = opt_info diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index b2735727f6755b635a6b55fb54c7f0a739ed79be..baf8add04caad086fd2de26e3dd7f3dd55d4feba 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -138,7 +138,6 @@ class CompileTimeStrategy(object): self.strategy = strategy self.role_maker = role_maker - self.use_ps_gpu = False try: self.is_heter_ps_mode = role_maker._is_heter_parameter_server_mode except: