未验证 提交 a288fcab 编写于 作者: Z zhaocaibei123 提交者: GitHub

Table refine: remove table/accessor unuseful (#41400)

* update name

* update name

* fix test

* fix fleet bind

* update name

* update name

* fix test

* fix gpups wrapper

* remove Push/Pull/Load/Save with context in client and wrapper base class

* fix

* fix

* remove some interface

* fix

* remove

* code style

* recover

* fix

* remove code unused

* remove some unused table & accessor & CommonDenseTable => MemoryDenseTable

* fix

* fix

* fix

* recover

* remove unused code
Co-authored-by: Nesythan <esythan@126.com>
上级 1bd8125f
......@@ -7,10 +7,7 @@ set_source_files_properties(${graphDir}/graph_weighted_sampler.cc PROPERTIES COM
cc_library(WeightedSampler SRCS ${graphDir}/graph_weighted_sampler.cc DEPS graph_edge)
set_source_files_properties(${graphDir}/graph_node.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(graph_node SRCS ${graphDir}/graph_node.cc DEPS WeightedSampler)
set_source_files_properties(common_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(common_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(ssd_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(memory_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(barrier_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(common_graph_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......@@ -23,10 +20,10 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp")
set(EXTERN_DEP "")
if(WITH_HETERPS)
set(TABLE_SRC common_sparse_table.cc ssd_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc common_graph_table.cc)
set(TABLE_SRC memory_dense_table.cc barrier_table.cc common_graph_table.cc)
set(EXTERN_DEP rocksdb)
else()
set(TABLE_SRC common_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc common_graph_table.cc)
set(TABLE_SRC memory_dense_table.cc barrier_table.cc common_graph_table.cc)
endif()
cc_library(common_table SRCS ${TABLE_SRC} DEPS ${TABLE_DEPS}
......@@ -43,12 +40,10 @@ set_source_files_properties(sparse_sgd_rule.cc PROPERTIES COMPILE_FLAGS ${DISTRI
set_source_files_properties(ctr_double_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(ctr_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(sparse_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(downpour_ctr_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(memory_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(sparse_sgd_rule SRCS sparse_sgd_rule.cc DEPS ${TABLE_DEPS} ps_framework_proto)
cc_library(ctr_double_accessor SRCS ctr_double_accessor.cc DEPS ${TABLE_DEPS} ps_framework_proto sparse_sgd_rule)
cc_library(ctr_accessor SRCS ctr_accessor.cc sparse_accessor.cc DEPS ${TABLE_DEPS} ps_framework_proto sparse_sgd_rule)
cc_library(downpour_ctr_accessor SRCS downpour_ctr_accessor.cc DEPS ${TABLE_DEPS} ps_framework_proto sparse_sgd_rule)
cc_library(memory_sparse_table SRCS memory_sparse_table.cc DEPS ps_framework_proto ${TABLE_DEPS} fs afs_wrapper ctr_accessor common_table)
set_source_files_properties(memory_sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include <sstream>
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace distributed {
class ValueBlock;
} // namespace distributed
} // namespace paddle
namespace paddle {
namespace distributed {
void CommonSparseTable::ProcessALine(const std::vector<std::string>& columns,
const Meta& meta, const int64_t id,
std::vector<std::vector<float>>* values) {
auto colunmn_size = columns.size();
auto load_values =
paddle::string::split_string<std::string>(columns[colunmn_size - 1], ",");
values->reserve(meta.names.size());
int offset = 0;
for (int x = 0; x < meta.names.size(); ++x) {
std::vector<float> val;
auto start = load_values.begin() + offset;
auto end = load_values.begin() + offset + meta.dims[x];
PADDLE_ENFORCE_LE(offset + meta.dims[x], load_values.size(),
paddle::platform::errors::InvalidArgument(
"The data format in txt does not meet the field "
"requirements defined in meta"));
std::transform(start, end, std::back_inserter(val), [id](std::string va) {
float v = 0.0;
try {
v = std::stof(va);
} catch (std::invalid_argument& e) {
VLOG(0) << "id: " << id << " get unexpected value: " << va
<< " and be reset to: 0.0";
} catch (std::out_of_range& e) {
VLOG(0) << "id: " << id << " get unexpected value: " << va
<< " and be reset to: 0.0";
}
return v;
});
values->push_back(val);
offset += meta.dims[x];
}
}
void CommonSparseTable::SaveMetaToText(std::ostream* os,
const CommonAccessorParameter& common,
const size_t shard_idx,
const int64_t total) {
// save meta
std::stringstream stream;
stream << "param=" << common.table_name() << "\n";
stream << "shard_id=" << shard_idx << "\n";
stream << "row_names=" << paddle::string::join_strings(common.params(), ',')
<< "\n";
stream << "row_dims=" << paddle::string::join_strings(common.dims(), ',')
<< "\n";
stream << "count=" << total << "\n";
os->write(stream.str().c_str(), sizeof(char) * stream.str().size());
}
int64_t CommonSparseTable::SaveValueToText(std::ostream* os,
std::shared_ptr<ValueBlock> block,
std::shared_ptr<::ThreadPool> pool,
const int mode, int shard_id) {
int64_t save_num = 0;
for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
++save_num;
std::stringstream ss;
auto* vs = value.second->data_.data();
auto id = value.first;
ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";
for (int i = 0; i < block->value_length_ - 1; i++) {
ss << std::to_string(vs[i]) << ",";
}
ss << std::to_string(vs[block->value_length_ - 1]);
ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second->need_save_ = false;
}
}
}
return save_num;
}
int64_t CommonSparseTable::LoadFromText(
const std::string& valuepath, const std::string& metapath,
const int pserver_id, const int pserver_num, const int local_shard_num,
std::vector<std::shared_ptr<ValueBlock>>* blocks) {
Meta meta = Meta(metapath);
int num_lines = 0;
std::ifstream file(valuepath);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
auto id = std::stoull(values[0]);
if (id % pserver_num != pserver_id) {
VLOG(3) << "will not load " << values[0] << " from " << valuepath
<< ", please check id distribution";
continue;
}
auto shard_id = id % local_shard_num;
auto block = blocks->at(shard_id);
std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, id, &kvalues);
block->Init(id, false);
VALUE* value_instant = block->GetValue(id);
if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
value_instant->is_entry_ = static_cast<bool>(std::stoi(values[3]));
}
std::vector<float*> block_values = block->Get(id, meta.names, meta.dims);
auto blas = GetBlas<float>();
for (int x = 0; x < meta.names.size(); ++x) {
blas.VCOPY(meta.dims[x], kvalues[x].data(), block_values[x]);
}
}
return 0;
}
int32_t CommonSparseTable::Initialize() {
_shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
_global_lr = new float(1.0);
auto common = _config.common();
int size = static_cast<int>(common.params().size());
size_t offset = 0;
for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x];
auto& dim = common.dims()[x];
value_idx_[varname] = x;
value_names_.push_back(varname);
value_dims_.push_back(dim);
value_offsets_.push_back(offset);
initializer_attrs_.push_back(common.initializers()[x]);
if (varname == "Param") {
param_dim_ = dim;
param_offset_ = offset;
}
offset += dim;
}
InitializeValue();
InitializeOptimizer();
InitializeRecorder();
return 0;
}
int32_t CommonSparseTable::InitializeRecorder() { return 0; }
int32_t CommonSparseTable::InitializeValue() {
auto common = _config.common();
shard_values_.reserve(task_pool_size_);
for (int x = 0; x < task_pool_size_; ++x) {
auto shard = std::make_shared<ValueBlock>(
value_names_, value_dims_, value_offsets_, value_idx_,
initializer_attrs_, common.entry());
shard_values_.emplace_back(shard);
}
return 0;
}
int32_t CommonSparseTable::InitializeOptimizer() {
auto common = _config.common();
auto name = common.name();
if (name == "sgd") {
optimizer_ = std::make_shared<SSGD>(value_names_, value_dims_,
value_offsets_, value_idx_);
optimizer_->SetGlobalLR(_global_lr);
} else if (name == "adam") {
optimizer_ = std::make_shared<SAdam>(value_names_, value_dims_,
value_offsets_, value_idx_);
optimizer_->SetGlobalLR(_global_lr);
} else if (name == "sum") {
optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else {
VLOG(3) << "init optimizer failed";
}
VLOG(3) << "init optimizer " << name << " done";
return 0;
}
int32_t CommonSparseTable::SetGlobalLR(float* lr) {
_global_lr = lr;
optimizer_->SetGlobalLR(_global_lr);
return 0;
}
int32_t CommonSparseTable::Load(const std::string& dirname,
const std::string& param) {
auto begin = GetCurrentUS();
rwlock_->WRLock();
auto varname = _config.common().table_name();
std::string var_store =
string::Sprintf("%s/%s%s", dirname, varname, PSERVER_SAVE_SUFFIX);
std::string shard_var_pre =
string::Sprintf("%s.block%d", varname, _shard_idx);
std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre);
std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre);
LoadFromText(value_, meta_, _shard_idx, _shard_num, task_pool_size_,
&shard_values_);
rwlock_->UNLock();
auto end = GetCurrentUS();
VLOG(0) << "load " << varname << " with value: " << value_
<< " , meta: " << meta_
<< " using: " << std::to_string((end - begin) / 1e+6) << " seconds";
return 0;
}
int32_t CommonSparseTable::Save(const std::string& dirname,
const std::string& param) {
auto begin = GetCurrentUS();
rwlock_->WRLock();
int mode = std::stoi(param);
VLOG(3) << "sparse table save: " << dirname << " mode: " << mode;
auto varname = _config.common().table_name();
std::string var_store =
string::Sprintf("%s/%s%s", dirname, varname, PSERVER_SAVE_SUFFIX);
MkDirRecursively(var_store.c_str());
VLOG(3) << "save " << varname << " in dir: " << var_store << " begin";
std::vector<std::string> params(_config.common().params().begin(),
_config.common().params().end());
std::string shard_var_pre =
string::Sprintf("%s.block%d", varname, _shard_idx);
std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre);
std::unique_ptr<std::ofstream> vs(new std::ofstream(value_));
int64_t total_ins = 0;
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
// save values
auto shard_save_num =
SaveValueToText(vs.get(), shard_values_[shard_id],
_shards_task_pool[shard_id], mode, shard_id);
total_ins += shard_save_num;
}
vs->close();
std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre);
std::unique_ptr<std::ofstream> ms(new std::ofstream(meta_));
SaveMetaToText(ms.get(), _config.common(), _shard_idx, total_ins);
ms->close();
auto end = GetCurrentUS();
rwlock_->UNLock();
VLOG(0) << "save " << varname << " with path: " << value_
<< " using: " << std::to_string((end - begin) / 1e+6) << " seconds";
return 0;
}
std::pair<int64_t, int64_t> CommonSparseTable::PrintTableStat() {
int64_t feasign_size = 0;
int64_t mf_size = 0;
for (auto& shard : shard_values_) {
for (auto& table : shard->values_) {
feasign_size += table.size();
}
}
return {feasign_size, mf_size};
}
int32_t CommonSparseTable::Pour() {
std::vector<float> values;
std::vector<uint64_t> keys;
keys.reserve(pull_reservoir_.size());
values.reserve(pull_reservoir_.size() * param_dim_);
for (auto& val : pull_reservoir_) {
keys.push_back(val.first);
auto& reservoir = val.second;
reservoir.avg();
std::copy(reservoir.values.begin(), reservoir.values.end(),
std::back_inserter(values));
}
_PushSparse(keys.data(), values.data(), pull_reservoir_.size());
pull_reservoir_.clear();
return 0;
}
int32_t CommonSparseTable::Pull(TableContext& context) {
CHECK(context.value_type == Sparse);
if (context.use_ptr) {
char** pull_values = context.pull_context.ptr_values;
const uint64_t* keys = context.pull_context.keys;
return PullSparsePtr(pull_values, keys, context.num);
} else {
float* pull_values = context.pull_context.values;
const PullSparseValue& pull_value = context.pull_context.pull_value;
return PullSparse(pull_values, pull_value);
}
}
int32_t CommonSparseTable::Push(TableContext& context) {
CHECK(context.value_type == Sparse);
if (context.push_context.values != nullptr) {
const float* values = context.push_context.values;
const uint64_t* keys = context.push_context.keys;
return PushSparse(keys, values, context.num);
} else {
const float** values = context.push_context.ptr_values;
const uint64_t* keys = context.push_context.keys;
return PushSparse(keys, values, context.num);
}
}
int32_t CommonSparseTable::PullSparse(float* pull_values,
const PullSparseValue& pull_value) {
auto shard_num = task_pool_size_;
std::vector<std::future<int>> tasks(shard_num);
for (int shard_id = 0; shard_id < shard_num; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, shard_num, &pull_value, &pull_values]() -> int {
auto& block = shard_values_[shard_id];
std::vector<int> offsets;
pull_value.Fission(shard_id, shard_num, &offsets);
if (pull_value.is_training_) {
for (auto& offset : offsets) {
auto feasign = pull_value.feasigns_[offset];
auto frequencie = pull_value.frequencies_[offset];
auto* value = block->Init(feasign, true, frequencie);
std::copy_n(value + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
}
} else {
for (auto& offset : offsets) {
auto feasign = pull_value.feasigns_[offset];
auto* value = block->Init(feasign, false);
std::copy_n(value + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
}
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t CommonSparseTable::PullSparsePtr(char** pull_values,
const uint64_t* keys, size_t num) {
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &pull_values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
auto* value = block->InitGet(id);
// std::copy_n(value + param_offset_, param_dim_,
// pull_values + param_dim_ * offset);
pull_values[offset] = reinterpret_cast<char*>(value);
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t CommonSparseTable::_PushSparse(const uint64_t* keys,
const float* values, size_t num) {
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &values, num, &offset_bucket]() -> int {
auto& offsets = offset_bucket[shard_id];
optimizer_->Update(keys, values, num, offsets,
shard_values_[shard_id].get());
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t CommonSparseTable::PushSparse(const uint64_t* keys, const float* values,
size_t num) {
if (sync) {
std::future<int> task =
_shards_task_pool[0]->enqueue([this, &keys, &values, num]() -> int {
for (int x = 0; x < num; ++x) {
auto id = keys[x];
auto has = pull_reservoir_.find(id);
if (has == pull_reservoir_.end()) {
pull_reservoir_[id] = ReservoirValue<float>(param_dim_);
}
auto& reservoir = pull_reservoir_[id];
reservoir.add(values + x * param_dim_, param_dim_);
}
return 0;
});
task.wait();
} else {
_PushSparse(keys, values, num);
}
return 0;
}
int32_t CommonSparseTable::PushSparse(const uint64_t* keys,
const float** values, size_t num) {
_PushSparse(keys, values, num);
return 0;
}
int32_t CommonSparseTable::_PushSparse(const uint64_t* keys,
const float** values, size_t num) {
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &values, num, &offset_bucket]() -> int {
auto& offsets = offset_bucket[shard_id];
for (size_t i = 0; i < offsets.size(); ++i) {
std::vector<uint64_t> tmp_off = {0};
optimizer_->Update(keys + offsets[i], values[offsets[i]], num,
tmp_off, shard_values_[shard_id].get());
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t CommonSparseTable::PushSparseParam(const uint64_t* keys,
const float* values, size_t num) {
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
auto* value = block->Init(id, false);
std::copy_n(values + param_dim_ * offset, param_dim_,
value + param_offset_);
block->SetEntry(id, true);
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t CommonSparseTable::Flush() { return 0; }
int32_t CommonSparseTable::Shrink(const std::string& param) {
int threshold = std::stoi(param);
VLOG(3) << "sparse table Shrink: " << threshold;
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
// Shrink
VLOG(4) << shard_id << " " << task_pool_size_ << " begin Shrink";
shard_values_[shard_id]->Shrink(threshold);
}
return 0;
}
void CommonSparseTable::Clear() { VLOG(0) << "clear coming soon"; }
} // namespace distributed
} // namespace paddle
......@@ -232,14 +232,15 @@ int32_t CtrCommonAccessor::Update(float** update_values,
(push_show - push_click) * _config.ctr_accessor_param().nonclk_coeff() +
push_click * _config.ctr_accessor_param().click_coeff();
update_value[common_feature_value.UnseenDaysIndex()] = 0;
// TODO(zhaocaibei123): add configure show_scale
_embed_sgd_rule->UpdateValue(
update_value + common_feature_value.EmbedWIndex(),
update_value + common_feature_value.EmbedG2SumIndex(),
push_value + CtrCommonPushValue::EmbedGIndex());
push_value + CtrCommonPushValue::EmbedGIndex(), push_show);
_embedx_sgd_rule->UpdateValue(
update_value + common_feature_value.EmbedxWIndex(),
update_value + common_feature_value.EmbedxG2SumIndex(),
push_value + CtrCommonPushValue::EmbedxGIndex());
push_value + CtrCommonPushValue::EmbedxGIndex(), push_show);
}
return 0;
}
......
......@@ -99,7 +99,7 @@ class DSGD : public DenseOptimizer {
};
// adam optimizer for dense tensor
// TODO(zhaocaibei123): add CHECK(common_dense_table.task_pool_size_) == 1
// TODO(zhaocaibei123): add CHECK(memory_dense_table.task_pool_size_) == 1
class DAdam : public DenseOptimizer {
public:
explicit DAdam(const CommonAccessorParameter& accessor,
......@@ -132,7 +132,7 @@ class DAdam : public DenseOptimizer {
epsilon = 1.0e-8;
}
// make sure common_dense_table.task_pool_size_ == 1;
// make sure memory_dense_table.task_pool_size_ == 1;
// otherwise, task_pool_size_ times beta1_pow/beta2_pow multiplication
void Update(const float* update_values, size_t num, int begin,
int end) override {
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <math.h> // for sqrt in CPU and CUDA
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "gflags/gflags.h"
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h"
namespace paddle {
namespace distributed {
class SparseOptimizer {
public:
explicit SparseOptimizer(
const std::vector<std::string>& value_names,
const std::vector<int>& value_dims, const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: value_names_(value_names),
value_dims_(value_dims),
value_offsets_(value_offsets),
value_idx_(value_idx) {}
virtual void Update(const uint64_t* keys, const float* update_values,
size_t num, const std::vector<uint64_t>& offsets,
ValueBlock* block) = 0;
virtual void SetGlobalLR(float* lr) { global_learning_rate_ = lr; }
const std::vector<std::string>& value_names_;
const std::vector<int>& value_dims_;
const std::vector<int>& value_offsets_;
const std::unordered_map<std::string, int>& value_idx_;
int param_offset = 0;
int update_numel = 0;
protected:
float* global_learning_rate_;
};
// sum calc for sparse tensor
class SSUM : public SparseOptimizer {
public:
explicit SSUM(const std::vector<std::string>& value_names,
const std::vector<int>& value_dims,
const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
auto idx = value_idx.at("Param");
param_offset = value_offsets.at(idx);
update_numel = value_dims.at(idx);
}
void Update(const uint64_t* keys, const float* update_values, size_t num,
const std::vector<uint64_t>& offsets,
ValueBlock* block) override {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
if (!block->GetEntry(id)) continue;
auto* value = block->Get(id);
float* param = value + param_offset;
blas.VADD(update_numel, update_values + x * update_numel, param, param);
}
}
};
// sgd optimzer for sparse tensor
class SSGD : public SparseOptimizer {
public:
explicit SSGD(const std::vector<std::string>& value_names,
const std::vector<int>& value_dims,
const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
auto idx = value_idx.at("Param");
param_offset = value_offsets.at(idx);
update_numel = value_dims.at(idx);
idx = value_idx.at("LearningRate");
lr_offset = value_offsets.at(idx);
}
void Update(const uint64_t* keys, const float* update_values, size_t num,
const std::vector<uint64_t>& offsets,
ValueBlock* block) override {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
if (!block->GetEntry(id)) continue;
auto* value = block->Get(id);
float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0];
float* param = value + param_offset;
std::vector<float> grads;
grads.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, grads.data());
blas.SCAL(update_numel, learning_rate, grads.data());
blas.VSUB(update_numel, param, grads.data(), param);
}
}
int lr_offset;
};
// adam optimzer for sparse tensor
class SAdam : public SparseOptimizer {
public:
explicit SAdam(const std::vector<std::string>& value_names,
const std::vector<int>& value_dims,
const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
auto idx = value_idx.at("Param");
param_offset = value_offsets.at(idx);
update_numel = value_dims.at(idx);
idx = value_idx.at("LearningRate");
lr_offset = value_offsets.at(idx);
idx = value_idx.at("Moment1");
m1_offset = value_offsets.at(idx);
idx = value_idx.at("Moment2");
m2_offset = value_offsets.at(idx);
idx = value_idx.at("Beta1Pow");
beta1_pow_offset = value_offsets.at(idx);
idx = value_idx.at("Beta2Pow");
beta2_pow_offset = value_offsets.at(idx);
// add attr later
beta1 = 0.9;
beta2 = 0.999;
epsilon = 1.0e-8;
}
void Update(const uint64_t* keys, const float* update_values, size_t num,
const std::vector<uint64_t>& offsets,
ValueBlock* block) override {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
if (!block->GetEntry(id)) continue;
auto* values = block->Get(id);
float lr_ = *(global_learning_rate_) * (values + lr_offset)[0];
float* param = values + param_offset;
float* moment1 = values + m1_offset;
float* moment2 = values + m2_offset;
float* beta1_pow = values + beta1_pow_offset;
float* beta2_pow = values + beta2_pow_offset;
beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2;
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
std::vector<float> grad, grad2, tmp;
grad.resize(update_numel);
grad2.resize(update_numel);
tmp.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, grad.data());
blas.VCOPY(update_numel, update_values + x * update_numel, grad2.data());
blas.SCAL(update_numel, 1 - beta1, grad.data());
blas.VSQUARE(update_numel, grad2.data(), grad2.data());
blas.SCAL(update_numel, 1 - beta2, grad2.data());
blas.SCAL(update_numel, beta1, moment1);
blas.VADD(update_numel, moment1, grad.data(), moment1);
blas.SCAL(update_numel, beta2, moment2);
blas.VADD(update_numel, moment2, grad2.data(), moment2);
float* tmp_ = tmp.data();
float eps_ = epsilon * sqrt(1 - beta2_pow[0]);
SQRT<float>(update_numel, moment2, tmp_);
ADD<float>(update_numel, tmp_, eps_, tmp_);
blas.VDIV(update_numel, moment1, tmp_, tmp_);
blas.SCAL(update_numel, lr_, tmp_);
blas.VSUB(update_numel, param, tmp_, param);
}
}
int lr_offset;
int m1_offset;
int m2_offset;
int beta1_pow_offset;
int beta2_pow_offset;
float beta1;
float beta2;
float epsilon;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/ps/table/downpour_ctr_accessor.h"
#include <gflags/gflags.h>
#include "glog/logging.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
int DownpourCtrAccessor::Initialize() {
auto name = _config.embed_sgd_param().name();
_embed_sgd_rule = CREATE_PSCORE_CLASS(SparseValueSGDRule, name);
_embed_sgd_rule->LoadConfig(_config.embed_sgd_param(), 1);
name = _config.embedx_sgd_param().name();
_embedx_sgd_rule = CREATE_PSCORE_CLASS(SparseValueSGDRule, name);
_embedx_sgd_rule->LoadConfig(_config.embedx_sgd_param(),
_config.embedx_dim());
_show_click_decay_rate = _config.ctr_accessor_param().show_click_decay_rate();
_ssd_unseenday_threshold =
_config.ctr_accessor_param().ssd_unseenday_threshold();
set_time_decay_rates();
InitAccessorInfo();
return 0;
}
void DownpourCtrAccessor::InitAccessorInfo() {
auto embedx_dim = _config.embedx_dim();
_accessor_info.dim = DownpourCtrFeatureValue::Dim(embedx_dim);
_accessor_info.size = DownpourCtrFeatureValue::Size(embedx_dim);
_accessor_info.select_dim = 3 + embedx_dim;
_accessor_info.select_size = _accessor_info.select_dim * sizeof(float);
_accessor_info.update_dim = 4 + embedx_dim;
_accessor_info.update_size = _accessor_info.update_dim * sizeof(float);
_accessor_info.mf_size = (embedx_dim + 1) * sizeof(float);
}
bool DownpourCtrAccessor::Shrink(float* value) {
// auto base_threshold = _config.ctr_accessor_param().base_threshold();
// auto delta_threshold = _config.ctr_accessor_param().delta_threshold();
// auto delete_threshold = _config.ctr_accessor_param().delete_threshold();
auto base_threshold = _config.ctr_accessor_param().base_threshold();
auto delta_threshold = _config.ctr_accessor_param().delta_threshold();
auto delete_after_unseen_days =
_config.ctr_accessor_param().delete_after_unseen_days();
auto delete_threshold = _config.ctr_accessor_param().delete_threshold();
// time_decay first
auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
int16_t day_diff = _day_id - unseen_days;
if (day_diff < 0 || day_diff > delete_after_unseen_days) {
return true;
}
auto show_right =
DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff];
auto click_right =
DownpourCtrFeatureValue::Click(value) * _time_decay_rates[day_diff];
// shrink after
auto score = ShowClickScore(show_right, click_right);
if (score < delete_threshold) {
return true;
}
return false;
}
void DownpourCtrAccessor::set_day_id(int day_id) { _day_id = day_id; }
int DownpourCtrAccessor::get_day_id() { return _day_id; }
bool DownpourCtrAccessor::save_ssd(float* value) {
if (_day_id == 0) {
return true;
}
auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
if (unseen_days == 0) {
return false;
}
// for the origin load (eg. unseen_days = 0-15)
if (unseen_days < _config.ctr_accessor_param().delta_keep_days()) {
unseen_days = _day_id - unseen_days;
}
int16_t day_diff = _day_id - unseen_days;
if (day_diff > _ssd_unseenday_threshold) {
return true;
}
return false;
}
// bool DownpourCtrAccessor::save_cache(
// float* value, int param, double global_cache_threshold) {
// auto base_threshold = _config.ctr_accessor_param().base_threshold();
// auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days();
// auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
// int16_t day_diff = _day_id - unseen_days;
// if (ShowClickScore(DownpourCtrFeatureValue::Show(value),
// DownpourCtrFeatureValue::Click(value)) >= base_threshold
// && day_diff <= delta_keep_days) {
// return DownpourCtrFeatureValue::Show(value) > global_cache_threshold;
// }
// return false;
// }
bool DownpourCtrAccessor::Save(float* value, int param) {
// auto base_threshold = _config.ctr_accessor_param().base_threshold();
// auto delta_threshold = _config.ctr_accessor_param().delta_threshold();
// auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days();
auto base_threshold = _config.ctr_accessor_param().base_threshold();
auto delta_threshold = _config.ctr_accessor_param().delta_threshold();
auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days();
if (param == 2) {
delta_threshold = 0;
}
switch (param) {
// save all
case 0: {
return true;
}
// save xbox delta
case 1:
// save xbox base
case 2: {
auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
int16_t day_diff = _day_id - unseen_days;
auto show_right =
DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff];
auto click_right =
DownpourCtrFeatureValue::Click(value) * _time_decay_rates[day_diff];
if (ShowClickScore(show_right, click_right) >= base_threshold &&
DownpourCtrFeatureValue::DeltaScore(value) >= delta_threshold &&
day_diff <= delta_keep_days) {
// do this after save, because it must not be modified when retry
if (param == 2) {
DownpourCtrFeatureValue::DeltaScore(value) = 0;
}
return true;
} else {
return false;
}
}
// already decayed in shrink
case 3: {
// DownpourCtrFeatureValue::Show(value) *= _show_click_decay_rate;
// DownpourCtrFeatureValue::Click(value) *= _show_click_decay_rate;
// do this after save, because it must not be modified when retry
// DownpourCtrFeatureValue::UnseenDays(value)++;
return true;
}
default:
return true;
};
}
void DownpourCtrAccessor::UpdateStatAfterSave(float* value, int param) {
auto base_threshold = _config.ctr_accessor_param().base_threshold();
auto delta_threshold = _config.ctr_accessor_param().delta_threshold();
auto delta_keep_days = _config.ctr_accessor_param().delta_keep_days();
if (param == 2) {
delta_threshold = 0;
}
switch (param) {
case 1: {
auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
int16_t day_diff = _day_id - unseen_days;
auto show_right =
DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff];
auto click_right =
DownpourCtrFeatureValue::Click(value) * _time_decay_rates[day_diff];
if (ShowClickScore(show_right, click_right) >= base_threshold &&
DownpourCtrFeatureValue::DeltaScore(value) >= delta_threshold &&
day_diff <= delta_keep_days) {
DownpourCtrFeatureValue::DeltaScore(value) = 0;
}
}
return;
// case 3:
// {
// DownpourCtrFeatureValue::UnseenDays(value)++;
// }
// return;
default:
return;
};
}
int32_t DownpourCtrAccessor::Create(float** values, size_t num) {
auto embedx_dim = _config.embedx_dim();
for (size_t value_item = 0; value_item < num; ++value_item) {
float* value = values[value_item];
value[DownpourCtrFeatureValue::UnseenDaysIndex()] = 0;
value[DownpourCtrFeatureValue::DeltaScoreIndex()] = 0;
value[DownpourCtrFeatureValue::ShowIndex()] = 0;
value[DownpourCtrFeatureValue::ClickIndex()] = 0;
value[DownpourCtrFeatureValue::SlotIndex()] = -1;
_embed_sgd_rule->InitValue(
value + DownpourCtrFeatureValue::EmbedWIndex(),
value + DownpourCtrFeatureValue::EmbedG2SumIndex(), true);
_embedx_sgd_rule->InitValue(
value + DownpourCtrFeatureValue::EmbedxWIndex(),
value + DownpourCtrFeatureValue::EmbedxG2SumIndex());
}
return 0;
}
bool DownpourCtrAccessor::NeedExtendMF(float* value) {
float show = value[DownpourCtrFeatureValue::ShowIndex()];
float click = value[DownpourCtrFeatureValue::ClickIndex()];
// float score = (show - click) * _config.ctr_accessor_param().nonclk_coeff()
float score = (show - click) * _config.ctr_accessor_param().nonclk_coeff() +
click * _config.ctr_accessor_param().click_coeff();
//+ click * _config.ctr_accessor_param().click_coeff();
return score >= _config.embedx_threshold();
}
bool DownpourCtrAccessor::HasMF(size_t size) {
return size > DownpourCtrFeatureValue::EmbedxG2SumIndex();
}
// from DownpourCtrFeatureValue to DownpourCtrPullValue
int32_t DownpourCtrAccessor::Select(float** select_values, const float** values,
size_t num) {
auto embedx_dim = _config.embedx_dim();
for (size_t value_item = 0; value_item < num; ++value_item) {
float* select_value = select_values[value_item];
float* value = const_cast<float*>(values[value_item]);
select_value[DownpourCtrPullValue::ShowIndex()] =
value[DownpourCtrFeatureValue::ShowIndex()];
select_value[DownpourCtrPullValue::ClickIndex()] =
value[DownpourCtrFeatureValue::ClickIndex()];
select_value[DownpourCtrPullValue::EmbedWIndex()] =
value[DownpourCtrFeatureValue::EmbedWIndex()];
memcpy(select_value + DownpourCtrPullValue::EmbedxWIndex(),
value + DownpourCtrFeatureValue::EmbedxWIndex(),
embedx_dim * sizeof(float));
}
return 0;
}
// from DownpourCtrPushValue to DownpourCtrPushValue
// first dim: item
// second dim: field num
int32_t DownpourCtrAccessor::Merge(float** update_values,
const float** other_update_values,
size_t num) {
auto embedx_dim = _config.embedx_dim();
size_t total_dim = DownpourCtrPushValue::Dim(embedx_dim);
for (size_t value_item = 0; value_item < num; ++value_item) {
float* update_value = update_values[value_item];
const float* other_update_value = other_update_values[value_item];
for (auto i = 0u; i < total_dim; ++i) {
if (i != DownpourCtrPushValue::SlotIndex()) {
update_value[i] += other_update_value[i];
}
}
}
return 0;
}
// from DownpourCtrPushValue to DownpourCtrFeatureValue
// first dim: item
// second dim: field num
int32_t DownpourCtrAccessor::Update(float** update_values,
const float** push_values, size_t num) {
auto embedx_dim = _config.embedx_dim();
for (size_t value_item = 0; value_item < num; ++value_item) {
float* update_value = update_values[value_item];
const float* push_value = push_values[value_item];
float push_show = push_value[DownpourCtrPushValue::ShowIndex()];
float push_click = push_value[DownpourCtrPushValue::ClickIndex()];
float slot = push_value[DownpourCtrPushValue::SlotIndex()];
update_value[DownpourCtrFeatureValue::ShowIndex()] += push_show;
update_value[DownpourCtrFeatureValue::ClickIndex()] += push_click;
update_value[DownpourCtrFeatureValue::SlotIndex()] = slot;
update_value[DownpourCtrFeatureValue::DeltaScoreIndex()] +=
(push_show - push_click) * _config.ctr_accessor_param().nonclk_coeff() +
push_click * _config.ctr_accessor_param().click_coeff();
//(push_show - push_click) * _config.ctr_accessor_param().nonclk_coeff() +
// push_click * _config.ctr_accessor_param().click_coeff();
update_value[DownpourCtrFeatureValue::UnseenDaysIndex()] = 0;
_embed_sgd_rule->UpdateValue(
update_value + DownpourCtrFeatureValue::EmbedWIndex(),
update_value + DownpourCtrFeatureValue::EmbedG2SumIndex(),
push_value + DownpourCtrPushValue::EmbedGIndex(), push_show);
_embedx_sgd_rule->UpdateValue(
update_value + DownpourCtrFeatureValue::EmbedxWIndex(),
update_value + DownpourCtrFeatureValue::EmbedxG2SumIndex(),
push_value + DownpourCtrPushValue::EmbedxGIndex(), push_show);
}
return 0;
}
bool DownpourCtrAccessor::CreateValue(int stage, const float* value) {
// stage == 0, pull
// stage == 1, push
if (stage == 0) {
return true;
} else if (stage == 1) {
auto show = DownpourCtrPushValue::Show(const_cast<float*>(value));
auto click = DownpourCtrPushValue::Click(const_cast<float*>(value));
auto score = ShowClickScore(show, click);
if (score <= 0) {
return false;
}
if (score >= 1) {
return true;
}
return local_uniform_real_distribution<float>()(local_random_engine()) <
score;
} else {
return true;
}
}
float DownpourCtrAccessor::ShowClickScore(float show, float click) {
// auto nonclk_coeff = _config.ctr_accessor_param().nonclk_coeff();
// auto click_coeff = _config.ctr_accessor_param().click_coeff();
auto nonclk_coeff = _config.ctr_accessor_param().nonclk_coeff();
auto click_coeff = _config.ctr_accessor_param().click_coeff();
return (show - click) * nonclk_coeff + click * click_coeff;
}
std::string DownpourCtrAccessor::ParseToString(const float* v, int param_size) {
thread_local std::ostringstream os;
os.clear();
os.str("");
os << v[0] << " " << v[1] << " " << v[2] << " " << v[3] << " " << v[4] << " "
<< v[5] << " " << v[6];
auto show = DownpourCtrFeatureValue::Show(const_cast<float*>(v));
auto click = DownpourCtrFeatureValue::Click(const_cast<float*>(v));
auto score = ShowClickScore(show, click);
if (score >= _config.embedx_threshold() && param_size > 7) {
os << " " << v[7];
for (auto i = 0; i < _config.embedx_dim(); ++i) {
os << " " << v[8 + i];
}
}
return os.str();
}
int DownpourCtrAccessor::ParseFromString(const std::string& str, float* value) {
int embedx_dim = _config.embedx_dim();
float data_buff[_accessor_info.dim];
float* data_buff_ptr = data_buff;
_embedx_sgd_rule->InitValue(
data_buff_ptr + DownpourCtrFeatureValue::EmbedxWIndex(),
data_buff_ptr + DownpourCtrFeatureValue::EmbedxG2SumIndex());
auto str_len = paddle::string::str_to_float(str.data(), data_buff_ptr);
CHECK(str_len >= 6) << "expect more than 6 real:" << str_len;
// no slot, embedx
int value_dim = _accessor_info.dim;
int embedx_g2sum_index = DownpourCtrFeatureValue::EmbedxG2SumIndex();
value[DownpourCtrFeatureValue::SlotIndex()] = -1;
// other case
if (str_len == (value_dim - 1)) {
memcpy(value, data_buff_ptr, (embedx_g2sum_index - 1) * sizeof(float));
memcpy(value + embedx_g2sum_index, data_buff_ptr + embedx_g2sum_index - 1,
(embedx_dim + 1) * sizeof(float));
} else {
memcpy(value, data_buff_ptr, str_len * sizeof(float));
}
if (str_len == (value_dim - 1) || str_len == 6) {
str_len += 1;
}
return str_len;
}
void DownpourCtrAccessor::set_time_decay_rates() {
//根据unseen_days的天数来初始化_time_decay_rates大小和对应的衰减率
auto delete_after_unseen_days =
_config.ctr_accessor_param().delete_after_unseen_days();
_time_decay_rates.assign(delete_after_unseen_days + 1, 0.0);
for (int i = 0; i <= delete_after_unseen_days; ++i) {
_time_decay_rates[i] = pow(_show_click_decay_rate, i);
}
}
void DownpourCtrAccessor::update_time_decay(float* value,
bool is_update_seen_day) {
// 根据day_id 来进行show click 衰减和unseen_day 更新;unseen_day
// 为上次出现的dayid
if (_day_id == 0) {
return;
}
auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
if (unseen_days == 0) {
DownpourCtrFeatureValue::UnseenDays(value) = _day_id;
return;
}
// for the origin load (unseenday = 0 -15)
if (unseen_days < _config.ctr_accessor_param().delete_after_unseen_days()) {
// pull
if (is_update_seen_day) {
DownpourCtrFeatureValue::UnseenDays(value) = _day_id;
return;
// save 舍弃原始的unseenday,都变为上一天出现,保证show/click不被重复decay
} else {
DownpourCtrFeatureValue::UnseenDays(value) = _day_id - 1;
}
}
int16_t day_diff = _day_id - unseen_days;
if (day_diff < 0) {
DownpourCtrFeatureValue::UnseenDays(value) = _day_id;
return;
}
if (day_diff >= _config.ctr_accessor_param().delete_after_unseen_days()) {
return;
}
DownpourCtrFeatureValue::Show(value) *= _time_decay_rates[day_diff];
DownpourCtrFeatureValue::Click(value) *= _time_decay_rates[day_diff];
if (is_update_seen_day) {
DownpourCtrFeatureValue::UnseenDays(value) = _day_id;
}
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/sparse_sgd_rule.h"
namespace paddle {
namespace distributed {
/**
* @brief Accessor for unit
**/
class DownpourCtrAccessor : public ValueAccessor {
public:
struct DownpourCtrFeatureValue {
/*
float unseen_days;
float delta_score;
float show;
float click;
float embed_w;
float embed_g2sum;
float slot;
float embedx_g2sum;
std::vector<float> embedx_w;
*/
static int Dim(int embedx_dim) { return 8 + embedx_dim; }
static int DimSize(size_t dim, int embedx_dim) { return sizeof(float); }
static int Size(int embedx_dim) { return Dim(embedx_dim) * sizeof(float); }
static int UnseenDaysIndex() { return 0; }
static int DeltaScoreIndex() {
return DownpourCtrFeatureValue::UnseenDaysIndex() + 1;
}
static int ShowIndex() {
return DownpourCtrFeatureValue::DeltaScoreIndex() + 1;
}
static int ClickIndex() { return DownpourCtrFeatureValue::ShowIndex() + 1; }
static int EmbedWIndex() {
return DownpourCtrFeatureValue::ClickIndex() + 1;
}
static int EmbedG2SumIndex() {
return DownpourCtrFeatureValue::EmbedWIndex() + 1;
}
static int SlotIndex() {
return DownpourCtrFeatureValue::EmbedG2SumIndex() + 1;
}
static int EmbedxG2SumIndex() {
return DownpourCtrFeatureValue::SlotIndex() + 1;
}
static int EmbedxWIndex() {
return DownpourCtrFeatureValue::EmbedxG2SumIndex() + 1;
}
static float& UnseenDays(float* val) {
return val[DownpourCtrFeatureValue::UnseenDaysIndex()];
}
static float& DeltaScore(float* val) {
return val[DownpourCtrFeatureValue::DeltaScoreIndex()];
}
static float& Show(float* val) {
return val[DownpourCtrFeatureValue::ShowIndex()];
}
static float& Click(float* val) {
return val[DownpourCtrFeatureValue::ClickIndex()];
}
static float& Slot(float* val) {
return val[DownpourCtrFeatureValue::SlotIndex()];
}
static float& EmbedW(float* val) {
return val[DownpourCtrFeatureValue::EmbedWIndex()];
}
static float& EmbedG2Sum(float* val) {
return val[DownpourCtrFeatureValue::EmbedG2SumIndex()];
}
static float& EmbedxG2Sum(float* val) {
return val[DownpourCtrFeatureValue::EmbedxG2SumIndex()];
}
static float* EmbedxW(float* val) {
return (val + DownpourCtrFeatureValue::EmbedxWIndex());
}
};
struct DownpourCtrPushValue {
/*
float slot;
float show;
float click;
float embed_g;
std::vector<float> embedx_g;
*/
static int Dim(int embedx_dim) { return 4 + embedx_dim; }
static int DimSize(int dim, int embedx_dim) { return sizeof(float); }
static int Size(int embedx_dim) { return Dim(embedx_dim) * sizeof(float); }
static int SlotIndex() { return 0; }
static int ShowIndex() { return DownpourCtrPushValue::SlotIndex() + 1; }
static int ClickIndex() { return DownpourCtrPushValue::ShowIndex() + 1; }
static int EmbedGIndex() { return DownpourCtrPushValue::ClickIndex() + 1; }
static int EmbedxGIndex() {
return DownpourCtrPushValue::EmbedGIndex() + 1;
}
static float& Slot(float* val) { return val[0]; }
static float& Show(float* val) { return val[1]; }
static float& Click(float* val) { return val[2]; }
static float& EmbedG(float* val) { return val[3]; }
static float* EmbedxG(float* val) { return val + 4; }
};
struct DownpourCtrPullValue {
/*
float show;
float click;
float embed_w;
std::vector<float> embedx_w;
*/
static int Dim(int embedx_dim) { return 3 + embedx_dim; }
static int DimSize(size_t dim) { return sizeof(float); }
static int Size(int embedx_dim) { return Dim(embedx_dim) * sizeof(float); }
static int ShowIndex() { return 0; }
static int ClickIndex() { return 1; }
static int EmbedWIndex() { return 2; }
static int EmbedxWIndex() { return 3; }
static float& Show(float* val) {
return val[DownpourCtrPullValue::ShowIndex()];
}
static float& Click(float* val) {
return val[DownpourCtrPullValue::ClickIndex()];
}
static float& EmbedW(float* val) {
return val[DownpourCtrPullValue::EmbedWIndex()];
}
static float* EmbedxW(float* val) {
return val + DownpourCtrPullValue::EmbedxWIndex();
}
};
DownpourCtrAccessor() {}
virtual ~DownpourCtrAccessor() {}
virtual int Initialize();
// 初始化AccessorInfo
virtual void InitAccessorInfo();
// 判断该value是否进行shrink
virtual bool Shrink(float* value);
// 判断该value是否保存到ssd
virtual bool save_ssd(float* value);
virtual bool NeedExtendMF(float* value);
virtual bool HasMF(size_t size);
// 判断该value是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
// param = 0, save all feature
// param = 1, save delta feature
// param = 3, save all feature with time decay
virtual bool Save(float* value, int param) override;
// update delta_score and unseen_days after save
virtual void UpdateStatAfterSave(float* value, int param) override;
// virtual bool save_cache(float* value, int param, double
// global_cache_threshold) override;
// keys不存在时,为values生成随机值
// 要求value的内存由外部调用者分配完毕
virtual int32_t Create(float** value, size_t num);
// 从values中选取到select_values中
virtual int32_t Select(float** select_values, const float** values,
size_t num);
// 将update_values聚合到一起
virtual int32_t Merge(float** update_values,
const float** other_update_values, size_t num);
// 将update_values聚合到一起,通过it.next判定是否进入下一个key
// virtual int32_t Merge(float** update_values, iterator it);
// 将update_values更新应用到values中
virtual int32_t Update(float** values, const float** update_values,
size_t num);
virtual std::string ParseToString(const float* value, int param) override;
virtual int32_t ParseFromString(const std::string& str, float* v) override;
virtual bool CreateValue(int type, const float* value);
//这个接口目前只用来取show
virtual float GetField(float* value, const std::string& name) override {
CHECK(name == "show");
if (name == "show") {
auto unseen_days = DownpourCtrFeatureValue::UnseenDays(value);
int16_t day_diff = _day_id - unseen_days;
auto show_right =
DownpourCtrFeatureValue::Show(value) * _time_decay_rates[day_diff];
return (float)show_right;
}
return 0.0;
}
// DEFINE_GET_INDEX(DownpourCtrFeatureValue, show)
// DEFINE_GET_INDEX(DownpourCtrFeatureValue, click)
// DEFINE_GET_INDEX(DownpourCtrFeatureValue, embed_w)
// DEFINE_GET_INDEX(DownpourCtrFeatureValue, embedx_w)
virtual void update_time_decay(float* value, bool is_update_seen_day);
virtual void set_day_id(int day_id);
virtual int get_day_id();
bool test_func() { return false; }
private:
float ShowClickScore(float show, float click);
void set_time_decay_rates();
private:
SparseValueSGDRule* _embed_sgd_rule;
SparseValueSGDRule* _embedx_sgd_rule;
float _show_click_decay_rate;
int32_t _ssd_unseenday_threshold;
std::vector<double> _time_decay_rates;
int _day_id;
};
} // namespace distributed
} // namespace paddle
......@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/ps/table/common_dense_table.h"
#include "paddle/fluid/distributed/ps/table/memory_dense_table.h"
#include "paddle/fluid/platform/enforce.h"
......@@ -21,7 +21,7 @@ namespace distributed {
int FLAGS_pslib_table_save_max_retry_dense = 3;
void CommonDenseTable::CreateInitializer(const std::string& attr,
void MemoryDenseTable::CreateInitializer(const std::string& attr,
const std::string& name) {
auto slices = string::split_string<std::string>(attr, "&");
......@@ -39,7 +39,7 @@ void CommonDenseTable::CreateInitializer(const std::string& attr,
}
}
int32_t CommonDenseTable::Initialize() {
int32_t MemoryDenseTable::Initialize() {
_shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
......@@ -54,7 +54,7 @@ int32_t CommonDenseTable::Initialize() {
return 0;
}
int32_t CommonDenseTable::InitializeValue() {
int32_t MemoryDenseTable::InitializeValue() {
auto common = _config.common();
int size = static_cast<int>(common.params().size());
values_.resize(size);
......@@ -92,14 +92,14 @@ int32_t CommonDenseTable::InitializeValue() {
param_col_ids_.insert(param_col_ids_.begin() + 1, -1);
}
VLOG(1) << "CommonDenseTable::InitializeValue total dim: " << total_dim_
VLOG(1) << "MemoryDenseTable::InitializeValue total dim: " << total_dim_
<< " fixed_len_params_dim: " << fixed_len_params_dim_;
pull_reservoir_ = ReservoirValue<float>(param_dim_);
return 0;
}
int32_t CommonDenseTable::InitializeOptimizer() {
int32_t MemoryDenseTable::InitializeOptimizer() {
auto common = _config.common();
auto name = common.name();
auto attrs = common.attributes();
......@@ -124,19 +124,19 @@ int32_t CommonDenseTable::InitializeOptimizer() {
return 0;
}
int32_t CommonDenseTable::SetGlobalLR(float* lr) {
int32_t MemoryDenseTable::SetGlobalLR(float* lr) {
_global_lr = lr;
optimizer_->SetGlobalLR(_global_lr);
return 0;
}
int32_t CommonDenseTable::Pull(TableContext& context) {
int32_t MemoryDenseTable::Pull(TableContext& context) {
CHECK(context.value_type == Dense);
float* pull_values = context.pull_context.values;
return PullDense(pull_values, context.num);
}
int32_t CommonDenseTable::Push(TableContext& context) {
int32_t MemoryDenseTable::Push(TableContext& context) {
CHECK(context.value_type == Dense);
if (context.push_context.values != nullptr) {
if (!context.push_context.is_param) {
......@@ -148,13 +148,13 @@ int32_t CommonDenseTable::Push(TableContext& context) {
return 0;
}
int32_t CommonDenseTable::PullDense(float* pull_values, size_t num) {
int32_t MemoryDenseTable::PullDense(float* pull_values, size_t num) {
std::copy(values_[param_idx_].begin(), values_[param_idx_].end(),
pull_values);
return 0;
}
int32_t CommonDenseTable::PushDenseParam(const float* values, size_t num) {
int32_t MemoryDenseTable::PushDenseParam(const float* values, size_t num) {
PADDLE_ENFORCE_GE(
num, param_dim_,
paddle::platform::errors::InvalidArgument(
......@@ -163,14 +163,14 @@ int32_t CommonDenseTable::PushDenseParam(const float* values, size_t num) {
return 0;
}
int32_t CommonDenseTable::Pour() {
int32_t MemoryDenseTable::Pour() {
pull_reservoir_.avg();
_PushDense(pull_reservoir_.values.data(), pull_reservoir_.values.size());
pull_reservoir_.reset();
return 0;
}
int32_t CommonDenseTable::PushDense(const float* values, size_t num) {
int32_t MemoryDenseTable::PushDense(const float* values, size_t num) {
if (sync) {
std::future<int> task =
_shards_task_pool[0]->enqueue([this, &values]() -> int {
......@@ -184,7 +184,7 @@ int32_t CommonDenseTable::PushDense(const float* values, size_t num) {
return 0;
}
int32_t CommonDenseTable::_PushDense(const float* values, size_t num) {
int32_t MemoryDenseTable::_PushDense(const float* values, size_t num) {
PADDLE_ENFORCE_GE(
num, param_dim_,
paddle::platform::errors::InvalidArgument(
......@@ -206,11 +206,11 @@ int32_t CommonDenseTable::_PushDense(const float* values, size_t num) {
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
VLOG(2) << "debug CommonDenseTable::_push_dense done";
VLOG(2) << "debug MemoryDenseTable::_push_dense done";
return 0;
}
int32_t CommonDenseTable::Load(const std::string& path,
int32_t MemoryDenseTable::Load(const std::string& path,
const std::string& param) {
if (param_dim_ <= 0) {
return 0;
......@@ -281,7 +281,7 @@ int32_t CommonDenseTable::Load(const std::string& path,
continue;
}
values_[param_col_ids_[col_idx]][dim_idx] = data_buffer[col_idx];
VLOG(2) << "CommonDenseTable::load param x: "
VLOG(2) << "MemoryDenseTable::load param x: "
<< param_col_ids_[col_idx] << " y: " << dim_idx
<< " value: " << values_[param_col_ids_[col_idx]][dim_idx]
<< " line " << file_dim_idx;
......@@ -318,11 +318,11 @@ int32_t CommonDenseTable::Load(const std::string& path,
return 0;
}
int32_t CommonDenseTable::Save(const std::string& path,
int32_t MemoryDenseTable::Save(const std::string& path,
const std::string& param) {
int save_param = atoi(param.c_str());
uint32_t feasign_size;
VLOG(0) << "CommonDenseTable::save path " << path;
VLOG(0) << "MemoryDenseTable::save path " << path;
FsChannelConfig channel_config;
if (_config.compress_in_save()) {
......@@ -356,7 +356,7 @@ int32_t CommonDenseTable::Save(const std::string& path,
for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x];
auto& dim = common.dims()[x];
VLOG(3) << "CommonDenseTable::save dim " << x << " size: " << dim;
VLOG(3) << "MemoryDenseTable::save dim " << x << " size: " << dim;
for (int y = 0; y < dim; ++y) {
os.clear();
os.str("");
......
......@@ -30,10 +30,10 @@ namespace distributed {
class DenseOptimizer;
class CommonDenseTable : public Table {
class MemoryDenseTable : public Table {
public:
CommonDenseTable() {}
virtual ~CommonDenseTable() {}
MemoryDenseTable() {}
virtual ~MemoryDenseTable() {}
int32_t Initialize() override;
int32_t InitializeShard() override { return 0; }
void CreateInitializer(const std::string& attr, const std::string& name);
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h"
namespace paddle {
namespace distributed {
int32_t SparseGeoTable::PullGeoParam(const uint32_t trainer_id,
std::vector<float>* values,
std::vector<uint64_t>* ids) {
geo_recorder->GetAndClear(trainer_id, ids);
auto dim = _config.common().dims()[0];
std::vector<uint32_t> frequencies;
frequencies.resize(ids->size(), 1);
auto pull_value = PullSparseValue(ids->size(), dim);
pull_value.is_training_ = true;
pull_value.feasigns_ = ids->data();
pull_value.frequencies_ = frequencies.data();
values->resize(ids->size() * dim);
CommonSparseTable::PullSparse(values->data(), pull_value);
return 0;
}
int32_t SparseGeoTable::PushSparse(const uint64_t* keys, const float* values,
size_t num) {
std::vector<uint64_t> ids;
ids.resize(num);
std::copy_n(keys, num, ids.begin());
geo_recorder->Update(ids);
CommonSparseTable::PushSparse(keys, values, num);
return 0;
}
int32_t SparseGeoTable::InitializeValue() {
auto common = _config.common();
shard_values_.reserve(task_pool_size_);
for (int x = 0; x < task_pool_size_; ++x) {
auto shard = std::make_shared<ValueBlock>(
value_names_, value_dims_, value_offsets_, value_idx_,
initializer_attrs_, common.entry());
shard_values_.emplace_back(shard);
}
auto accessor = _config.accessor();
std::vector<uint64_t> feasigns;
for (size_t x = 0; x < accessor.fea_dim(); ++x) {
if (x % _shard_num == _shard_idx) {
feasigns.push_back(x);
}
}
VLOG(3) << "has " << feasigns.size() << " ids need to be pre inited";
auto buckets = bucket(feasigns.size(), 10);
for (int x = 0; x < 10; ++x) {
auto bucket_feasigns = buckets[x + 1] - buckets[x];
std::vector<uint64_t> ids(bucket_feasigns);
std::copy(feasigns.begin() + buckets[x], feasigns.begin() + buckets[x + 1],
ids.begin());
std::vector<uint32_t> fres;
fres.resize(ids.size(), 1);
auto pull_value = PullSparseValue(ids, fres, param_dim_);
std::vector<float> pulls;
pulls.resize(bucket_feasigns * param_dim_);
PullSparse(pulls.data(), pull_value);
}
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <assert.h>
#include <pthread.h>
#include <stdint.h>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <utility>
#include <vector>
#include "Eigen/Dense"
#include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/common_table.h"
#include "paddle/fluid/distributed/ps/table/depends/geo_recorder.h"
#include "paddle/fluid/distributed/ps/table/depends/initializers.h"
#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/ps/table/depends/sparse.h"
#include "paddle/fluid/string/string_helper.h"
#include "paddle/phi/core/utils/rw_lock.h"
namespace paddle {
namespace distributed {
class GeoRecorder;
class SparseGeoTable : public CommonSparseTable {
public:
explicit SparseGeoTable() : CommonSparseTable() { geo_recorder = nullptr; }
virtual ~SparseGeoTable() {}
virtual int32_t InitializeValue();
int32_t PullGeoParam(const uint32_t trainer_id, std::vector<float>* values,
std::vector<uint64_t>* keys);
int32_t PushSparse(const uint64_t* keys, const float* values,
size_t num) override;
virtual int32_t InitializeRecorder() {
if (!geo_recorder) {
auto trainers = _config.common().trainer_num();
geo_recorder = std::make_shared<GeoRecorder>(trainers);
}
return 0;
}
private:
std::shared_ptr<GeoRecorder> geo_recorder;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/distributed/ps/table/ssd_sparse_table.h"
DEFINE_string(rocksdb_path, "database", "path of sparse table rocksdb file");
namespace paddle {
namespace distributed {
int32_t SSDSparseTable::Initialize() {
_shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
_global_lr = new float(1.0);
auto common = _config.common();
int size = static_cast<int>(common.params().size());
size_t offset = 0;
for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x];
auto& dim = common.dims()[x];
value_idx_[varname] = x;
value_names_.push_back(varname);
value_dims_.push_back(dim);
value_offsets_.push_back(offset);
initializer_attrs_.push_back(common.initializers()[x]);
if (varname == "Param") {
param_dim_ = dim;
param_offset_ = offset;
}
offset += dim;
}
InitializeValue();
InitializeOptimizer();
InitializeRecorder();
_db = paddle::distributed::RocksDBHandler::GetInstance();
_db->initialize(FLAGS_rocksdb_path, task_pool_size_);
return 0;
}
int32_t SSDSparseTable::Pull(TableContext& context) {
CHECK(context.value_type == Sparse);
if (context.use_ptr) {
char** pull_values = context.pull_context.ptr_values;
const uint64_t* keys = context.pull_context.keys;
return PullSparsePtr(pull_values, keys, context.num);
} else {
float* pull_values = context.pull_context.values;
const PullSparseValue& pull_value = context.pull_context.pull_value;
return PullSparse(pull_values, pull_value);
}
}
int32_t SSDSparseTable::Push(TableContext& context) { return 0; }
int32_t SSDSparseTable::PullSparse(float* pull_values,
const PullSparseValue& pull_value) {
auto shard_num = task_pool_size_;
std::vector<std::future<int>> tasks(shard_num);
for (int shard_id = 0; shard_id < shard_num; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, shard_num, &pull_value, &pull_values]() -> int {
auto& block = shard_values_[shard_id];
std::vector<int> offsets;
pull_value.Fission(shard_id, shard_num, &offsets);
for (auto& offset : offsets) {
auto feasign = pull_value.feasigns_[offset];
auto frequencie = pull_value.frequencies_[offset];
float* embedding = nullptr;
auto iter = block->Find(feasign);
// in mem
if (iter == block->end()) {
embedding = iter->second->data_.data();
if (pull_value.is_training_) {
block->AttrUpdate(iter->second, frequencie);
}
} else {
// need create
std::string tmp_str("");
if (_db->get(shard_id, (char*)&feasign, sizeof(uint64_t),
tmp_str) > 0) {
embedding = block->Init(feasign, true, frequencie);
} else {
// in db
int data_size = tmp_str.size() / sizeof(float);
int value_size = block->value_length_;
float* db_value = (float*)const_cast<char*>(tmp_str.c_str());
VALUE* value = block->InitGet(feasign);
// copy to mem
memcpy(value->data_.data(), db_value,
value_size * sizeof(float));
embedding = db_value;
// param, count, unseen_day
value->count_ = db_value[value_size];
value->unseen_days_ = db_value[value_size + 1];
value->is_entry_ = db_value[value_size + 2];
if (pull_value.is_training_) {
block->AttrUpdate(value, frequencie);
}
}
}
std::copy_n(embedding + param_offset_, param_dim_,
pull_values + param_dim_ * offset);
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t SSDSparseTable::PullSparsePtr(char** pull_values, const uint64_t* keys,
size_t num) {
auto shard_num = task_pool_size_;
std::vector<std::future<int>> tasks(shard_num);
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
for (int shard_id = 0; shard_id < shard_num; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &pull_values, &offset_bucket]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (auto& offset : offsets) {
auto feasign = keys[offset];
auto iter = block->Find(feasign);
VALUE* value = nullptr;
// in mem
if (iter != block->end()) {
value = iter->second;
} else {
// need create
std::string tmp_str("");
if (_db->get(shard_id, (char*)&feasign, sizeof(uint64_t),
tmp_str) > 0) {
value = block->InitGet(feasign);
} else {
// in db
int data_size = tmp_str.size() / sizeof(float);
int value_size = block->value_length_;
float* db_value = (float*)const_cast<char*>(tmp_str.c_str());
value = block->InitGet(feasign);
// copy to mem
memcpy(value->data_.data(), db_value,
value_size * sizeof(float));
// param, count, unseen_day
value->count_ = db_value[value_size];
value->unseen_days_ = db_value[value_size + 1];
value->is_entry_ = db_value[value_size + 2];
}
}
pull_values[offset] = (char*)value;
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
int32_t SSDSparseTable::Shrink(const std::string& param) { return 0; }
int32_t SSDSparseTable::UpdateTable() {
int count = 0;
int value_size = shard_values_[0]->value_length_;
int db_size = 3 + value_size;
float tmp_value[db_size];
for (size_t i = 0; i < task_pool_size_; ++i) {
auto& block = shard_values_[i];
for (auto& table : block->values_) {
for (auto iter = table.begin(); iter != table.end();) {
VALUE* value = iter->second;
if (value->unseen_days_ >= 1) {
tmp_value[value_size] = value->count_;
tmp_value[value_size + 1] = value->unseen_days_;
tmp_value[value_size + 2] = value->is_entry_;
memcpy(tmp_value, value->data_.data(), sizeof(float) * value_size);
_db->put(i, (char*)&(iter->first), sizeof(uint64_t), (char*)tmp_value,
db_size * sizeof(float));
count++;
butil::return_object(iter->second);
iter = table.erase(iter);
} else {
++iter;
}
}
}
_db->flush(i);
}
VLOG(1) << "Table>> update count: " << count;
return 0;
}
int64_t SSDSparseTable::SaveValueToText(std::ostream* os,
std::shared_ptr<ValueBlock> block,
std::shared_ptr<::ThreadPool> pool,
const int mode, int shard_id) {
int64_t save_num = 0;
for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
++save_num;
std::stringstream ss;
auto* vs = value.second->data_.data();
auto id = value.first;
ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";
for (int i = 0; i < block->value_length_ - 1; i++) {
ss << std::to_string(vs[i]) << ",";
}
ss << std::to_string(vs[block->value_length_ - 1]);
ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second->need_save_ = false;
}
}
}
if (mode != 1) {
int value_size = block->value_length_;
auto* it = _db->get_iterator(shard_id);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
float* value = (float*)const_cast<char*>(it->value().data());
std::stringstream ss;
ss << *((uint64_t*)const_cast<char*>(it->key().data())) << "\t"
<< value[value_size] << "\t" << value[value_size + 1] << "\t"
<< value[value_size + 2] << "\t";
for (int i = 0; i < block->value_length_ - 1; i++) {
ss << std::to_string(value[i]) << ",";
}
ss << std::to_string(value[block->value_length_ - 1]);
ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
}
}
return save_num;
}
int32_t SSDSparseTable::Load(const std::string& path,
const std::string& param) {
rwlock_->WRLock();
VLOG(3) << "ssd sparse table load with " << path << " with meta " << param;
LoadFromText(path, param, _shard_idx, _shard_num, task_pool_size_,
&shard_values_);
rwlock_->UNLock();
return 0;
}
int64_t SSDSparseTable::LoadFromText(
const std::string& valuepath, const std::string& metapath,
const int pserver_id, const int pserver_num, const int local_shard_num,
std::vector<std::shared_ptr<ValueBlock>>* blocks) {
Meta meta = Meta(metapath);
int num_lines = 0;
std::ifstream file(valuepath);
std::string line;
int value_size = shard_values_[0]->value_length_;
int db_size = 3 + value_size;
float tmp_value[db_size];
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
auto id = std::stoull(values[0]);
if (id % pserver_num != pserver_id) {
VLOG(3) << "will not load " << values[0] << " from " << valuepath
<< ", please check id distribution";
continue;
}
auto shard_id = id % local_shard_num;
auto block = blocks->at(shard_id);
std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, id, &kvalues);
block->Init(id, false);
VALUE* value_instant = block->GetValue(id);
if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
value_instant->is_entry_ = static_cast<bool>(std::stoi(values[3]));
}
std::vector<float*> block_values = block->Get(id, meta.names, meta.dims);
auto blas = GetBlas<float>();
for (int x = 0; x < meta.names.size(); ++x) {
blas.VCOPY(meta.dims[x], kvalues[x].data(), block_values[x]);
}
VLOG(3) << "loading: " << id
<< "unseen day: " << value_instant->unseen_days_;
if (value_instant->unseen_days_ >= 1) {
tmp_value[value_size] = value_instant->count_;
tmp_value[value_size + 1] = value_instant->unseen_days_;
tmp_value[value_size + 2] = value_instant->is_entry_;
memcpy(tmp_value, value_instant->data_.data(),
sizeof(float) * value_size);
_db->put(shard_id, (char*)&(id), sizeof(uint64_t), (char*)tmp_value,
db_size * sizeof(float));
block->erase(id);
}
}
return 0;
}
} // namespace ps
} // namespace paddle
#endif
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/depends/rocksdb_warpper.h"
#ifdef PADDLE_WITH_HETERPS
namespace paddle {
namespace distributed {
class SSDSparseTable : public CommonSparseTable {
public:
SSDSparseTable() {}
virtual ~SSDSparseTable() {}
virtual int32_t Initialize() override;
void SaveMetaToText(std::ostream* os, const CommonAccessorParameter& common,
const size_t shard_idx, const int64_t total);
int64_t SaveValueToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
std::shared_ptr<::ThreadPool> pool, const int mode,
int shard_id);
virtual int64_t LoadFromText(
const std::string& valuepath, const std::string& metapath,
const int pserver_id, const int pserver_num, const int local_shard_num,
std::vector<std::shared_ptr<ValueBlock>>* blocks);
virtual int32_t Load(const std::string& path, const std::string& param);
// exchange data
virtual int32_t UpdateTable();
virtual int32_t Pull(TableContext& context);
virtual int32_t Push(TableContext& context);
virtual int32_t PullSparse(float* values, const PullSparseValue& pull_value);
virtual int32_t PullSparsePtr(char** pull_values, const uint64_t* keys,
size_t num);
virtual int32_t Flush() override { return 0; }
virtual int32_t Shrink(const std::string& param) override;
virtual void Clear() override {}
private:
RocksDBHandler* _db;
int64_t _cache_tk_size;
};
} // namespace ps
} // namespace paddle
#endif
......@@ -17,15 +17,11 @@
#include "glog/logging.h"
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps/table/common_dense_table.h"
#include "paddle/fluid/distributed/ps/table/common_graph_table.h"
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/memory_sparse_geo_table.h"
#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h"
#ifdef PADDLE_WITH_HETERPS
#include "paddle/fluid/distributed/ps/table/ssd_sparse_table.h"
#endif
#include "paddle/fluid/distributed/ps/table/memory_dense_table.h"
#include "paddle/fluid/distributed/ps/table/ctr_accessor.h"
#include "paddle/fluid/distributed/ps/table/memory_sparse_geo_table.h"
#include "paddle/fluid/distributed/ps/table/memory_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/sparse_accessor.h"
#include "paddle/fluid/distributed/ps/table/tensor_accessor.h"
......@@ -34,14 +30,11 @@
namespace paddle {
namespace distributed {
REGISTER_PSCORE_CLASS(Table, GraphTable);
REGISTER_PSCORE_CLASS(Table, CommonDenseTable);
REGISTER_PSCORE_CLASS(Table, CommonSparseTable);
REGISTER_PSCORE_CLASS(Table, MemoryDenseTable);
#ifdef PADDLE_WITH_HETERPS
REGISTER_PSCORE_CLASS(Table, SSDSparseTable);
REGISTER_PSCORE_CLASS(GraphSampler, CompleteGraphSampler);
REGISTER_PSCORE_CLASS(GraphSampler, BasicBfsGraphSampler);
#endif
REGISTER_PSCORE_CLASS(Table, SparseGeoTable);
REGISTER_PSCORE_CLASS(Table, BarrierTable);
REGISTER_PSCORE_CLASS(Table, TensorTable);
REGISTER_PSCORE_CLASS(Table, DenseTensorTable);
......
......@@ -63,7 +63,7 @@ void InitTensorsOnClient(framework::Scope* scope, platform::CPUPlace* place,
void GetDownpourDenseTableProto(
::paddle::distributed::TableParameter* dense_table_proto) {
dense_table_proto->set_table_id(0);
dense_table_proto->set_table_class("CommonDenseTable");
dense_table_proto->set_table_class("MemoryDenseTable");
dense_table_proto->set_shard_num(256);
dense_table_proto->set_type(::paddle::distributed::PS_DENSE_TABLE);
::paddle::distributed::TableAccessorParameter* accessor_proto =
......
......@@ -164,7 +164,7 @@ TEST(downpour_feature_value_accessor_test, test_update) {
for (auto i = 0u; i < item_size; ++i) {
float* p = new float[acc->GetAccessorInfo().update_dim];
for (auto j = 0u; j < acc->GetAccessorInfo().update_dim; ++j) {
p[j] = i;
p[j] = i + 1;
}
grad[i] = p;
}
......@@ -247,9 +247,9 @@ TEST(downpour_feature_value_accessor_test, test_update) {
v.delta_score += acc->ShowClickScore(push_v.show, push_v.click);
acc->_embed_sgd_rule->UpdateValue(&v.embed_w, &v.embed_g2sum[0],
&push_v.embed_g);
&push_v.embed_g, push_v.show);
acc->_embedx_sgd_rule->UpdateValue(&v.embedx_w[0], &v.embedx_g2sum[0],
&push_v.embedx_g[0]);
&push_v.embedx_g[0], push_v.show);
float* ptr = new float[acc->GetAccessorInfo().dim];
v.to_array(ptr, parameter.embedx_dim());
......
......@@ -16,22 +16,22 @@ limitations under the License. */
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/common_dense_table.h"
#include "paddle/fluid/distributed/ps/table/memory_dense_table.h"
namespace paddle {
namespace distributed {
// CommonDenseTable + Adam
// MemoryDenseTable + Adam
class Table;
TEST(CommonDenseTable, Adam) {
TEST(MemoryDenseTable, Adam) {
int fea_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("CommonDenseTable");
table_config.set_table_class("MemoryDenseTable");
FsClientParameter fs_config;
Table *table = new CommonDenseTable();
Table *table = new MemoryDenseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
......@@ -141,15 +141,15 @@ TEST(CommonDenseTable, Adam) {
}
}
// CommonDenseTable + Adam
TEST(CommonDenseTable, SGD) {
// MemoryDenseTable + Adam
TEST(MemoryDenseTable, SGD) {
int fea_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("CommonDenseTable");
table_config.set_table_class("MemoryDenseTable");
FsClientParameter fs_config;
Table *table = new CommonDenseTable();
Table *table = new MemoryDenseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
......
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/common_dense_table.h"
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/depends/sparse_utils.h"
#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/ps/table/table.h"
namespace paddle {
namespace distributed {
// SparseGeoTable + SSUM
TEST(SparseGeoTable, SSUM) {
int emb_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("SparseGeoTable");
FsClientParameter fs_config;
Table *table = new SparseGeoTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("sum");
common_config->set_table_name("ssum_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// test push_sparse_param, and create params
std::vector<uint64_t> init_keys = {0, 1, 2, 3, 4};
std::vector<uint32_t> init_fres = {1, 1, 1, 1, 1};
std::vector<float> init_values;
for (size_t i = 0; i < init_keys.size() * emb_dim; i++) {
init_values.push_back(0.0);
}
table->push_sparse_param(init_keys.data(), init_values.data(),
init_keys.size());
std::vector<float> pull_values(init_values.size());
auto value = PullSparseValue(init_keys, init_fres, emb_dim);
table->pull_sparse(pull_values.data(), value);
for (size_t i = 0; i < init_keys.size() * emb_dim; i++) {
ASSERT_TRUE(abs(pull_values[i] - init_values[i]) < 1e-5);
}
std::vector<std::vector<uint64_t>> trainer_keys;
std::vector<std::vector<float>> trainer_values;
trainer_keys.resize(trainers);
trainer_values.resize(trainers);
float start = 0.0;
for (int i = 0; i < trainers; i++) {
trainer_keys[i] = init_keys;
for (size_t j = 0; j < trainer_keys[i].size(); j++) {
auto id = trainer_keys[i][j];
for (int k = 0; k < emb_dim; k++) {
trainer_values[i].push_back(start);
pull_values[id * emb_dim + k] += start;
start += 0.1;
}
}
}
std::shared_ptr<::ThreadPool> pool_ =
std::make_shared<::ThreadPool>(trainers);
std::vector<std::future<void>> task_status;
for (int i = 0; i < trainers; i++) {
auto &push_keys = trainer_keys[i];
auto &push_values = trainer_values[i];
auto task = [table, &push_keys, &push_values] {
table->push_sparse(push_keys.data(), push_values.data(),
push_keys.size());
};
task_status.push_back(pool_->enqueue(std::move(task)));
}
for (auto &status : task_status) {
status.wait();
}
std::vector<std::vector<uint64_t>> geo_pull_ids;
std::vector<std::vector<float>> geo_pull_values;
geo_pull_ids.resize(trainers);
geo_pull_values.resize(trainers);
for (int i = 0; i < trainers; i++) {
table->pull_geo_param(i, &geo_pull_values[i], &geo_pull_ids[i]);
ASSERT_EQ(geo_pull_values[i].size(), geo_pull_ids[i].size() * emb_dim);
for (size_t j = 0; j < geo_pull_ids[i].size(); ++j) {
auto id = geo_pull_ids[i][j];
for (int k = 0; k < emb_dim; k++) {
ASSERT_TRUE(abs(geo_pull_values[i][j * emb_dim + k] -
pull_values[id * emb_dim + k]) < 1e-5);
}
}
}
}
} // namespace distributed
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/ps/table/table.h"
namespace paddle {
namespace distributed {
TEST(BENCHMARK, LargeScaleKV) {
int emb_dim = 10;
int trainers = 2;
float beta1 = 0.9;
float beta2 = 0.999;
float epsilon = 1.0e-8;
TableParameter table_config;
table_config.set_table_class("CommonSparseTable");
FsClientParameter fs_config;
Table *table = new CommonSparseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("adam");
common_config->set_table_name("adam_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("uniform_random&0&-1.0&1.0");
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Moment1");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Moment2");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Beta1Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Beta2Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
}
} // namespace distributed
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/common_dense_table.h"
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/ps/table/table.h"
namespace paddle {
namespace distributed {
// CommonSparseTable + SSGD
TEST(CommonSparseTable, SGD) {
int emb_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("CommonSparseTable");
FsClientParameter fs_config;
Table *table = new CommonSparseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("sgd");
common_config->set_table_name("sgd_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("uniform_random&0&-1.0&1.0"); // param
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0"); // learning_rate
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// pull parameters for create and check
std::vector<uint64_t> init_keys = {0, 1, 2, 3, 4};
std::vector<uint32_t> init_fres = {1, 1, 1, 1, 1};
std::vector<float> init_values;
init_values.resize(init_keys.size() * emb_dim);
std::vector<float> pull_values(init_values.size());
auto value = PullSparseValue(init_keys, init_fres, emb_dim);
table->pull_sparse(init_values.data(), value);
// for check
std::vector<float> total_gradients;
total_gradients.resize(init_keys.size() * emb_dim);
memset(total_gradients.data(), 0, sizeof(float) * total_gradients.size());
// push gradient
std::vector<std::vector<uint64_t>> trainer_keys;
std::vector<std::vector<float>> trainer_gradient_values;
trainer_keys.resize(trainers);
trainer_gradient_values.resize(trainers);
float start = 0.0;
for (int i = 0; i < trainers; i++) {
trainer_keys[i] = init_keys;
for (size_t j = 0; j < trainer_keys[i].size(); j++) {
auto id = trainer_keys[i][j];
for (int k = 0; k < emb_dim; k++) {
trainer_gradient_values[i].push_back(start);
total_gradients[id * emb_dim + k] += start;
start += 0.1;
}
}
}
std::shared_ptr<::ThreadPool> pool_ =
std::make_shared<::ThreadPool>(trainers);
std::vector<std::future<void>> task_status;
for (int i = 0; i < trainers; i++) {
auto &push_keys = trainer_keys[i];
auto &push_values = trainer_gradient_values[i];
auto task = [table, &push_keys, &push_values] {
table->push_sparse(push_keys.data(), push_values.data(),
push_keys.size());
};
task_status.push_back(pool_->enqueue(std::move(task)));
}
for (auto &status : task_status) {
status.wait();
}
std::vector<float> pull_values;
pull_values.resize(init_keys.size() * emb_dim);
table->pull_sparse(init_values.data(), value);
for (size_t i = 0; i < init_values.size(); ++i) {
auto update_val = init_values[i] - 1.0 * total_gradients[i];
ASSERT_TRUE(abs(update_val - pull_values[i]) < 1e-5);
}
}
// CommonSparseTable + Adam
TEST(CommonSparseTable, Adam) {
int emb_dim = 10;
int trainers = 2;
float beta1 = 0.9;
float beta2 = 0.999;
float epsilon = 1.0e-8;
TableParameter table_config;
table_config.set_table_class("CommonSparseTable");
FsClientParameter fs_config;
Table *table = new CommonSparseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("adam");
common_config->set_table_name("adam_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("uniform_random&0&-1.0&1.0");
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Moment1");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Moment2");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Beta1Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Beta2Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// pull parameters for create and check
std::vector<uint64_t> init_keys = {0, 1, 2, 3, 4};
std::vector<uint32_t> init_fres = {1, 1, 1, 1, 1};
std::vector<float> init_values;
init_values.resize(init_keys.size() * emb_dim);
auto value = PullSparseValue(init_keys, init_fres, emb_dim);
table->pull_sparse(init_values.data(), value);
// push gradient
std::vector<std::vector<uint64_t>> trainer_keys;
std::vector<std::vector<float>> trainer_gradient_values;
trainer_keys.resize(trainers);
trainer_gradient_values.resize(trainers);
float start = 0.0;
for (int i = 0; i < trainers; i++) {
trainer_keys[i] = init_keys;
for (size_t j = 0; j < trainer_keys[i].size(); j++) {
for (int k = 0; k < emb_dim; k++) {
trainer_gradient_values[i].push_back(start);
start += 0.1;
}
}
}
for (int i = 0; i < trainers; i++) {
auto &push_keys = trainer_keys[i];
auto &push_values = trainer_gradient_values[i];
table->push_sparse(push_keys.data(), push_values.data(), push_keys.size());
}
std::vector<float> pull_values;
pull_values.resize(init_keys.size() * emb_dim);
table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size());
for (size_t idx = 0; idx < init_keys.size(); idx += emb_dim) {
std::vector<float> beta1_pow, beta2_pow, lr, mom1, mom2, param;
beta1_pow.push_back(beta1);
beta2_pow.push_back(beta2);
lr.push_back(1.0);
for (int i = 0; i < emb_dim; i++) {
mom1.push_back(0.0);
mom2.push_back(0.0);
param.push_back(init_values[idx + i]);
}
for (int i = 0; i < trainers; i++) {
auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
for (int j = 0; j < emb_dim; j++) {
mom1[j] =
beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][idx + j];
mom2[j] = beta2 * mom2[j] +
(1 - beta2) * trainer_gradient_values[i][idx + j] *
trainer_gradient_values[i][idx + j];
param[j] = param[j] -
lr_ * (mom1[j] /
(sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0])));
}
beta1_pow[0] *= beta1;
beta2_pow[0] *= beta2;
}
for (int i = 0; i < emb_dim; i++) {
ASSERT_TRUE(abs(param[i] - pull_values[idx + i]) < 1e-5);
}
}
}
} // namespace distributed
} // namespace paddle
......@@ -14,18 +14,18 @@ limitations under the License. */
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/common_sparse_table.h"
#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/ps/table/memory_dense_table.h"
//#include "paddle/fluid/distributed/ps/table/sparse_geo_table.h"
namespace paddle {
namespace distributed {
TEST(Table, Initialize) {
TableParameter table_config;
table_config.set_table_class("SparseGeoTable");
table_config.set_table_class("MemoryDenseTable");
FsClientParameter fs_config;
// case 1. no accessor
Table *table = new SparseGeoTable();
Table *table = new MemoryDenseTable();
auto ret = table->Initialize(table_config, fs_config);
ASSERT_EQ(ret, -1);
}
......
......@@ -47,7 +47,7 @@ class SendOp : public framework::OperatorBase {
auto send_varnames = Attr<std::vector<std::string>>("send_varnames");
// for common_dense_table, distributed_push_sparse op for push sparse in
// for memory_dense_table, distributed_push_sparse op for push sparse in
// async
if (is_sparse == 0 && send_varnames.size() >= 1 &&
send_varnames[0] != "@PS_STEP_COUNTER@") {
......
......@@ -984,7 +984,7 @@ class TheOnePSRuntime(RuntimeBase):
table_proto.accessor)
else:
table.type = "PS_DENSE_TABLE"
table.table_class = "CommonDenseTable"
table.table_class = "MemoryDenseTable"
table.shard_num = 256
common.table_name = "MergedDense"
......
......@@ -665,7 +665,7 @@ class DenseTable(Table):
table_proto.table_id = ctx.table_id()
table_proto.type = the_one_ps_pb2.PS_DENSE_TABLE
table_proto.table_class = "CommonDenseTable"
table_proto.table_class = "MemoryDenseTable"
table_proto.shard_num = 256
table_proto.accessor.accessor_class = 'CommMergeAccessor'
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册