未验证 提交 ae3f7a71 编写于 作者: T tangwei12 提交者: GitHub

add ps table (#29463)

* add ps table

Change-Id: I468a04bd071d21ff52654926fcf4d5f3da19e178
上级 36ec9456
add_subdirectory(memory)
add_subdirectory(platform)
add_subdirectory(distributed)
add_subdirectory(framework)
add_subdirectory(imperative)
add_subdirectory(operators)
......
if(NOT WITH_DISTRIBUTE)
return()
endif()
proto_library(ps_framework_proto SRCS ps.proto)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-error=unused-value -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()
add_subdirectory(table)
add_subdirectory(test)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <glog/logging.h>
#include <iostream>
#include <map>
#include <string>
#include <vector>
namespace paddle {
namespace distributed {
class Any {
public:
Any() : content_(NULL) {}
template <typename ValueType>
Any(const ValueType &value) : content_(new Holder<ValueType>(value)) {}
Any(const Any &other)
: content_(other.content_ ? other.content_->clone() : NULL) {}
~Any() { delete content_; }
template <typename ValueType>
ValueType *any_cast() {
return content_ ? &static_cast<Holder<ValueType> *>(content_)->held_ : NULL;
}
private:
class PlaceHolder {
public:
virtual ~PlaceHolder() {}
virtual PlaceHolder *clone() const = 0;
};
template <typename ValueType>
class Holder : public PlaceHolder {
public:
explicit Holder(const ValueType &value) : held_(value) {}
virtual PlaceHolder *clone() const { return new Holder(held_); }
ValueType held_;
};
PlaceHolder *content_;
};
class ObjectFactory {
public:
ObjectFactory() {}
virtual ~ObjectFactory() {}
virtual Any NewInstance() { return Any(); }
private:
};
typedef std::map<std::string, ObjectFactory *> FactoryMap;
typedef std::map<std::string, FactoryMap> BaseClassMap;
#ifdef __cplusplus
extern "C" {
#endif
inline BaseClassMap &global_factory_map() {
static BaseClassMap *base_class = new BaseClassMap();
return *base_class;
}
#ifdef __cplusplus
}
#endif
inline BaseClassMap &global_factory_map_cpp() { return global_factory_map(); }
// typedef pa::Any Any;
// typedef ::FactoryMap FactoryMap;
#define REGISTER_REGISTERER(base_class) \
class base_class##Registerer { \
public: \
static base_class *CreateInstanceByName(const ::std::string &name) { \
if (global_factory_map_cpp().find(#base_class) == \
global_factory_map_cpp().end()) { \
LOG(ERROR) << "Can't Find BaseClass For CreateClass with:" \
<< #base_class; \
return NULL; \
} \
FactoryMap &map = global_factory_map_cpp()[#base_class]; \
FactoryMap::iterator iter = map.find(name); \
if (iter == map.end()) { \
LOG(ERROR) << "Can't Find Class For Create with:" << name; \
return NULL; \
} \
Any object = iter->second->NewInstance(); \
return *(object.any_cast<base_class *>()); \
} \
};
#define REGISTER_CLASS(clazz, name) \
class ObjectFactory##name : public ObjectFactory { \
public: \
Any NewInstance() { return Any(new name()); } \
}; \
void register_factory_##name() { \
FactoryMap &map = global_factory_map_cpp()[#clazz]; \
if (map.find(#name) == map.end()) { \
map[#name] = new ObjectFactory##name(); \
} \
} \
void register_factory_##name() __attribute__((constructor));
#define CREATE_CLASS(base_class, name) \
base_class##Registerer::CreateInstanceByName(name);
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/operators/math/blas.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace distributed {
template <typename T>
inline paddle::operators::math::BlasT<paddle::platform::CPUDeviceContext, T>
GetBlas() {
auto cpu_ctx = paddle::platform::CPUDeviceContext();
return paddle::operators::math::GetBlas<paddle::platform::CPUDeviceContext,
T>(cpu_ctx);
}
template <typename T>
inline void SQRT(int n, const T* x, T* z) {
for (int i = 0; i < n; ++i) {
z[i] = sqrt(x[i]);
}
}
template <typename T>
inline void ADD(int n, const T* x, const T y, T* z) {
for (int i = 0; i < n; ++i) {
z[i] = x[i] + y;
}
}
static bool StartWith(const std::string& str, const std::string& substr) {
return str.find(substr) == 0;
}
static bool EndWith(const std::string& str, const std::string& substr) {
return str.rfind(substr) == (str.length() - substr.length());
}
inline std::vector<int> bucket(const int v_size, const int b_size) {
int remainder = v_size % b_size;
int bucket = v_size / b_size;
std::vector<int> ret_vec(b_size, bucket);
for (int i = 0; i < remainder; ++i) {
ret_vec[i] = ret_vec[i] + 1;
}
int cur_bucket = 0;
for (int& j : ret_vec) {
int tmp = j;
j = cur_bucket;
cur_bucket += tmp;
}
ret_vec.push_back(cur_bucket);
return ret_vec;
}
template <typename T>
std::string to_string(const std::vector<T>& vec) {
std::stringstream ss;
for (const auto& c : vec) {
ss << c << " ";
}
return ss.str();
}
}
}
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <iostream>
#include <sstream>
#include <string>
#include <vector>
namespace paddle {
namespace distributed {
struct CommContext {
CommContext() = default;
CommContext(const std::string &name, const std::vector<std::string> &names,
const std::vector<std::string> &emap,
const std::vector<int64_t> &sections,
const std::vector<std::string> &origin_names, int id,
bool merge_add_ = true, bool is_sparse_ = true,
bool is_distributed_ = false, int table_id_ = -1)
: var_name(name),
splited_varnames(names),
epmap(emap),
height_sections(sections),
origin_varnames(origin_names),
trainer_id(id),
merge_add(merge_add_),
is_sparse(is_sparse_),
is_distributed(is_distributed_),
table_id(table_id_) {}
CommContext(const CommContext &ctx) {
var_name = ctx.var_name;
splited_varnames = ctx.splited_varnames;
epmap = ctx.epmap;
height_sections = ctx.height_sections;
trainer_id = ctx.trainer_id;
merge_add = ctx.merge_add;
is_sparse = ctx.is_sparse;
origin_varnames = ctx.origin_varnames;
is_distributed = ctx.is_distributed;
table_id = ctx.table_id;
}
std::string print() const {
std::stringstream ss;
ss << "varname: " << var_name << " trainer_id: " << trainer_id << " ";
ss << " table_id: " << table_id;
for (size_t i = 0; i < splited_varnames.size(); i++) {
ss << "slice varname: " << splited_varnames[i] << " ep: " << epmap[i]
<< " section: " << height_sections[i] << " ";
}
ss << "origin varnames: ";
for (size_t i = 0; i < origin_varnames.size(); i++) {
ss << origin_varnames[i] << " ";
}
ss << " aggregation->add: " << merge_add;
ss << " is_sparse: " << is_sparse;
ss << " is_distributed: " << is_distributed << "\n";
ss << " table_id: " << table_id << "\n";
return ss.str();
}
std::string var_name;
std::vector<std::string> splited_varnames;
std::vector<std::string> epmap;
std::vector<int64_t> height_sections;
std::vector<std::string> origin_varnames;
int trainer_id;
bool merge_add;
bool is_sparse;
bool is_distributed;
int table_id;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package paddle.distributed;
option cc_generic_services = true;
option cc_enable_arenas = true;
message FsClientParameter {
enum FsApiType {
HDFS = 0;
AFS = 1;
}
optional FsApiType fs_type = 1 [ default = HDFS ];
optional string uri = 2; // such as afs://xxx.afs.com:9902
optional string user = 3; // user_name to access fs
optional string passwd = 4; // password
optional int32 buffer_size = 5; // buffer for read/write
optional string hadoop_bin = 51;
optional string afs_conf = 101;
}
message PSParameter {
optional string worker_class = 1;
optional string server_class = 2;
optional string instance_class = 3;
optional string init_gflags = 4 [ default = "" ];
optional WorkerParameter worker_param = 101;
optional ServerParameter server_param = 102;
repeated DownpourTrainerParameter trainer_param = 301;
optional FsClientParameter fs_client_param = 501;
}
message WorkerParameter {
optional DownpourWorkerParameter downpour_worker_param = 1;
}
message DownpourWorkerParameter {
repeated TableParameter downpour_table_param = 1;
}
message DownpourServerParameter {
repeated TableParameter downpour_table_param = 1;
optional ServerServiceParameter service_param = 2;
}
message ServerParameter {
optional DownpourServerParameter downpour_server_param = 1;
}
message DownpourTrainerParameter {
repeated DenseTableParameter dense_table = 1;
repeated SparseTableParameter sparse_table = 2;
optional int32 push_sparse_per_batch = 3;
optional int32 push_dense_per_batch = 4;
repeated string skip_op = 5;
repeated ProgramConfig program_config = 6;
}
message DenseTableParameter {
optional int32 table_id = 1;
repeated string dense_variable_name = 2;
repeated string dense_gradient_variable_name = 3;
optional int32 fea_dim = 4;
}
message SparseTableParameter {
optional int32 table_id = 1;
optional int32 feature_dim = 2;
repeated string slot_key = 3;
repeated string slot_value = 4;
repeated string slot_gradient = 5;
}
message ServerServiceParameter {
optional string server_class = 1 [ default = "BrpcPsServer" ];
optional string client_class = 2 [ default = "BrpcPsClient" ];
optional string service_class = 3 [ default = "PsService" ];
optional uint32 start_server_port = 4
[ default = 0 ]; // will find a avaliable port from it
optional uint32 server_thread_num = 5 [ default = 12 ];
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
repeated int32 push_dense_table_id = 3;
repeated int32 pull_sparse_table_id = 4;
repeated int32 pull_dense_table_id = 5;
}
enum TableType {
PS_SPARSE_TABLE = 0;
PS_DENSE_TABLE = 1;
PS_OTHER_TABLE = 2;
}
message TableParameter {
optional uint64 table_id = 1;
optional string table_class = 2;
optional uint64 shard_num = 3 [ default = 1000 ];
optional TableAccessorParameter accessor = 4;
optional TensorAccessorParameter tensor = 5;
optional CommonAccessorParameter common = 6;
optional TableType type = 7;
optional bool compress_in_save = 8 [ default = false ];
}
message TableAccessorParameter {
optional string accessor_class = 1;
optional uint32 fea_dim = 4 [ default = 11 ];
optional uint32 embedx_dim = 5 [ default = 8 ];
optional uint32 embedx_threshold = 6 [ default = 10 ];
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
}
message TensorAccessorParameter {
optional string tensor_class = 1;
optional uint32 fea_dim = 2;
optional uint32 emb_dim = 3;
optional string param = 4;
optional string grad = 5;
optional string common_block_map = 6;
}
message CommonAccessorParameter {
optional string name = 1;
optional string table_name = 2;
repeated string attributes = 3;
repeated string params = 4;
repeated uint32 dims = 5;
repeated string initializers = 6;
optional int32 trainer_num = 7;
optional bool sync = 8;
}
message TableAccessorSaveParameter {
optional uint32 param = 1;
optional string converter = 2;
optional string deconverter = 3;
}
set_property(GLOBAL PROPERTY TABLE_DEPS string_helper)
get_property(TABLE_DEPS GLOBAL PROPERTY TABLE_DEPS)
set_source_files_properties(common_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(common_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(barrier_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(common_table SRCS common_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc DEPS ${TABLE_DEPS} device_context string_helper simple_threadpool xxhash generator)
set_source_files_properties(tensor_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(tensor_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_framework_proto device_context)
cc_library(tensor_table SRCS tensor_table.cc DEPS ps_framework_proto proto_desc enforce executor tensor device_context simple_threadpool gflags glog )
set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(table SRCS table.cc DEPS common_table tensor_table tensor_accessor ps_framework_proto string_helper device_context gflags glog boost)
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h"
namespace paddle {
namespace distributed {
struct FsDataConverter {
std::string converter;
std::string deconverter;
};
struct Region {
Region() : data(NULL), size(0) {}
Region(char* data, size_t data_num) : data(data), size(data_num) {}
Region(float* data, size_t data_num)
: data(reinterpret_cast<char*>(data)), size(data_num << 2) {}
Region(int16_t* data, size_t data_num)
: data(reinterpret_cast<char*>(data)), size(data_num << 1) {}
Region(int32_t* data, size_t data_num)
: data(reinterpret_cast<char*>(data)), size(data_num << 2) {}
Region(int64_t* data, size_t data_num)
: data(reinterpret_cast<char*>(data)), size(data_num << 3) {}
char* data;
size_t size;
};
struct DataConverter {
int param;
std::string converter;
std::string deconverter;
};
class ValueAccessor {
public:
explicit ValueAccessor(){};
virtual ~ValueAccessor(){};
virtual int configure(const TableAccessorParameter& parameter) {
_config = parameter;
// data_convert结构体初始化
if (_config.table_accessor_save_param_size() != 0) {
for (int i = 0; i < _config.table_accessor_save_param_size(); ++i) {
int param = _config.table_accessor_save_param(i).param();
std::string converter =
_config.table_accessor_save_param(i).converter();
std::string deconverter =
_config.table_accessor_save_param(i).deconverter();
_data_coverter_map[param] = std::make_shared<DataConverter>();
*(_data_coverter_map[param]) = {param, converter, deconverter};
}
}
return 0;
}
virtual int initialize() = 0;
// value维度
virtual size_t dim() = 0;
// value各个维度的size
virtual size_t dim_size(size_t dim) = 0;
// value各维度相加总size
virtual size_t size() = 0;
// value中mf动态长度部分总size大小, sparse下生效
virtual size_t mf_size() { return 0; }
virtual bool need_extend_mf(float* value) { return false; }
virtual bool has_mf(size_t size) { return false; }
// pull value维度
virtual size_t select_dim() = 0;
// pull value各个维度的size
virtual size_t select_dim_size(size_t dim) = 0;
// pull value各维度相加总size
virtual size_t select_size() = 0;
// push value维度
virtual size_t update_dim() = 0;
// push value各个维度的size
virtual size_t update_dim_size(size_t dim) = 0;
// push value各维度相加总size
virtual size_t update_size() = 0;
// fea total for dense
virtual size_t fea_dim() { return _config.fea_dim(); }
// converter for save
virtual std::string get_converter(int param) {
auto itr = _data_coverter_map.find(param);
if (itr == _data_coverter_map.end()) {
return "";
} else {
return (*itr).second->converter;
}
}
// deconverter for load
virtual std::string get_deconverter(int param) {
auto itr = _data_coverter_map.find(param);
if (itr == _data_coverter_map.end()) {
return "";
} else {
return (*itr).second->deconverter;
}
}
// 判断该value是否进行shrink
virtual bool shrink(float* value) = 0;
// 判断该value是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
virtual bool save(float* value, int param) = 0;
// update delta_score and unseen_days after save
virtual void update_stat_after_save(float* value, int param) {}
// keys不存在时,为values生成随机值
virtual int32_t create(float** value, size_t num) = 0;
virtual bool create_value(int type, const float* value) { return true; }
// 从values中选取到select_values中
virtual int32_t select(float** select_values, const float** values,
size_t num) = 0;
// 将update_values聚合到一起
virtual int32_t merge(float** update_values,
const float** other_update_values, size_t num) = 0;
// 将update_values聚合到一起,通过it.next判定是否进入下一个key
// virtual int32_t merge(float** update_values, iterator it);
// 将update_values更新应用到values中
virtual int32_t update(float** values, const float** update_values,
size_t num) = 0;
// used to save model, will filter feature
virtual std::string parse_to_string(const float* value, int param) = 0;
// parse value from string, used to load model
virtual int32_t parse_from_string(const std::string& data, float* value) = 0;
virtual FsDataConverter converter(int param) {
FsDataConverter data_convert;
data_convert.converter = this->get_converter(param);
data_convert.deconverter = this->get_deconverter(param);
return data_convert;
}
virtual int set_weight(float** values, const float** update_values,
size_t num) {
return 0;
}
virtual float get_field(float* value, const std::string& name) { return 0.0; }
protected:
size_t _value_size;
size_t _select_value_size;
size_t _update_value_size;
TableAccessorParameter _config;
std::unordered_map<int, std::shared_ptr<struct DataConverter>>
_data_coverter_map;
};
REGISTER_REGISTERER(ValueAccessor);
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono> // NOLINT
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/common_table.h"
namespace paddle {
namespace distributed {
int32_t BarrierTable::initialize() {
auto trainers = _config.common().trainer_num();
trigger_.store(trainers);
for (int x = 0; x < trainers; ++x) {
trainer_all_.insert(x);
}
VLOG(1) << "BarrierTable init trigger: " << trigger_.load();
return 0;
}
// 0: send_barrier 1: recv_barrier 2: complete
int32_t BarrierTable::barrier(const uint32_t trainer_id,
const std::string barrier_type) {
std::unique_lock<std::mutex> lock(mutex_);
if (barrier_type == "2") {
trigger_.fetch_sub(1, std::memory_order::memory_order_relaxed);
VLOG(1) << "trigger sub to : " << trigger_.load();
} else {
trainer_ids_.insert(trainer_id);
VLOG(1) << "barrier type: " << barrier_type
<< " add trainer id: " << trainer_id;
}
if (trainer_ids_.size() < trigger_.load()) {
std::vector<uint32_t> diffs(trainer_all_.size());
auto iter = std::set_difference(trainer_all_.begin(), trainer_all_.end(),
trainer_ids_.begin(), trainer_ids_.end(),
diffs.begin());
diffs.resize(iter - diffs.begin());
auto diff = to_string<uint32_t>(diffs);
VLOG(1) << "still need trainers: " << diff;
trainer_wait_.wait(lock, [&] { return trainer_ids_.size() == 0; });
} else {
VLOG(1) << "barrier table optimize begin";
for (auto& x : *table_map_) {
auto table = x.second;
table->pour();
}
VLOG(1) << "barrier table optimize done";
trainer_ids_.clear();
trainer_wait_.notify_all();
}
return 0;
}
int32_t BarrierTable::set_table_map(
std::unordered_map<uint32_t, std::shared_ptr<Table>>* table_map) {
table_map_ = table_map;
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/table/common_dense_table.h"
#include "paddle/fluid/distributed/common/utils.h"
namespace paddle {
namespace distributed {
void CommonDenseTable::create_initializer(const std::string& attr,
const std::string& name) {
auto slices = string::split_string<std::string>(attr, "&");
if (slices[0] == "gaussian_random") {
initializers_[name] = new GaussianInitializer(slices);
} else if (slices[0] == "fill_constant") {
initializers_[name] = new FillConstantInitializer(slices);
} else if (slices[0] == "uniform_random") {
initializers_[name] = new UniformInitializer(slices);
} else {
PADDLE_THROW(
platform::errors::InvalidArgument("%s can not be supported", name));
}
}
int32_t CommonDenseTable::initialize() {
_shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
initialize_value();
initialize_optimizer();
return 0;
}
int32_t CommonDenseTable::initialize_value() {
auto common = _config.common();
int size = static_cast<int>(common.params().size());
values_.resize(size);
for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x];
auto& dim = common.dims()[x];
if (varname == "Param") {
param_dim_ = dim;
param_idx_ = x;
}
auto& initializer = common.initializers()[x];
create_initializer(initializer, varname);
values_[x].resize(dim);
names_index_[varname] = x;
for (int y = 0; y < dim; ++y) {
values_[x][y] = initializers_[varname]->GetValue();
}
}
pull_reservoir_ = ReservoirValue<float>(param_dim_);
return 0;
}
int32_t CommonDenseTable::initialize_optimizer() {
auto common = _config.common();
auto name = common.name();
auto attrs = common.attributes();
if (name == "sgd") {
optimizer_ = std::make_shared<DSGD>(common, &values_);
} else if (name == "adam") {
optimizer_ = std::make_shared<DAdam>(common, &values_);
} else if (name == "sum") {
optimizer_ = std::make_shared<DSUM>(common, &values_);
} else {
VLOG(0) << "init optimizer failed";
}
VLOG(0) << "init optimizer " << name << " done";
return 0;
}
int32_t CommonDenseTable::pull_dense(float* pull_values, size_t num) {
std::copy(values_[param_idx_].begin(), values_[param_idx_].end(),
pull_values);
return 0;
}
int32_t CommonDenseTable::push_dense_param(const float* values, size_t num) {
PADDLE_ENFORCE_GE(
num, param_dim_,
paddle::platform::errors::InvalidArgument(
"update desne param numel expected %d, but got %d", param_dim_, num));
std::copy_n(values, param_dim_, values_[param_idx_].begin());
return 0;
}
int32_t CommonDenseTable::pour() {
_push_dense(pull_reservoir_.values.data(), pull_reservoir_.values.size());
pull_reservoir_.reset();
return 0;
}
int32_t CommonDenseTable::push_dense(const float* values, size_t num) {
if (sync) {
std::future<int> task =
_shards_task_pool[0]->enqueue([this, &values]() -> int {
pull_reservoir_.add(values, param_dim_);
return 0;
});
task.wait();
} else {
_push_dense(values, num);
}
return 0;
}
int32_t CommonDenseTable::_push_dense(const float* values, size_t num) {
PADDLE_ENFORCE_GE(
num, param_dim_,
paddle::platform::errors::InvalidArgument(
"update desne numel expected %d, but got %d", param_dim_, num));
std::vector<int> buckets = bucket(param_dim_, task_pool_size_);
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &buckets, &values]() -> int {
auto begin = buckets[shard_id];
auto end = buckets[shard_id + 1];
optimizer_->update(values, param_dim_, begin, end);
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ThreadPool.h>
#include <assert.h>
#include <pthread.h>
#include <string>
#include "Eigen/Dense"
#include "paddle/fluid/distributed/table/accessor.h"
#include "paddle/fluid/distributed/table/common_table.h"
#include "paddle/fluid/distributed/table/depends/dense.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
class CommonDenseTable : public DenseTable {
public:
explicit CommonDenseTable() {}
virtual ~CommonDenseTable() {}
virtual int32_t initialize() override;
virtual int32_t initialize_shard() override { return 0; }
virtual void create_initializer(const std::string& attr,
const std::string& name);
virtual int32_t initialize_value();
virtual int32_t initialize_optimizer();
virtual int32_t pull_dense(float* pull_values, size_t num) override;
virtual int32_t push_dense_param(const float* values, size_t num) override;
virtual int32_t push_dense(const float* values, size_t num) override;
virtual int32_t pour() override;
int32_t load(const std::string& path, const std::string& param) override {
VLOG(0) << "Dense table may load by "
"paddle.distributed.fleet.init_server";
return 0;
}
int32_t save(const std::string& path, const std::string& param) override {
VLOG(0)
<< "Dense table may be saved by "
"paddle.distributed.fleet.save_persistables/save_inference_model";
return 0;
}
virtual int32_t flush() override { return 0; }
virtual int32_t shrink() override { return 0; }
virtual void clear() override { return; }
protected:
int32_t _push_dense(const float* values, size_t num);
private:
const int task_pool_size_ = 1;
bool sync = true;
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
int param_dim_ = 0;
int param_idx_ = 0;
std::shared_ptr<DenseOptimizer> optimizer_;
std::vector<std::vector<float>> values_;
ReservoirValue<float> pull_reservoir_;
std::unordered_map<std::string, Initializer*> initializers_;
std::unordered_map<std::string, int> names_index_;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include <algorithm>
#include <sstream>
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
struct Meta {
std::string param;
int shard_id;
std::vector<std::string> names;
std::vector<int> dims;
uint64_t count;
std::unordered_map<std::string, int> dims_map;
explicit Meta(const std::string& metapath) {
std::ifstream file(metapath);
std::string line;
int num_lines = 0;
while (std::getline(file, line)) {
if (StartWith(line, "#")) {
continue;
}
auto pairs = paddle::string::split_string<std::string>(line, "=");
PADDLE_ENFORCE_EQ(
pairs.size(), 2,
paddle::platform::errors::InvalidArgument(
"info in %s except k=v, but got %s", metapath, line));
if (pairs[0] == "param") {
param = pairs[1];
}
if (pairs[0] == "shard_id") {
shard_id = std::stoi(pairs[1]);
}
if (pairs[0] == "row_names") {
names = paddle::string::split_string<std::string>(pairs[1], ",");
}
if (pairs[0] == "row_dims") {
auto dims_strs =
paddle::string::split_string<std::string>(pairs[1], ",");
for (auto& str : dims_strs) {
dims.push_back(std::stoi(str));
}
}
if (pairs[0] == "count") {
count = std::stoull(pairs[1]);
}
}
for (int x = 0; x < names.size(); ++x) {
dims_map[names[x]] = dims[x];
}
}
Meta(std::string param, int shard_id, std::vector<std::string> row_names,
std::vector<int> dims, uint64_t count) {
this->param = param;
this->shard_id = shard_id;
this->names = row_names;
this->dims = dims;
this->count = count;
}
std::string ToString() {
std::stringstream ss;
ss << "param=" << param << "\n";
ss << "shard_id=" << shard_id << "\n";
ss << "row_names=" << paddle::string::join_strings(names, ',') << "\n";
ss << "row_dims=" << paddle::string::join_strings(dims, ',') << "\n";
ss << "count=" << count << "\n";
return ss.str();
}
};
void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
std::vector<std::vector<float>>* values) {
PADDLE_ENFORCE_EQ(columns.size(), meta.names.size() + 1,
paddle::platform::errors::InvalidArgument(
"record in txt do not match meta."));
values->reserve(columns.size() - 1);
for (int x = 1; x < columns.size(); ++x) {
auto& column = columns[x];
auto val_ = paddle::string::split_string<std::string>(column, ",");
std::vector<float> val;
std::transform(val_.begin(), val_.end(), std::back_inserter(val),
[](std::string va) { return std::stof(va); });
PADDLE_ENFORCE_EQ(val.size(), meta.dims[x - 1],
paddle::platform::errors::InvalidArgument(
"record in txt do not match meta."));
values->push_back(val);
}
}
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const std::vector<std::string>& saved_names,
const int mode) {
for (auto value : block->values_) {
std::vector<std::vector<float>*> vss = value.second->get(saved_names);
std::stringstream ss;
auto id = value.first;
ss << id << "\t";
for (int i = 0; i < static_cast<int>(vss.size()); i++) {
auto& vs = vss[i];
ss << paddle::string::join_strings((*vs), ',');
ss << "\t";
}
ss << "\n";
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
}
return block->values_.size();
}
int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
const int pserver_id, const int pserver_num,
const int local_shard_num,
std::vector<std::shared_ptr<ValueBlock>>* blocks) {
Meta meta = Meta(metapath);
int num_lines = 0;
std::ifstream file(valuepath);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
auto id = std::stoull(values[0]);
if (id % pserver_num != pserver_id) {
VLOG(0) << "will not load " << values[0] << " from " << valuepath
<< ", please check id distribution";
continue;
}
auto shard_id = id % local_shard_num;
auto block = blocks->at(shard_id);
std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, &kvalues);
block->Init(id, &kvalues, 1);
}
return 0;
}
void SaveShard(std::shared_ptr<ValueBlock> block, const std::string& dirname,
const CommonAccessorParameter& common, const int mode,
const int pserver_id, const int shard_id) {
auto varname = common.table_name();
std::string var_store = string::Sprintf("%s/%s", dirname, varname);
VLOG(3) << "save " << varname << " in dir: " << var_store << " begin";
MkDirRecursively(var_store.c_str());
std::string shard_var_pre =
string::Sprintf("%s.block%d.%d", varname, pserver_id, shard_id);
std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre);
std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre);
// save values
std::vector<std::string> params(common.params().begin(),
common.params().end());
std::unique_ptr<std::ofstream> value_out(new std::ofstream(value_));
SaveToText(value_out.get(), block, params, mode);
// save meta
std::stringstream stream;
stream << "param=" << common.table_name() << "\n";
stream << "server_id=" << pserver_id << "\n";
stream << "shard_id=" << shard_id << "\n";
stream << "row_names=" << paddle::string::join_strings(common.params(), ',')
<< "\n";
stream << "row_dims=" << paddle::string::join_strings(common.dims(), ',')
<< "\n";
stream << "count=" << block->values_.size() << "\n";
std::unique_ptr<std::ofstream> meta_out(new std::ofstream(meta_));
meta_out->write(stream.str().c_str(), sizeof(char) * stream.str().size());
meta_out->close();
VLOG(3) << "save " << varname << " in dir: " << var_store << " done";
}
void CommonSparseTable::create_initializer(const std::string& attr,
const std::string& name) {
auto slices = string::split_string<std::string>(attr, "&");
if (slices[0] == "gaussian_random") {
initializers_[name] = new GaussianInitializer(slices);
} else if (slices[0] == "fill_constant") {
initializers_[name] = new FillConstantInitializer(slices);
} else if (slices[0] == "uniform_random") {
initializers_[name] = new UniformInitializer(slices);
} else {
PADDLE_THROW(
platform::errors::InvalidArgument("%s can not be supported", name));
}
}
int32_t CommonSparseTable::initialize() {
_shards_task_pool.resize(task_pool_size_);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
initialize_value();
initialize_optimizer();
initialize_recorder();
return 0;
}
int32_t CommonSparseTable::initialize_recorder() { return 0; }
int32_t CommonSparseTable::initialize_value() {
auto common = _config.common();
int size = static_cast<int>(common.params().size());
for (int x = 0; x < size; ++x) {
auto& varname = common.params()[x];
auto& dim = common.dims()[x];
if (varname == "Param") {
param_dim_ = dim;
}
auto& initializer = common.initializers()[x];
create_initializer(initializer, varname);
}
shard_values_.reserve(task_pool_size_);
for (int x = 0; x < task_pool_size_; ++x) {
auto shard = std::make_shared<ValueBlock>(common, &initializers_);
shard_values_.emplace_back(shard);
}
return 0;
}
int32_t CommonSparseTable::initialize_optimizer() {
auto common = _config.common();
auto name = common.name();
auto attrs = common.attributes();
if (name == "sgd") {
optimizer_ = std::make_shared<SSGD>(common);
} else if (name == "adam") {
optimizer_ = std::make_shared<SAdam>(common);
} else if (name == "sum") {
optimizer_ = std::make_shared<SSUM>(common);
} else {
VLOG(0) << "init optimizer failed";
}
VLOG(0) << "init optimizer " << name << " done";
return 0;
}
int32_t CommonSparseTable::load(const std::string& path,
const std::string& param) {
rwlock_->WRLock();
VLOG(0) << "sparse table load with " << path << " with meta " << param;
LoadFromText(path, param, _shard_idx, _shard_num, task_pool_size_,
&shard_values_);
rwlock_->UNLock();
return 0;
}
int32_t CommonSparseTable::save(const std::string& dirname,
const std::string& param) {
rwlock_->WRLock();
int mode = std::stoi(param);
VLOG(0) << "sparse table save: " << dirname << " mode: " << mode;
auto varname = _config.common().table_name();
std::string var_store = string::Sprintf("%s/%s", dirname, varname);
MkDirRecursively(var_store.c_str());
VLOG(3) << "save " << varname << " in dir: " << var_store << " begin";
std::vector<std::string> params(_config.common().params().begin(),
_config.common().params().end());
std::string shard_var_pre =
string::Sprintf("%s.block%d", varname, _shard_idx);
std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre);
std::unique_ptr<std::ofstream> value_out(new std::ofstream(value_));
int64_t total_ins = 0;
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
// save values
total_ins +=
SaveToText(value_out.get(), shard_values_[shard_id], params, mode);
}
value_out->close();
// save meta
std::stringstream stream;
stream << "param=" << _config.common().table_name() << "\n";
stream << "shard_id=" << _shard_idx << "\n";
stream << "row_names="
<< paddle::string::join_strings(_config.common().params(), ',')
<< "\n";
stream << "row_dims="
<< paddle::string::join_strings(_config.common().dims(), ',') << "\n";
stream << "count=" << total_ins << "\n";
std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre);
std::unique_ptr<std::ofstream> meta_out(new std::ofstream(meta_));
meta_out->write(stream.str().c_str(), sizeof(char) * stream.str().size());
meta_out->close();
VLOG(3) << "save " << varname << " in dir: " << var_store << " done";
rwlock_->UNLock();
return 0;
}
std::pair<int64_t, int64_t> CommonSparseTable::print_table_stat() {
int64_t feasign_size = 0;
int64_t mf_size = 0;
for (auto& value : shard_values_) {
feasign_size += value->values_.size();
}
return {feasign_size, mf_size};
}
int32_t CommonSparseTable::pour() {
rwlock_->RDLock();
std::vector<float> values;
std::vector<uint64_t> keys;
keys.reserve(pull_reservoir_.size());
values.reserve(pull_reservoir_.size() * param_dim_);
for (auto& val : pull_reservoir_) {
keys.push_back(val.first);
auto& reservoir = val.second;
reservoir.avg();
std::copy(reservoir.values.begin(), reservoir.values.end(),
std::back_inserter(values));
}
_push_sparse(keys.data(), values.data(), pull_reservoir_.size());
pull_reservoir_.clear();
rwlock_->UNLock();
return 0;
}
int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys,
size_t num) {
rwlock_->RDLock();
std::vector<std::string> value_names;
for (auto name : _config.common().params()) {
value_names.push_back(name);
}
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &value_names,
&pull_values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
block->InitFromInitializer(id, value_names);
auto values = block->Get(id, {"Param"});
auto dim = values[0]->size();
std::copy(values[0]->begin(), values[0]->end(),
pull_values + dim * offset);
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
rwlock_->UNLock();
return 0;
}
int32_t CommonSparseTable::_push_sparse(const uint64_t* keys,
const float* values, size_t num) {
rwlock_->RDLock();
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &values, num, &offset_bucket]() -> int {
auto& offsets = offset_bucket[shard_id];
optimizer_->update(keys, values, num, offsets,
shard_values_[shard_id].get());
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
rwlock_->UNLock();
return 0;
}
int32_t CommonSparseTable::push_sparse(const uint64_t* keys,
const float* values, size_t num) {
if (sync) {
std::future<int> task =
_shards_task_pool[0]->enqueue([this, &keys, &values, num]() -> int {
for (int x = 0; x < num; ++x) {
auto id = keys[x];
auto has = pull_reservoir_.find(id);
if (has == pull_reservoir_.end()) {
pull_reservoir_[id] = ReservoirValue<float>(param_dim_);
}
auto& reservoir = pull_reservoir_[id];
reservoir.add(values + x * param_dim_, param_dim_);
}
return 0;
});
task.wait();
} else {
_push_sparse(keys, values, num);
}
return 0;
}
int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys,
const float* values, size_t num) {
rwlock_->RDLock();
std::vector<std::string> value_names;
for (auto name : _config.common().params()) {
value_names.push_back(name);
}
std::vector<std::vector<uint64_t>> offset_bucket;
offset_bucket.resize(task_pool_size_);
for (int x = 0; x < num; ++x) {
auto y = keys[x] % task_pool_size_;
offset_bucket[y].push_back(x);
}
std::vector<std::future<int>> tasks(task_pool_size_);
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
tasks[shard_id] = _shards_task_pool[shard_id]->enqueue(
[this, shard_id, &keys, &offset_bucket, &value_names,
&values]() -> int {
auto& block = shard_values_[shard_id];
auto& offsets = offset_bucket[shard_id];
for (int i = 0; i < offsets.size(); ++i) {
auto offset = offsets[i];
auto id = keys[offset];
block->InitFromInitializer(id, value_names);
auto values_ = block->Get(id, {"Param"});
auto dim = values_[0]->size();
std::copy_n(values + dim * offset, dim, values_[0]->data());
}
return 0;
});
}
for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) {
tasks[shard_id].wait();
}
rwlock_->UNLock();
return 0;
}
int32_t CommonSparseTable::flush() { return 0; }
int32_t CommonSparseTable::shrink() {
VLOG(0) << "shrink coming soon";
return 0;
}
void CommonSparseTable::clear() { VLOG(0) << "clear coming soon"; }
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ThreadPool.h>
#include <assert.h>
#include <pthread.h>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "Eigen/Dense"
#include "paddle/fluid/distributed/table/accessor.h"
#include "paddle/fluid/distributed/table/common_table.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/table/depends/sparse.h"
#include "paddle/fluid/framework/rw_lock.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
class CommonSparseTable : public SparseTable {
public:
CommonSparseTable() { rwlock_.reset(new framework::RWLock); }
virtual ~CommonSparseTable() {}
// unused method begin
virtual int32_t pull_dense(float* pull_values, size_t num) { return 0; }
virtual int32_t push_dense_param(const float* values, size_t num) {
return 0;
}
virtual int32_t push_dense(const float* values, size_t num) { return 0; }
// unused method end
virtual int32_t initialize();
virtual int32_t initialize_shard() { return 0; }
virtual void create_initializer(const std::string& attr,
const std::string& name);
virtual int32_t initialize_value();
virtual int32_t initialize_optimizer();
virtual int32_t initialize_recorder();
int32_t load(const std::string& path, const std::string& param);
int32_t save(const std::string& path, const std::string& param);
virtual std::pair<int64_t, int64_t> print_table_stat();
virtual int32_t pull_sparse(float* pull_values, const uint64_t* keys,
size_t num);
virtual int32_t push_sparse(const uint64_t* keys, const float* values,
size_t num);
// only for sparse geo table
virtual int32_t push_sparse_param(const uint64_t* keys, const float* values,
size_t num);
virtual int32_t pour();
virtual int32_t flush();
virtual int32_t shrink();
virtual void clear();
protected:
virtual int32_t _push_sparse(const uint64_t* keys, const float* values,
size_t num);
private:
const int task_pool_size_ = 11;
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
bool sync = false;
int param_dim_ = 0;
std::shared_ptr<SparseOptimizer> optimizer_;
std::unordered_map<std::string, Initializer*> initializers_;
std::vector<std::shared_ptr<ValueBlock>> shard_values_;
std::unordered_map<uint64_t, ReservoirValue<float>> pull_reservoir_;
std::unique_ptr<framework::RWLock> rwlock_{nullptr};
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <algorithm>
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <set>
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/distributed/common/utils.h"
namespace paddle {
namespace distributed {
template <typename T>
struct ReservoirValue {
std::vector<T> values;
uint32_t counter;
uint32_t dim;
ReservoirValue() {
dim = 0;
values.resize(dim);
counter = 0;
}
ReservoirValue(uint32_t dim) {
this->dim = dim;
values.resize(dim);
counter = 0;
}
void add(const T *value, int numel) {
GetBlas<T>().VADD(numel, values.data(), value, values.data());
counter++;
}
void add(T *value, int numel) {
GetBlas<T>().VADD(numel, values.data(), value, values.data());
counter++;
}
void avg() {
auto scale = 1 / static_cast<T>(counter);
GetBlas<T>().SCAL(values.size(), scale, values.data());
}
void reset() {
values.resize(dim, 0);
counter = 0;
}
};
class SparseTable : public Table {
public:
SparseTable() {}
virtual ~SparseTable() {}
virtual void *get_shard(size_t shard_idx) { return 0; }
int32_t pull_dense(float *values, size_t num) override { return 0; }
int32_t push_dense(const float *values, size_t num) override { return 0; }
static int32_t sparse_local_shard_num(uint32_t shard_num,
uint32_t server_num) {
if (shard_num % server_num == 0) {
return shard_num / server_num;
}
size_t local_shard_num = shard_num / server_num + 1;
return local_shard_num;
}
static size_t get_sparse_shard(uint32_t shard_num, uint32_t server_num,
uint64_t key) {
return (key % shard_num) / sparse_local_shard_num(shard_num, server_num);
}
};
class DenseTable : public Table {
public:
DenseTable() {}
virtual ~DenseTable() {}
virtual void *get_shard(size_t shard_idx) { return 0; }
int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
return 0;
}
int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) override {
return 0;
}
int32_t push_dense_param(const float *values, size_t num) override {
return 0;
}
int32_t shrink() override { return 0; }
};
class BarrierTable : public Table {
public:
BarrierTable() {}
virtual ~BarrierTable() {}
virtual void *get_shard(size_t shard_idx) { return 0; }
int32_t pull_dense(float *values, size_t num) override { return 0; }
int32_t push_dense(const float *values, size_t num) override { return 0; }
int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
return 0;
}
int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) override {
return 0;
}
int32_t push_dense_param(const float *values, size_t num) override {
return 0;
}
int32_t shrink() override { return 0; }
virtual void clear(){};
virtual int32_t flush() { return 0; };
virtual int32_t load(const std::string &path, const std::string &param) {
return 0;
}
virtual int32_t save(const std::string &path, const std::string &param) {
return 0;
}
virtual int32_t initialize_shard() { return 0; };
virtual int32_t initialize() override;
// only for barrier
// 0: send_barrier 1: recv_barrier 2: complete
virtual int32_t barrier(const uint32_t trainer_id,
const std::string barrier_type) override;
virtual int32_t set_table_map(
std::unordered_map<uint32_t, std::shared_ptr<Table>> *table_map) override;
private:
std::mutex mutex_;
std::condition_variable trainer_wait_;
std::set<uint64_t> trainer_ids_;
std::set<uint64_t> trainer_all_;
std::atomic<int> trigger_;
std::atomic<bool> exit_;
std::unordered_map<uint32_t, std::shared_ptr<Table>> *table_map_;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <gflags/gflags.h>
#include <math.h> // for sqrt in CPU and CUDA
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/distributed/common/utils.h"
namespace paddle {
namespace distributed {
// dense optimzier
// TODO(tangwei12) integrate with sparse optimzer later.
class DenseOptimizer {
public:
DenseOptimizer() {}
explicit DenseOptimizer(const CommonAccessorParameter& accessor,
std::vector<std::vector<float>>* values) {}
virtual void update(const float* update_values, size_t num, int begin,
int end) = 0;
};
// sum calc for dense tensor
class DSUM : public DenseOptimizer {
public:
explicit DSUM(const CommonAccessorParameter& accessor,
std::vector<std::vector<float>>* values) {
auto& names = accessor.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "Param") {
param = (*values)[x].data();
}
}
}
void update(const float* update_values, size_t num, int begin,
int end) override {
auto update_numel = end - begin;
GetBlas<float>().VADD(update_numel, update_values + begin, param + begin,
param + begin);
}
float* param;
};
// sgd optimizer for dense tensor
class DSGD : public DenseOptimizer {
public:
explicit DSGD(const CommonAccessorParameter& accessor,
std::vector<std::vector<float>>* values) {
auto& names = accessor.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "LearningRate") {
learning_rate = (*values)[x].data();
}
if (names[x] == "Param") {
param = (*values)[x].data();
}
}
}
void update(const float* update_values, size_t num, int begin,
int end) override {
auto update_numel = end - begin;
std::vector<float> grads;
grads.resize(update_numel);
auto blas = GetBlas<float>();
blas.VCOPY(update_numel, update_values + begin, grads.data());
blas.SCAL(update_numel, *learning_rate, grads.data());
blas.VSUB(update_numel, param + begin, grads.data(), param + begin);
}
float* learning_rate;
float* param;
};
// adam optimizer for dense tensor
class DAdam : public DenseOptimizer {
public:
explicit DAdam(const CommonAccessorParameter& accessor,
std::vector<std::vector<float>>* values) {
auto& names = accessor.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "LearningRate") {
learning_rate = (*values)[x].data();
}
if (names[x] == "Param") {
param = (*values)[x].data();
}
if (names[x] == "Moment1") {
moment1 = (*values)[x].data();
}
if (names[x] == "Moment2") {
moment2 = (*values)[x].data();
}
if (names[x] == "Beta1Pow") {
beta1_pow = (*values)[x].data();
}
if (names[x] == "Beta2Pow") {
beta2_pow = (*values)[x].data();
}
}
// add attr later
beta1 = 0.9;
beta2 = 0.999;
epsilon = 1.0e-8;
}
void update(const float* update_values, size_t num, int begin,
int end) override {
auto update_numel = end - begin;
std::vector<float> grad, grad2, tmp;
grad.resize(update_numel);
grad2.resize(update_numel);
tmp.resize(update_numel);
auto blas = GetBlas<float>();
blas.VCOPY(update_numel, update_values + begin, grad.data());
blas.VCOPY(update_numel, update_values + begin, grad2.data());
blas.SCAL(update_numel, 1 - beta1, grad.data());
blas.VSQUARE(update_numel, grad2.data(), grad2.data());
blas.SCAL(update_numel, 1 - beta2, grad2.data());
blas.SCAL(update_numel, beta1, moment1 + begin);
blas.VADD(update_numel, moment1 + begin, grad.data(), moment1 + begin);
blas.SCAL(update_numel, beta2, moment2 + begin);
blas.VADD(update_numel, moment2 + begin, grad2.data(), moment2 + begin);
beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2;
float lr_ = learning_rate[0];
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
float* tmp_ = tmp.data();
float eps_ = epsilon * sqrt(1 - beta2_pow[0]);
SQRT<float>(update_numel, moment2 + begin, tmp_);
ADD<float>(update_numel, tmp_, eps_, tmp_);
blas.VDIV(update_numel, moment1 + begin, tmp_, tmp_);
blas.SCAL(update_numel, lr_, tmp_);
blas.VSUB(update_numel, param + begin, tmp_, param + begin);
}
float* learning_rate;
float* param;
float* moment1;
float* moment2;
float* beta1_pow;
float* beta2_pow;
float beta1;
float beta2;
float epsilon;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ThreadPool.h>
#include <functional>
#include <future> // NOLINT
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
namespace paddle {
namespace distributed {
class ConcurrentSet {
public:
ConcurrentSet() : pool_(new ::ThreadPool(1)) {}
~ConcurrentSet() {}
std::future<void> Update(const std::vector<uint64_t>& rows) {
auto task = [this, rows] {
for (auto row : rows) {
set_.insert(row);
}
};
return pool_->enqueue(std::move(task));
}
std::future<void> GetAndClear(std::vector<uint64_t>* result) {
auto task = [this, &result] {
result->clear();
for (auto& id : set_) {
result->push_back(id);
}
set_.clear();
};
return pool_->enqueue(std::move(task));
}
private:
std::unordered_set<uint64_t> set_;
std::unique_ptr<::ThreadPool> pool_{nullptr};
};
class GeoRecorder {
public:
explicit GeoRecorder(int trainer_num) : trainer_num_(trainer_num) {
trainer_rows_.reserve(trainer_num);
for (auto i = 0; i < trainer_num; ++i) {
trainer_rows_.emplace_back(new ConcurrentSet());
}
}
~GeoRecorder() = default;
void Update(const std::vector<uint64_t>& update_rows) {
VLOG(3) << " row size: " << update_rows.size();
std::vector<std::future<void>> fs;
for (auto& set : trainer_rows_) {
fs.push_back(set->Update(update_rows));
}
for (auto& f : fs) {
f.wait();
}
}
void GetAndClear(uint32_t trainer_id, std::vector<uint64_t>* result) {
VLOG(3) << "GetAndClear for trainer: " << trainer_id;
trainer_rows_.at(trainer_id)->GetAndClear(result).wait();
}
private:
const int trainer_num_;
std::vector<std::unique_ptr<ConcurrentSet>> trainer_rows_;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <gflags/gflags.h>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/generator.h"
namespace paddle {
namespace distributed {
class Initializer {
public:
Initializer() {}
explicit Initializer(const std::vector<std::string> &attrs) {}
virtual float GetValue() = 0;
virtual ~Initializer() {}
protected:
std::string name_;
unsigned int seed_;
};
class UniformInitializer : public Initializer {
public:
explicit UniformInitializer(const std::vector<std::string> &attrs) {
name_ = attrs[0];
seed_ = static_cast<unsigned int>(std::stoi(attrs[1]));
min_ = std::stof(attrs[2]);
max_ = std::stof(attrs[3]);
dist_ = std::uniform_real_distribution<float>(min_, max_);
random_engine_ = framework::GetCPURandomEngine(seed_);
}
float GetValue() override { return dist_(*random_engine_); }
private:
float min_;
float max_;
std::shared_ptr<std::mt19937_64> random_engine_;
std::uniform_real_distribution<float> dist_;
};
class GaussianInitializer : public Initializer {
public:
explicit GaussianInitializer(const std::vector<std::string> &attrs) {
name_ = attrs[0];
seed_ = static_cast<unsigned int>(std::stoi(attrs[1]));
mean_ = std::stof(attrs[2]);
std_ = std::stof(attrs[3]);
random_engine_ = framework::GetCPURandomEngine(seed_);
dist_ = std::normal_distribution<float>(mean_, std_);
}
float GetValue() override { return dist_(*random_engine_); }
private:
float std_;
float mean_;
std::shared_ptr<std::mt19937_64> random_engine_;
std::normal_distribution<float> dist_;
};
class FillConstantInitializer : public Initializer {
public:
explicit FillConstantInitializer(const std::vector<std::string> &attrs) {
name_ = attrs[0];
value_ = std::stof(attrs[1]);
}
float GetValue() override { return value_; }
private:
float value_;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ThreadPool.h>
#include <gflags/gflags.h>
#include <functional>
#include <future> // NOLINT
#include <memory>
#include <string>
#include <thread> // NOLINT
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/rw_lock.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
enum Mode { training, infer };
template <typename T>
inline bool entry(const int count, const T threshold);
template <>
inline bool entry<std::string>(const int count, const std::string threshold) {
return true;
}
template <>
inline bool entry<int>(const int count, const int threshold) {
return count >= threshold;
}
template <>
inline bool entry<float>(const int count, const float threshold) {
UniformInitializer uniform = UniformInitializer({"0", "0", "1"});
return uniform.GetValue() >= threshold;
}
struct VALUE {
explicit VALUE(const std::vector<std::string> &names)
: names_(names), count_(0), unseen_days_(0) {
values_.resize(names.size());
for (int i = 0; i < static_cast<int>(names.size()); i++) {
places[names[i]] = i;
}
}
void set(std::vector<std::vector<float>> *values) {
values_ = std::move(*values);
}
void set(const std::vector<std::string> &names,
const std::vector<std::vector<float>> &values) {
for (int i = 0; i < static_cast<int>(names.size()); i++) {
auto idx = places[names[i]];
auto value = values[i];
values_[idx].assign(value.begin(), value.end());
}
}
std::vector<std::vector<float> *> get() {
auto pts = std::vector<std::vector<float> *>();
pts.reserve(values_.size());
for (auto &value : values_) {
pts.push_back(&value);
}
return pts;
}
int fetch_count() { return ++count_; }
void reset_unseen_days() { unseen_days_ = 0; }
void set_entry(bool is_entry) { is_entry_ = is_entry; }
bool get_entry() { return is_entry_; }
std::vector<std::vector<float> *> get(const std::vector<std::string> names) {
auto pts = std::vector<std::vector<float> *>();
pts.reserve(values_.size());
for (int i = 0; i < static_cast<int>(names.size()); i++) {
pts.push_back(&(values_[places[names[i]]]));
}
return pts;
}
std::vector<std::string> names_;
int count_;
bool seen_after_last_save_;
int unseen_days_;
bool is_entry_;
std::vector<std::vector<float>> values_;
std::unordered_map<std::string, int> places;
};
class ValueBlock {
public:
explicit ValueBlock(
const CommonAccessorParameter &common,
std::unordered_map<std::string, Initializer *> *initializers) {
initializers_ = initializers;
int size = static_cast<int>(common.params().size());
for (int x = 0; x < size; ++x) {
auto varname = common.params()[x];
auto dim = common.dims()[x];
value_names_.push_back(varname);
value_dims_.push_back(dim);
}
// for Entry
{
// entry will add later
std::string entry_attr = "none";
if (entry_attr == "none") {
entry_func_ =
std::bind(entry<std::string>, std::placeholders::_1, "none");
} else {
auto slices = string::split_string<std::string>(entry_attr, "&");
if (slices[0] == "count_filter") {
int threshold = std::stoi(slices[1]);
entry_func_ = std::bind(entry<int>, std::placeholders::_1, threshold);
} else if (slices[0] == "probability") {
float threshold = std::stof(slices[1]);
entry_func_ =
std::bind(entry<float>, std::placeholders::_1, threshold);
}
}
}
}
~ValueBlock() {}
void Init(const uint64_t &id, std::vector<std::vector<float>> *values,
int count) {
if (Has(id)) {
PADDLE_THROW(platform::errors::AlreadyExists("id already exist, error"));
}
if (values->size() != value_names_.size()) {
PADDLE_THROW(
platform::errors::AlreadyExists("values can not match, error"));
}
auto value = new VALUE(value_names_);
value->set(values);
value->seen_after_last_save_ = true;
value->count_ = count;
values_[id] = value;
}
std::vector<std::vector<float> *> Get(
const uint64_t &id, const std::vector<std::string> &value_names) {
auto ret_values = values_.at(id)->get(value_names);
return ret_values;
}
std::vector<std::vector<float> *> Get(const uint64_t &id) {
auto ret_values = values_.at(id)->get(value_names_);
return ret_values;
}
void InitFromInitializer(const uint64_t &id,
const std::vector<std::string> &value_names) {
if (Has(id)) {
Update(id);
return;
}
auto rets = std::vector<std::vector<float>>();
rets.resize(value_names_.size());
for (int i = 0; i < static_cast<int>(value_names_.size()); i++) {
auto name = value_names_[i];
auto *init = initializers_->at(name);
auto dim = value_dims_[i];
rets[i].resize(dim);
for (int j = 0; j < static_cast<int>(dim); j++) {
rets[i][j] = init->GetValue();
}
}
Init(id, &rets, 0);
Update(id);
}
bool GetEntry(const uint64_t &id) {
auto value = values_.at(id);
auto entry = value->get_entry();
return entry;
}
void Set(const uint64_t &id, const std::vector<std::string> &value_names,
const std::vector<std::vector<float>> &values) {
auto value = values_.at(id);
value->set(value_names, values);
}
void Update(const uint64_t id) {
auto *value = values_.at(id);
value->reset_unseen_days();
auto count = value->fetch_count();
if (!value->get_entry()) {
value->set_entry(entry_func_(count));
}
}
private:
bool Has(const uint64_t id) {
auto got = values_.find(id);
if (got == values_.end()) {
return false;
} else {
return true;
}
}
public:
std::unordered_map<uint64_t, VALUE *> values_;
private:
std::vector<std::string> value_names_;
std::vector<int> value_dims_;
std::function<bool(uint64_t)> entry_func_;
std::unordered_map<std::string, Initializer *> *initializers_;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <gflags/gflags.h>
#include <math.h> // for sqrt in CPU and CUDA
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
namespace paddle {
namespace distributed {
class SparseOptimizer {
public:
SparseOptimizer() {}
explicit SparseOptimizer(const CommonAccessorParameter& common) {}
virtual void update(const uint64_t* keys, const float* update_values,
size_t num, const std::vector<uint64_t>& offsets,
ValueBlock* block) = 0;
};
// sum calc for sparse tensor
class SSUM : public SparseOptimizer {
public:
SSUM(){};
explicit SSUM(const CommonAccessorParameter& common) {
auto& names = common.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "Param") {
param_idx = x;
update_numel = common.dims()[x];
}
}
}
void update(const uint64_t* keys, const float* update_values, size_t num,
const std::vector<uint64_t>& offsets,
ValueBlock* block) override {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
auto values = block->Get(id);
float* param = values[param_idx]->data();
std::vector<float> delta;
delta.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, delta.data());
blas.VADD(update_numel, delta.data(), param, param);
}
}
int param_idx;
int update_numel;
};
// sgd optimzer for sparse tensor
class SSGD : public SparseOptimizer {
public:
SSGD(){};
explicit SSGD(const CommonAccessorParameter& common) {
auto& names = common.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "LearningRate") {
learning_rate_idx = x;
}
if (names[x] == "Param") {
param_idx = x;
update_numel = common.dims()[x];
}
}
}
void update(const uint64_t* keys, const float* update_values, size_t num,
const std::vector<uint64_t>& offsets,
ValueBlock* block) override {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
auto values = block->Get(id);
float* learning_rate = values[learning_rate_idx]->data();
float* param = values[param_idx]->data();
std::vector<float> grads;
grads.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, grads.data());
blas.SCAL(update_numel, learning_rate[0], grads.data());
blas.VSUB(update_numel, param, grads.data(), param);
}
}
int learning_rate_idx;
int param_idx;
int update_numel;
};
// adam optimzer for sparse tensor
class SAdam : public SparseOptimizer {
public:
SAdam() {}
explicit SAdam(const CommonAccessorParameter& common) {
auto& names = common.params();
for (int x = 0; x < static_cast<int>(names.size()); ++x) {
if (names[x] == "LearningRate") {
learning_rate_idx = x;
}
if (names[x] == "Param") {
param_idx = x;
update_numel = common.dims()[x];
}
if (names[x] == "Moment1") {
moment1_idx = x;
}
if (names[x] == "Moment2") {
moment2_idx = x;
}
if (names[x] == "Beta1Pow") {
beta1_pow_idx = x;
}
if (names[x] == "Beta2Pow") {
beta2_pow_idx = x;
}
}
// add attr later
beta1 = 0.9;
beta2 = 0.999;
epsilon = 1.0e-8;
}
void update(const uint64_t* keys, const float* update_values, size_t num,
const std::vector<uint64_t>& offsets,
ValueBlock* block) override {
auto blas = GetBlas<float>();
for (auto x : offsets) {
auto id = keys[x];
auto values = block->Get(id);
float* learning_rate = values[learning_rate_idx]->data();
float* param = values[param_idx]->data();
float* moment1 = values[moment1_idx]->data();
float* moment2 = values[moment2_idx]->data();
float* beta1_pow = values[beta1_pow_idx]->data();
float* beta2_pow = values[beta2_pow_idx]->data();
beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2;
float lr_ = learning_rate[0];
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
std::vector<float> grad, grad2, tmp;
grad.resize(update_numel);
grad2.resize(update_numel);
tmp.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, grad.data());
blas.VCOPY(update_numel, update_values + x * update_numel, grad2.data());
blas.SCAL(update_numel, 1 - beta1, grad.data());
blas.VSQUARE(update_numel, grad2.data(), grad2.data());
blas.SCAL(update_numel, 1 - beta2, grad2.data());
blas.SCAL(update_numel, beta1, moment1);
blas.VADD(update_numel, moment1, grad.data(), moment1);
blas.SCAL(update_numel, beta2, moment2);
blas.VADD(update_numel, moment2, grad2.data(), moment2);
float* tmp_ = tmp.data();
float eps_ = epsilon * sqrt(1 - beta2_pow[0]);
SQRT<float>(update_numel, moment2, tmp_);
ADD<float>(update_numel, tmp_, eps_, tmp_);
blas.VDIV(update_numel, moment1, tmp_, tmp_);
blas.SCAL(update_numel, lr_, tmp_);
blas.VSUB(update_numel, param, tmp_, param);
}
}
int learning_rate_idx;
int param_idx;
int moment1_idx;
int moment2_idx;
int beta1_pow_idx;
int beta2_pow_idx;
float beta1;
float beta2;
float epsilon;
int update_numel;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
namespace paddle {
namespace distributed {
int32_t SparseGeoTable::pull_geo_param(const uint32_t trainer_id,
std::vector<float>* values,
std::vector<uint64_t>* ids) {
geo_recorder->GetAndClear(trainer_id, ids);
auto dim = _config.common().dims()[0];
values->resize(ids->size() * dim);
CommonSparseTable::pull_sparse(values->data(), ids->data(), ids->size());
return 0;
}
int32_t SparseGeoTable::push_sparse(const uint64_t* keys, const float* values,
size_t num) {
std::vector<uint64_t> ids;
ids.resize(num);
std::copy_n(keys, num, ids.begin());
geo_recorder->Update(ids);
CommonSparseTable::push_sparse(keys, values, num);
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <assert.h>
#include <pthread.h>
#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <utility>
#include <vector>
#include "Eigen/Dense"
#include "paddle/fluid/distributed/table/accessor.h"
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/common_table.h"
#include "paddle/fluid/distributed/table/depends/geo_recorder.h"
#include "paddle/fluid/distributed/table/depends/initializers.h"
#include "paddle/fluid/distributed/table/depends/large_scale_kv.h"
#include "paddle/fluid/distributed/table/depends/sparse.h"
#include "paddle/fluid/framework/rw_lock.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
class SparseGeoTable : public CommonSparseTable {
public:
explicit SparseGeoTable() : CommonSparseTable() { geo_recorder = nullptr; }
virtual ~SparseGeoTable() {}
int32_t pull_geo_param(const uint32_t trainer_id, std::vector<float>* values,
std::vector<uint64_t>* keys);
virtual int32_t push_sparse(const uint64_t* keys, const float* values,
size_t num) override;
virtual int32_t initialize_recorder() {
if (!geo_recorder) {
auto trainers = _config.common().trainer_num();
geo_recorder = std::make_shared<GeoRecorder>(trainers);
}
return 0;
}
private:
std::shared_ptr<GeoRecorder> geo_recorder;
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/table/table.h"
#include <boost/preprocessor/repetition/repeat_from_to.hpp>
#include <boost/preprocessor/seq/elem.hpp>
#include "glog/logging.h"
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/table/common_dense_table.h"
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/tensor_accessor.h"
#include "paddle/fluid/distributed/table/tensor_table.h"
namespace paddle {
namespace distributed {
REGISTER_CLASS(Table, CommonDenseTable);
REGISTER_CLASS(Table, CommonSparseTable);
REGISTER_CLASS(Table, DenseTensorTable);
REGISTER_CLASS(Table, SparseGeoTable);
REGISTER_CLASS(Table, BarrierTable);
REGISTER_CLASS(ValueAccessor, CommMergeAccessor);
int32_t TableManager::initialize() {
static bool initialized = false;
if (initialized) {
return 0;
}
initialized = true;
return 0;
}
int32_t Table::initialize(const TableParameter &config,
const FsClientParameter &fs_config) {
_config = config;
if (initialize_accessor() != 0) {
LOG(WARNING) << "Table accessor initialize failed";
return -1;
}
return initialize();
}
int32_t Table::initialize_accessor() {
if (!_config.has_accessor() || !_config.accessor().has_accessor_class()) {
LOG(ERROR) << "missing accessor config in table, table_id:"
<< _config.table_id();
return -1;
}
auto *accessor =
CREATE_CLASS(ValueAccessor,
_config.accessor().accessor_class()) if (accessor == NULL) {
LOG(ERROR) << "accessor is unregisteg, table_id:" << _config.table_id()
<< ", accessor_name:" << _config.accessor().accessor_class();
return -1;
}
if (accessor->configure(_config.accessor()) || accessor->initialize() != 0) {
LOG(ERROR) << " accessor initialize failed, table_id:" << _config.table_id()
<< ", accessor_name:" << _config.accessor().accessor_class();
return -1;
}
_value_accesor.reset(accessor);
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <assert.h>
#include <atomic>
#include <future> // NOLINT
#include <memory>
#include <string>
#include <utility>
#include "paddle/fluid/distributed/table/accessor.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace distributed {
class Table {
public:
Table() {}
virtual ~Table() {}
virtual int32_t initialize(const TableParameter &config,
const FsClientParameter &fs_config) final;
virtual int32_t pull_dense(float *values, size_t num) = 0;
virtual int32_t push_dense(const float *values, size_t num) = 0;
virtual int32_t push_dense_param(const float *values, size_t num) {
return 0;
}
virtual int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) = 0;
virtual int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) = 0;
virtual int32_t push_sparse_param(const uint64_t *keys, const float *values,
size_t num) {
return 0;
}
// only for sparse geo table
virtual int32_t pull_geo_param(const uint32_t trainer_id,
std::vector<float> *values,
std::vector<uint64_t> *keys) {
return 0;
}
// only for barrier
virtual int32_t barrier(const uint32_t trainer_id,
const std::string barrier_type) {
return 0;
}
// only for barrier table
virtual int32_t set_table_map(
std::unordered_map<uint32_t, std::shared_ptr<Table>> *table_map) {
return 0;
}
virtual int32_t pour() { return 0; }
virtual void clear() = 0;
virtual int32_t flush() = 0;
virtual int32_t shrink() = 0;
//指定加载路径
virtual int32_t load(const std::string &path,
const std::string &converter) = 0;
//指定保存路径
virtual int32_t save(const std::string &path,
const std::string &converter) = 0;
virtual int32_t set_shard(size_t shard_idx, size_t shard_num) final {
_shard_idx = shard_idx;
_shard_num = shard_num;
return initialize_shard();
}
inline std::shared_ptr<ValueAccessor> value_accesor() {
return _value_accesor;
}
virtual void *get_shard(size_t shard_idx) = 0;
virtual std::pair<int64_t, int64_t> print_table_stat() { return {0, 0}; }
protected:
virtual int32_t initialize() = 0;
virtual int32_t initialize_accessor() final;
virtual int32_t initialize_shard() = 0;
virtual std::string table_dir(const std::string &model_dir) {
return paddle::string::format_string("%s/%03d/", model_dir.c_str(),
_config.table_id());
}
size_t _shard_idx; // table 分片编号
size_t _shard_num; // table 分片总数
TableParameter _config;
std::shared_ptr<ValueAccessor> _value_accesor;
};
REGISTER_REGISTERER(Table);
class TableManager {
public:
static TableManager &instance() {
static TableManager manager;
return manager;
}
int32_t initialize();
private:
TableManager() {}
~TableManager() {}
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/table/tensor_accessor.h"
#include "Eigen/Dense"
namespace paddle {
namespace distributed {
int CommMergeAccessor::initialize() { return 0; }
// value 维度
size_t CommMergeAccessor::dim() { return 0; }
// value 各个维度的size
size_t CommMergeAccessor::dim_size(size_t dim) { return 0; }
// value 各维度相加总size
size_t CommMergeAccessor::size() { return 0; }
// pull value 维度
size_t CommMergeAccessor::select_dim() { return _config.embedx_dim(); }
// pull value 各个维度的size
size_t CommMergeAccessor::select_dim_size(size_t dim) { return sizeof(float); }
// pull value 各维度相加总size
size_t CommMergeAccessor::select_size() { return select_dim() * sizeof(float); }
// push value 维度
size_t CommMergeAccessor::update_dim() { return _config.embedx_dim(); }
// push value 各个维度的size
size_t CommMergeAccessor::update_dim_size(size_t dim) { return sizeof(float); }
// push value 各维度相加总size
size_t CommMergeAccessor::update_size() { return update_dim() * sizeof(float); }
// 判断该value 是否进行shrink
bool CommMergeAccessor::shrink(float * /*value*/) { return false; }
// 判断该value 是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
bool CommMergeAccessor::save(float * /*value*/, int /*param*/) { return true; }
// keys不存在时,为values生成随机值
int32_t CommMergeAccessor::create(float **value, size_t num) { return 0; }
// 从values中选取到select_values中
int32_t CommMergeAccessor::select(float **select_values, const float **values,
size_t num) {
return 0;
}
// 将update_values聚合到一起
int32_t CommMergeAccessor::merge(float **update_values,
const float **other_update_values,
size_t num) {
Eigen::Map<Eigen::MatrixXf> u_mat(update_values[0], 1, num);
Eigen::Map<const Eigen::MatrixXf> o_mat(other_update_values[0], 1, num);
u_mat += o_mat;
return 0;
}
// 将update_values聚合到一起,通过it.next判定是否进入下一个key
// int32_t merge(float** update_values, iterator it);
// 将update_values更新应用到values中
int32_t CommMergeAccessor::update(float **values, const float **update_values,
size_t num) {
return 0;
}
int CommMergeAccessor::set_weight(float **values, const float **update_values,
size_t num) {
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <stdio.h>
#include <string>
#include <vector>
#include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/table/accessor.h"
namespace paddle {
namespace distributed {
class CommMergeAccessor : public ValueAccessor {
public:
CommMergeAccessor() {}
virtual ~CommMergeAccessor() {}
virtual int initialize();
// value维度
virtual size_t dim();
// value各个维度的size
virtual size_t dim_size(size_t dim);
// value各维度相加总size
virtual size_t size();
// pull value维度
virtual size_t select_dim();
// pull value各个维度的size
virtual size_t select_dim_size(size_t dim);
// pull value各维度相加总size
virtual size_t select_size();
// push value维度
virtual size_t update_dim();
// push value各个维度的size
virtual size_t update_dim_size(size_t dim);
// push value各维度相加总size
virtual size_t update_size();
// 判断该value是否进行shrink
virtual bool shrink(float * /*value*/);
// 判断该value是否在save阶段dump,
// param作为参数用于标识save阶段,如downpour的xbox与batch_model
virtual bool save(float * /*value*/, int /*param*/);
// keys不存在时,为values生成随机值
virtual int32_t create(float **value, size_t num);
// 从values中选取到select_values中
virtual int32_t select(float **select_values, const float **values,
size_t num);
// 将update_values聚合到一起
virtual int32_t merge(float **update_values,
const float **other_update_values, size_t num);
// 将update_values聚合到一起,通过it.next判定是否进入下一个key
// virtual int32_t merge(float** update_values, iterator it);
// 将update_values更新应用到values中
virtual int32_t update(float **values, const float **update_values,
size_t num);
virtual int set_weight(float **values, const float **update_values,
size_t num);
virtual std::string parse_to_string(const float *value, int param) {
return "";
}
virtual int parse_from_string(const std::string &str, float *v) { return 0; }
};
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/distributed/table/tensor_table.h"
#include "paddle/fluid/distributed/common/utils.h"
namespace paddle {
namespace distributed {
int32_t DenseTensorTable::initialize() {
_shards_task_pool.resize(10);
for (int i = 0; i < _shards_task_pool.size(); ++i) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
}
return 0;
}
int32_t DenseTensorTable::initialize_tensor(framework::Scope *scope,
framework::ProgramDesc *program,
framework::Executor *executor) {
scope_ = scope;
program_ = program;
executor_ = executor;
auto tensor_config = _config.tensor();
if (tensor_config.has_common_block_map()) {
auto block_maps =
paddle::string::split_string(tensor_config.common_block_map(), "#");
for (auto &block_map : block_maps) {
auto block = paddle::string::split_string(block_map, ":");
auto block_id = std::stoi(block[0]);
std::vector<int> block_ids{block_id};
auto block_cmd = block[1];
auto prepared = executor_->Prepare(*program_, block_ids);
(*prepared_ctx_)[block_cmd] = prepared[0];
}
}
}
int32_t DenseTensorTable::pull_dense(float *values, size_t numel) {
PADDLE_ENFORCE_EQ(numel, _data.numel(),
paddle::platform::errors::PreconditionNotMet(
"pull dense error, excepted numel %d, but actually %d.",
_data.numel(), numel));
GetBlas<float>().VCOPY(numel, _data.data<float>(), values);
return 0;
}
int32_t DenseTensorTable::push_dense(const float *values, size_t numel) {
auto varname = _config.tensor().grad();
auto local_scope = scope_->NewTmpScope();
auto *var = local_scope->Var(varname);
auto *t = var->GetMutable<framework::LoDTensor>();
auto dims = paddle::framework::make_ddim({});
auto ctx = paddle::platform::CPUDeviceContext();
t->mutable_data<float>(_data.dims(), ctx.GetPlace());
GetBlas<float>().VCOPY(numel, values, t->data<float>());
executor_->RunPreparedContext((*prepared_ctx_)["push"].get(),
local_scope.get());
}
int32_t DenseTensorTable::push_dense_param(const float *values, size_t numel) {
auto ctx = paddle::platform::CPUDeviceContext();
if (_data.IsInitialized()) {
PADDLE_ENFORCE_EQ(
numel, _data.numel(),
paddle::platform::errors::PreconditionNotMet(
"pull dense error, excepted numel %d, but actually %d.",
_data.numel(), numel));
} else {
_data.mutable_data<float>(
framework::make_ddim({static_cast<int64_t>(numel), 1}), ctx.GetPlace());
}
GetBlas<float>().VCOPY(numel, values, _data.data<float>());
return 0;
}
} // namespace distributed
} // namespace paddle
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include <ThreadPool.h>
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/operators/math/blas.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace distributed {
class TensorTable : public Table {
public:
TensorTable() : Table() {}
virtual ~TensorTable() {}
virtual int32_t initialize() { return 0; }
virtual int32_t pull_dense(float *values, size_t num) override { return 0; };
virtual int32_t push_dense(const float *values, size_t num) override {
return 0;
};
virtual void *get_shard(size_t shard_idx) override { return 0; }
virtual int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
return 0;
};
virtual int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) override {
return 0;
};
virtual int32_t push_dense_param(const float *values, size_t num) {
return 0;
}
virtual int32_t shrink() { return 0; }
virtual void clear() {}
virtual int32_t flush() { return 0; }
//指定加载路径
virtual int32_t load(const std::string &path, const std::string &converter) {
return 0;
}
//指定保存路径
virtual int32_t save(const std::string &path, const std::string &converter) {
return 0;
}
protected:
virtual int32_t initialize_shard() { return 0; }
virtual int32_t initialize_tensor(paddle::framework::Scope *scope,
paddle::framework::ProgramDesc *program,
paddle::framework::Executor *executor) {
return 0;
}
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
framework::Executor *executor_;
framework::Scope *scope_;
framework::ProgramDesc *program_;
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>
*prepared_ctx_;
};
class DenseTensorTable : public TensorTable {
public:
DenseTensorTable() : TensorTable() {}
~DenseTensorTable() {}
virtual int32_t initialize();
void *get_shard(size_t shard_idx) { return 0; }
int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) {
return 0;
}
int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) {
return 0;
}
int32_t shrink() { return 0; }
int32_t pull_dense(float *values, size_t num) override;
int32_t push_dense_param(const float *values, size_t num) override;
int32_t push_dense(const float *values, size_t num) override;
virtual void clear() {}
virtual int32_t flush() { return 0; }
//指定加载路径
virtual int32_t load(const std::string &path, const std::string &converter) {
return 0;
}
//指定保存路径
virtual int32_t save(const std::string &path, const std::string &converter) {
return 0;
}
protected:
virtual int32_t initialize_shard() { return 0; }
virtual int32_t initialize_tensor(paddle::framework::Scope *scope,
paddle::framework::ProgramDesc *program,
paddle::framework::Executor *executor);
protected:
framework::Tensor _data;
};
//
//// common sparse table [0, N) with out large scale
// class SparseTensorTable : public TensorTable {
// void *get_shard(size_t shard_idx) { return 0; }
//
// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num)
// override;
// int32_t push_sparse(const uint64_t *keys, const float *values, size_t num)
// override ;
// int32_t shrink() { return 0; }
// void *get_shard(size_t shard_idx) { return 0; };
//
// int32_t pull_dense(float *values, size_t num) { return 0; };
// int32_t push_dense_param(const float *values, size_t num) { return 0; };
// int32_t push_dense(const float *values, size_t num) { return 0; };
//
// protected:
// framework::Tensor _data;
//};
//// for Large scale kv tensor [0, int64] do not use specific optimizer
// class KvTensorTable : public TensorTable {
// int32_t pull_dense(float *values, size_t num) { return 0; };
// int32_t push_dense_param(const float *values, size_t num) { return 0; };
// int32_t push_dense(const float *values, size_t num) { return 0; };
//
// void *get_shard(size_t shard_idx) override;
// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num)
// override;
// int32_t push_sparse(const uint64_t *keys, const float *values,
// size_t num) override;
// int32_t shrink() override;
// void *get_shard(size_t shard_idx) override;
//};
//
//// for Geo sparse handle
// class GeoSparseTensorTable : public TensorTable {};
} // namespace distributed
} // namespace paddle
if(APPLE)
return()
endif()
set_source_files_properties(table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(table_test SRCS table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS})
set_source_files_properties(dense_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(dense_table_test SRCS dense_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS})
set_source_files_properties(sparse_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(sparse_table_test SRCS sparse_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS})
set_source_files_properties(geo_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(geo_table_test SRCS geo_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS})
set_source_files_properties(barrier_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(barrier_table_test SRCS barrier_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS})
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/table/common_table.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
TEST(BarrierTable, Barrier) {
int emb_dim = 10;
int trainers = 2;
bool sync = true;
TableParameter table_config;
table_config.set_table_class("BarrierTable");
FsClientParameter fs_config;
Table *table = new BarrierTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_table_name("barrier_table");
common_config->set_trainer_num(trainers);
common_config->set_sync(sync);
auto ret = table->initialize(table_config, fs_config);
std::unordered_map<uint32_t, std::shared_ptr<Table>> maps =
std::unordered_map<uint32_t, std::shared_ptr<Table>>();
table->set_table_map(&maps);
std::shared_ptr<::ThreadPool> pool_ =
std::make_shared<::ThreadPool>(trainers);
std::vector<std::future<void>> task_status;
for (auto x = 0; x < trainers; x++) {
auto task = [table, x] { table->barrier(x, 0); };
task_status.push_back(pool_->enqueue(std::move(task)));
}
for (auto &status : task_status) {
status.wait();
}
ASSERT_EQ(ret, 0);
}
} // namespace distributed
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/table/common_dense_table.h"
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
// CommonDenseTable + Adam
TEST(CommonDenseTable, Adam) {
int fea_dim = 10;
int trainers = 2;
float beta1 = 0.9;
float beta2 = 0.999;
float epsilon = 1.0e-8;
TableParameter table_config;
table_config.set_table_class("CommonDenseTable");
FsClientParameter fs_config;
Table *table = new CommonDenseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
// set adam optimize config
common_config->set_name("adam");
common_config->set_table_name("adam_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(fea_dim);
common_config->add_initializers("gaussian_random&0&0.0&1.0");
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Moment1");
common_config->add_dims(fea_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Moment2");
common_config->add_dims(fea_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Beta1Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Beta2Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// pull parameters for create and check
std::vector<float> init_values;
init_values.resize(fea_dim);
table->pull_dense(init_values.data(), fea_dim);
// push gradient
std::vector<std::vector<float>> trainer_gradient_values;
trainer_gradient_values.resize(trainers);
float start = 10.0;
for (int i = 0; i < trainers; i++) {
for (int k = 0; k < fea_dim; k++) {
trainer_gradient_values[i].push_back(start);
start += 0.1;
}
}
// for adam
for (int i = 0; i < trainers; i++) {
auto &push_values = trainer_gradient_values[i];
table->push_dense(push_values.data(), push_values.size());
}
std::vector<float> pull_values;
pull_values.resize(fea_dim);
table->pull_dense(pull_values.data(), fea_dim);
std::vector<float> beta1_pow, beta2_pow, lr, mom1, mom2, param;
beta1_pow.push_back(beta1);
beta2_pow.push_back(beta2);
lr.push_back(1.0);
for (int i = 0; i < fea_dim; i++) {
mom1.push_back(0.0);
mom2.push_back(0.0);
param.push_back(init_values[i]);
}
for (int i = 0; i < trainers; i++) {
auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
for (int j = 0; j < fea_dim; j++) {
mom1[j] = beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][j];
mom2[j] = beta2 * mom2[j] +
(1 - beta2) * trainer_gradient_values[i][j] *
trainer_gradient_values[i][j];
param[j] =
param[j] -
lr_ * (mom1[j] / (sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0])));
}
beta1_pow[0] *= beta1;
beta2_pow[0] *= beta2;
}
for (int j = 0; j < fea_dim; j++) {
ASSERT_TRUE(abs(param[j] - pull_values[j]) < 1e-6);
}
}
// CommonDenseTable + Adam
TEST(CommonDenseTable, SGD) {
int fea_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("CommonDenseTable");
FsClientParameter fs_config;
Table *table = new CommonDenseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("sgd");
common_config->set_table_name("sgd_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(fea_dim);
common_config->add_initializers("gaussian_random&0&0.0&1.0");
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// pull parameters for create and check
std::vector<float> init_values;
init_values.resize(fea_dim);
table->pull_dense(init_values.data(), fea_dim);
std::vector<float> total_gradients;
total_gradients.resize(fea_dim);
memset(total_gradients.data(), 0, sizeof(float) * total_gradients.size());
// push gradient
std::vector<std::vector<float>> trainer_gradient_values;
trainer_gradient_values.resize(trainers);
float start = 10.0;
for (int i = 0; i < trainers; i++) {
for (int k = 0; k < fea_dim; k++) {
trainer_gradient_values[i].push_back(start);
total_gradients[k] += start;
start += 0.1;
}
}
std::shared_ptr<::ThreadPool> pool_ =
std::make_shared<::ThreadPool>(trainers);
std::vector<std::future<void>> task_status;
for (int i = 0; i < trainers; i++) {
auto &push_values = trainer_gradient_values[i];
auto task = [table, &push_values] {
table->push_dense(push_values.data(), push_values.size());
};
task_status.push_back(pool_->enqueue(std::move(task)));
}
for (auto &status : task_status) {
status.wait();
}
std::vector<float> pull_values;
pull_values.resize(fea_dim);
table->pull_dense(pull_values.data(), fea_dim);
for (int j = 0; j < fea_dim; j++) {
auto update_val = init_values[j] - 1.0 * total_gradients[j];
ASSERT_TRUE(abs(update_val - pull_values[j]) < 1e-5);
}
}
} // namespace distributed
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/table/common_dense_table.h"
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
// SparseGeoTable + SSUM
TEST(SparseGeoTable, SSUM) {
int emb_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("SparseGeoTable");
FsClientParameter fs_config;
Table *table = new SparseGeoTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("sum");
common_config->set_table_name("ssum_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// test push_sparse_param, and create params
std::vector<uint64_t> init_keys = {0, 1, 2, 3, 4};
std::vector<float> init_values;
for (size_t i = 0; i < init_keys.size() * emb_dim; i++) {
init_values.push_back(0.0);
}
table->push_sparse_param(init_keys.data(), init_values.data(),
init_keys.size());
std::vector<float> pull_values(init_values.size());
table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size());
for (size_t i = 0; i < init_keys.size() * emb_dim; i++) {
ASSERT_TRUE(abs(pull_values[i] - init_values[i]) < 1e-6);
}
std::vector<std::vector<uint64_t>> trainer_keys;
std::vector<std::vector<float>> trainer_values;
trainer_keys.resize(trainers);
trainer_values.resize(trainers);
float start = 0.0;
for (int i = 0; i < trainers; i++) {
trainer_keys[i] = init_keys;
for (size_t j = 0; j < trainer_keys[i].size(); j++) {
auto id = trainer_keys[i][j];
for (int k = 0; k < emb_dim; k++) {
trainer_values[i].push_back(start);
pull_values[id * emb_dim + k] += start;
start += 0.1;
}
}
}
std::shared_ptr<::ThreadPool> pool_ =
std::make_shared<::ThreadPool>(trainers);
std::vector<std::future<void>> task_status;
for (int i = 0; i < trainers; i++) {
auto &push_keys = trainer_keys[i];
auto &push_values = trainer_values[i];
auto task = [table, &push_keys, &push_values] {
table->push_sparse(push_keys.data(), push_values.data(),
push_keys.size());
};
task_status.push_back(pool_->enqueue(std::move(task)));
}
for (auto &status : task_status) {
status.wait();
}
std::vector<std::vector<uint64_t>> geo_pull_ids;
std::vector<std::vector<float>> geo_pull_values;
geo_pull_ids.resize(trainers);
geo_pull_values.resize(trainers);
for (int i = 0; i < trainers; i++) {
table->pull_geo_param(i, &geo_pull_values[i], &geo_pull_ids[i]);
ASSERT_EQ(geo_pull_values[i].size(), geo_pull_ids[i].size() * emb_dim);
for (size_t j = 0; j < geo_pull_ids[i].size(); ++j) {
auto id = geo_pull_ids[i][j];
for (int k = 0; k < emb_dim; k++) {
ASSERT_TRUE(abs(geo_pull_values[i][j * emb_dim + k] -
pull_values[id * emb_dim + k]) < 1e-6);
}
}
}
}
} // namespace distributed
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/distributed/service/heter_serde.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
namespace framework = paddle::framework;
namespace platform = paddle::platform;
namespace operators = paddle::operators;
namespace math = paddle::operators::math;
namespace memory = paddle::memory;
namespace distributed = paddle::distributed;
void CreateVarsOnScope(framework::Scope* scope, platform::Place* place,
const platform::DeviceContext& ctx) {
// var 1
framework::Variable* var1 = scope->Var("x1");
auto* tensor1 = var1->GetMutable<framework::LoDTensor>();
tensor1->Resize(framework::make_ddim({512, 8, 4, 2}));
framework::LoD lod1;
lod1.push_back(framework::Vector<size_t>({1, 3, 8}));
tensor1->set_lod(lod1);
tensor1->mutable_data<float>(*place);
math::set_constant(ctx, tensor1, 31.9);
// var 2
framework::Variable* var2 = scope->Var("x2");
auto* tensor2 = var2->GetMutable<framework::LoDTensor>();
tensor2->Resize(framework::make_ddim({1000, 64}));
framework::LoD lod2;
lod2.push_back(framework::Vector<size_t>({1, 1}));
tensor2->set_lod(lod2);
tensor2->mutable_data<int>(*place);
math::set_constant(ctx, tensor2, 100);
// var 3
framework::Variable* var3 = scope->Var("x3");
auto* slr = var3->GetMutable<framework::SelectedRows>();
slr->set_height(564);
auto* tensor3 = slr->mutable_value();
auto* rows = slr->mutable_rows();
tensor3->Resize(framework::make_ddim({564, 128}));
tensor3->mutable_data<float>(*place);
math::set_constant(ctx, tensor3, 32.7);
for (int i = 0; i < 564; ++i) rows->push_back(i);
}
void RunMultiVarMsg(platform::Place place) {
framework::Scope scope;
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
CreateVarsOnScope(&scope, &place, ctx);
::paddle::MultiVariableMessage multi_msg;
std::string message_name("se_de_test");
std::vector<std::string> send_var_name = {"x1", "x2", "x3"};
std::vector<std::string> recv_var_name = {};
LOG(INFO) << "begin SerializeToMultiVarMsg";
butil::IOBuf io_buf;
distributed::SerializeToMultiVarMsgAndIOBuf(message_name, send_var_name,
recv_var_name, ctx, &scope,
&multi_msg, &io_buf);
EXPECT_GT(multi_msg.ByteSizeLong(), static_cast<size_t>(0));
// deserialize
framework::Scope scope_recv;
LOG(INFO) << "begin DeserializeFromMultiVarMsg";
distributed::DeserializeFromMultiVarMsgAndIOBuf(multi_msg, &io_buf, ctx,
&scope_recv);
// check var1
framework::Variable* var1 = scope_recv.FindVar("x1");
auto* tensor1 = var1->GetMutable<framework::LoDTensor>();
EXPECT_EQ(tensor1->dims(), framework::make_ddim({512, 8, 4, 2}));
// EXPECT_EQ(tensor1->lod(), framework::Vector<size_t>({1, 3, 8}));
auto* tensor_data1 = const_cast<float*>(tensor1->data<float>());
int tensor_numel1 = 512 * 8 * 4 * 2;
for (int i = 0; i < tensor_numel1; ++i)
EXPECT_FLOAT_EQ(tensor_data1[i], 31.9);
// check var2
framework::Variable* var2 = scope_recv.FindVar("x2");
auto* tensor2 = var2->GetMutable<framework::LoDTensor>();
EXPECT_EQ(tensor2->dims(), framework::make_ddim({1000, 64}));
// EXPECT_EQ(tensor2->lod(), framework::Vector<size_t>({1, 1}));
auto* tensor_data2 = const_cast<int*>(tensor2->data<int>());
int tensor_numel2 = 1000 * 64;
for (int i = 0; i < tensor_numel2; ++i) EXPECT_EQ(tensor_data2[i], 100);
// check var3
framework::Variable* var3 = scope_recv.FindVar("x3");
auto* slr = var3->GetMutable<framework::SelectedRows>();
EXPECT_EQ(slr->rows().size(), 564);
for (int i = 0; i < 564; ++i) {
EXPECT_EQ(slr->rows()[i], i);
}
auto* tensor3 = slr->mutable_value();
EXPECT_EQ(tensor3->dims(), framework::make_ddim({564, 128}));
auto* tensor_data3 = const_cast<float*>(tensor3->data<float>());
int tensor_numel3 = 564 * 128;
for (int i = 0; i < tensor_numel3; ++i)
EXPECT_FLOAT_EQ(tensor_data3[i], 32.7);
}
TEST(MultiVarMsgCPU, Run) {
platform::CPUPlace place;
RunMultiVarMsg(place);
}
// #ifdef PADDLE_WITH_CUDA
// TEST(MultiVarMsgGPU, Run) {
// platform::CUDAPlace place;
// RunMultiVarMsg(place);
// }
// #endif
\ No newline at end of file
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/table/common_dense_table.h"
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
// CommonSparseTable + SSGD
TEST(CommonSparseTable, SGD) {
int emb_dim = 10;
int trainers = 2;
TableParameter table_config;
table_config.set_table_class("CommonSparseTable");
FsClientParameter fs_config;
Table *table = new CommonSparseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("sgd");
common_config->set_table_name("sgd_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("uniform_random&0&-1.0&1.0"); // param
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0"); // learning_rate
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// pull parameters for create and check
std::vector<uint64_t> init_keys = {0, 1, 2, 3, 4};
std::vector<float> init_values;
init_values.resize(init_keys.size() * emb_dim);
table->pull_sparse(init_values.data(), init_keys.data(), init_keys.size());
// for check
std::vector<float> total_gradients;
total_gradients.resize(init_keys.size() * emb_dim);
memset(total_gradients.data(), 0, sizeof(float) * total_gradients.size());
// push gradient
std::vector<std::vector<uint64_t>> trainer_keys;
std::vector<std::vector<float>> trainer_gradient_values;
trainer_keys.resize(trainers);
trainer_gradient_values.resize(trainers);
float start = 0.0;
for (int i = 0; i < trainers; i++) {
trainer_keys[i] = init_keys;
for (size_t j = 0; j < trainer_keys[i].size(); j++) {
auto id = trainer_keys[i][j];
for (int k = 0; k < emb_dim; k++) {
trainer_gradient_values[i].push_back(start);
total_gradients[id * emb_dim + k] += start;
start += 0.1;
}
}
}
std::shared_ptr<::ThreadPool> pool_ =
std::make_shared<::ThreadPool>(trainers);
std::vector<std::future<void>> task_status;
for (int i = 0; i < trainers; i++) {
auto &push_keys = trainer_keys[i];
auto &push_values = trainer_gradient_values[i];
auto task = [table, &push_keys, &push_values] {
table->push_sparse(push_keys.data(), push_values.data(),
push_keys.size());
};
task_status.push_back(pool_->enqueue(std::move(task)));
}
for (auto &status : task_status) {
status.wait();
}
std::vector<float> pull_values;
pull_values.resize(init_keys.size() * emb_dim);
table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size());
for (size_t i = 0; i < init_values.size(); ++i) {
auto update_val = init_values[i] - 1.0 * total_gradients[i];
ASSERT_TRUE(abs(update_val - pull_values[i]) < 1e-6);
}
}
// CommonSparseTable + Adam
TEST(CommonSparseTable, Adam) {
int emb_dim = 10;
int trainers = 2;
float beta1 = 0.9;
float beta2 = 0.999;
float epsilon = 1.0e-8;
TableParameter table_config;
table_config.set_table_class("CommonSparseTable");
FsClientParameter fs_config;
Table *table = new CommonSparseTable();
TableAccessorParameter *accessor_config = table_config.mutable_accessor();
accessor_config->set_accessor_class("CommMergeAccessor");
CommonAccessorParameter *common_config = table_config.mutable_common();
common_config->set_name("adam");
common_config->set_table_name("adam_test_table");
common_config->set_trainer_num(trainers);
common_config->add_params("Param");
common_config->add_dims(emb_dim);
common_config->add_initializers("uniform_random&0&-1.0&1.0");
common_config->add_params("LearningRate");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Moment1");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Moment2");
common_config->add_dims(emb_dim);
common_config->add_initializers("fill_constant&0.0");
common_config->add_params("Beta1Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
common_config->add_params("Beta2Pow");
common_config->add_dims(1);
common_config->add_initializers("fill_constant&1.0");
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, 0);
// pull parameters for create and check
std::vector<uint64_t> init_keys = {0, 1, 2, 3, 4};
std::vector<float> init_values;
init_values.resize(init_keys.size() * emb_dim);
table->pull_sparse(init_values.data(), init_keys.data(), init_keys.size());
// push gradient
std::vector<std::vector<uint64_t>> trainer_keys;
std::vector<std::vector<float>> trainer_gradient_values;
trainer_keys.resize(trainers);
trainer_gradient_values.resize(trainers);
float start = 0.0;
for (int i = 0; i < trainers; i++) {
trainer_keys[i] = init_keys;
for (size_t j = 0; j < trainer_keys[i].size(); j++) {
for (int k = 0; k < emb_dim; k++) {
trainer_gradient_values[i].push_back(start);
start += 0.1;
}
}
}
for (int i = 0; i < trainers; i++) {
auto &push_keys = trainer_keys[i];
auto &push_values = trainer_gradient_values[i];
table->push_sparse(push_keys.data(), push_values.data(), push_keys.size());
}
std::vector<float> pull_values;
pull_values.resize(init_keys.size() * emb_dim);
table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size());
for (size_t idx = 0; idx < init_keys.size(); idx += emb_dim) {
std::vector<float> beta1_pow, beta2_pow, lr, mom1, mom2, param;
beta1_pow.push_back(beta1);
beta2_pow.push_back(beta2);
lr.push_back(1.0);
for (int i = 0; i < emb_dim; i++) {
mom1.push_back(0.0);
mom2.push_back(0.0);
param.push_back(init_values[idx + i]);
}
for (int i = 0; i < trainers; i++) {
auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
for (int j = 0; j < emb_dim; j++) {
mom1[j] =
beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][idx + j];
mom2[j] = beta2 * mom2[j] +
(1 - beta2) * trainer_gradient_values[i][idx + j] *
trainer_gradient_values[i][idx + j];
param[j] = param[j] -
lr_ * (mom1[j] /
(sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0])));
}
beta1_pow[0] *= beta1;
beta2_pow[0] *= beta2;
}
for (int i = 0; i < emb_dim; i++) {
ASSERT_TRUE(abs(param[i] - pull_values[idx + i]) < 1e-5);
}
}
}
} // namespace distributed
} // namespace paddle
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <ThreadPool.h>
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/table/common_dense_table.h"
#include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/table.h"
namespace paddle {
namespace distributed {
TEST(Table, Initialize) {
TableParameter table_config;
table_config.set_table_class("SparseGeoTable");
FsClientParameter fs_config;
// case 1. no accessor
Table *table = new SparseGeoTable();
auto ret = table->initialize(table_config, fs_config);
ASSERT_EQ(ret, -1);
}
} // namespace distributed
} // // namespace paddle
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册