未验证 提交 e3975223 编写于 作者: T tangwei12 提交者: GitHub

fix large scale memory (#30035) (#30085)

* memory holder optimize

Change-Id: Ic91af8ac6f2853336d28a9fbbc5e8d0c57b5d05e

* memory holder optimize

Change-Id: I2fd1c14ecc17f5d5ce88b87890381ea801e6367f

* fix large scale memory holder

Change-Id: Ief0992b02b00220e16c72cc637a56e7b5788140f

* fix large scale memory holder

Change-Id: I910142a3952ead643a5604f8f80955f3e6efe655
上级 9a6926f5
...@@ -114,18 +114,18 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta, ...@@ -114,18 +114,18 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
} }
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block, int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const std::vector<std::string>& saved_names,
const int mode) { const int mode) {
for (auto value : block->values_) { for (auto value : block->values_) {
std::vector<std::vector<float>*> vss = value.second->get(saved_names); auto* vs = value.second->data_.data();
std::stringstream ss; std::stringstream ss;
auto id = value.first; auto id = value.first;
ss << id << "\t"; ss << id << "\t";
for (int i = 0; i < static_cast<int>(vss.size()); i++) {
auto& vs = vss[i]; for (int i = 0; i < block->value_length_; i++) {
ss << paddle::string::join_strings((*vs), ','); ss << vs[i];
ss << "\t"; ss << ",";
} }
ss << "\n"; ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
...@@ -159,62 +159,13 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, ...@@ -159,62 +159,13 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
std::vector<std::vector<float>> kvalues; std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, &kvalues); ProcessALine(values, meta, &kvalues);
block->Init(id, &kvalues, 1); // warning: need fix
block->Init(id);
} }
return 0; return 0;
} }
void SaveShard(std::shared_ptr<ValueBlock> block, const std::string& dirname,
const CommonAccessorParameter& common, const int mode,
const int pserver_id, const int shard_id) {
auto varname = common.table_name();
std::string var_store = string::Sprintf("%s/%s", dirname, varname);
VLOG(3) << "save " << varname << " in dir: " << var_store << " begin";
MkDirRecursively(var_store.c_str());
std::string shard_var_pre =
string::Sprintf("%s.block%d.%d", varname, pserver_id, shard_id);
std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre);
std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre);
// save values
std::vector<std::string> params(common.params().begin(),
common.params().end());
std::unique_ptr<std::ofstream> value_out(new std::ofstream(value_));
SaveToText(value_out.get(), block, params, mode);
// save meta
std::stringstream stream;
stream << "param=" << common.table_name() << "\n";
stream << "server_id=" << pserver_id << "\n";
stream << "shard_id=" << shard_id << "\n";
stream << "row_names=" << paddle::string::join_strings(common.params(), ',')
<< "\n";
stream << "row_dims=" << paddle::string::join_strings(common.dims(), ',')
<< "\n";
stream << "count=" << block->values_.size() << "\n";
std::unique_ptr<std::ofstream> meta_out(new std::ofstream(meta_));
meta_out->write(stream.str().c_str(), sizeof(char) * stream.str().size());
meta_out->close();
VLOG(3) << "save " << varname << " in dir: " << var_store << " done";
}
void CommonSparseTable::create_initializer(const std::string& attr,
const std::string& name) {
auto slices = string::split_string<std::string>(attr, "&");
if (slices[0] == "gaussian_random") {
initializers_[name] = new GaussianInitializer(slices);
} else if (slices[0] == "fill_constant") {
initializers_[name] = new FillConstantInitializer(slices);
} else if (slices[0] == "uniform_random") {
initializers_[name] = new UniformInitializer(slices);
} else {
PADDLE_THROW(
platform::errors::InvalidArgument("%s can not be supported", name));
}
}
int32_t CommonSparseTable::initialize() { int32_t CommonSparseTable::initialize() {
_shards_task_pool.resize(task_pool_size_); _shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) { for (int i = 0; i < _shards_task_pool.size(); ++i) {
...@@ -224,31 +175,44 @@ int32_t CommonSparseTable::initialize() { ...@@ -224,31 +175,44 @@ int32_t CommonSparseTable::initialize() {
sync = _config.common().sync(); sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
initialize_value();
initialize_optimizer();
initialize_recorder();
return 0;
}
int32_t CommonSparseTable::initialize_recorder() { return 0; }
int32_t CommonSparseTable::initialize_value() {
auto common = _config.common(); auto common = _config.common();
int size = static_cast<int>(common.params().size()); int size = static_cast<int>(common.params().size());
size_t offset = 0;
for (int x = 0; x < size; ++x) { for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x]; auto& varname = common.params()[x];
auto& dim = common.dims()[x]; auto& dim = common.dims()[x];
value_idx_[varname] = x;
value_names_.push_back(varname);
value_dims_.push_back(dim);
value_offsets_.push_back(offset);
initializer_attrs_.push_back(common.initializers()[x]);
if (varname == "Param") { if (varname == "Param") {
param_dim_ = dim; param_dim_ = dim;
param_offset_ = offset;
} }
auto& initializer = common.initializers()[x];
create_initializer(initializer, varname); offset += dim;
} }
initialize_value();
initialize_optimizer();
initialize_recorder();
return 0;
}
int32_t CommonSparseTable::initialize_recorder() { return 0; }
int32_t CommonSparseTable::initialize_value() {
shard_values_.reserve(task_pool_size_); shard_values_.reserve(task_pool_size_);
for (int x = 0; x < task_pool_size_; ++x) { for (int x = 0; x < task_pool_size_; ++x) {
auto shard = std::make_shared<ValueBlock>(common, &initializers_); auto shard =
std::make_shared<ValueBlock>(value_names_, value_dims_, value_offsets_,
value_idx_, initializer_attrs_, "none");
shard_values_.emplace_back(shard); shard_values_.emplace_back(shard);
} }
...@@ -281,14 +245,16 @@ int32_t CommonSparseTable::initialize_value() { ...@@ -281,14 +245,16 @@ int32_t CommonSparseTable::initialize_value() {
int32_t CommonSparseTable::initialize_optimizer() { int32_t CommonSparseTable::initialize_optimizer() {
auto common = _config.common(); auto common = _config.common();
auto name = common.name(); auto name = common.name();
auto attrs = common.attributes();
if (name == "sgd") { if (name == "sgd") {
optimizer_ = std::make_shared<SSGD>(common); optimizer_ = std::make_shared<SSGD>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else if (name == "adam") { } else if (name == "adam") {
optimizer_ = std::make_shared<SAdam>(common); optimizer_ = std::make_shared<SAdam>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else if (name == "sum") { } else if (name == "sum") {
optimizer_ = std::make_shared<SSUM>(common); optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_,
value_offsets_, value_idx_);
} else { } else {
VLOG(0) << "init optimizer failed"; VLOG(0) << "init optimizer failed";
} }
...@@ -330,8 +296,7 @@ int32_t CommonSparseTable::save(const std::string& dirname, ...@@ -330,8 +296,7 @@ int32_t CommonSparseTable::save(const std::string& dirname,
int64_t total_ins = 0; int64_t total_ins = 0;
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
// save values // save values
total_ins += total_ins += SaveToText(value_out.get(), shard_values_[shard_id], mode);
SaveToText(value_out.get(), shard_values_[shard_id], params, mode);
} }
value_out->close(); value_out->close();
...@@ -391,10 +356,6 @@ int32_t CommonSparseTable::pour() { ...@@ -391,10 +356,6 @@ int32_t CommonSparseTable::pour() {
int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys, int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys,
size_t num) { size_t num) {
rwlock_->RDLock(); rwlock_->RDLock();
std::vector<std::string> value_names;
for (auto name : _config.common().params()) {
value_names.push_back(name);
}
std::vector<std::vector<uint64_t>> offset_bucket; std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_); offset_bucket.resize(task_pool_size_);
...@@ -408,20 +369,18 @@ int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys, ...@@ -408,20 +369,18 @@ int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys,
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &value_names, [this, shard_id, &keys, &offset_bucket, &pull_values]() -> int {
&pull_values]() -> int {
auto& block = shard_values_[shard_id]; auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id]; auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) { for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i]; auto offset = offsets[i];
auto id = keys[offset]; auto id = keys[offset];
block->InitFromInitializer(id, value_names); auto* value = block->InitFromInitializer(id);
auto values = block->Get(id, {"Param"}); std::copy_n(value + param_offset_, param_dim_,
auto dim = values[0]->size(); pull_values + param_dim_ * offset);
std::copy(values[0]->begin(), values[0]->end(),
pull_values + dim * offset);
} }
return 0; return 0;
}); });
} }
...@@ -492,10 +451,6 @@ int32_t CommonSparseTable::push_sparse(const uint64_t* keys, ...@@ -492,10 +451,6 @@ int32_t CommonSparseTable::push_sparse(const uint64_t* keys,
int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys, int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys,
const float* values, size_t num) { const float* values, size_t num) {
rwlock_->RDLock(); rwlock_->RDLock();
std::vector<std::string> value_names;
for (auto name : _config.common().params()) {
value_names.push_back(name);
}
std::vector<std::vector<uint64_t>> offset_bucket; std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_); offset_bucket.resize(task_pool_size_);
...@@ -509,18 +464,16 @@ int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys, ...@@ -509,18 +464,16 @@ int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys,
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &value_names, [this, shard_id, &keys, &offset_bucket, &values]() -> int {
&values]() -> int {
auto& block = shard_values_[shard_id]; auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id]; auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) { for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i]; auto offset = offsets[i];
auto id = keys[offset]; auto id = keys[offset];
block->InitFromInitializer(id, value_names); auto* value = block->InitFromInitializer(id);
auto values_ = block->Get(id, {"Param"}); std::copy_n(values + param_dim_ * offset, param_dim_,
auto dim = values_[0]->size(); value + param_offset_);
std::copy_n(values + dim * offset, dim, values_[0]->data());
} }
return 0; return 0;
}); });
......
...@@ -50,8 +50,6 @@ class CommonSparseTable : public SparseTable { ...@@ -50,8 +50,6 @@ class CommonSparseTable : public SparseTable {
virtual int32_t initialize(); virtual int32_t initialize();
virtual int32_t initialize_shard() { return 0; } virtual int32_t initialize_shard() { return 0; }
virtual void create_initializer(const std::string& attr,
const std::string& name);
virtual int32_t initialize_value(); virtual int32_t initialize_value();
virtual int32_t initialize_optimizer(); virtual int32_t initialize_optimizer();
virtual int32_t initialize_recorder(); virtual int32_t initialize_recorder();
...@@ -86,8 +84,15 @@ class CommonSparseTable : public SparseTable { ...@@ -86,8 +84,15 @@ class CommonSparseTable : public SparseTable {
bool sync = false; bool sync = false;
int param_dim_ = 0; int param_dim_ = 0;
int param_offset_ = 0;
std::unordered_map<std::string, int> value_idx_;
std::vector<std::string> value_names_;
std::vector<int> value_dims_;
std::vector<int> value_offsets_;
std::vector<std::string> initializer_attrs_;
std::shared_ptr<SparseOptimizer> optimizer_; std::shared_ptr<SparseOptimizer> optimizer_;
std::unordered_map<std::string, Initializer*> initializers_;
std::vector<std::shared_ptr<ValueBlock>> shard_values_; std::vector<std::shared_ptr<ValueBlock>> shard_values_;
std::unordered_map<uint64_t, ReservoirValue<float>> pull_reservoir_; std::unordered_map<uint64_t, ReservoirValue<float>> pull_reservoir_;
std::unique_ptr<framework::RWLock> rwlock_{nullptr}; std::unique_ptr<framework::RWLock> rwlock_{nullptr};
......
...@@ -67,100 +67,47 @@ inline bool entry<float>(const int count, const float threshold) { ...@@ -67,100 +67,47 @@ inline bool entry<float>(const int count, const float threshold) {
} }
struct VALUE { struct VALUE {
explicit VALUE(const std::vector<std::string> &names) explicit VALUE(size_t length)
: names_(names), count_(1), unseen_days_(0), seen_after_last_save_(true) { : length_(length),
values_.resize(names.size()); count_(1),
for (int i = 0; i < static_cast<int>(names.size()); i++) { unseen_days_(0),
places[names[i]] = i; seen_after_last_save_(true),
} is_entry_(true) {
} data_.resize(length);
void set(std::vector<std::vector<float>> *values) {
values_ = std::move(*values);
}
void set(const std::vector<Initializer *> &inits, std::vector<int> numels) {
for (int x = 0; x < numels.size(); ++x) {
auto &value = values_[x];
value.resize(numels[x]);
inits[x]->GetValue(value.data(), numels[x]);
}
}
void set(const std::vector<std::string> &names,
const std::vector<std::vector<float>> &values) {
for (int i = 0; i < static_cast<int>(names.size()); i++) {
auto idx = places[names[i]];
auto value = values[i];
values_[idx].assign(value.begin(), value.end());
}
}
std::vector<std::vector<float> *> get() {
auto pts = std::vector<std::vector<float> *>();
pts.reserve(values_.size());
for (auto &value : values_) {
pts.push_back(&value);
}
return pts;
} }
int fetch_count() { return ++count_; } size_t length_;
void reset_unseen_days() { unseen_days_ = 0; } std::vector<float> data_;
void set_entry(bool is_entry) { is_entry_ = is_entry; }
bool get_entry() { return is_entry_; }
std::vector<std::vector<float> *> get(const std::vector<std::string> names) {
auto pts = std::vector<std::vector<float> *>();
pts.reserve(values_.size());
for (int i = 0; i < static_cast<int>(names.size()); i++) {
pts.push_back(&(values_[places[names[i]]]));
}
return pts;
}
std::vector<std::string> names_;
int count_; int count_;
int unseen_days_; int unseen_days_;
bool seen_after_last_save_; bool seen_after_last_save_;
bool is_entry_; bool is_entry_;
std::vector<std::vector<float>> values_;
std::unordered_map<std::string, int> places;
}; };
class ValueBlock { class ValueBlock {
public: public:
explicit ValueBlock( explicit ValueBlock(const std::vector<std::string> &value_names,
const CommonAccessorParameter &common, const std::vector<int> &value_dims,
std::unordered_map<std::string, Initializer *> *initializers) { const std::vector<int> &value_offsets,
initializers_ = initializers; const std::unordered_map<std::string, int> &value_idx,
int size = static_cast<int>(common.params().size()); const std::vector<std::string> &init_attrs,
const std::string &entry_attr)
for (int x = 0; x < size; ++x) { : value_names_(value_names),
auto varname = common.params()[x]; value_dims_(value_dims),
auto dim = common.dims()[x]; value_offsets_(value_offsets),
value_names_.push_back(varname); value_idx_(value_idx) {
value_dims_.push_back(dim); for (int x = 0; x < value_dims.size(); ++x) {
} value_length_ += value_dims[x];
for (auto &name : value_names_) {
initializer_list_.emplace_back(initializers_->at(name));
} }
// for Entry // for Entry
{ {
// entry will add later
std::string entry_attr = "none";
if (entry_attr == "none") { if (entry_attr == "none") {
has_entry = false; has_entry_ = false;
entry_func_ = entry_func_ =
std::bind(entry<std::string>, std::placeholders::_1, "none"); std::bind(entry<std::string>, std::placeholders::_1, "none");
} else { } else {
has_entry = true; has_entry_ = true;
auto slices = string::split_string<std::string>(entry_attr, "&"); auto slices = string::split_string<std::string>(entry_attr, "&");
if (slices[0] == "count_filter") { if (slices[0] == "count_filter") {
int threshold = std::stoi(slices[1]); int threshold = std::stoi(slices[1]);
...@@ -172,85 +119,82 @@ class ValueBlock { ...@@ -172,85 +119,82 @@ class ValueBlock {
} }
} }
} }
// for Initializer
{
for (auto &attr : init_attrs) {
auto slices = string::split_string<std::string>(attr, "&");
if (slices[0] == "gaussian_random") {
initializers_.emplace_back(
std::make_shared<GaussianInitializer>(slices));
} else if (slices[0] == "fill_constant") {
initializers_.emplace_back(
std::make_shared<FillConstantInitializer>(slices));
} else if (slices[0] == "uniform_random") {
initializers_.emplace_back(
std::make_shared<UniformInitializer>(slices));
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"%s can not be supported", attr));
}
}
}
} }
~ValueBlock() {} ~ValueBlock() {}
void Init(const uint64_t &id, std::vector<std::vector<float>> *values, float *Init(const uint64_t &id) {
int count) { auto value = std::make_shared<VALUE>(value_length_);
if (Has(id)) { for (int x = 0; x < value_names_.size(); ++x) {
PADDLE_THROW(platform::errors::AlreadyExists("id already exist, error")); initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
value_dims_[x]);
} }
if (values->size() != value_names_.size()) {
PADDLE_THROW(
platform::errors::AlreadyExists("values can not match, error"));
}
auto value = new VALUE(value_names_);
value->set(values);
value->seen_after_last_save_ = true;
value->count_ = count;
values_[id] = value; values_[id] = value;
return value->data_.data();
} }
void Init(const uint64_t &id, const std::vector<Initializer *> &inits, std::vector<float *> Get(const uint64_t &id,
int count) { const std::vector<std::string> &value_names) {
if (Has(id)) { auto pts = std::vector<float *>();
PADDLE_THROW(platform::errors::AlreadyExists("id already exist, error")); pts.reserve(value_names.size());
} auto &values = values_.at(id);
for (int i = 0; i < static_cast<int>(value_names.size()); i++) {
if (inits.size() != value_names_.size()) { pts.push_back(values->data_.data() +
PADDLE_THROW( value_offsets_.at(value_idx_.at(value_names[i])));
platform::errors::AlreadyExists("values can not match, error"));
} }
return pts;
auto value = new VALUE(value_names_);
value->set(inits, value_dims_);
values_[id] = value;
} }
std::vector<std::vector<float> *> Get( float *Get(const uint64_t &id) {
const uint64_t &id, const std::vector<std::string> &value_names) { auto pts = std::vector<std::vector<float> *>();
auto ret_values = values_.at(id)->get(value_names); auto &values = values_.at(id);
return ret_values;
}
std::vector<std::vector<float> *> Get(const uint64_t &id) { return values->data_.data();
auto ret_values = values_.at(id)->get(value_names_);
return ret_values;
} }
void InitFromInitializer(const uint64_t &id, float *InitFromInitializer(const uint64_t &id) {
const std::vector<std::string> &value_names) {
if (Has(id)) { if (Has(id)) {
if (has_entry) { if (has_entry_) {
Update(id); Update(id);
} }
return; return Get(id);
} }
Init(id, initializer_list_, 1); return Init(id);
} }
bool GetEntry(const uint64_t &id) { bool GetEntry(const uint64_t &id) {
auto value = values_.at(id); auto value = values_.at(id);
auto entry = value->get_entry(); return value->is_entry_;
return entry;
}
void Set(const uint64_t &id, const std::vector<std::string> &value_names,
const std::vector<std::vector<float>> &values) {
auto value = values_.at(id);
value->set(value_names, values);
} }
void Update(const uint64_t id) { void Update(const uint64_t id) {
auto *value = values_.at(id); auto value = values_.at(id);
value->reset_unseen_days(); value->unseen_days_ = 0;
auto count = value->fetch_count(); auto count = ++value->count_;
if (!value->get_entry()) { if (!value->is_entry_) {
value->set_entry(entry_func_(count)); value->is_entry_ = entry_func_(count);
} }
} }
...@@ -265,15 +209,18 @@ class ValueBlock { ...@@ -265,15 +209,18 @@ class ValueBlock {
} }
public: public:
std::unordered_map<uint64_t, VALUE *> values_; std::unordered_map<uint64_t, std::shared_ptr<VALUE>> values_;
size_t value_length_ = 0;
private: private:
bool has_entry = false; const std::vector<std::string> &value_names_;
std::vector<std::string> value_names_; const std::vector<int> &value_dims_;
std::vector<int> value_dims_; const std::vector<int> &value_offsets_;
const std::unordered_map<std::string, int> &value_idx_;
bool has_entry_ = false;
std::function<bool(uint64_t)> entry_func_; std::function<bool(uint64_t)> entry_func_;
std::unordered_map<std::string, Initializer *> *initializers_; std::vector<std::shared_ptr<Initializer>> initializers_;
std::vector<Initializer *> initializer_list_;
}; };
} // namespace distributed } // namespace distributed
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -30,25 +31,38 @@ namespace distributed { ...@@ -30,25 +31,38 @@ namespace distributed {
class SparseOptimizer { class SparseOptimizer {
public: public:
SparseOptimizer() {} explicit SparseOptimizer(
explicit SparseOptimizer(const CommonAccessorParameter& common) {} const std::vector<std::string>& value_names,
const std::vector<int>& value_dims, const std::vector<int>& value_offsets,
const std::unordered_map<std::string, int>& value_idx)
: value_names_(value_names),
value_dims_(value_dims),
value_offsets_(value_offsets),
value_idx_(value_idx) {}
virtual void update(const uint64_t* keys, const float* update_values, virtual void update(const uint64_t* keys, const float* update_values,
size_t num, const std::vector<uint64_t>& offsets, size_t num, const std::vector<uint64_t>& offsets,
ValueBlock* block) = 0; ValueBlock* block) = 0;
const std::vector<std::string>& value_names_;
const std::vector<int>& value_dims_;
const std::vector<int>& value_offsets_;
const std::unordered_map<std::string, int>& value_idx_;
int param_offset = 0;
int update_numel = 0;
}; };
// sum calc for sparse tensor // sum calc for sparse tensor
class SSUM : public SparseOptimizer { class SSUM : public SparseOptimizer {
public: public:
SSUM(){}; explicit SSUM(const std::vector<std::string>& value_names,
explicit SSUM(const CommonAccessorParameter& common) { const std::vector<int>& value_dims,
auto& names = common.params(); const std::vector<int>& value_offsets,
for (int x = 0; x < static_cast<int>(names.size()); ++x) { const std::unordered_map<std::string, int>& value_idx)
if (names[x] == "Param") { : SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
param_idx = x; auto idx = value_idx.at("Param");
update_numel = common.dims()[x]; param_offset = value_offsets.at(idx);
} update_numel = value_dims.at(idx);
}
} }
void update(const uint64_t* keys, const float* update_values, size_t num, void update(const uint64_t* keys, const float* update_values, size_t num,
...@@ -57,35 +71,27 @@ class SSUM : public SparseOptimizer { ...@@ -57,35 +71,27 @@ class SSUM : public SparseOptimizer {
auto blas = GetBlas<float>(); auto blas = GetBlas<float>();
for (auto x : offsets) { for (auto x : offsets) {
auto id = keys[x]; auto id = keys[x];
auto values = block->Get(id); auto* value = block->Get(id);
float* param = values[param_idx]->data(); float* param = value + param_offset;
blas.VADD(update_numel, update_values + x * update_numel, param, param);
std::vector<float> delta;
delta.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, delta.data());
blas.VADD(update_numel, delta.data(), param, param);
} }
} }
int param_idx;
int update_numel;
}; };
// sgd optimzer for sparse tensor // sgd optimzer for sparse tensor
class SSGD : public SparseOptimizer { class SSGD : public SparseOptimizer {
public: public:
SSGD(){}; explicit SSGD(const std::vector<std::string>& value_names,
explicit SSGD(const CommonAccessorParameter& common) { const std::vector<int>& value_dims,
auto& names = common.params(); const std::vector<int>& value_offsets,
for (int x = 0; x < static_cast<int>(names.size()); ++x) { const std::unordered_map<std::string, int>& value_idx)
if (names[x] == "LearningRate") { : SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
learning_rate_idx = x; auto idx = value_idx.at("Param");
} param_offset = value_offsets.at(idx);
if (names[x] == "Param") { update_numel = value_dims.at(idx);
param_idx = x;
update_numel = common.dims()[x]; idx = value_idx.at("LearningRate");
} lr_offset = value_offsets.at(idx);
}
} }
void update(const uint64_t* keys, const float* update_values, size_t num, void update(const uint64_t* keys, const float* update_values, size_t num,
...@@ -94,9 +100,10 @@ class SSGD : public SparseOptimizer { ...@@ -94,9 +100,10 @@ class SSGD : public SparseOptimizer {
auto blas = GetBlas<float>(); auto blas = GetBlas<float>();
for (auto x : offsets) { for (auto x : offsets) {
auto id = keys[x]; auto id = keys[x];
auto values = block->Get(id); auto* value = block->Get(id);
float* learning_rate = values[learning_rate_idx]->data();
float* param = values[param_idx]->data(); float* learning_rate = value + lr_offset;
float* param = value + param_offset;
std::vector<float> grads; std::vector<float> grads;
grads.resize(update_numel); grads.resize(update_numel);
...@@ -106,38 +113,35 @@ class SSGD : public SparseOptimizer { ...@@ -106,38 +113,35 @@ class SSGD : public SparseOptimizer {
} }
} }
int learning_rate_idx; int lr_offset;
int param_idx;
int update_numel;
}; };
// adam optimzer for sparse tensor // adam optimzer for sparse tensor
class SAdam : public SparseOptimizer { class SAdam : public SparseOptimizer {
public: public:
SAdam() {} explicit SAdam(const std::vector<std::string>& value_names,
explicit SAdam(const CommonAccessorParameter& common) { const std::vector<int>& value_dims,
auto& names = common.params(); const std::vector<int>& value_offsets,
for (int x = 0; x < static_cast<int>(names.size()); ++x) { const std::unordered_map<std::string, int>& value_idx)
if (names[x] == "LearningRate") { : SparseOptimizer(value_names, value_dims, value_offsets, value_idx) {
learning_rate_idx = x; auto idx = value_idx.at("Param");
} param_offset = value_offsets.at(idx);
if (names[x] == "Param") { update_numel = value_dims.at(idx);
param_idx = x;
update_numel = common.dims()[x]; idx = value_idx.at("LearningRate");
} lr_offset = value_offsets.at(idx);
if (names[x] == "Moment1") {
moment1_idx = x; idx = value_idx.at("Moment1");
} m1_offset = value_offsets.at(idx);
if (names[x] == "Moment2") {
moment2_idx = x; idx = value_idx.at("Moment2");
} m2_offset = value_offsets.at(idx);
if (names[x] == "Beta1Pow") {
beta1_pow_idx = x; idx = value_idx.at("Beta1Pow");
} beta1_pow_offset = value_offsets.at(idx);
if (names[x] == "Beta2Pow") {
beta2_pow_idx = x; idx = value_idx.at("Beta2Pow");
} beta2_pow_offset = value_offsets.at(idx);
}
// add attr later // add attr later
beta1 = 0.9; beta1 = 0.9;
...@@ -151,13 +155,13 @@ class SAdam : public SparseOptimizer { ...@@ -151,13 +155,13 @@ class SAdam : public SparseOptimizer {
auto blas = GetBlas<float>(); auto blas = GetBlas<float>();
for (auto x : offsets) { for (auto x : offsets) {
auto id = keys[x]; auto id = keys[x];
auto values = block->Get(id); auto* values = block->Get(id);
float* learning_rate = values[learning_rate_idx]->data(); float* learning_rate = values + lr_offset;
float* param = values[param_idx]->data(); float* param = values + param_offset;
float* moment1 = values[moment1_idx]->data(); float* moment1 = values + m1_offset;
float* moment2 = values[moment2_idx]->data(); float* moment2 = values + m2_offset;
float* beta1_pow = values[beta1_pow_idx]->data(); float* beta1_pow = values + beta1_pow_offset;
float* beta2_pow = values[beta2_pow_idx]->data(); float* beta2_pow = values + beta2_pow_offset;
beta1_pow[0] = beta1_pow[0] * beta1; beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2; beta2_pow[0] = beta2_pow[0] * beta2;
...@@ -194,16 +198,15 @@ class SAdam : public SparseOptimizer { ...@@ -194,16 +198,15 @@ class SAdam : public SparseOptimizer {
} }
} }
int learning_rate_idx; int lr_offset;
int param_idx; int m1_offset;
int moment1_idx; int m2_offset;
int moment2_idx; int beta1_pow_offset;
int beta1_pow_idx; int beta2_pow_offset;
int beta2_pow_idx;
float beta1; float beta1;
float beta2; float beta2;
float epsilon; float epsilon;
int update_numel;
}; };
} // namespace distributed } // namespace distributed
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册