From a288fcabdccb7d7da817060ebe0bd6bd0e2e8f9b Mon Sep 17 00:00:00 2001 From: zhaocaibei123 <48509226+zhaocaibei123@users.noreply.github.com> Date: Tue, 5 Apr 2022 21:38:21 +0800 Subject: [PATCH] Table refine: remove table/accessor unuseful (#41400) * update name * update name * fix test * fix fleet bind * update name * update name * fix test * fix gpups wrapper * remove Push/Pull/Load/Save with context in client and wrapper base class * fix * fix * remove some interface * fix * remove * code style * recover * fix * remove code unused * remove some unused table & accessor & CommonDenseTable => MemoryDenseTable * fix * fix * fix * recover * remove unused code Co-authored-by: esythan --- .../fluid/distributed/ps/table/CMakeLists.txt | 11 +- .../ps/table/common_sparse_table.cc | 605 ------------------ .../distributed/ps/table/ctr_accessor.cc | 5 +- .../distributed/ps/table/depends/dense.h | 4 +- .../distributed/ps/table/depends/sparse.h | 220 ------- .../ps/table/downpour_ctr_accessor.cc | 435 ------------- .../ps/table/downpour_ctr_accessor.h | 231 ------- ...n_dense_table.cc => memory_dense_table.cc} | 40 +- ...mon_dense_table.h => memory_dense_table.h} | 6 +- .../distributed/ps/table/sparse_geo_table.cc | 91 --- .../distributed/ps/table/sparse_geo_table.h | 68 -- .../distributed/ps/table/ssd_sparse_table.cc | 376 ----------- .../distributed/ps/table/ssd_sparse_table.h | 64 -- paddle/fluid/distributed/ps/table/table.cc | 15 +- .../test/brpc_service_dense_sgd_test.cc | 2 +- .../distributed/test/ctr_accessor_test.cc | 6 +- .../distributed/test/dense_table_test.cc | 18 +- .../fluid/distributed/test/geo_table_test.cc | 124 ---- .../distributed/test/large_scale_test.cc | 71 -- .../distributed/test/sparse_table_test.cc | 223 ------- paddle/fluid/distributed/test/table_test.cc | 8 +- paddle/fluid/operators/pscore/send_op.cc | 2 +- .../distributed/fleet/runtime/the_one_ps.py | 2 +- python/paddle/distributed/ps/the_one_ps.py | 2 +- 24 files changed, 55 insertions(+), 2574 deletions(-) delete mode 100644 paddle/fluid/distributed/ps/table/common_sparse_table.cc delete mode 100644 paddle/fluid/distributed/ps/table/depends/sparse.h delete mode 100644 paddle/fluid/distributed/ps/table/downpour_ctr_accessor.cc delete mode 100644 paddle/fluid/distributed/ps/table/downpour_ctr_accessor.h rename paddle/fluid/distributed/ps/table/{common_dense_table.cc => memory_dense_table.cc} (92%) rename paddle/fluid/distributed/ps/table/{common_dense_table.h => memory_dense_table.h} (96%) delete mode 100644 paddle/fluid/distributed/ps/table/sparse_geo_table.cc delete mode 100644 paddle/fluid/distributed/ps/table/sparse_geo_table.h delete mode 100644 paddle/fluid/distributed/ps/table/ssd_sparse_table.cc delete mode 100644 paddle/fluid/distributed/ps/table/ssd_sparse_table.h delete mode 100644 paddle/fluid/distributed/test/geo_table_test.cc delete mode 100644 paddle/fluid/distributed/test/large_scale_test.cc delete mode 100644 paddle/fluid/distributed/test/sparse_table_test.cc diff --git a/paddle/fluid/distributed/ps/table/CMakeLists.txt b/paddle/fluid/distributed/ps/table/CMakeLists.txt index 227d0a9f1c..aebe36b5e0 100644 --- a/paddle/fluid/distributed/ps/table/CMakeLists.txt +++ b/paddle/fluid/distributed/ps/table/CMakeLists.txt @@ -7,10 +7,7 @@ set_source_files_properties(${graphDir}/graph_weighted_sampler.cc PROPERTIES COM cc_library(WeightedSampler SRCS ${graphDir}/graph_weighted_sampler.cc DEPS graph_edge) set_source_files_properties(${graphDir}/graph_node.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_library(graph_node SRCS ${graphDir}/graph_node.cc DEPS WeightedSampler) -set_source_files_properties(common_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -set_source_files_properties(common_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -set_source_files_properties(ssd_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -set_source_files_properties(sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +set_source_files_properties(memory_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(barrier_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(common_graph_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) @@ -23,10 +20,10 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp") set(EXTERN_DEP "") if(WITH_HETERPS) - set(TABLE_SRC common_sparse_table.cc ssd_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc common_graph_table.cc) + set(TABLE_SRC memory_dense_table.cc barrier_table.cc common_graph_table.cc) set(EXTERN_DEP rocksdb) else() - set(TABLE_SRC common_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc common_graph_table.cc) + set(TABLE_SRC memory_dense_table.cc barrier_table.cc common_graph_table.cc) endif() cc_library(common_table SRCS ${TABLE_SRC} DEPS ${TABLE_DEPS} @@ -43,12 +40,10 @@ set_source_files_properties(sparse_sgd_rule.cc PROPERTIES COMPILE_FLAGS ${DISTRI set_source_files_properties(ctr_double_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(ctr_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(sparse_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -set_source_files_properties(downpour_ctr_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(memory_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_library(sparse_sgd_rule SRCS sparse_sgd_rule.cc DEPS ${TABLE_DEPS} ps_framework_proto) cc_library(ctr_double_accessor SRCS ctr_double_accessor.cc DEPS ${TABLE_DEPS} ps_framework_proto sparse_sgd_rule) cc_library(ctr_accessor SRCS ctr_accessor.cc sparse_accessor.cc DEPS ${TABLE_DEPS} ps_framework_proto sparse_sgd_rule) -cc_library(downpour_ctr_accessor SRCS downpour_ctr_accessor.cc DEPS ${TABLE_DEPS} ps_framework_proto sparse_sgd_rule) cc_library(memory_sparse_table SRCS memory_sparse_table.cc DEPS ps_framework_proto ${TABLE_DEPS} fs afs_wrapper ctr_accessor common_table) set_source_files_properties(memory_sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) diff --git a/paddle/fluid/distributed/ps/table/common_sparse_table.cc b/paddle/fluid/distributed/ps/table/common_sparse_table.cc deleted file mode 100644 index 6b3d3a6ea1..0000000000 --- a/paddle/fluid/distributed/ps/table/common_sparse_table.cc +++ /dev/null @@ -1,605 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include - -#include "glog/logging.h" -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace distributed { -class ValueBlock; -} // namespace distributed -} // namespace paddle - -namespace paddle { -namespace distributed { - -void CommonSparseTable::ProcessALine(const std::vector& columns, - const Meta& meta, const int64_t id, - std::vector>* values) { - auto colunmn_size = columns.size(); - auto load_values = - paddle::string::split_string(columns[colunmn_size - 1], ","); - values->reserve(meta.names.size()); - - int offset = 0; - for (int x = 0; x < meta.names.size(); ++x) { - std::vector val; - auto start = load_values.begin() + offset; - auto end = load_values.begin() + offset + meta.dims[x]; - PADDLE_ENFORCE_LE(offset + meta.dims[x], load_values.size(), - paddle::platform::errors::InvalidArgument( - "The data format in txt does not meet the field " - "requirements defined in meta")); - - std::transform(start, end, std::back_inserter(val), [id](std::string va) { - float v = 0.0; - - try { - v = std::stof(va); - } catch (std::invalid_argument& e) { - VLOG(0) << "id: " << id << " get unexpected value: " << va - << " and be reset to: 0.0"; - } catch (std::out_of_range& e) { - VLOG(0) << "id: " << id << " get unexpected value: " << va - << " and be reset to: 0.0"; - } - return v; - }); - - values->push_back(val); - offset += meta.dims[x]; - } -} - -void CommonSparseTable::SaveMetaToText(std::ostream* os, - const CommonAccessorParameter& common, - const size_t shard_idx, - const int64_t total) { - // save meta - std::stringstream stream; - stream << "param=" << common.table_name() << "\n"; - stream << "shard_id=" << shard_idx << "\n"; - stream << "row_names=" << paddle::string::join_strings(common.params(), ',') - << "\n"; - stream << "row_dims=" << paddle::string::join_strings(common.dims(), ',') - << "\n"; - stream << "count=" << total << "\n"; - os->write(stream.str().c_str(), sizeof(char) * stream.str().size()); -} - -int64_t CommonSparseTable::SaveValueToText(std::ostream* os, - std::shared_ptr block, - std::shared_ptr<::ThreadPool> pool, - const int mode, int shard_id) { - int64_t save_num = 0; - for (auto& table : block->values_) { - for (auto& value : table) { - if (mode == SaveMode::delta && !value.second->need_save_) { - continue; - } - - ++save_num; - - std::stringstream ss; - auto* vs = value.second->data_.data(); - - auto id = value.first; - - ss << id << "\t" << value.second->count_ << "\t" - << value.second->unseen_days_ << "\t" << value.second->is_entry_ - << "\t"; - - for (int i = 0; i < block->value_length_ - 1; i++) { - ss << std::to_string(vs[i]) << ","; - } - - ss << std::to_string(vs[block->value_length_ - 1]); - ss << "\n"; - - os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); - - if (mode == SaveMode::base || mode == SaveMode::delta) { - value.second->need_save_ = false; - } - } - } - - return save_num; -} - -int64_t CommonSparseTable::LoadFromText( - const std::string& valuepath, const std::string& metapath, - const int pserver_id, const int pserver_num, const int local_shard_num, - std::vector>* blocks) { - Meta meta = Meta(metapath); - - int num_lines = 0; - std::ifstream file(valuepath); - std::string line; - - while (std::getline(file, line)) { - auto values = paddle::string::split_string(line, "\t"); - auto id = std::stoull(values[0]); - - if (id % pserver_num != pserver_id) { - VLOG(3) << "will not load " << values[0] << " from " << valuepath - << ", please check id distribution"; - continue; - } - - auto shard_id = id % local_shard_num; - auto block = blocks->at(shard_id); - - std::vector> kvalues; - ProcessALine(values, meta, id, &kvalues); - - block->Init(id, false); - - VALUE* value_instant = block->GetValue(id); - - if (values.size() == 5) { - value_instant->count_ = std::stoi(values[1]); - value_instant->unseen_days_ = std::stoi(values[2]); - value_instant->is_entry_ = static_cast(std::stoi(values[3])); - } - - std::vector block_values = block->Get(id, meta.names, meta.dims); - auto blas = GetBlas(); - for (int x = 0; x < meta.names.size(); ++x) { - blas.VCOPY(meta.dims[x], kvalues[x].data(), block_values[x]); - } - } - - return 0; -} - -int32_t CommonSparseTable::Initialize() { - _shards_task_pool.resize(task_pool_size_); - for (int i = 0; i < _shards_task_pool.size(); ++i) { - _shards_task_pool[i].reset(new ::ThreadPool(1)); - } - - sync = _config.common().sync(); - VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; - - _global_lr = new float(1.0); - - auto common = _config.common(); - int size = static_cast(common.params().size()); - - size_t offset = 0; - for (int x = 0; x < size; ++x) { - auto& varname = common.params()[x]; - auto& dim = common.dims()[x]; - - value_idx_[varname] = x; - value_names_.push_back(varname); - value_dims_.push_back(dim); - value_offsets_.push_back(offset); - initializer_attrs_.push_back(common.initializers()[x]); - - if (varname == "Param") { - param_dim_ = dim; - param_offset_ = offset; - } - - offset += dim; - } - - InitializeValue(); - InitializeOptimizer(); - InitializeRecorder(); - return 0; -} - -int32_t CommonSparseTable::InitializeRecorder() { return 0; } - -int32_t CommonSparseTable::InitializeValue() { - auto common = _config.common(); - shard_values_.reserve(task_pool_size_); - - for (int x = 0; x < task_pool_size_; ++x) { - auto shard = std::make_shared( - value_names_, value_dims_, value_offsets_, value_idx_, - initializer_attrs_, common.entry()); - - shard_values_.emplace_back(shard); - } - - return 0; -} - -int32_t CommonSparseTable::InitializeOptimizer() { - auto common = _config.common(); - auto name = common.name(); - - if (name == "sgd") { - optimizer_ = std::make_shared(value_names_, value_dims_, - value_offsets_, value_idx_); - optimizer_->SetGlobalLR(_global_lr); - } else if (name == "adam") { - optimizer_ = std::make_shared(value_names_, value_dims_, - value_offsets_, value_idx_); - optimizer_->SetGlobalLR(_global_lr); - } else if (name == "sum") { - optimizer_ = std::make_shared(value_names_, value_dims_, - value_offsets_, value_idx_); - } else { - VLOG(3) << "init optimizer failed"; - } - - VLOG(3) << "init optimizer " << name << " done"; - return 0; -} - -int32_t CommonSparseTable::SetGlobalLR(float* lr) { - _global_lr = lr; - optimizer_->SetGlobalLR(_global_lr); - return 0; -} - -int32_t CommonSparseTable::Load(const std::string& dirname, - const std::string& param) { - auto begin = GetCurrentUS(); - rwlock_->WRLock(); - auto varname = _config.common().table_name(); - std::string var_store = - string::Sprintf("%s/%s%s", dirname, varname, PSERVER_SAVE_SUFFIX); - std::string shard_var_pre = - string::Sprintf("%s.block%d", varname, _shard_idx); - std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre); - std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre); - - LoadFromText(value_, meta_, _shard_idx, _shard_num, task_pool_size_, - &shard_values_); - rwlock_->UNLock(); - auto end = GetCurrentUS(); - - VLOG(0) << "load " << varname << " with value: " << value_ - << " , meta: " << meta_ - << " using: " << std::to_string((end - begin) / 1e+6) << " seconds"; - - return 0; -} - -int32_t CommonSparseTable::Save(const std::string& dirname, - const std::string& param) { - auto begin = GetCurrentUS(); - rwlock_->WRLock(); - int mode = std::stoi(param); - VLOG(3) << "sparse table save: " << dirname << " mode: " << mode; - - auto varname = _config.common().table_name(); - std::string var_store = - string::Sprintf("%s/%s%s", dirname, varname, PSERVER_SAVE_SUFFIX); - MkDirRecursively(var_store.c_str()); - - VLOG(3) << "save " << varname << " in dir: " << var_store << " begin"; - std::vector params(_config.common().params().begin(), - _config.common().params().end()); - - std::string shard_var_pre = - string::Sprintf("%s.block%d", varname, _shard_idx); - - std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre); - - std::unique_ptr vs(new std::ofstream(value_)); - - int64_t total_ins = 0; - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { - // save values - auto shard_save_num = - SaveValueToText(vs.get(), shard_values_[shard_id], - _shards_task_pool[shard_id], mode, shard_id); - total_ins += shard_save_num; - } - vs->close(); - - std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre); - std::unique_ptr ms(new std::ofstream(meta_)); - SaveMetaToText(ms.get(), _config.common(), _shard_idx, total_ins); - ms->close(); - - auto end = GetCurrentUS(); - rwlock_->UNLock(); - VLOG(0) << "save " << varname << " with path: " << value_ - << " using: " << std::to_string((end - begin) / 1e+6) << " seconds"; - - return 0; -} - -std::pair CommonSparseTable::PrintTableStat() { - int64_t feasign_size = 0; - int64_t mf_size = 0; - - for (auto& shard : shard_values_) { - for (auto& table : shard->values_) { - feasign_size += table.size(); - } - } - - return {feasign_size, mf_size}; -} - -int32_t CommonSparseTable::Pour() { - std::vector values; - std::vector keys; - - keys.reserve(pull_reservoir_.size()); - values.reserve(pull_reservoir_.size() * param_dim_); - - for (auto& val : pull_reservoir_) { - keys.push_back(val.first); - auto& reservoir = val.second; - reservoir.avg(); - std::copy(reservoir.values.begin(), reservoir.values.end(), - std::back_inserter(values)); - } - _PushSparse(keys.data(), values.data(), pull_reservoir_.size()); - - pull_reservoir_.clear(); - return 0; -} - -int32_t CommonSparseTable::Pull(TableContext& context) { - CHECK(context.value_type == Sparse); - if (context.use_ptr) { - char** pull_values = context.pull_context.ptr_values; - const uint64_t* keys = context.pull_context.keys; - return PullSparsePtr(pull_values, keys, context.num); - } else { - float* pull_values = context.pull_context.values; - const PullSparseValue& pull_value = context.pull_context.pull_value; - return PullSparse(pull_values, pull_value); - } -} - -int32_t CommonSparseTable::Push(TableContext& context) { - CHECK(context.value_type == Sparse); - if (context.push_context.values != nullptr) { - const float* values = context.push_context.values; - const uint64_t* keys = context.push_context.keys; - return PushSparse(keys, values, context.num); - } else { - const float** values = context.push_context.ptr_values; - const uint64_t* keys = context.push_context.keys; - return PushSparse(keys, values, context.num); - } -} - -int32_t CommonSparseTable::PullSparse(float* pull_values, - const PullSparseValue& pull_value) { - auto shard_num = task_pool_size_; - std::vector> tasks(shard_num); - - for (int shard_id = 0; shard_id < shard_num; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, shard_num, &pull_value, &pull_values]() -> int { - auto& block = shard_values_[shard_id]; - - std::vector offsets; - pull_value.Fission(shard_id, shard_num, &offsets); - - if (pull_value.is_training_) { - for (auto& offset : offsets) { - auto feasign = pull_value.feasigns_[offset]; - auto frequencie = pull_value.frequencies_[offset]; - auto* value = block->Init(feasign, true, frequencie); - std::copy_n(value + param_offset_, param_dim_, - pull_values + param_dim_ * offset); - } - } else { - for (auto& offset : offsets) { - auto feasign = pull_value.feasigns_[offset]; - auto* value = block->Init(feasign, false); - std::copy_n(value + param_offset_, param_dim_, - pull_values + param_dim_ * offset); - } - } - - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t CommonSparseTable::PullSparsePtr(char** pull_values, - const uint64_t* keys, size_t num) { - std::vector> offset_bucket; - offset_bucket.resize(task_pool_size_); - - for (int x = 0; x < num; ++x) { - auto y = keys[x] % task_pool_size_; - offset_bucket[y].push_back(x); - } - - std::vector> tasks(task_pool_size_); - - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, &keys, &offset_bucket, &pull_values]() -> int { - auto& block = shard_values_[shard_id]; - auto& offsets = offset_bucket[shard_id]; - - for (int i = 0; i < offsets.size(); ++i) { - auto offset = offsets[i]; - auto id = keys[offset]; - auto* value = block->InitGet(id); - // std::copy_n(value + param_offset_, param_dim_, - // pull_values + param_dim_ * offset); - pull_values[offset] = reinterpret_cast(value); - } - - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t CommonSparseTable::_PushSparse(const uint64_t* keys, - const float* values, size_t num) { - std::vector> offset_bucket; - offset_bucket.resize(task_pool_size_); - - for (int x = 0; x < num; ++x) { - auto y = keys[x] % task_pool_size_; - offset_bucket[y].push_back(x); - } - - std::vector> tasks(task_pool_size_); - - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, &keys, &values, num, &offset_bucket]() -> int { - auto& offsets = offset_bucket[shard_id]; - optimizer_->Update(keys, values, num, offsets, - shard_values_[shard_id].get()); - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t CommonSparseTable::PushSparse(const uint64_t* keys, const float* values, - size_t num) { - if (sync) { - std::future task = - _shards_task_pool[0]->enqueue([this, &keys, &values, num]() -> int { - for (int x = 0; x < num; ++x) { - auto id = keys[x]; - auto has = pull_reservoir_.find(id); - - if (has == pull_reservoir_.end()) { - pull_reservoir_[id] = ReservoirValue(param_dim_); - } - - auto& reservoir = pull_reservoir_[id]; - reservoir.add(values + x * param_dim_, param_dim_); - } - return 0; - }); - task.wait(); - } else { - _PushSparse(keys, values, num); - } - - return 0; -} - -int32_t CommonSparseTable::PushSparse(const uint64_t* keys, - const float** values, size_t num) { - _PushSparse(keys, values, num); - return 0; -} - -int32_t CommonSparseTable::_PushSparse(const uint64_t* keys, - const float** values, size_t num) { - std::vector> offset_bucket; - offset_bucket.resize(task_pool_size_); - - for (int x = 0; x < num; ++x) { - auto y = keys[x] % task_pool_size_; - offset_bucket[y].push_back(x); - } - - std::vector> tasks(task_pool_size_); - - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, &keys, &values, num, &offset_bucket]() -> int { - auto& offsets = offset_bucket[shard_id]; - for (size_t i = 0; i < offsets.size(); ++i) { - std::vector tmp_off = {0}; - optimizer_->Update(keys + offsets[i], values[offsets[i]], num, - tmp_off, shard_values_[shard_id].get()); - } - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t CommonSparseTable::PushSparseParam(const uint64_t* keys, - const float* values, size_t num) { - std::vector> offset_bucket; - offset_bucket.resize(task_pool_size_); - - for (int x = 0; x < num; ++x) { - auto y = keys[x] % task_pool_size_; - offset_bucket[y].push_back(x); - } - - std::vector> tasks(task_pool_size_); - - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, &keys, &offset_bucket, &values]() -> int { - auto& block = shard_values_[shard_id]; - auto& offsets = offset_bucket[shard_id]; - - for (int i = 0; i < offsets.size(); ++i) { - auto offset = offsets[i]; - auto id = keys[offset]; - auto* value = block->Init(id, false); - std::copy_n(values + param_dim_ * offset, param_dim_, - value + param_offset_); - block->SetEntry(id, true); - } - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t CommonSparseTable::Flush() { return 0; } - -int32_t CommonSparseTable::Shrink(const std::string& param) { - int threshold = std::stoi(param); - VLOG(3) << "sparse table Shrink: " << threshold; - - for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { - // Shrink - VLOG(4) << shard_id << " " << task_pool_size_ << " begin Shrink"; - shard_values_[shard_id]->Shrink(threshold); - } - return 0; -} - -void CommonSparseTable::Clear() { VLOG(0) << "clear coming soon"; } - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/ctr_accessor.cc b/paddle/fluid/distributed/ps/table/ctr_accessor.cc index 2eda47ccaa..4446c8297c 100644 --- a/paddle/fluid/distributed/ps/table/ctr_accessor.cc +++ b/paddle/fluid/distributed/ps/table/ctr_accessor.cc @@ -232,14 +232,15 @@ int32_t CtrCommonAccessor::Update(float** update_values, (push_show - push_click) * _config.ctr_accessor_param().nonclk_coeff() + push_click * _config.ctr_accessor_param().click_coeff(); update_value[common_feature_value.UnseenDaysIndex()] = 0; + // TODO(zhaocaibei123): add configure show_scale _embed_sgd_rule->UpdateValue( update_value + common_feature_value.EmbedWIndex(), update_value + common_feature_value.EmbedG2SumIndex(), - push_value + CtrCommonPushValue::EmbedGIndex()); + push_value + CtrCommonPushValue::EmbedGIndex(), push_show); _embedx_sgd_rule->UpdateValue( update_value + common_feature_value.EmbedxWIndex(), update_value + common_feature_value.EmbedxG2SumIndex(), - push_value + CtrCommonPushValue::EmbedxGIndex()); + push_value + CtrCommonPushValue::EmbedxGIndex(), push_show); } return 0; } diff --git a/paddle/fluid/distributed/ps/table/depends/dense.h b/paddle/fluid/distributed/ps/table/depends/dense.h index 258c0f4b6a..aea757e8d5 100644 --- a/paddle/fluid/distributed/ps/table/depends/dense.h +++ b/paddle/fluid/distributed/ps/table/depends/dense.h @@ -99,7 +99,7 @@ class DSGD : public DenseOptimizer { }; // adam optimizer for dense tensor -// TODO(zhaocaibei123): add CHECK(common_dense_table.task_pool_size_) == 1 +// TODO(zhaocaibei123): add CHECK(memory_dense_table.task_pool_size_) == 1 class DAdam : public DenseOptimizer { public: explicit DAdam(const CommonAccessorParameter& accessor, @@ -132,7 +132,7 @@ class DAdam : public DenseOptimizer { epsilon = 1.0e-8; } - // make sure common_dense_table.task_pool_size_ == 1; + // make sure memory_dense_table.task_pool_size_ == 1; // otherwise, task_pool_size_ times beta1_pow/beta2_pow multiplication void Update(const float* update_values, size_t num, int begin, int end) override { diff --git a/paddle/fluid/distributed/ps/table/depends/sparse.h b/paddle/fluid/distributed/ps/table/depends/sparse.h deleted file mode 100644 index 7eed5ab6c7..0000000000 --- a/paddle/fluid/distributed/ps/table/depends/sparse.h +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include // for sqrt in CPU and CUDA -#include -#include -#include -#include -#include -#include -#include "gflags/gflags.h" - -#include "paddle/fluid/distributed/common/utils.h" -#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h" - -namespace paddle { -namespace distributed { - -class SparseOptimizer { - public: - explicit SparseOptimizer( - const std::vector& value_names, - const std::vector& value_dims, const std::vector& value_offsets, - const std::unordered_map& value_idx) - : value_names_(value_names), - value_dims_(value_dims), - value_offsets_(value_offsets), - value_idx_(value_idx) {} - - virtual void Update(const uint64_t* keys, const float* update_values, - size_t num, const std::vector& offsets, - ValueBlock* block) = 0; - - virtual void SetGlobalLR(float* lr) { global_learning_rate_ = lr; } - - const std::vector& value_names_; - const std::vector& value_dims_; - const std::vector& value_offsets_; - const std::unordered_map& value_idx_; - int param_offset = 0; - int update_numel = 0; - - protected: - float* global_learning_rate_; -}; - -// sum calc for sparse tensor -class SSUM : public SparseOptimizer { - public: - explicit SSUM(const std::vector& value_names, - const std::vector& value_dims, - const std::vector& value_offsets, - const std::unordered_map& value_idx) - : SparseOptimizer(value_names, value_dims, value_offsets, value_idx) { - auto idx = value_idx.at("Param"); - param_offset = value_offsets.at(idx); - update_numel = value_dims.at(idx); - } - - void Update(const uint64_t* keys, const float* update_values, size_t num, - const std::vector& offsets, - ValueBlock* block) override { - auto blas = GetBlas(); - for (auto x : offsets) { - auto id = keys[x]; - if (!block->GetEntry(id)) continue; - auto* value = block->Get(id); - float* param = value + param_offset; - blas.VADD(update_numel, update_values + x * update_numel, param, param); - } - } -}; - -// sgd optimzer for sparse tensor -class SSGD : public SparseOptimizer { - public: - explicit SSGD(const std::vector& value_names, - const std::vector& value_dims, - const std::vector& value_offsets, - const std::unordered_map& value_idx) - : SparseOptimizer(value_names, value_dims, value_offsets, value_idx) { - auto idx = value_idx.at("Param"); - param_offset = value_offsets.at(idx); - update_numel = value_dims.at(idx); - - idx = value_idx.at("LearningRate"); - lr_offset = value_offsets.at(idx); - } - - void Update(const uint64_t* keys, const float* update_values, size_t num, - const std::vector& offsets, - ValueBlock* block) override { - auto blas = GetBlas(); - for (auto x : offsets) { - auto id = keys[x]; - if (!block->GetEntry(id)) continue; - auto* value = block->Get(id); - - float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0]; - float* param = value + param_offset; - - std::vector grads; - grads.resize(update_numel); - blas.VCOPY(update_numel, update_values + x * update_numel, grads.data()); - blas.SCAL(update_numel, learning_rate, grads.data()); - blas.VSUB(update_numel, param, grads.data(), param); - } - } - - int lr_offset; -}; - -// adam optimzer for sparse tensor -class SAdam : public SparseOptimizer { - public: - explicit SAdam(const std::vector& value_names, - const std::vector& value_dims, - const std::vector& value_offsets, - const std::unordered_map& value_idx) - : SparseOptimizer(value_names, value_dims, value_offsets, value_idx) { - auto idx = value_idx.at("Param"); - param_offset = value_offsets.at(idx); - update_numel = value_dims.at(idx); - - idx = value_idx.at("LearningRate"); - lr_offset = value_offsets.at(idx); - - idx = value_idx.at("Moment1"); - m1_offset = value_offsets.at(idx); - - idx = value_idx.at("Moment2"); - m2_offset = value_offsets.at(idx); - - idx = value_idx.at("Beta1Pow"); - beta1_pow_offset = value_offsets.at(idx); - - idx = value_idx.at("Beta2Pow"); - beta2_pow_offset = value_offsets.at(idx); - - // add attr later - beta1 = 0.9; - beta2 = 0.999; - epsilon = 1.0e-8; - } - - void Update(const uint64_t* keys, const float* update_values, size_t num, - const std::vector& offsets, - ValueBlock* block) override { - auto blas = GetBlas(); - for (auto x : offsets) { - auto id = keys[x]; - if (!block->GetEntry(id)) continue; - auto* values = block->Get(id); - float lr_ = *(global_learning_rate_) * (values + lr_offset)[0]; - float* param = values + param_offset; - float* moment1 = values + m1_offset; - float* moment2 = values + m2_offset; - float* beta1_pow = values + beta1_pow_offset; - float* beta2_pow = values + beta2_pow_offset; - - beta1_pow[0] = beta1_pow[0] * beta1; - beta2_pow[0] = beta2_pow[0] * beta2; - - lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); - - std::vector grad, grad2, tmp; - grad.resize(update_numel); - grad2.resize(update_numel); - tmp.resize(update_numel); - - blas.VCOPY(update_numel, update_values + x * update_numel, grad.data()); - blas.VCOPY(update_numel, update_values + x * update_numel, grad2.data()); - - blas.SCAL(update_numel, 1 - beta1, grad.data()); - blas.VSQUARE(update_numel, grad2.data(), grad2.data()); - blas.SCAL(update_numel, 1 - beta2, grad2.data()); - - blas.SCAL(update_numel, beta1, moment1); - blas.VADD(update_numel, moment1, grad.data(), moment1); - blas.SCAL(update_numel, beta2, moment2); - blas.VADD(update_numel, moment2, grad2.data(), moment2); - - float* tmp_ = tmp.data(); - float eps_ = epsilon * sqrt(1 - beta2_pow[0]); - - SQRT(update_numel, moment2, tmp_); - ADD(update_numel, tmp_, eps_, tmp_); - - blas.VDIV(update_numel, moment1, tmp_, tmp_); - blas.SCAL(update_numel, lr_, tmp_); - blas.VSUB(update_numel, param, tmp_, param); - } - } - - int lr_offset; - int m1_offset; - int m2_offset; - int beta1_pow_offset; - int beta2_pow_offset; - - float beta1; - float beta2; - float epsilon; -}; - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/downpour_ctr_accessor.cc b/paddle/fluid/distributed/ps/table/downpour_ctr_accessor.cc deleted file mode 100644 index bad75d2de1..0000000000 --- a/paddle/fluid/distributed/ps/table/downpour_ctr_accessor.cc +++ /dev/null @@ -1,435 +0,0 @@ -// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/distributed/ps/table/downpour_ctr_accessor.h" -#include -#include "glog/logging.h" -#include "paddle/fluid/string/string_helper.h" - -namespace paddle { -namespace distributed { - -int DownpourCtrAccessor::Initialize() { - auto name = _config.embed_sgd_param().name(); - _embed_sgd_rule = CREATE_PSCORE_CLASS(SparseValueSGDRule, name); - _embed_sgd_rule->LoadConfig(_config.embed_sgd_param(), 1); - - name = _config.embedx_sgd_param().name(); - _embedx_sgd_rule = CREATE_PSCORE_CLASS(SparseValueSGDRule, name); - _embedx_sgd_rule->LoadConfig(_config.embedx_sgd_param(), - _config.embedx_dim()); - - _show_click_decay_rate = _config.ctr_accessor_param().show_click_decay_rate(); - _ssd_unseenday_threshold = - _config.ctr_accessor_param().ssd_unseenday_threshold(); - set_time_decay_rates(); - InitAccessorInfo(); - return 0; -} - -void DownpourCtrAccessor::InitAccessorInfo() { - auto embedx_dim = _config.embedx_dim(); - _accessor_info.dim = DownpourCtrFeatureValue::Dim(embedx_dim); - _accessor_info.size = DownpourCtrFeatureValue::Size(embedx_dim); - _accessor_info.select_dim = 3 + embedx_dim; - _accessor_info.select_size = _accessor_info.select_dim * sizeof(float); - _accessor_info.update_dim = 4 + embedx_dim; - _accessor_info.update_size = _accessor_info.update_dim * sizeof(float); - _accessor_info.mf_size = (embedx_dim + 1) * sizeof(float); -} - -bool DownpourCtrAccessor::Shrink(float* value) { - // auto base_threshold = _config.ctr_accessor_param().base_threshold(); - // auto delta_threshold = _config.ctr_accessor_param().delta_threshold(); - // auto delete_threshold = _config.ctr_accessor_param().delete_threshold(); - auto base_threshold = _config.ctr_accessor_param().base_threshold(); - auto delta_threshold = _config.ctr_accessor_param().delta_threshold(); - auto delete_after_unseen_days = - _config.ctr_accessor_param().delete_after_unseen_days(); - auto delete_threshold = _config.ctr_accessor_param().delete_threshold(); - - // time_decay first - auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); - int16_t day_diff = _day_id - unseen_days; - if (day_diff < 0 || day_diff > delete_after_unseen_days) { - return true; - } - auto show_right = - DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff]; - auto click_right = - DownpourCtrFeatureValue::Click(value) * _time_decay_rates[day_diff]; - - // shrink after - auto score = ShowClickScore(show_right, click_right); - if (score < delete_threshold) { - return true; - } - return false; -} - -void DownpourCtrAccessor::set_day_id(int day_id) { _day_id = day_id; } - -int DownpourCtrAccessor::get_day_id() { return _day_id; } - -bool DownpourCtrAccessor::save_ssd(float* value) { - if (_day_id == 0) { - return true; - } - auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); - if (unseen_days == 0) { - return false; - } - // for the origin load (eg. unseen_days = 0-15) - if (unseen_days < _config.ctr_accessor_param().delta_keep_days()) { - unseen_days = _day_id - unseen_days; - } - int16_t day_diff = _day_id - unseen_days; - if (day_diff > _ssd_unseenday_threshold) { - return true; - } - return false; -} - -// bool DownpourCtrAccessor::save_cache( -// float* value, int param, double global_cache_threshold) { -// auto base_threshold = _config.ctr_accessor_param().base_threshold(); -// auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days(); -// auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); -// int16_t day_diff = _day_id - unseen_days; -// if (ShowClickScore(DownpourCtrFeatureValue::Show(value), -// DownpourCtrFeatureValue::Click(value)) >= base_threshold -// && day_diff <= delta_keep_days) { -// return DownpourCtrFeatureValue::Show(value) > global_cache_threshold; -// } -// return false; -// } - -bool DownpourCtrAccessor::Save(float* value, int param) { - // auto base_threshold = _config.ctr_accessor_param().base_threshold(); - // auto delta_threshold = _config.ctr_accessor_param().delta_threshold(); - // auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days(); - auto base_threshold = _config.ctr_accessor_param().base_threshold(); - auto delta_threshold = _config.ctr_accessor_param().delta_threshold(); - auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days(); - if (param == 2) { - delta_threshold = 0; - } - switch (param) { - // save all - case 0: { - return true; - } - // save xbox delta - case 1: - // save xbox base - case 2: { - auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); - int16_t day_diff = _day_id - unseen_days; - - auto show_right = - DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff]; - auto click_right = - DownpourCtrFeatureValue::Click(value) * _time_decay_rates[day_diff]; - - if (ShowClickScore(show_right, click_right) >= base_threshold && - DownpourCtrFeatureValue::DeltaScore(value) >= delta_threshold && - day_diff <= delta_keep_days) { - // do this after save, because it must not be modified when retry - if (param == 2) { - DownpourCtrFeatureValue::DeltaScore(value) = 0; - } - return true; - } else { - return false; - } - } - // already decayed in shrink - case 3: { - // DownpourCtrFeatureValue::Show(value) *= _show_click_decay_rate; - // DownpourCtrFeatureValue::Click(value) *= _show_click_decay_rate; - // do this after save, because it must not be modified when retry - // DownpourCtrFeatureValue::UnseenDays(value)++; - return true; - } - default: - return true; - }; -} - -void DownpourCtrAccessor::UpdateStatAfterSave(float* value, int param) { - auto base_threshold = _config.ctr_accessor_param().base_threshold(); - auto delta_threshold = _config.ctr_accessor_param().delta_threshold(); - auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days(); - if (param == 2) { - delta_threshold = 0; - } - switch (param) { - case 1: { - auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); - int16_t day_diff = _day_id - unseen_days; - auto show_right = - DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff]; - auto click_right = - DownpourCtrFeatureValue::Click(value) * _time_decay_rates[day_diff]; - - if (ShowClickScore(show_right, click_right) >= base_threshold && - DownpourCtrFeatureValue::DeltaScore(value) >= delta_threshold && - day_diff <= delta_keep_days) { - DownpourCtrFeatureValue::DeltaScore(value) = 0; - } - } - return; - // case 3: - // { - // DownpourCtrFeatureValue::UnseenDays(value)++; - // } - // return; - default: - return; - }; -} - -int32_t DownpourCtrAccessor::Create(float** values, size_t num) { - auto embedx_dim = _config.embedx_dim(); - for (size_t value_item = 0; value_item < num; ++value_item) { - float* value = values[value_item]; - value[DownpourCtrFeatureValue::UnseenDaysIndex()] = 0; - value[DownpourCtrFeatureValue::DeltaScoreIndex()] = 0; - value[DownpourCtrFeatureValue::ShowIndex()] = 0; - value[DownpourCtrFeatureValue::ClickIndex()] = 0; - value[DownpourCtrFeatureValue::SlotIndex()] = -1; - _embed_sgd_rule->InitValue( - value + DownpourCtrFeatureValue::EmbedWIndex(), - value + DownpourCtrFeatureValue::EmbedG2SumIndex(), true); - _embedx_sgd_rule->InitValue( - value + DownpourCtrFeatureValue::EmbedxWIndex(), - value + DownpourCtrFeatureValue::EmbedxG2SumIndex()); - } - return 0; -} - -bool DownpourCtrAccessor::NeedExtendMF(float* value) { - float show = value[DownpourCtrFeatureValue::ShowIndex()]; - float click = value[DownpourCtrFeatureValue::ClickIndex()]; - // float score = (show - click) * _config.ctr_accessor_param().nonclk_coeff() - float score = (show - click) * _config.ctr_accessor_param().nonclk_coeff() + - click * _config.ctr_accessor_param().click_coeff(); - //+ click * _config.ctr_accessor_param().click_coeff(); - return score >= _config.embedx_threshold(); -} - -bool DownpourCtrAccessor::HasMF(size_t size) { - return size > DownpourCtrFeatureValue::EmbedxG2SumIndex(); -} - -// from DownpourCtrFeatureValue to DownpourCtrPullValue -int32_t DownpourCtrAccessor::Select(float** select_values, const float** values, - size_t num) { - auto embedx_dim = _config.embedx_dim(); - for (size_t value_item = 0; value_item < num; ++value_item) { - float* select_value = select_values[value_item]; - float* value = const_cast(values[value_item]); - select_value[DownpourCtrPullValue::ShowIndex()] = - value[DownpourCtrFeatureValue::ShowIndex()]; - select_value[DownpourCtrPullValue::ClickIndex()] = - value[DownpourCtrFeatureValue::ClickIndex()]; - select_value[DownpourCtrPullValue::EmbedWIndex()] = - value[DownpourCtrFeatureValue::EmbedWIndex()]; - memcpy(select_value + DownpourCtrPullValue::EmbedxWIndex(), - value + DownpourCtrFeatureValue::EmbedxWIndex(), - embedx_dim * sizeof(float)); - } - return 0; -} - -// from DownpourCtrPushValue to DownpourCtrPushValue -// first dim: item -// second dim: field num -int32_t DownpourCtrAccessor::Merge(float** update_values, - const float** other_update_values, - size_t num) { - auto embedx_dim = _config.embedx_dim(); - size_t total_dim = DownpourCtrPushValue::Dim(embedx_dim); - for (size_t value_item = 0; value_item < num; ++value_item) { - float* update_value = update_values[value_item]; - const float* other_update_value = other_update_values[value_item]; - for (auto i = 0u; i < total_dim; ++i) { - if (i != DownpourCtrPushValue::SlotIndex()) { - update_value[i] += other_update_value[i]; - } - } - } - return 0; -} - -// from DownpourCtrPushValue to DownpourCtrFeatureValue -// first dim: item -// second dim: field num -int32_t DownpourCtrAccessor::Update(float** update_values, - const float** push_values, size_t num) { - auto embedx_dim = _config.embedx_dim(); - for (size_t value_item = 0; value_item < num; ++value_item) { - float* update_value = update_values[value_item]; - const float* push_value = push_values[value_item]; - float push_show = push_value[DownpourCtrPushValue::ShowIndex()]; - float push_click = push_value[DownpourCtrPushValue::ClickIndex()]; - float slot = push_value[DownpourCtrPushValue::SlotIndex()]; - update_value[DownpourCtrFeatureValue::ShowIndex()] += push_show; - update_value[DownpourCtrFeatureValue::ClickIndex()] += push_click; - update_value[DownpourCtrFeatureValue::SlotIndex()] = slot; - update_value[DownpourCtrFeatureValue::DeltaScoreIndex()] += - (push_show - push_click) * _config.ctr_accessor_param().nonclk_coeff() + - push_click * _config.ctr_accessor_param().click_coeff(); - //(push_show - push_click) * _config.ctr_accessor_param().nonclk_coeff() + - // push_click * _config.ctr_accessor_param().click_coeff(); - update_value[DownpourCtrFeatureValue::UnseenDaysIndex()] = 0; - _embed_sgd_rule->UpdateValue( - update_value + DownpourCtrFeatureValue::EmbedWIndex(), - update_value + DownpourCtrFeatureValue::EmbedG2SumIndex(), - push_value + DownpourCtrPushValue::EmbedGIndex(), push_show); - _embedx_sgd_rule->UpdateValue( - update_value + DownpourCtrFeatureValue::EmbedxWIndex(), - update_value + DownpourCtrFeatureValue::EmbedxG2SumIndex(), - push_value + DownpourCtrPushValue::EmbedxGIndex(), push_show); - } - return 0; -} - -bool DownpourCtrAccessor::CreateValue(int stage, const float* value) { - // stage == 0, pull - // stage == 1, push - if (stage == 0) { - return true; - } else if (stage == 1) { - auto show = DownpourCtrPushValue::Show(const_cast(value)); - auto click = DownpourCtrPushValue::Click(const_cast(value)); - auto score = ShowClickScore(show, click); - if (score <= 0) { - return false; - } - if (score >= 1) { - return true; - } - return local_uniform_real_distribution()(local_random_engine()) < - score; - } else { - return true; - } -} - -float DownpourCtrAccessor::ShowClickScore(float show, float click) { - // auto nonclk_coeff = _config.ctr_accessor_param().nonclk_coeff(); - // auto click_coeff = _config.ctr_accessor_param().click_coeff(); - auto nonclk_coeff = _config.ctr_accessor_param().nonclk_coeff(); - auto click_coeff = _config.ctr_accessor_param().click_coeff(); - return (show - click) * nonclk_coeff + click * click_coeff; -} - -std::string DownpourCtrAccessor::ParseToString(const float* v, int param_size) { - thread_local std::ostringstream os; - os.clear(); - os.str(""); - os << v[0] << " " << v[1] << " " << v[2] << " " << v[3] << " " << v[4] << " " - << v[5] << " " << v[6]; - auto show = DownpourCtrFeatureValue::Show(const_cast(v)); - auto click = DownpourCtrFeatureValue::Click(const_cast(v)); - auto score = ShowClickScore(show, click); - if (score >= _config.embedx_threshold() && param_size > 7) { - os << " " << v[7]; - for (auto i = 0; i < _config.embedx_dim(); ++i) { - os << " " << v[8 + i]; - } - } - return os.str(); -} - -int DownpourCtrAccessor::ParseFromString(const std::string& str, float* value) { - int embedx_dim = _config.embedx_dim(); - float data_buff[_accessor_info.dim]; - float* data_buff_ptr = data_buff; - - _embedx_sgd_rule->InitValue( - data_buff_ptr + DownpourCtrFeatureValue::EmbedxWIndex(), - data_buff_ptr + DownpourCtrFeatureValue::EmbedxG2SumIndex()); - - auto str_len = paddle::string::str_to_float(str.data(), data_buff_ptr); - CHECK(str_len >= 6) << "expect more than 6 real:" << str_len; - // no slot, embedx - int value_dim = _accessor_info.dim; - int embedx_g2sum_index = DownpourCtrFeatureValue::EmbedxG2SumIndex(); - value[DownpourCtrFeatureValue::SlotIndex()] = -1; - // other case - if (str_len == (value_dim - 1)) { - memcpy(value, data_buff_ptr, (embedx_g2sum_index - 1) * sizeof(float)); - memcpy(value + embedx_g2sum_index, data_buff_ptr + embedx_g2sum_index - 1, - (embedx_dim + 1) * sizeof(float)); - } else { - memcpy(value, data_buff_ptr, str_len * sizeof(float)); - } - if (str_len == (value_dim - 1) || str_len == 6) { - str_len += 1; - } - return str_len; -} - -void DownpourCtrAccessor::set_time_decay_rates() { - //根据unseen_days的天数来初始化_time_decay_rates大小和对应的衰减率 - auto delete_after_unseen_days = - _config.ctr_accessor_param().delete_after_unseen_days(); - _time_decay_rates.assign(delete_after_unseen_days + 1, 0.0); - for (int i = 0; i <= delete_after_unseen_days; ++i) { - _time_decay_rates[i] = pow(_show_click_decay_rate, i); - } -} - -void DownpourCtrAccessor::update_time_decay(float* value, - bool is_update_seen_day) { - // 根据day_id 来进行show click 衰减和unseen_day 更新;unseen_day - // 为上次出现的dayid - if (_day_id == 0) { - return; - } - auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); - if (unseen_days == 0) { - DownpourCtrFeatureValue::UnseenDays(value) = _day_id; - return; - } - // for the origin load (unseenday = 0 -15) - if (unseen_days < _config.ctr_accessor_param().delete_after_unseen_days()) { - // pull - if (is_update_seen_day) { - DownpourCtrFeatureValue::UnseenDays(value) = _day_id; - return; - // save 舍弃原始的unseenday,都变为上一天出现,保证show/click不被重复decay - } else { - DownpourCtrFeatureValue::UnseenDays(value) = _day_id - 1; - } - } - int16_t day_diff = _day_id - unseen_days; - if (day_diff < 0) { - DownpourCtrFeatureValue::UnseenDays(value) = _day_id; - return; - } - if (day_diff >= _config.ctr_accessor_param().delete_after_unseen_days()) { - return; - } - DownpourCtrFeatureValue::Show(value) *= _time_decay_rates[day_diff]; - DownpourCtrFeatureValue::Click(value) *= _time_decay_rates[day_diff]; - if (is_update_seen_day) { - DownpourCtrFeatureValue::UnseenDays(value) = _day_id; - } -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/downpour_ctr_accessor.h b/paddle/fluid/distributed/ps/table/downpour_ctr_accessor.h deleted file mode 100644 index 785acaf8ea..0000000000 --- a/paddle/fluid/distributed/ps/table/downpour_ctr_accessor.h +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include -#include -#include -#include "paddle/fluid/distributed/common/registerer.h" -#include "paddle/fluid/distributed/ps.pb.h" -#include "paddle/fluid/distributed/ps/table/accessor.h" -#include "paddle/fluid/distributed/ps/table/sparse_sgd_rule.h" - -namespace paddle { -namespace distributed { - -/** - * @brief Accessor for unit - **/ -class DownpourCtrAccessor : public ValueAccessor { - public: - struct DownpourCtrFeatureValue { - /* - float unseen_days; - float delta_score; - float show; - float click; - float embed_w; - float embed_g2sum; - float slot; - float embedx_g2sum; - std::vector embedx_w; - */ - - static int Dim(int embedx_dim) { return 8 + embedx_dim; } - static int DimSize(size_t dim, int embedx_dim) { return sizeof(float); } - static int Size(int embedx_dim) { return Dim(embedx_dim) * sizeof(float); } - static int UnseenDaysIndex() { return 0; } - static int DeltaScoreIndex() { - return DownpourCtrFeatureValue::UnseenDaysIndex() + 1; - } - static int ShowIndex() { - return DownpourCtrFeatureValue::DeltaScoreIndex() + 1; - } - static int ClickIndex() { return DownpourCtrFeatureValue::ShowIndex() + 1; } - static int EmbedWIndex() { - return DownpourCtrFeatureValue::ClickIndex() + 1; - } - static int EmbedG2SumIndex() { - return DownpourCtrFeatureValue::EmbedWIndex() + 1; - } - static int SlotIndex() { - return DownpourCtrFeatureValue::EmbedG2SumIndex() + 1; - } - static int EmbedxG2SumIndex() { - return DownpourCtrFeatureValue::SlotIndex() + 1; - } - static int EmbedxWIndex() { - return DownpourCtrFeatureValue::EmbedxG2SumIndex() + 1; - } - static float& UnseenDays(float* val) { - return val[DownpourCtrFeatureValue::UnseenDaysIndex()]; - } - static float& DeltaScore(float* val) { - return val[DownpourCtrFeatureValue::DeltaScoreIndex()]; - } - static float& Show(float* val) { - return val[DownpourCtrFeatureValue::ShowIndex()]; - } - static float& Click(float* val) { - return val[DownpourCtrFeatureValue::ClickIndex()]; - } - static float& Slot(float* val) { - return val[DownpourCtrFeatureValue::SlotIndex()]; - } - static float& EmbedW(float* val) { - return val[DownpourCtrFeatureValue::EmbedWIndex()]; - } - static float& EmbedG2Sum(float* val) { - return val[DownpourCtrFeatureValue::EmbedG2SumIndex()]; - } - static float& EmbedxG2Sum(float* val) { - return val[DownpourCtrFeatureValue::EmbedxG2SumIndex()]; - } - static float* EmbedxW(float* val) { - return (val + DownpourCtrFeatureValue::EmbedxWIndex()); - } - }; - - struct DownpourCtrPushValue { - /* - float slot; - float show; - float click; - float embed_g; - std::vector embedx_g; - */ - - static int Dim(int embedx_dim) { return 4 + embedx_dim; } - - static int DimSize(int dim, int embedx_dim) { return sizeof(float); } - static int Size(int embedx_dim) { return Dim(embedx_dim) * sizeof(float); } - static int SlotIndex() { return 0; } - static int ShowIndex() { return DownpourCtrPushValue::SlotIndex() + 1; } - static int ClickIndex() { return DownpourCtrPushValue::ShowIndex() + 1; } - static int EmbedGIndex() { return DownpourCtrPushValue::ClickIndex() + 1; } - static int EmbedxGIndex() { - return DownpourCtrPushValue::EmbedGIndex() + 1; - } - static float& Slot(float* val) { return val[0]; } - static float& Show(float* val) { return val[1]; } - static float& Click(float* val) { return val[2]; } - static float& EmbedG(float* val) { return val[3]; } - static float* EmbedxG(float* val) { return val + 4; } - }; - - struct DownpourCtrPullValue { - /* - float show; - float click; - float embed_w; - std::vector embedx_w; - */ - - static int Dim(int embedx_dim) { return 3 + embedx_dim; } - static int DimSize(size_t dim) { return sizeof(float); } - static int Size(int embedx_dim) { return Dim(embedx_dim) * sizeof(float); } - static int ShowIndex() { return 0; } - static int ClickIndex() { return 1; } - static int EmbedWIndex() { return 2; } - static int EmbedxWIndex() { return 3; } - static float& Show(float* val) { - return val[DownpourCtrPullValue::ShowIndex()]; - } - static float& Click(float* val) { - return val[DownpourCtrPullValue::ClickIndex()]; - } - static float& EmbedW(float* val) { - return val[DownpourCtrPullValue::EmbedWIndex()]; - } - static float* EmbedxW(float* val) { - return val + DownpourCtrPullValue::EmbedxWIndex(); - } - }; - DownpourCtrAccessor() {} - virtual ~DownpourCtrAccessor() {} - - virtual int Initialize(); - // 初始化AccessorInfo - virtual void InitAccessorInfo(); - // 判断该value是否进行shrink - virtual bool Shrink(float* value); - // 判断该value是否保存到ssd - virtual bool save_ssd(float* value); - virtual bool NeedExtendMF(float* value); - virtual bool HasMF(size_t size); - // 判断该value是否在save阶段dump, - // param作为参数用于标识save阶段,如downpour的xbox与batch_model - // param = 0, save all feature - // param = 1, save delta feature - // param = 3, save all feature with time decay - virtual bool Save(float* value, int param) override; - // update delta_score and unseen_days after save - virtual void UpdateStatAfterSave(float* value, int param) override; - // virtual bool save_cache(float* value, int param, double - // global_cache_threshold) override; - // keys不存在时,为values生成随机值 - // 要求value的内存由外部调用者分配完毕 - virtual int32_t Create(float** value, size_t num); - // 从values中选取到select_values中 - virtual int32_t Select(float** select_values, const float** values, - size_t num); - // 将update_values聚合到一起 - virtual int32_t Merge(float** update_values, - const float** other_update_values, size_t num); - // 将update_values聚合到一起,通过it.next判定是否进入下一个key - // virtual int32_t Merge(float** update_values, iterator it); - // 将update_values更新应用到values中 - virtual int32_t Update(float** values, const float** update_values, - size_t num); - - virtual std::string ParseToString(const float* value, int param) override; - virtual int32_t ParseFromString(const std::string& str, float* v) override; - virtual bool CreateValue(int type, const float* value); - - //这个接口目前只用来取show - virtual float GetField(float* value, const std::string& name) override { - CHECK(name == "show"); - if (name == "show") { - auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value); - int16_t day_diff = _day_id - unseen_days; - auto show_right = - DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff]; - return (float)show_right; - } - return 0.0; - } - // DEFINE_GET_INDEX(DownpourCtrFeatureValue, show) - // DEFINE_GET_INDEX(DownpourCtrFeatureValue, click) - // DEFINE_GET_INDEX(DownpourCtrFeatureValue, embed_w) - // DEFINE_GET_INDEX(DownpourCtrFeatureValue, embedx_w) - - virtual void update_time_decay(float* value, bool is_update_seen_day); - virtual void set_day_id(int day_id); - virtual int get_day_id(); - bool test_func() { return false; } - - private: - float ShowClickScore(float show, float click); - void set_time_decay_rates(); - - private: - SparseValueSGDRule* _embed_sgd_rule; - SparseValueSGDRule* _embedx_sgd_rule; - float _show_click_decay_rate; - int32_t _ssd_unseenday_threshold; - std::vector _time_decay_rates; - int _day_id; -}; -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/common_dense_table.cc b/paddle/fluid/distributed/ps/table/memory_dense_table.cc similarity index 92% rename from paddle/fluid/distributed/ps/table/common_dense_table.cc rename to paddle/fluid/distributed/ps/table/memory_dense_table.cc index 45208670f9..58ec8503c8 100644 --- a/paddle/fluid/distributed/ps/table/common_dense_table.cc +++ b/paddle/fluid/distributed/ps/table/memory_dense_table.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/distributed/ps/table/common_dense_table.h" +#include "paddle/fluid/distributed/ps/table/memory_dense_table.h" #include "paddle/fluid/platform/enforce.h" @@ -21,7 +21,7 @@ namespace distributed { int FLAGS_pslib_table_save_max_retry_dense = 3; -void CommonDenseTable::CreateInitializer(const std::string& attr, +void MemoryDenseTable::CreateInitializer(const std::string& attr, const std::string& name) { auto slices = string::split_string(attr, "&"); @@ -39,7 +39,7 @@ void CommonDenseTable::CreateInitializer(const std::string& attr, } } -int32_t CommonDenseTable::Initialize() { +int32_t MemoryDenseTable::Initialize() { _shards_task_pool.resize(task_pool_size_); for (int i = 0; i < _shards_task_pool.size(); ++i) { _shards_task_pool[i].reset(new ::ThreadPool(1)); @@ -54,7 +54,7 @@ int32_t CommonDenseTable::Initialize() { return 0; } -int32_t CommonDenseTable::InitializeValue() { +int32_t MemoryDenseTable::InitializeValue() { auto common = _config.common(); int size = static_cast(common.params().size()); values_.resize(size); @@ -92,14 +92,14 @@ int32_t CommonDenseTable::InitializeValue() { param_col_ids_.insert(param_col_ids_.begin() + 1, -1); } - VLOG(1) << "CommonDenseTable::InitializeValue total dim: " << total_dim_ + VLOG(1) << "MemoryDenseTable::InitializeValue total dim: " << total_dim_ << " fixed_len_params_dim: " << fixed_len_params_dim_; pull_reservoir_ = ReservoirValue(param_dim_); return 0; } -int32_t CommonDenseTable::InitializeOptimizer() { +int32_t MemoryDenseTable::InitializeOptimizer() { auto common = _config.common(); auto name = common.name(); auto attrs = common.attributes(); @@ -124,19 +124,19 @@ int32_t CommonDenseTable::InitializeOptimizer() { return 0; } -int32_t CommonDenseTable::SetGlobalLR(float* lr) { +int32_t MemoryDenseTable::SetGlobalLR(float* lr) { _global_lr = lr; optimizer_->SetGlobalLR(_global_lr); return 0; } -int32_t CommonDenseTable::Pull(TableContext& context) { +int32_t MemoryDenseTable::Pull(TableContext& context) { CHECK(context.value_type == Dense); float* pull_values = context.pull_context.values; return PullDense(pull_values, context.num); } -int32_t CommonDenseTable::Push(TableContext& context) { +int32_t MemoryDenseTable::Push(TableContext& context) { CHECK(context.value_type == Dense); if (context.push_context.values != nullptr) { if (!context.push_context.is_param) { @@ -148,13 +148,13 @@ int32_t CommonDenseTable::Push(TableContext& context) { return 0; } -int32_t CommonDenseTable::PullDense(float* pull_values, size_t num) { +int32_t MemoryDenseTable::PullDense(float* pull_values, size_t num) { std::copy(values_[param_idx_].begin(), values_[param_idx_].end(), pull_values); return 0; } -int32_t CommonDenseTable::PushDenseParam(const float* values, size_t num) { +int32_t MemoryDenseTable::PushDenseParam(const float* values, size_t num) { PADDLE_ENFORCE_GE( num, param_dim_, paddle::platform::errors::InvalidArgument( @@ -163,14 +163,14 @@ int32_t CommonDenseTable::PushDenseParam(const float* values, size_t num) { return 0; } -int32_t CommonDenseTable::Pour() { +int32_t MemoryDenseTable::Pour() { pull_reservoir_.avg(); _PushDense(pull_reservoir_.values.data(), pull_reservoir_.values.size()); pull_reservoir_.reset(); return 0; } -int32_t CommonDenseTable::PushDense(const float* values, size_t num) { +int32_t MemoryDenseTable::PushDense(const float* values, size_t num) { if (sync) { std::future task = _shards_task_pool[0]->enqueue([this, &values]() -> int { @@ -184,7 +184,7 @@ int32_t CommonDenseTable::PushDense(const float* values, size_t num) { return 0; } -int32_t CommonDenseTable::_PushDense(const float* values, size_t num) { +int32_t MemoryDenseTable::_PushDense(const float* values, size_t num) { PADDLE_ENFORCE_GE( num, param_dim_, paddle::platform::errors::InvalidArgument( @@ -206,11 +206,11 @@ int32_t CommonDenseTable::_PushDense(const float* values, size_t num) { for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { tasks[shard_id].wait(); } - VLOG(2) << "debug CommonDenseTable::_push_dense done"; + VLOG(2) << "debug MemoryDenseTable::_push_dense done"; return 0; } -int32_t CommonDenseTable::Load(const std::string& path, +int32_t MemoryDenseTable::Load(const std::string& path, const std::string& param) { if (param_dim_ <= 0) { return 0; @@ -281,7 +281,7 @@ int32_t CommonDenseTable::Load(const std::string& path, continue; } values_[param_col_ids_[col_idx]][dim_idx] = data_buffer[col_idx]; - VLOG(2) << "CommonDenseTable::load param x: " + VLOG(2) << "MemoryDenseTable::load param x: " << param_col_ids_[col_idx] << " y: " << dim_idx << " value: " << values_[param_col_ids_[col_idx]][dim_idx] << " line " << file_dim_idx; @@ -318,11 +318,11 @@ int32_t CommonDenseTable::Load(const std::string& path, return 0; } -int32_t CommonDenseTable::Save(const std::string& path, +int32_t MemoryDenseTable::Save(const std::string& path, const std::string& param) { int save_param = atoi(param.c_str()); uint32_t feasign_size; - VLOG(0) << "CommonDenseTable::save path " << path; + VLOG(0) << "MemoryDenseTable::save path " << path; FsChannelConfig channel_config; if (_config.compress_in_save()) { @@ -356,7 +356,7 @@ int32_t CommonDenseTable::Save(const std::string& path, for (int x = 0; x < size; ++x) { auto& varname = common.params()[x]; auto& dim = common.dims()[x]; - VLOG(3) << "CommonDenseTable::save dim " << x << " size: " << dim; + VLOG(3) << "MemoryDenseTable::save dim " << x << " size: " << dim; for (int y = 0; y < dim; ++y) { os.clear(); os.str(""); diff --git a/paddle/fluid/distributed/ps/table/common_dense_table.h b/paddle/fluid/distributed/ps/table/memory_dense_table.h similarity index 96% rename from paddle/fluid/distributed/ps/table/common_dense_table.h rename to paddle/fluid/distributed/ps/table/memory_dense_table.h index acda009d02..73653fbc2e 100644 --- a/paddle/fluid/distributed/ps/table/common_dense_table.h +++ b/paddle/fluid/distributed/ps/table/memory_dense_table.h @@ -30,10 +30,10 @@ namespace distributed { class DenseOptimizer; -class CommonDenseTable : public Table { +class MemoryDenseTable : public Table { public: - CommonDenseTable() {} - virtual ~CommonDenseTable() {} + MemoryDenseTable() {} + virtual ~MemoryDenseTable() {} int32_t Initialize() override; int32_t InitializeShard() override { return 0; } void CreateInitializer(const std::string& attr, const std::string& name); diff --git a/paddle/fluid/distributed/ps/table/sparse_geo_table.cc b/paddle/fluid/distributed/ps/table/sparse_geo_table.cc deleted file mode 100644 index de9628a5b5..0000000000 --- a/paddle/fluid/distributed/ps/table/sparse_geo_table.cc +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h" - -namespace paddle { -namespace distributed { - -int32_t SparseGeoTable::PullGeoParam(const uint32_t trainer_id, - std::vector* values, - std::vector* ids) { - geo_recorder->GetAndClear(trainer_id, ids); - auto dim = _config.common().dims()[0]; - - std::vector frequencies; - frequencies.resize(ids->size(), 1); - - auto pull_value = PullSparseValue(ids->size(), dim); - pull_value.is_training_ = true; - pull_value.feasigns_ = ids->data(); - pull_value.frequencies_ = frequencies.data(); - - values->resize(ids->size() * dim); - CommonSparseTable::PullSparse(values->data(), pull_value); - return 0; -} - -int32_t SparseGeoTable::PushSparse(const uint64_t* keys, const float* values, - size_t num) { - std::vector ids; - ids.resize(num); - std::copy_n(keys, num, ids.begin()); - geo_recorder->Update(ids); - CommonSparseTable::PushSparse(keys, values, num); - return 0; -} - -int32_t SparseGeoTable::InitializeValue() { - auto common = _config.common(); - shard_values_.reserve(task_pool_size_); - - for (int x = 0; x < task_pool_size_; ++x) { - auto shard = std::make_shared( - value_names_, value_dims_, value_offsets_, value_idx_, - initializer_attrs_, common.entry()); - - shard_values_.emplace_back(shard); - } - - auto accessor = _config.accessor(); - std::vector feasigns; - - for (size_t x = 0; x < accessor.fea_dim(); ++x) { - if (x % _shard_num == _shard_idx) { - feasigns.push_back(x); - } - } - - VLOG(3) << "has " << feasigns.size() << " ids need to be pre inited"; - - auto buckets = bucket(feasigns.size(), 10); - for (int x = 0; x < 10; ++x) { - auto bucket_feasigns = buckets[x + 1] - buckets[x]; - std::vector ids(bucket_feasigns); - std::copy(feasigns.begin() + buckets[x], feasigns.begin() + buckets[x + 1], - ids.begin()); - - std::vector fres; - fres.resize(ids.size(), 1); - - auto pull_value = PullSparseValue(ids, fres, param_dim_); - std::vector pulls; - pulls.resize(bucket_feasigns * param_dim_); - PullSparse(pulls.data(), pull_value); - } - return 0; -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/sparse_geo_table.h b/paddle/fluid/distributed/ps/table/sparse_geo_table.h deleted file mode 100644 index 261338c2ba..0000000000 --- a/paddle/fluid/distributed/ps/table/sparse_geo_table.h +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include // NOLINT -#include -#include -#include - -#include "Eigen/Dense" -#include "paddle/fluid/distributed/ps/table/accessor.h" -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/common_table.h" -#include "paddle/fluid/distributed/ps/table/depends/geo_recorder.h" -#include "paddle/fluid/distributed/ps/table/depends/initializers.h" -#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h" -#include "paddle/fluid/distributed/ps/table/depends/sparse.h" -#include "paddle/fluid/string/string_helper.h" -#include "paddle/phi/core/utils/rw_lock.h" - -namespace paddle { -namespace distributed { - -class GeoRecorder; - -class SparseGeoTable : public CommonSparseTable { - public: - explicit SparseGeoTable() : CommonSparseTable() { geo_recorder = nullptr; } - virtual ~SparseGeoTable() {} - - virtual int32_t InitializeValue(); - - int32_t PullGeoParam(const uint32_t trainer_id, std::vector* values, - std::vector* keys); - - int32_t PushSparse(const uint64_t* keys, const float* values, - size_t num) override; - - virtual int32_t InitializeRecorder() { - if (!geo_recorder) { - auto trainers = _config.common().trainer_num(); - geo_recorder = std::make_shared(trainers); - } - return 0; - } - - private: - std::shared_ptr geo_recorder; -}; - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc b/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc deleted file mode 100644 index 484fa9e1c6..0000000000 --- a/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifdef PADDLE_WITH_HETERPS -#include "paddle/fluid/distributed/ps/table/ssd_sparse_table.h" - -DEFINE_string(rocksdb_path, "database", "path of sparse table rocksdb file"); - -namespace paddle { -namespace distributed { - -int32_t SSDSparseTable::Initialize() { - _shards_task_pool.resize(task_pool_size_); - for (int i = 0; i < _shards_task_pool.size(); ++i) { - _shards_task_pool[i].reset(new ::ThreadPool(1)); - } - - sync = _config.common().sync(); - VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; - - _global_lr = new float(1.0); - - auto common = _config.common(); - int size = static_cast(common.params().size()); - - size_t offset = 0; - for (int x = 0; x < size; ++x) { - auto& varname = common.params()[x]; - auto& dim = common.dims()[x]; - - value_idx_[varname] = x; - value_names_.push_back(varname); - value_dims_.push_back(dim); - value_offsets_.push_back(offset); - initializer_attrs_.push_back(common.initializers()[x]); - - if (varname == "Param") { - param_dim_ = dim; - param_offset_ = offset; - } - - offset += dim; - } - - InitializeValue(); - InitializeOptimizer(); - InitializeRecorder(); - _db = paddle::distributed::RocksDBHandler::GetInstance(); - _db->initialize(FLAGS_rocksdb_path, task_pool_size_); - return 0; -} - -int32_t SSDSparseTable::Pull(TableContext& context) { - CHECK(context.value_type == Sparse); - if (context.use_ptr) { - char** pull_values = context.pull_context.ptr_values; - const uint64_t* keys = context.pull_context.keys; - return PullSparsePtr(pull_values, keys, context.num); - } else { - float* pull_values = context.pull_context.values; - const PullSparseValue& pull_value = context.pull_context.pull_value; - return PullSparse(pull_values, pull_value); - } -} - -int32_t SSDSparseTable::Push(TableContext& context) { return 0; } - -int32_t SSDSparseTable::PullSparse(float* pull_values, - const PullSparseValue& pull_value) { - auto shard_num = task_pool_size_; - std::vector> tasks(shard_num); - - for (int shard_id = 0; shard_id < shard_num; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, shard_num, &pull_value, &pull_values]() -> int { - auto& block = shard_values_[shard_id]; - - std::vector offsets; - pull_value.Fission(shard_id, shard_num, &offsets); - - for (auto& offset : offsets) { - auto feasign = pull_value.feasigns_[offset]; - auto frequencie = pull_value.frequencies_[offset]; - float* embedding = nullptr; - auto iter = block->Find(feasign); - // in mem - if (iter == block->end()) { - embedding = iter->second->data_.data(); - if (pull_value.is_training_) { - block->AttrUpdate(iter->second, frequencie); - } - } else { - // need create - std::string tmp_str(""); - if (_db->get(shard_id, (char*)&feasign, sizeof(uint64_t), - tmp_str) > 0) { - embedding = block->Init(feasign, true, frequencie); - } else { - // in db - int data_size = tmp_str.size() / sizeof(float); - int value_size = block->value_length_; - float* db_value = (float*)const_cast(tmp_str.c_str()); - VALUE* value = block->InitGet(feasign); - - // copy to mem - memcpy(value->data_.data(), db_value, - value_size * sizeof(float)); - embedding = db_value; - - // param, count, unseen_day - value->count_ = db_value[value_size]; - value->unseen_days_ = db_value[value_size + 1]; - value->is_entry_ = db_value[value_size + 2]; - if (pull_value.is_training_) { - block->AttrUpdate(value, frequencie); - } - } - } - std::copy_n(embedding + param_offset_, param_dim_, - pull_values + param_dim_ * offset); - } - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t SSDSparseTable::PullSparsePtr(char** pull_values, const uint64_t* keys, - size_t num) { - auto shard_num = task_pool_size_; - std::vector> tasks(shard_num); - - std::vector> offset_bucket; - offset_bucket.resize(task_pool_size_); - - for (int x = 0; x < num; ++x) { - auto y = keys[x] % task_pool_size_; - offset_bucket[y].push_back(x); - } - - for (int shard_id = 0; shard_id < shard_num; ++shard_id) { - tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( - [this, shard_id, &keys, &pull_values, &offset_bucket]() -> int { - auto& block = shard_values_[shard_id]; - auto& offsets = offset_bucket[shard_id]; - - for (auto& offset : offsets) { - auto feasign = keys[offset]; - auto iter = block->Find(feasign); - VALUE* value = nullptr; - // in mem - if (iter != block->end()) { - value = iter->second; - } else { - // need create - std::string tmp_str(""); - if (_db->get(shard_id, (char*)&feasign, sizeof(uint64_t), - tmp_str) > 0) { - value = block->InitGet(feasign); - } else { - // in db - int data_size = tmp_str.size() / sizeof(float); - int value_size = block->value_length_; - float* db_value = (float*)const_cast(tmp_str.c_str()); - value = block->InitGet(feasign); - - // copy to mem - memcpy(value->data_.data(), db_value, - value_size * sizeof(float)); - - // param, count, unseen_day - value->count_ = db_value[value_size]; - value->unseen_days_ = db_value[value_size + 1]; - value->is_entry_ = db_value[value_size + 2]; - } - } - pull_values[offset] = (char*)value; - } - return 0; - }); - } - - for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { - tasks[shard_id].wait(); - } - return 0; -} - -int32_t SSDSparseTable::Shrink(const std::string& param) { return 0; } - -int32_t SSDSparseTable::UpdateTable() { - int count = 0; - int value_size = shard_values_[0]->value_length_; - int db_size = 3 + value_size; - float tmp_value[db_size]; - - for (size_t i = 0; i < task_pool_size_; ++i) { - auto& block = shard_values_[i]; - - for (auto& table : block->values_) { - for (auto iter = table.begin(); iter != table.end();) { - VALUE* value = iter->second; - if (value->unseen_days_ >= 1) { - tmp_value[value_size] = value->count_; - tmp_value[value_size + 1] = value->unseen_days_; - tmp_value[value_size + 2] = value->is_entry_; - memcpy(tmp_value, value->data_.data(), sizeof(float) * value_size); - _db->put(i, (char*)&(iter->first), sizeof(uint64_t), (char*)tmp_value, - db_size * sizeof(float)); - count++; - - butil::return_object(iter->second); - iter = table.erase(iter); - } else { - ++iter; - } - } - } - _db->flush(i); - } - VLOG(1) << "Table>> update count: " << count; - return 0; -} - -int64_t SSDSparseTable::SaveValueToText(std::ostream* os, - std::shared_ptr block, - std::shared_ptr<::ThreadPool> pool, - const int mode, int shard_id) { - int64_t save_num = 0; - - for (auto& table : block->values_) { - for (auto& value : table) { - if (mode == SaveMode::delta && !value.second->need_save_) { - continue; - } - - ++save_num; - - std::stringstream ss; - auto* vs = value.second->data_.data(); - - auto id = value.first; - - ss << id << "\t" << value.second->count_ << "\t" - << value.second->unseen_days_ << "\t" << value.second->is_entry_ - << "\t"; - - for (int i = 0; i < block->value_length_ - 1; i++) { - ss << std::to_string(vs[i]) << ","; - } - - ss << std::to_string(vs[block->value_length_ - 1]); - ss << "\n"; - - os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); - - if (mode == SaveMode::base || mode == SaveMode::delta) { - value.second->need_save_ = false; - } - } - } - - if (mode != 1) { - int value_size = block->value_length_; - auto* it = _db->get_iterator(shard_id); - - for (it->SeekToFirst(); it->Valid(); it->Next()) { - float* value = (float*)const_cast(it->value().data()); - std::stringstream ss; - ss << *((uint64_t*)const_cast(it->key().data())) << "\t" - << value[value_size] << "\t" << value[value_size + 1] << "\t" - << value[value_size + 2] << "\t"; - for (int i = 0; i < block->value_length_ - 1; i++) { - ss << std::to_string(value[i]) << ","; - } - - ss << std::to_string(value[block->value_length_ - 1]); - ss << "\n"; - - os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); - } - } - - return save_num; -} - -int32_t SSDSparseTable::Load(const std::string& path, - const std::string& param) { - rwlock_->WRLock(); - VLOG(3) << "ssd sparse table load with " << path << " with meta " << param; - LoadFromText(path, param, _shard_idx, _shard_num, task_pool_size_, - &shard_values_); - rwlock_->UNLock(); - return 0; -} - -int64_t SSDSparseTable::LoadFromText( - const std::string& valuepath, const std::string& metapath, - const int pserver_id, const int pserver_num, const int local_shard_num, - std::vector>* blocks) { - Meta meta = Meta(metapath); - - int num_lines = 0; - std::ifstream file(valuepath); - std::string line; - - int value_size = shard_values_[0]->value_length_; - int db_size = 3 + value_size; - float tmp_value[db_size]; - - while (std::getline(file, line)) { - auto values = paddle::string::split_string(line, "\t"); - auto id = std::stoull(values[0]); - - if (id % pserver_num != pserver_id) { - VLOG(3) << "will not load " << values[0] << " from " << valuepath - << ", please check id distribution"; - continue; - } - - auto shard_id = id % local_shard_num; - auto block = blocks->at(shard_id); - - std::vector> kvalues; - ProcessALine(values, meta, id, &kvalues); - - block->Init(id, false); - - VALUE* value_instant = block->GetValue(id); - - if (values.size() == 5) { - value_instant->count_ = std::stoi(values[1]); - value_instant->unseen_days_ = std::stoi(values[2]); - value_instant->is_entry_ = static_cast(std::stoi(values[3])); - } - - std::vector block_values = block->Get(id, meta.names, meta.dims); - auto blas = GetBlas(); - for (int x = 0; x < meta.names.size(); ++x) { - blas.VCOPY(meta.dims[x], kvalues[x].data(), block_values[x]); - } - VLOG(3) << "loading: " << id - << "unseen day: " << value_instant->unseen_days_; - if (value_instant->unseen_days_ >= 1) { - tmp_value[value_size] = value_instant->count_; - tmp_value[value_size + 1] = value_instant->unseen_days_; - tmp_value[value_size + 2] = value_instant->is_entry_; - memcpy(tmp_value, value_instant->data_.data(), - sizeof(float) * value_size); - _db->put(shard_id, (char*)&(id), sizeof(uint64_t), (char*)tmp_value, - db_size * sizeof(float)); - block->erase(id); - } - } - - return 0; -} - -} // namespace ps -} // namespace paddle -#endif diff --git a/paddle/fluid/distributed/ps/table/ssd_sparse_table.h b/paddle/fluid/distributed/ps/table/ssd_sparse_table.h deleted file mode 100644 index 11a776bd9e..0000000000 --- a/paddle/fluid/distributed/ps/table/ssd_sparse_table.h +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/depends/rocksdb_warpper.h" -#ifdef PADDLE_WITH_HETERPS -namespace paddle { -namespace distributed { -class SSDSparseTable : public CommonSparseTable { - public: - SSDSparseTable() {} - virtual ~SSDSparseTable() {} - - virtual int32_t Initialize() override; - - void SaveMetaToText(std::ostream* os, const CommonAccessorParameter& common, - const size_t shard_idx, const int64_t total); - - int64_t SaveValueToText(std::ostream* os, std::shared_ptr block, - std::shared_ptr<::ThreadPool> pool, const int mode, - int shard_id); - - virtual int64_t LoadFromText( - const std::string& valuepath, const std::string& metapath, - const int pserver_id, const int pserver_num, const int local_shard_num, - std::vector>* blocks); - - virtual int32_t Load(const std::string& path, const std::string& param); - - // exchange data - virtual int32_t UpdateTable(); - - virtual int32_t Pull(TableContext& context); - virtual int32_t Push(TableContext& context); - - virtual int32_t PullSparse(float* values, const PullSparseValue& pull_value); - - virtual int32_t PullSparsePtr(char** pull_values, const uint64_t* keys, - size_t num); - - virtual int32_t Flush() override { return 0; } - virtual int32_t Shrink(const std::string& param) override; - virtual void Clear() override {} - - private: - RocksDBHandler* _db; - int64_t _cache_tk_size; -}; - -} // namespace ps -} // namespace paddle -#endif diff --git a/paddle/fluid/distributed/ps/table/table.cc b/paddle/fluid/distributed/ps/table/table.cc index 0a7352c977..0fbdfb6fcc 100644 --- a/paddle/fluid/distributed/ps/table/table.cc +++ b/paddle/fluid/distributed/ps/table/table.cc @@ -17,15 +17,11 @@ #include "glog/logging.h" #include "paddle/fluid/distributed/common/registerer.h" -#include "paddle/fluid/distributed/ps/table/common_dense_table.h" #include "paddle/fluid/distributed/ps/table/common_graph_table.h" -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/memory_sparse_geo_table.h" -#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h" -#ifdef PADDLE_WITH_HETERPS -#include "paddle/fluid/distributed/ps/table/ssd_sparse_table.h" -#endif +#include "paddle/fluid/distributed/ps/table/memory_dense_table.h" + #include "paddle/fluid/distributed/ps/table/ctr_accessor.h" +#include "paddle/fluid/distributed/ps/table/memory_sparse_geo_table.h" #include "paddle/fluid/distributed/ps/table/memory_sparse_table.h" #include "paddle/fluid/distributed/ps/table/sparse_accessor.h" #include "paddle/fluid/distributed/ps/table/tensor_accessor.h" @@ -34,14 +30,11 @@ namespace paddle { namespace distributed { REGISTER_PSCORE_CLASS(Table, GraphTable); -REGISTER_PSCORE_CLASS(Table, CommonDenseTable); -REGISTER_PSCORE_CLASS(Table, CommonSparseTable); +REGISTER_PSCORE_CLASS(Table, MemoryDenseTable); #ifdef PADDLE_WITH_HETERPS -REGISTER_PSCORE_CLASS(Table, SSDSparseTable); REGISTER_PSCORE_CLASS(GraphSampler, CompleteGraphSampler); REGISTER_PSCORE_CLASS(GraphSampler, BasicBfsGraphSampler); #endif -REGISTER_PSCORE_CLASS(Table, SparseGeoTable); REGISTER_PSCORE_CLASS(Table, BarrierTable); REGISTER_PSCORE_CLASS(Table, TensorTable); REGISTER_PSCORE_CLASS(Table, DenseTensorTable); diff --git a/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc b/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc index d5e196ff32..f9d57be95a 100644 --- a/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc +++ b/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc @@ -63,7 +63,7 @@ void InitTensorsOnClient(framework::Scope* scope, platform::CPUPlace* place, void GetDownpourDenseTableProto( ::paddle::distributed::TableParameter* dense_table_proto) { dense_table_proto->set_table_id(0); - dense_table_proto->set_table_class("CommonDenseTable"); + dense_table_proto->set_table_class("MemoryDenseTable"); dense_table_proto->set_shard_num(256); dense_table_proto->set_type(::paddle::distributed::PS_DENSE_TABLE); ::paddle::distributed::TableAccessorParameter* accessor_proto = diff --git a/paddle/fluid/distributed/test/ctr_accessor_test.cc b/paddle/fluid/distributed/test/ctr_accessor_test.cc index 844aa54946..258b4d3326 100644 --- a/paddle/fluid/distributed/test/ctr_accessor_test.cc +++ b/paddle/fluid/distributed/test/ctr_accessor_test.cc @@ -164,7 +164,7 @@ TEST(downpour_feature_value_accessor_test, test_update) { for (auto i = 0u; i < item_size; ++i) { float* p = new float[acc->GetAccessorInfo().update_dim]; for (auto j = 0u; j < acc->GetAccessorInfo().update_dim; ++j) { - p[j] = i; + p[j] = i + 1; } grad[i] = p; } @@ -247,9 +247,9 @@ TEST(downpour_feature_value_accessor_test, test_update) { v.delta_score += acc->ShowClickScore(push_v.show, push_v.click); acc->_embed_sgd_rule->UpdateValue(&v.embed_w, &v.embed_g2sum[0], - &push_v.embed_g); + &push_v.embed_g, push_v.show); acc->_embedx_sgd_rule->UpdateValue(&v.embedx_w[0], &v.embedx_g2sum[0], - &push_v.embedx_g[0]); + &push_v.embedx_g[0], push_v.show); float* ptr = new float[acc->GetAccessorInfo().dim]; v.to_array(ptr, parameter.embedx_dim()); diff --git a/paddle/fluid/distributed/test/dense_table_test.cc b/paddle/fluid/distributed/test/dense_table_test.cc index 40992b1b53..9529c776c1 100644 --- a/paddle/fluid/distributed/test/dense_table_test.cc +++ b/paddle/fluid/distributed/test/dense_table_test.cc @@ -16,22 +16,22 @@ limitations under the License. */ #include #include "gtest/gtest.h" #include "paddle/fluid/distributed/ps.pb.h" -#include "paddle/fluid/distributed/ps/table/common_dense_table.h" +#include "paddle/fluid/distributed/ps/table/memory_dense_table.h" namespace paddle { namespace distributed { -// CommonDenseTable + Adam +// MemoryDenseTable + Adam class Table; -TEST(CommonDenseTable, Adam) { +TEST(MemoryDenseTable, Adam) { int fea_dim = 10; int trainers = 2; TableParameter table_config; - table_config.set_table_class("CommonDenseTable"); + table_config.set_table_class("MemoryDenseTable"); FsClientParameter fs_config; - Table *table = new CommonDenseTable(); + Table *table = new MemoryDenseTable(); TableAccessorParameter *accessor_config = table_config.mutable_accessor(); accessor_config->set_accessor_class("CommMergeAccessor"); CommonAccessorParameter *common_config = table_config.mutable_common(); @@ -141,15 +141,15 @@ TEST(CommonDenseTable, Adam) { } } -// CommonDenseTable + Adam -TEST(CommonDenseTable, SGD) { +// MemoryDenseTable + Adam +TEST(MemoryDenseTable, SGD) { int fea_dim = 10; int trainers = 2; TableParameter table_config; - table_config.set_table_class("CommonDenseTable"); + table_config.set_table_class("MemoryDenseTable"); FsClientParameter fs_config; - Table *table = new CommonDenseTable(); + Table *table = new MemoryDenseTable(); TableAccessorParameter *accessor_config = table_config.mutable_accessor(); accessor_config->set_accessor_class("CommMergeAccessor"); CommonAccessorParameter *common_config = table_config.mutable_common(); diff --git a/paddle/fluid/distributed/test/geo_table_test.cc b/paddle/fluid/distributed/test/geo_table_test.cc deleted file mode 100644 index b148c32f49..0000000000 --- a/paddle/fluid/distributed/test/geo_table_test.cc +++ /dev/null @@ -1,124 +0,0 @@ -/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include - -#include -#include -#include // NOLINT - -#include "google/protobuf/text_format.h" -#include "gtest/gtest.h" -#include "paddle/fluid/distributed/ps.pb.h" -#include "paddle/fluid/distributed/ps/table/common_dense_table.h" -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/depends/sparse_utils.h" -#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h" -#include "paddle/fluid/distributed/ps/table/table.h" - -namespace paddle { -namespace distributed { - -// SparseGeoTable + SSUM -TEST(SparseGeoTable, SSUM) { - int emb_dim = 10; - int trainers = 2; - - TableParameter table_config; - table_config.set_table_class("SparseGeoTable"); - FsClientParameter fs_config; - Table *table = new SparseGeoTable(); - TableAccessorParameter *accessor_config = table_config.mutable_accessor(); - accessor_config->set_accessor_class("CommMergeAccessor"); - CommonAccessorParameter *common_config = table_config.mutable_common(); - common_config->set_name("sum"); - common_config->set_table_name("ssum_test_table"); - common_config->set_trainer_num(trainers); - common_config->add_params("Param"); - common_config->add_dims(emb_dim); - common_config->add_initializers("fill_constant&1.0"); - - auto ret = table->initialize(table_config, fs_config); - ASSERT_EQ(ret, 0); - - // test push_sparse_param, and create params - std::vector init_keys = {0, 1, 2, 3, 4}; - std::vector init_fres = {1, 1, 1, 1, 1}; - std::vector init_values; - for (size_t i = 0; i < init_keys.size() * emb_dim; i++) { - init_values.push_back(0.0); - } - table->push_sparse_param(init_keys.data(), init_values.data(), - init_keys.size()); - - std::vector pull_values(init_values.size()); - auto value = PullSparseValue(init_keys, init_fres, emb_dim); - table->pull_sparse(pull_values.data(), value); - - for (size_t i = 0; i < init_keys.size() * emb_dim; i++) { - ASSERT_TRUE(abs(pull_values[i] - init_values[i]) < 1e-5); - } - - std::vector> trainer_keys; - std::vector> trainer_values; - trainer_keys.resize(trainers); - trainer_values.resize(trainers); - float start = 0.0; - for (int i = 0; i < trainers; i++) { - trainer_keys[i] = init_keys; - for (size_t j = 0; j < trainer_keys[i].size(); j++) { - auto id = trainer_keys[i][j]; - for (int k = 0; k < emb_dim; k++) { - trainer_values[i].push_back(start); - pull_values[id * emb_dim + k] += start; - start += 0.1; - } - } - } - - std::shared_ptr<::ThreadPool> pool_ = - std::make_shared<::ThreadPool>(trainers); - std::vector> task_status; - for (int i = 0; i < trainers; i++) { - auto &push_keys = trainer_keys[i]; - auto &push_values = trainer_values[i]; - auto task = [table, &push_keys, &push_values] { - table->push_sparse(push_keys.data(), push_values.data(), - push_keys.size()); - }; - task_status.push_back(pool_->enqueue(std::move(task))); - } - for (auto &status : task_status) { - status.wait(); - } - - std::vector> geo_pull_ids; - std::vector> geo_pull_values; - geo_pull_ids.resize(trainers); - geo_pull_values.resize(trainers); - for (int i = 0; i < trainers; i++) { - table->pull_geo_param(i, &geo_pull_values[i], &geo_pull_ids[i]); - ASSERT_EQ(geo_pull_values[i].size(), geo_pull_ids[i].size() * emb_dim); - for (size_t j = 0; j < geo_pull_ids[i].size(); ++j) { - auto id = geo_pull_ids[i][j]; - for (int k = 0; k < emb_dim; k++) { - ASSERT_TRUE(abs(geo_pull_values[i][j * emb_dim + k] - - pull_values[id * emb_dim + k]) < 1e-5); - } - } - } -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/test/large_scale_test.cc b/paddle/fluid/distributed/test/large_scale_test.cc deleted file mode 100644 index 13c1d13212..0000000000 --- a/paddle/fluid/distributed/test/large_scale_test.cc +++ /dev/null @@ -1,71 +0,0 @@ -/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include - -#include -#include -#include // NOLINT - -#include "google/protobuf/text_format.h" -#include "gtest/gtest.h" -#include "paddle/fluid/distributed/ps.pb.h" -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h" -#include "paddle/fluid/distributed/ps/table/table.h" - -namespace paddle { -namespace distributed { - -TEST(BENCHMARK, LargeScaleKV) { - int emb_dim = 10; - int trainers = 2; - float beta1 = 0.9; - float beta2 = 0.999; - float epsilon = 1.0e-8; - - TableParameter table_config; - table_config.set_table_class("CommonSparseTable"); - FsClientParameter fs_config; - Table *table = new CommonSparseTable(); - TableAccessorParameter *accessor_config = table_config.mutable_accessor(); - accessor_config->set_accessor_class("CommMergeAccessor"); - CommonAccessorParameter *common_config = table_config.mutable_common(); - common_config->set_name("adam"); - common_config->set_table_name("adam_test_table"); - common_config->set_trainer_num(trainers); - common_config->add_params("Param"); - common_config->add_dims(emb_dim); - common_config->add_initializers("uniform_random&0&-1.0&1.0"); - common_config->add_params("LearningRate"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - common_config->add_params("Moment1"); - common_config->add_dims(emb_dim); - common_config->add_initializers("fill_constant&0.0"); - common_config->add_params("Moment2"); - common_config->add_dims(emb_dim); - common_config->add_initializers("fill_constant&0.0"); - common_config->add_params("Beta1Pow"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - common_config->add_params("Beta2Pow"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - auto ret = table->initialize(table_config, fs_config); - ASSERT_EQ(ret, 0); -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/test/sparse_table_test.cc b/paddle/fluid/distributed/test/sparse_table_test.cc deleted file mode 100644 index f13bab078a..0000000000 --- a/paddle/fluid/distributed/test/sparse_table_test.cc +++ /dev/null @@ -1,223 +0,0 @@ -/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include - -#include -#include -#include // NOLINT - -#include "google/protobuf/text_format.h" -#include "gtest/gtest.h" -#include "paddle/fluid/distributed/ps.pb.h" -#include "paddle/fluid/distributed/ps/table/common_dense_table.h" -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h" -#include "paddle/fluid/distributed/ps/table/table.h" - -namespace paddle { -namespace distributed { - -// CommonSparseTable + SSGD -TEST(CommonSparseTable, SGD) { - int emb_dim = 10; - int trainers = 2; - - TableParameter table_config; - table_config.set_table_class("CommonSparseTable"); - FsClientParameter fs_config; - Table *table = new CommonSparseTable(); - TableAccessorParameter *accessor_config = table_config.mutable_accessor(); - accessor_config->set_accessor_class("CommMergeAccessor"); - CommonAccessorParameter *common_config = table_config.mutable_common(); - common_config->set_name("sgd"); - common_config->set_table_name("sgd_test_table"); - common_config->set_trainer_num(trainers); - common_config->add_params("Param"); - common_config->add_dims(emb_dim); - common_config->add_initializers("uniform_random&0&-1.0&1.0"); // param - common_config->add_params("LearningRate"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); // learning_rate - auto ret = table->initialize(table_config, fs_config); - ASSERT_EQ(ret, 0); - - // pull parameters for create and check - std::vector init_keys = {0, 1, 2, 3, 4}; - std::vector init_fres = {1, 1, 1, 1, 1}; - - std::vector init_values; - init_values.resize(init_keys.size() * emb_dim); - - std::vector pull_values(init_values.size()); - auto value = PullSparseValue(init_keys, init_fres, emb_dim); - table->pull_sparse(init_values.data(), value); - - // for check - std::vector total_gradients; - total_gradients.resize(init_keys.size() * emb_dim); - memset(total_gradients.data(), 0, sizeof(float) * total_gradients.size()); - - // push gradient - std::vector> trainer_keys; - std::vector> trainer_gradient_values; - trainer_keys.resize(trainers); - trainer_gradient_values.resize(trainers); - float start = 0.0; - for (int i = 0; i < trainers; i++) { - trainer_keys[i] = init_keys; - for (size_t j = 0; j < trainer_keys[i].size(); j++) { - auto id = trainer_keys[i][j]; - for (int k = 0; k < emb_dim; k++) { - trainer_gradient_values[i].push_back(start); - total_gradients[id * emb_dim + k] += start; - start += 0.1; - } - } - } - - std::shared_ptr<::ThreadPool> pool_ = - std::make_shared<::ThreadPool>(trainers); - std::vector> task_status; - for (int i = 0; i < trainers; i++) { - auto &push_keys = trainer_keys[i]; - auto &push_values = trainer_gradient_values[i]; - auto task = [table, &push_keys, &push_values] { - table->push_sparse(push_keys.data(), push_values.data(), - push_keys.size()); - }; - task_status.push_back(pool_->enqueue(std::move(task))); - } - for (auto &status : task_status) { - status.wait(); - } - - std::vector pull_values; - pull_values.resize(init_keys.size() * emb_dim); - table->pull_sparse(init_values.data(), value); - - for (size_t i = 0; i < init_values.size(); ++i) { - auto update_val = init_values[i] - 1.0 * total_gradients[i]; - ASSERT_TRUE(abs(update_val - pull_values[i]) < 1e-5); - } -} - -// CommonSparseTable + Adam -TEST(CommonSparseTable, Adam) { - int emb_dim = 10; - int trainers = 2; - float beta1 = 0.9; - float beta2 = 0.999; - float epsilon = 1.0e-8; - - TableParameter table_config; - table_config.set_table_class("CommonSparseTable"); - FsClientParameter fs_config; - Table *table = new CommonSparseTable(); - TableAccessorParameter *accessor_config = table_config.mutable_accessor(); - accessor_config->set_accessor_class("CommMergeAccessor"); - CommonAccessorParameter *common_config = table_config.mutable_common(); - common_config->set_name("adam"); - common_config->set_table_name("adam_test_table"); - common_config->set_trainer_num(trainers); - common_config->add_params("Param"); - common_config->add_dims(emb_dim); - common_config->add_initializers("uniform_random&0&-1.0&1.0"); - common_config->add_params("LearningRate"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - common_config->add_params("Moment1"); - common_config->add_dims(emb_dim); - common_config->add_initializers("fill_constant&0.0"); - common_config->add_params("Moment2"); - common_config->add_dims(emb_dim); - common_config->add_initializers("fill_constant&0.0"); - common_config->add_params("Beta1Pow"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - common_config->add_params("Beta2Pow"); - common_config->add_dims(1); - common_config->add_initializers("fill_constant&1.0"); - auto ret = table->initialize(table_config, fs_config); - ASSERT_EQ(ret, 0); - - // pull parameters for create and check - std::vector init_keys = {0, 1, 2, 3, 4}; - std::vector init_fres = {1, 1, 1, 1, 1}; - - std::vector init_values; - init_values.resize(init_keys.size() * emb_dim); - - auto value = PullSparseValue(init_keys, init_fres, emb_dim); - table->pull_sparse(init_values.data(), value); - - // push gradient - std::vector> trainer_keys; - std::vector> trainer_gradient_values; - trainer_keys.resize(trainers); - trainer_gradient_values.resize(trainers); - float start = 0.0; - for (int i = 0; i < trainers; i++) { - trainer_keys[i] = init_keys; - for (size_t j = 0; j < trainer_keys[i].size(); j++) { - for (int k = 0; k < emb_dim; k++) { - trainer_gradient_values[i].push_back(start); - start += 0.1; - } - } - } - - for (int i = 0; i < trainers; i++) { - auto &push_keys = trainer_keys[i]; - auto &push_values = trainer_gradient_values[i]; - table->push_sparse(push_keys.data(), push_values.data(), push_keys.size()); - } - - std::vector pull_values; - pull_values.resize(init_keys.size() * emb_dim); - table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size()); - - for (size_t idx = 0; idx < init_keys.size(); idx += emb_dim) { - std::vector beta1_pow, beta2_pow, lr, mom1, mom2, param; - beta1_pow.push_back(beta1); - beta2_pow.push_back(beta2); - lr.push_back(1.0); - for (int i = 0; i < emb_dim; i++) { - mom1.push_back(0.0); - mom2.push_back(0.0); - param.push_back(init_values[idx + i]); - } - for (int i = 0; i < trainers; i++) { - auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); - for (int j = 0; j < emb_dim; j++) { - mom1[j] = - beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][idx + j]; - mom2[j] = beta2 * mom2[j] + - (1 - beta2) * trainer_gradient_values[i][idx + j] * - trainer_gradient_values[i][idx + j]; - param[j] = param[j] - - lr_ * (mom1[j] / - (sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0]))); - } - beta1_pow[0] *= beta1; - beta2_pow[0] *= beta2; - } - for (int i = 0; i < emb_dim; i++) { - ASSERT_TRUE(abs(param[i] - pull_values[idx + i]) < 1e-5); - } - } -} - -} // namespace distributed -} // namespace paddle diff --git a/paddle/fluid/distributed/test/table_test.cc b/paddle/fluid/distributed/test/table_test.cc index 8690aee39f..4f73519ef5 100644 --- a/paddle/fluid/distributed/test/table_test.cc +++ b/paddle/fluid/distributed/test/table_test.cc @@ -14,18 +14,18 @@ limitations under the License. */ #include "gtest/gtest.h" #include "paddle/fluid/distributed/ps.pb.h" -#include "paddle/fluid/distributed/ps/table/common_sparse_table.h" -#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h" +#include "paddle/fluid/distributed/ps/table/memory_dense_table.h" +//#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h" namespace paddle { namespace distributed { TEST(Table, Initialize) { TableParameter table_config; - table_config.set_table_class("SparseGeoTable"); + table_config.set_table_class("MemoryDenseTable"); FsClientParameter fs_config; // case 1. no accessor - Table *table = new SparseGeoTable(); + Table *table = new MemoryDenseTable(); auto ret = table->Initialize(table_config, fs_config); ASSERT_EQ(ret, -1); } diff --git a/paddle/fluid/operators/pscore/send_op.cc b/paddle/fluid/operators/pscore/send_op.cc index 5b4a641f29..4ca99115be 100644 --- a/paddle/fluid/operators/pscore/send_op.cc +++ b/paddle/fluid/operators/pscore/send_op.cc @@ -47,7 +47,7 @@ class SendOp : public framework::OperatorBase { auto send_varnames = Attr>("send_varnames"); - // for common_dense_table, distributed_push_sparse op for push sparse in + // for memory_dense_table, distributed_push_sparse op for push sparse in // async if (is_sparse == 0 && send_varnames.size() >= 1 && send_varnames[0] != "@PS_STEP_COUNTER@") { diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index 47e1c64f99..c90fab6af5 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -984,7 +984,7 @@ class TheOnePSRuntime(RuntimeBase): table_proto.accessor) else: table.type = "PS_DENSE_TABLE" - table.table_class = "CommonDenseTable" + table.table_class = "MemoryDenseTable" table.shard_num = 256 common.table_name = "MergedDense" diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index 1fd435cca1..1d23567b72 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -665,7 +665,7 @@ class DenseTable(Table): table_proto.table_id = ctx.table_id() table_proto.type = the_one_ps_pb2.PS_DENSE_TABLE - table_proto.table_class = "CommonDenseTable" + table_proto.table_class = "MemoryDenseTable" table_proto.shard_num = 256 table_proto.accessor.accessor_class = 'CommMergeAccessor' -- GitLab