diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 16457b564ffc82a4246776dc283261bed0351ec6..c18332d3b873164a725a25316fc611aa7e7a3092 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(memory) add_subdirectory(platform) +add_subdirectory(distributed) add_subdirectory(framework) add_subdirectory(imperative) add_subdirectory(operators) diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ee9037dec1a5d02aa74978f28034ef3ca1ab4182 --- /dev/null +++ b/paddle/fluid/distributed/CMakeLists.txt @@ -0,0 +1,16 @@ +if(NOT WITH_DISTRIBUTE) + return() +endif() + +proto_library(ps_framework_proto SRCS ps.proto) + +set(DISTRIBUTE_COMPILE_FLAGS "-Wno-error=unused-value -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=sign-compare -Wno-error=unused-variable -Wno-error=return-type -Wno-error=unused-but-set-variable -Wno-error=type-limits -Wno-error=unknown-pragmas -Wno-error=parentheses -Wno-error=unused-result") + +if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) + set(DISTRIBUTE_COMPILE_FLAGS + "${DISTRIBUTE_COMPILE_FLAGS} -faligned-new") +endif() + + +add_subdirectory(table) +add_subdirectory(test) diff --git a/paddle/fluid/distributed/common/registerer.h b/paddle/fluid/distributed/common/registerer.h new file mode 100644 index 0000000000000000000000000000000000000000..a4eab9c4a75e9ecabb183a9f41460a8b0cb516f6 --- /dev/null +++ b/paddle/fluid/distributed/common/registerer.h @@ -0,0 +1,127 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace paddle { +namespace distributed { + +class Any { + public: + Any() : content_(NULL) {} + + template + Any(const ValueType &value) : content_(new Holder(value)) {} + + Any(const Any &other) + : content_(other.content_ ? other.content_->clone() : NULL) {} + + ~Any() { delete content_; } + + template + ValueType *any_cast() { + return content_ ? &static_cast *>(content_)->held_ : NULL; + } + + private: + class PlaceHolder { + public: + virtual ~PlaceHolder() {} + virtual PlaceHolder *clone() const = 0; + }; + + template + class Holder : public PlaceHolder { + public: + explicit Holder(const ValueType &value) : held_(value) {} + virtual PlaceHolder *clone() const { return new Holder(held_); } + + ValueType held_; + }; + + PlaceHolder *content_; +}; + +class ObjectFactory { + public: + ObjectFactory() {} + virtual ~ObjectFactory() {} + virtual Any NewInstance() { return Any(); } + + private: +}; + +typedef std::map FactoryMap; +typedef std::map BaseClassMap; +#ifdef __cplusplus +extern "C" { +#endif + +inline BaseClassMap &global_factory_map() { + static BaseClassMap *base_class = new BaseClassMap(); + return *base_class; +} +#ifdef __cplusplus +} +#endif + +inline BaseClassMap &global_factory_map_cpp() { return global_factory_map(); } + +// typedef pa::Any Any; +// typedef ::FactoryMap FactoryMap; +#define REGISTER_REGISTERER(base_class) \ + class base_class##Registerer { \ + public: \ + static base_class *CreateInstanceByName(const ::std::string &name) { \ + if (global_factory_map_cpp().find(#base_class) == \ + global_factory_map_cpp().end()) { \ + LOG(ERROR) << "Can't Find BaseClass For CreateClass with:" \ + << #base_class; \ + return NULL; \ + } \ + FactoryMap &map = global_factory_map_cpp()[#base_class]; \ + FactoryMap::iterator iter = map.find(name); \ + if (iter == map.end()) { \ + LOG(ERROR) << "Can't Find Class For Create with:" << name; \ + return NULL; \ + } \ + Any object = iter->second->NewInstance(); \ + return *(object.any_cast()); \ + } \ + }; + +#define REGISTER_CLASS(clazz, name) \ + class ObjectFactory##name : public ObjectFactory { \ + public: \ + Any NewInstance() { return Any(new name()); } \ + }; \ + void register_factory_##name() { \ + FactoryMap &map = global_factory_map_cpp()[#clazz]; \ + if (map.find(#name) == map.end()) { \ + map[#name] = new ObjectFactory##name(); \ + } \ + } \ + void register_factory_##name() __attribute__((constructor)); + +#define CREATE_CLASS(base_class, name) \ + base_class##Registerer::CreateInstanceByName(name); + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/common/utils.h b/paddle/fluid/distributed/common/utils.h new file mode 100644 index 0000000000000000000000000000000000000000..f81f84b1e117510443a5698a6ba1574262f640a5 --- /dev/null +++ b/paddle/fluid/distributed/common/utils.h @@ -0,0 +1,87 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/operators/math/blas.h" +#include "paddle/fluid/platform/device_context.h" + +namespace paddle { +namespace distributed { + +template +inline paddle::operators::math::BlasT +GetBlas() { + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + return paddle::operators::math::GetBlas(cpu_ctx); +} + +template +inline void SQRT(int n, const T* x, T* z) { + for (int i = 0; i < n; ++i) { + z[i] = sqrt(x[i]); + } +} + +template +inline void ADD(int n, const T* x, const T y, T* z) { + for (int i = 0; i < n; ++i) { + z[i] = x[i] + y; + } +} + +static bool StartWith(const std::string& str, const std::string& substr) { + return str.find(substr) == 0; +} + +static bool EndWith(const std::string& str, const std::string& substr) { + return str.rfind(substr) == (str.length() - substr.length()); +} + +inline std::vector bucket(const int v_size, const int b_size) { + int remainder = v_size % b_size; + int bucket = v_size / b_size; + std::vector ret_vec(b_size, bucket); + for (int i = 0; i < remainder; ++i) { + ret_vec[i] = ret_vec[i] + 1; + } + int cur_bucket = 0; + for (int& j : ret_vec) { + int tmp = j; + j = cur_bucket; + cur_bucket += tmp; + } + ret_vec.push_back(cur_bucket); + return ret_vec; +} + +template +std::string to_string(const std::vector& vec) { + std::stringstream ss; + for (const auto& c : vec) { + ss << c << " "; + } + return ss.str(); +} +} +} diff --git a/paddle/fluid/distributed/communicator_common.h b/paddle/fluid/distributed/communicator_common.h new file mode 100644 index 0000000000000000000000000000000000000000..6a8ce552370bf72d95dd0d52a8e521afb0b0dfd0 --- /dev/null +++ b/paddle/fluid/distributed/communicator_common.h @@ -0,0 +1,95 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include +#include + +namespace paddle { +namespace distributed { + +struct CommContext { + CommContext() = default; + + CommContext(const std::string &name, const std::vector &names, + const std::vector &emap, + const std::vector §ions, + const std::vector &origin_names, int id, + bool merge_add_ = true, bool is_sparse_ = true, + bool is_distributed_ = false, int table_id_ = -1) + : var_name(name), + splited_varnames(names), + epmap(emap), + height_sections(sections), + origin_varnames(origin_names), + trainer_id(id), + merge_add(merge_add_), + is_sparse(is_sparse_), + is_distributed(is_distributed_), + table_id(table_id_) {} + + CommContext(const CommContext &ctx) { + var_name = ctx.var_name; + splited_varnames = ctx.splited_varnames; + epmap = ctx.epmap; + height_sections = ctx.height_sections; + trainer_id = ctx.trainer_id; + merge_add = ctx.merge_add; + is_sparse = ctx.is_sparse; + origin_varnames = ctx.origin_varnames; + is_distributed = ctx.is_distributed; + table_id = ctx.table_id; + } + + std::string print() const { + std::stringstream ss; + + ss << "varname: " << var_name << " trainer_id: " << trainer_id << " "; + ss << " table_id: " << table_id; + + for (size_t i = 0; i < splited_varnames.size(); i++) { + ss << "slice varname: " << splited_varnames[i] << " ep: " << epmap[i] + << " section: " << height_sections[i] << " "; + } + + ss << "origin varnames: "; + for (size_t i = 0; i < origin_varnames.size(); i++) { + ss << origin_varnames[i] << " "; + } + + ss << " aggregation->add: " << merge_add; + ss << " is_sparse: " << is_sparse; + ss << " is_distributed: " << is_distributed << "\n"; + ss << " table_id: " << table_id << "\n"; + + return ss.str(); + } + + std::string var_name; + std::vector splited_varnames; + std::vector epmap; + std::vector height_sections; + std::vector origin_varnames; + int trainer_id; + bool merge_add; + bool is_sparse; + bool is_distributed; + int table_id; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/ps.proto b/paddle/fluid/distributed/ps.proto new file mode 100644 index 0000000000000000000000000000000000000000..383ff73690bfdbb35ad87fa91c0f511c7b1a3b85 --- /dev/null +++ b/paddle/fluid/distributed/ps.proto @@ -0,0 +1,152 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +package paddle.distributed; +option cc_generic_services = true; +option cc_enable_arenas = true; + +message FsClientParameter { + enum FsApiType { + HDFS = 0; + AFS = 1; + } + optional FsApiType fs_type = 1 [ default = HDFS ]; + optional string uri = 2; // such as afs://xxx.afs.com:9902 + optional string user = 3; // user_name to access fs + optional string passwd = 4; // password + optional int32 buffer_size = 5; // buffer for read/write + optional string hadoop_bin = 51; + optional string afs_conf = 101; +} + +message PSParameter { + optional string worker_class = 1; + optional string server_class = 2; + optional string instance_class = 3; + optional string init_gflags = 4 [ default = "" ]; + optional WorkerParameter worker_param = 101; + optional ServerParameter server_param = 102; + repeated DownpourTrainerParameter trainer_param = 301; + optional FsClientParameter fs_client_param = 501; +} + +message WorkerParameter { + optional DownpourWorkerParameter downpour_worker_param = 1; +} + +message DownpourWorkerParameter { + repeated TableParameter downpour_table_param = 1; +} + +message DownpourServerParameter { + repeated TableParameter downpour_table_param = 1; + optional ServerServiceParameter service_param = 2; +} + +message ServerParameter { + optional DownpourServerParameter downpour_server_param = 1; +} + +message DownpourTrainerParameter { + repeated DenseTableParameter dense_table = 1; + repeated SparseTableParameter sparse_table = 2; + optional int32 push_sparse_per_batch = 3; + optional int32 push_dense_per_batch = 4; + repeated string skip_op = 5; + repeated ProgramConfig program_config = 6; +} + +message DenseTableParameter { + optional int32 table_id = 1; + repeated string dense_variable_name = 2; + repeated string dense_gradient_variable_name = 3; + optional int32 fea_dim = 4; +} + +message SparseTableParameter { + optional int32 table_id = 1; + optional int32 feature_dim = 2; + repeated string slot_key = 3; + repeated string slot_value = 4; + repeated string slot_gradient = 5; +} + +message ServerServiceParameter { + optional string server_class = 1 [ default = "BrpcPsServer" ]; + optional string client_class = 2 [ default = "BrpcPsClient" ]; + optional string service_class = 3 [ default = "PsService" ]; + optional uint32 start_server_port = 4 + [ default = 0 ]; // will find a avaliable port from it + optional uint32 server_thread_num = 5 [ default = 12 ]; +} + +message ProgramConfig { + required string program_id = 1; + repeated int32 push_sparse_table_id = 2; + repeated int32 push_dense_table_id = 3; + repeated int32 pull_sparse_table_id = 4; + repeated int32 pull_dense_table_id = 5; +} + +enum TableType { + PS_SPARSE_TABLE = 0; + PS_DENSE_TABLE = 1; + PS_OTHER_TABLE = 2; +} + +message TableParameter { + optional uint64 table_id = 1; + optional string table_class = 2; + optional uint64 shard_num = 3 [ default = 1000 ]; + optional TableAccessorParameter accessor = 4; + optional TensorAccessorParameter tensor = 5; + optional CommonAccessorParameter common = 6; + optional TableType type = 7; + optional bool compress_in_save = 8 [ default = false ]; +} + +message TableAccessorParameter { + optional string accessor_class = 1; + optional uint32 fea_dim = 4 [ default = 11 ]; + optional uint32 embedx_dim = 5 [ default = 8 ]; + optional uint32 embedx_threshold = 6 [ default = 10 ]; + repeated TableAccessorSaveParameter table_accessor_save_param = 8; +} + +message TensorAccessorParameter { + optional string tensor_class = 1; + optional uint32 fea_dim = 2; + optional uint32 emb_dim = 3; + optional string param = 4; + optional string grad = 5; + optional string common_block_map = 6; +} + +message CommonAccessorParameter { + optional string name = 1; + optional string table_name = 2; + repeated string attributes = 3; + repeated string params = 4; + repeated uint32 dims = 5; + repeated string initializers = 6; + optional int32 trainer_num = 7; + optional bool sync = 8; +} + +message TableAccessorSaveParameter { + optional uint32 param = 1; + optional string converter = 2; + optional string deconverter = 3; +} diff --git a/paddle/fluid/distributed/table/CMakeLists.txt b/paddle/fluid/distributed/table/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c0f8470b36b01d9bf4515e44a62f59ea8a0519a4 --- /dev/null +++ b/paddle/fluid/distributed/table/CMakeLists.txt @@ -0,0 +1,19 @@ +set_property(GLOBAL PROPERTY TABLE_DEPS string_helper) + +get_property(TABLE_DEPS GLOBAL PROPERTY TABLE_DEPS) + +set_source_files_properties(common_dense_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +set_source_files_properties(common_sparse_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +set_source_files_properties(sparse_geo_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +set_source_files_properties(barrier_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + +cc_library(common_table SRCS common_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc DEPS ${TABLE_DEPS} device_context string_helper simple_threadpool xxhash generator) + +set_source_files_properties(tensor_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +set_source_files_properties(tensor_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_framework_proto device_context) +cc_library(tensor_table SRCS tensor_table.cc DEPS ps_framework_proto proto_desc enforce executor tensor device_context simple_threadpool gflags glog ) + +set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + +cc_library(table SRCS table.cc DEPS common_table tensor_table tensor_accessor ps_framework_proto string_helper device_context gflags glog boost) diff --git a/paddle/fluid/distributed/table/accessor.h b/paddle/fluid/distributed/table/accessor.h new file mode 100644 index 0000000000000000000000000000000000000000..a07a8e10b16f64539a83ebf55bbe4c43dbb7fef2 --- /dev/null +++ b/paddle/fluid/distributed/table/accessor.h @@ -0,0 +1,170 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include "paddle/fluid/distributed/common/registerer.h" +#include "paddle/fluid/distributed/ps.pb.h" + +namespace paddle { +namespace distributed { +struct FsDataConverter { + std::string converter; + std::string deconverter; +}; + +struct Region { + Region() : data(NULL), size(0) {} + Region(char* data, size_t data_num) : data(data), size(data_num) {} + Region(float* data, size_t data_num) + : data(reinterpret_cast(data)), size(data_num << 2) {} + Region(int16_t* data, size_t data_num) + : data(reinterpret_cast(data)), size(data_num << 1) {} + Region(int32_t* data, size_t data_num) + : data(reinterpret_cast(data)), size(data_num << 2) {} + Region(int64_t* data, size_t data_num) + : data(reinterpret_cast(data)), size(data_num << 3) {} + char* data; + size_t size; +}; + +struct DataConverter { + int param; + std::string converter; + std::string deconverter; +}; + +class ValueAccessor { + public: + explicit ValueAccessor(){}; + virtual ~ValueAccessor(){}; + + virtual int configure(const TableAccessorParameter& parameter) { + _config = parameter; + // data_convert结构体初始化 + if (_config.table_accessor_save_param_size() != 0) { + for (int i = 0; i < _config.table_accessor_save_param_size(); ++i) { + int param = _config.table_accessor_save_param(i).param(); + std::string converter = + _config.table_accessor_save_param(i).converter(); + std::string deconverter = + _config.table_accessor_save_param(i).deconverter(); + _data_coverter_map[param] = std::make_shared(); + *(_data_coverter_map[param]) = {param, converter, deconverter}; + } + } + return 0; + } + virtual int initialize() = 0; + + // value维度 + virtual size_t dim() = 0; + // value各个维度的size + virtual size_t dim_size(size_t dim) = 0; + // value各维度相加总size + virtual size_t size() = 0; + + // value中mf动态长度部分总size大小, sparse下生效 + virtual size_t mf_size() { return 0; } + virtual bool need_extend_mf(float* value) { return false; } + virtual bool has_mf(size_t size) { return false; } + // pull value维度 + virtual size_t select_dim() = 0; + // pull value各个维度的size + virtual size_t select_dim_size(size_t dim) = 0; + // pull value各维度相加总size + virtual size_t select_size() = 0; + // push value维度 + virtual size_t update_dim() = 0; + // push value各个维度的size + virtual size_t update_dim_size(size_t dim) = 0; + // push value各维度相加总size + virtual size_t update_size() = 0; + // fea total for dense + virtual size_t fea_dim() { return _config.fea_dim(); } + // converter for save + virtual std::string get_converter(int param) { + auto itr = _data_coverter_map.find(param); + if (itr == _data_coverter_map.end()) { + return ""; + } else { + return (*itr).second->converter; + } + } + // deconverter for load + virtual std::string get_deconverter(int param) { + auto itr = _data_coverter_map.find(param); + if (itr == _data_coverter_map.end()) { + return ""; + } else { + return (*itr).second->deconverter; + } + } + // 判断该value是否进行shrink + virtual bool shrink(float* value) = 0; + + // 判断该value是否在save阶段dump, + // param作为参数用于标识save阶段,如downpour的xbox与batch_model + virtual bool save(float* value, int param) = 0; + // update delta_score and unseen_days after save + virtual void update_stat_after_save(float* value, int param) {} + + // keys不存在时,为values生成随机值 + virtual int32_t create(float** value, size_t num) = 0; + virtual bool create_value(int type, const float* value) { return true; } + // 从values中选取到select_values中 + virtual int32_t select(float** select_values, const float** values, + size_t num) = 0; + // 将update_values聚合到一起 + virtual int32_t merge(float** update_values, + const float** other_update_values, size_t num) = 0; + // 将update_values聚合到一起,通过it.next判定是否进入下一个key + // virtual int32_t merge(float** update_values, iterator it); + // 将update_values更新应用到values中 + virtual int32_t update(float** values, const float** update_values, + size_t num) = 0; + + // used to save model, will filter feature + virtual std::string parse_to_string(const float* value, int param) = 0; + // parse value from string, used to load model + virtual int32_t parse_from_string(const std::string& data, float* value) = 0; + + virtual FsDataConverter converter(int param) { + FsDataConverter data_convert; + data_convert.converter = this->get_converter(param); + data_convert.deconverter = this->get_deconverter(param); + return data_convert; + } + + virtual int set_weight(float** values, const float** update_values, + size_t num) { + return 0; + } + + virtual float get_field(float* value, const std::string& name) { return 0.0; } + + protected: + size_t _value_size; + size_t _select_value_size; + size_t _update_value_size; + TableAccessorParameter _config; + std::unordered_map> + _data_coverter_map; +}; +REGISTER_REGISTERER(ValueAccessor); +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/barrier_table.cc b/paddle/fluid/distributed/table/barrier_table.cc new file mode 100644 index 0000000000000000000000000000000000000000..d1e545a133e6163f75c8c2ba756be3ee420e3916 --- /dev/null +++ b/paddle/fluid/distributed/table/barrier_table.cc @@ -0,0 +1,78 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include // NOLINT +#include "paddle/fluid/distributed/common/utils.h" +#include "paddle/fluid/distributed/table/common_table.h" + +namespace paddle { +namespace distributed { + +int32_t BarrierTable::initialize() { + auto trainers = _config.common().trainer_num(); + trigger_.store(trainers); + + for (int x = 0; x < trainers; ++x) { + trainer_all_.insert(x); + } + VLOG(1) << "BarrierTable init trigger: " << trigger_.load(); + return 0; +} + +// 0: send_barrier 1: recv_barrier 2: complete +int32_t BarrierTable::barrier(const uint32_t trainer_id, + const std::string barrier_type) { + std::unique_lock lock(mutex_); + + if (barrier_type == "2") { + trigger_.fetch_sub(1, std::memory_order::memory_order_relaxed); + VLOG(1) << "trigger sub to : " << trigger_.load(); + } else { + trainer_ids_.insert(trainer_id); + VLOG(1) << "barrier type: " << barrier_type + << " add trainer id: " << trainer_id; + } + + if (trainer_ids_.size() < trigger_.load()) { + std::vector diffs(trainer_all_.size()); + auto iter = std::set_difference(trainer_all_.begin(), trainer_all_.end(), + trainer_ids_.begin(), trainer_ids_.end(), + diffs.begin()); + diffs.resize(iter - diffs.begin()); + + auto diff = to_string(diffs); + VLOG(1) << "still need trainers: " << diff; + trainer_wait_.wait(lock, [&] { return trainer_ids_.size() == 0; }); + } else { + VLOG(1) << "barrier table optimize begin"; + for (auto& x : *table_map_) { + auto table = x.second; + table->pour(); + } + VLOG(1) << "barrier table optimize done"; + + trainer_ids_.clear(); + trainer_wait_.notify_all(); + } + return 0; +} + +int32_t BarrierTable::set_table_map( + std::unordered_map>* table_map) { + table_map_ = table_map; + return 0; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/common_dense_table.cc b/paddle/fluid/distributed/table/common_dense_table.cc new file mode 100644 index 0000000000000000000000000000000000000000..e3d481f32eb8881505514281544ddd92b0d8f921 --- /dev/null +++ b/paddle/fluid/distributed/table/common_dense_table.cc @@ -0,0 +1,156 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/table/common_dense_table.h" +#include "paddle/fluid/distributed/common/utils.h" + +namespace paddle { +namespace distributed { + +void CommonDenseTable::create_initializer(const std::string& attr, + const std::string& name) { + auto slices = string::split_string(attr, "&"); + + if (slices[0] == "gaussian_random") { + initializers_[name] = new GaussianInitializer(slices); + } else if (slices[0] == "fill_constant") { + initializers_[name] = new FillConstantInitializer(slices); + } else if (slices[0] == "uniform_random") { + initializers_[name] = new UniformInitializer(slices); + } else { + PADDLE_THROW( + platform::errors::InvalidArgument("%s can not be supported", name)); + } +} + +int32_t CommonDenseTable::initialize() { + _shards_task_pool.resize(task_pool_size_); + for (int i = 0; i < _shards_task_pool.size(); ++i) { + _shards_task_pool[i].reset(new ::ThreadPool(1)); + } + + sync = _config.common().sync(); + VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; + + initialize_value(); + initialize_optimizer(); + return 0; +} + +int32_t CommonDenseTable::initialize_value() { + auto common = _config.common(); + int size = static_cast(common.params().size()); + values_.resize(size); + for (int x = 0; x < size; ++x) { + auto& varname = common.params()[x]; + auto& dim = common.dims()[x]; + if (varname == "Param") { + param_dim_ = dim; + param_idx_ = x; + } + auto& initializer = common.initializers()[x]; + + create_initializer(initializer, varname); + values_[x].resize(dim); + names_index_[varname] = x; + + for (int y = 0; y < dim; ++y) { + values_[x][y] = initializers_[varname]->GetValue(); + } + } + + pull_reservoir_ = ReservoirValue(param_dim_); + return 0; +} + +int32_t CommonDenseTable::initialize_optimizer() { + auto common = _config.common(); + auto name = common.name(); + auto attrs = common.attributes(); + + if (name == "sgd") { + optimizer_ = std::make_shared(common, &values_); + } else if (name == "adam") { + optimizer_ = std::make_shared(common, &values_); + } else if (name == "sum") { + optimizer_ = std::make_shared(common, &values_); + } else { + VLOG(0) << "init optimizer failed"; + } + VLOG(0) << "init optimizer " << name << " done"; + return 0; +} + +int32_t CommonDenseTable::pull_dense(float* pull_values, size_t num) { + std::copy(values_[param_idx_].begin(), values_[param_idx_].end(), + pull_values); + return 0; +} + +int32_t CommonDenseTable::push_dense_param(const float* values, size_t num) { + PADDLE_ENFORCE_GE( + num, param_dim_, + paddle::platform::errors::InvalidArgument( + "update desne param numel expected %d, but got %d", param_dim_, num)); + std::copy_n(values, param_dim_, values_[param_idx_].begin()); + return 0; +} + +int32_t CommonDenseTable::pour() { + _push_dense(pull_reservoir_.values.data(), pull_reservoir_.values.size()); + pull_reservoir_.reset(); + return 0; +} + +int32_t CommonDenseTable::push_dense(const float* values, size_t num) { + if (sync) { + std::future task = + _shards_task_pool[0]->enqueue([this, &values]() -> int { + pull_reservoir_.add(values, param_dim_); + return 0; + }); + task.wait(); + } else { + _push_dense(values, num); + } + return 0; +} + +int32_t CommonDenseTable::_push_dense(const float* values, size_t num) { + PADDLE_ENFORCE_GE( + num, param_dim_, + paddle::platform::errors::InvalidArgument( + "update desne numel expected %d, but got %d", param_dim_, num)); + + std::vector buckets = bucket(param_dim_, task_pool_size_); + std::vector> tasks(task_pool_size_); + + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( + [this, shard_id, &buckets, &values]() -> int { + auto begin = buckets[shard_id]; + auto end = buckets[shard_id + 1]; + optimizer_->update(values, param_dim_, begin, end); + return 0; + }); + } + + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } + return 0; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/common_dense_table.h b/paddle/fluid/distributed/table/common_dense_table.h new file mode 100644 index 0000000000000000000000000000000000000000..eb97f3f26416a905020bcf722aee182dc2510de0 --- /dev/null +++ b/paddle/fluid/distributed/table/common_dense_table.h @@ -0,0 +1,80 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "Eigen/Dense" +#include "paddle/fluid/distributed/table/accessor.h" +#include "paddle/fluid/distributed/table/common_table.h" +#include "paddle/fluid/distributed/table/depends/dense.h" +#include "paddle/fluid/distributed/table/depends/initializers.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace distributed { + +class CommonDenseTable : public DenseTable { + public: + explicit CommonDenseTable() {} + virtual ~CommonDenseTable() {} + virtual int32_t initialize() override; + virtual int32_t initialize_shard() override { return 0; } + virtual void create_initializer(const std::string& attr, + const std::string& name); + virtual int32_t initialize_value(); + virtual int32_t initialize_optimizer(); + virtual int32_t pull_dense(float* pull_values, size_t num) override; + virtual int32_t push_dense_param(const float* values, size_t num) override; + virtual int32_t push_dense(const float* values, size_t num) override; + virtual int32_t pour() override; + + int32_t load(const std::string& path, const std::string& param) override { + VLOG(0) << "Dense table may load by " + "paddle.distributed.fleet.init_server"; + return 0; + } + + int32_t save(const std::string& path, const std::string& param) override { + VLOG(0) + << "Dense table may be saved by " + "paddle.distributed.fleet.save_persistables/save_inference_model"; + return 0; + } + + virtual int32_t flush() override { return 0; } + virtual int32_t shrink() override { return 0; } + virtual void clear() override { return; } + + protected: + int32_t _push_dense(const float* values, size_t num); + + private: + const int task_pool_size_ = 1; + bool sync = true; + std::vector> _shards_task_pool; + int param_dim_ = 0; + int param_idx_ = 0; + std::shared_ptr optimizer_; + std::vector> values_; + ReservoirValue pull_reservoir_; + std::unordered_map initializers_; + std::unordered_map names_index_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc new file mode 100644 index 0000000000000000000000000000000000000000..288f034c4bb3a67c202d2a4033cd43b3b71c66cc --- /dev/null +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -0,0 +1,521 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include +#include +#include "paddle/fluid/distributed/common/utils.h" +#include "paddle/fluid/distributed/table/depends/large_scale_kv.h" +#include "paddle/fluid/framework/generator.h" +#include "paddle/fluid/string/printf.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace distributed { + +struct Meta { + std::string param; + int shard_id; + std::vector names; + std::vector dims; + uint64_t count; + std::unordered_map dims_map; + + explicit Meta(const std::string& metapath) { + std::ifstream file(metapath); + std::string line; + int num_lines = 0; + while (std::getline(file, line)) { + if (StartWith(line, "#")) { + continue; + } + auto pairs = paddle::string::split_string(line, "="); + PADDLE_ENFORCE_EQ( + pairs.size(), 2, + paddle::platform::errors::InvalidArgument( + "info in %s except k=v, but got %s", metapath, line)); + + if (pairs[0] == "param") { + param = pairs[1]; + } + if (pairs[0] == "shard_id") { + shard_id = std::stoi(pairs[1]); + } + if (pairs[0] == "row_names") { + names = paddle::string::split_string(pairs[1], ","); + } + if (pairs[0] == "row_dims") { + auto dims_strs = + paddle::string::split_string(pairs[1], ","); + for (auto& str : dims_strs) { + dims.push_back(std::stoi(str)); + } + } + if (pairs[0] == "count") { + count = std::stoull(pairs[1]); + } + } + for (int x = 0; x < names.size(); ++x) { + dims_map[names[x]] = dims[x]; + } + } + + Meta(std::string param, int shard_id, std::vector row_names, + std::vector dims, uint64_t count) { + this->param = param; + this->shard_id = shard_id; + this->names = row_names; + this->dims = dims; + this->count = count; + } + + std::string ToString() { + std::stringstream ss; + ss << "param=" << param << "\n"; + ss << "shard_id=" << shard_id << "\n"; + ss << "row_names=" << paddle::string::join_strings(names, ',') << "\n"; + ss << "row_dims=" << paddle::string::join_strings(dims, ',') << "\n"; + ss << "count=" << count << "\n"; + return ss.str(); + } +}; + +void ProcessALine(const std::vector& columns, const Meta& meta, + std::vector>* values) { + PADDLE_ENFORCE_EQ(columns.size(), meta.names.size() + 1, + paddle::platform::errors::InvalidArgument( + "record in txt do not match meta.")); + + values->reserve(columns.size() - 1); + + for (int x = 1; x < columns.size(); ++x) { + auto& column = columns[x]; + auto val_ = paddle::string::split_string(column, ","); + + std::vector val; + std::transform(val_.begin(), val_.end(), std::back_inserter(val), + [](std::string va) { return std::stof(va); }); + PADDLE_ENFORCE_EQ(val.size(), meta.dims[x - 1], + paddle::platform::errors::InvalidArgument( + "record in txt do not match meta.")); + values->push_back(val); + } +} + +int64_t SaveToText(std::ostream* os, std::shared_ptr block, + const std::vector& saved_names, + const int mode) { + for (auto value : block->values_) { + std::vector*> vss = value.second->get(saved_names); + std::stringstream ss; + auto id = value.first; + ss << id << "\t"; + for (int i = 0; i < static_cast(vss.size()); i++) { + auto& vs = vss[i]; + ss << paddle::string::join_strings((*vs), ','); + ss << "\t"; + } + ss << "\n"; + + os->write(ss.str().c_str(), sizeof(char) * ss.str().size()); + } + + return block->values_.size(); +} + +int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, + const int pserver_id, const int pserver_num, + const int local_shard_num, + std::vector>* blocks) { + Meta meta = Meta(metapath); + + int num_lines = 0; + std::ifstream file(valuepath); + std::string line; + + while (std::getline(file, line)) { + auto values = paddle::string::split_string(line, "\t"); + auto id = std::stoull(values[0]); + + if (id % pserver_num != pserver_id) { + VLOG(0) << "will not load " << values[0] << " from " << valuepath + << ", please check id distribution"; + continue; + } + + auto shard_id = id % local_shard_num; + auto block = blocks->at(shard_id); + + std::vector> kvalues; + ProcessALine(values, meta, &kvalues); + block->Init(id, &kvalues, 1); + } + + return 0; +} + +void SaveShard(std::shared_ptr block, const std::string& dirname, + const CommonAccessorParameter& common, const int mode, + const int pserver_id, const int shard_id) { + auto varname = common.table_name(); + std::string var_store = string::Sprintf("%s/%s", dirname, varname); + VLOG(3) << "save " << varname << " in dir: " << var_store << " begin"; + MkDirRecursively(var_store.c_str()); + + std::string shard_var_pre = + string::Sprintf("%s.block%d.%d", varname, pserver_id, shard_id); + std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre); + std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre); + + // save values + std::vector params(common.params().begin(), + common.params().end()); + std::unique_ptr value_out(new std::ofstream(value_)); + SaveToText(value_out.get(), block, params, mode); + // save meta + std::stringstream stream; + stream << "param=" << common.table_name() << "\n"; + stream << "server_id=" << pserver_id << "\n"; + stream << "shard_id=" << shard_id << "\n"; + stream << "row_names=" << paddle::string::join_strings(common.params(), ',') + << "\n"; + stream << "row_dims=" << paddle::string::join_strings(common.dims(), ',') + << "\n"; + stream << "count=" << block->values_.size() << "\n"; + std::unique_ptr meta_out(new std::ofstream(meta_)); + meta_out->write(stream.str().c_str(), sizeof(char) * stream.str().size()); + meta_out->close(); + VLOG(3) << "save " << varname << " in dir: " << var_store << " done"; +} + +void CommonSparseTable::create_initializer(const std::string& attr, + const std::string& name) { + auto slices = string::split_string(attr, "&"); + + if (slices[0] == "gaussian_random") { + initializers_[name] = new GaussianInitializer(slices); + } else if (slices[0] == "fill_constant") { + initializers_[name] = new FillConstantInitializer(slices); + } else if (slices[0] == "uniform_random") { + initializers_[name] = new UniformInitializer(slices); + } else { + PADDLE_THROW( + platform::errors::InvalidArgument("%s can not be supported", name)); + } +} + +int32_t CommonSparseTable::initialize() { + _shards_task_pool.resize(task_pool_size_); + for (int i = 0; i < _shards_task_pool.size(); ++i) { + _shards_task_pool[i].reset(new ::ThreadPool(1)); + } + + sync = _config.common().sync(); + VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; + + initialize_value(); + initialize_optimizer(); + initialize_recorder(); + return 0; +} + +int32_t CommonSparseTable::initialize_recorder() { return 0; } + +int32_t CommonSparseTable::initialize_value() { + auto common = _config.common(); + int size = static_cast(common.params().size()); + + for (int x = 0; x < size; ++x) { + auto& varname = common.params()[x]; + auto& dim = common.dims()[x]; + if (varname == "Param") { + param_dim_ = dim; + } + auto& initializer = common.initializers()[x]; + create_initializer(initializer, varname); + } + + shard_values_.reserve(task_pool_size_); + for (int x = 0; x < task_pool_size_; ++x) { + auto shard = std::make_shared(common, &initializers_); + shard_values_.emplace_back(shard); + } + return 0; +} + +int32_t CommonSparseTable::initialize_optimizer() { + auto common = _config.common(); + auto name = common.name(); + auto attrs = common.attributes(); + + if (name == "sgd") { + optimizer_ = std::make_shared(common); + } else if (name == "adam") { + optimizer_ = std::make_shared(common); + } else if (name == "sum") { + optimizer_ = std::make_shared(common); + } else { + VLOG(0) << "init optimizer failed"; + } + + VLOG(0) << "init optimizer " << name << " done"; + return 0; +} + +int32_t CommonSparseTable::load(const std::string& path, + const std::string& param) { + rwlock_->WRLock(); + VLOG(0) << "sparse table load with " << path << " with meta " << param; + LoadFromText(path, param, _shard_idx, _shard_num, task_pool_size_, + &shard_values_); + rwlock_->UNLock(); + return 0; +} + +int32_t CommonSparseTable::save(const std::string& dirname, + const std::string& param) { + rwlock_->WRLock(); + int mode = std::stoi(param); + VLOG(0) << "sparse table save: " << dirname << " mode: " << mode; + + auto varname = _config.common().table_name(); + std::string var_store = string::Sprintf("%s/%s", dirname, varname); + MkDirRecursively(var_store.c_str()); + + VLOG(3) << "save " << varname << " in dir: " << var_store << " begin"; + std::vector params(_config.common().params().begin(), + _config.common().params().end()); + std::string shard_var_pre = + string::Sprintf("%s.block%d", varname, _shard_idx); + + std::string value_ = string::Sprintf("%s/%s.txt", var_store, shard_var_pre); + + std::unique_ptr value_out(new std::ofstream(value_)); + + int64_t total_ins = 0; + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + // save values + total_ins += + SaveToText(value_out.get(), shard_values_[shard_id], params, mode); + } + value_out->close(); + + // save meta + std::stringstream stream; + stream << "param=" << _config.common().table_name() << "\n"; + stream << "shard_id=" << _shard_idx << "\n"; + stream << "row_names=" + << paddle::string::join_strings(_config.common().params(), ',') + << "\n"; + stream << "row_dims=" + << paddle::string::join_strings(_config.common().dims(), ',') << "\n"; + stream << "count=" << total_ins << "\n"; + std::string meta_ = string::Sprintf("%s/%s.meta", var_store, shard_var_pre); + std::unique_ptr meta_out(new std::ofstream(meta_)); + meta_out->write(stream.str().c_str(), sizeof(char) * stream.str().size()); + meta_out->close(); + VLOG(3) << "save " << varname << " in dir: " << var_store << " done"; + rwlock_->UNLock(); + return 0; +} + +std::pair CommonSparseTable::print_table_stat() { + int64_t feasign_size = 0; + int64_t mf_size = 0; + + for (auto& value : shard_values_) { + feasign_size += value->values_.size(); + } + + return {feasign_size, mf_size}; +} + +int32_t CommonSparseTable::pour() { + rwlock_->RDLock(); + + std::vector values; + std::vector keys; + + keys.reserve(pull_reservoir_.size()); + values.reserve(pull_reservoir_.size() * param_dim_); + + for (auto& val : pull_reservoir_) { + keys.push_back(val.first); + auto& reservoir = val.second; + reservoir.avg(); + std::copy(reservoir.values.begin(), reservoir.values.end(), + std::back_inserter(values)); + } + _push_sparse(keys.data(), values.data(), pull_reservoir_.size()); + + pull_reservoir_.clear(); + rwlock_->UNLock(); + return 0; +} + +int32_t CommonSparseTable::pull_sparse(float* pull_values, const uint64_t* keys, + size_t num) { + rwlock_->RDLock(); + std::vector value_names; + for (auto name : _config.common().params()) { + value_names.push_back(name); + } + + std::vector> offset_bucket; + offset_bucket.resize(task_pool_size_); + + for (int x = 0; x < num; ++x) { + auto y = keys[x] % task_pool_size_; + offset_bucket[y].push_back(x); + } + + std::vector> tasks(task_pool_size_); + + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( + [this, shard_id, &keys, &offset_bucket, &value_names, + &pull_values]() -> int { + auto& block = shard_values_[shard_id]; + auto& offsets = offset_bucket[shard_id]; + + for (int i = 0; i < offsets.size(); ++i) { + auto offset = offsets[i]; + auto id = keys[offset]; + block->InitFromInitializer(id, value_names); + auto values = block->Get(id, {"Param"}); + auto dim = values[0]->size(); + std::copy(values[0]->begin(), values[0]->end(), + pull_values + dim * offset); + } + return 0; + }); + } + + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } + rwlock_->UNLock(); + return 0; +} + +int32_t CommonSparseTable::_push_sparse(const uint64_t* keys, + const float* values, size_t num) { + rwlock_->RDLock(); + std::vector> offset_bucket; + offset_bucket.resize(task_pool_size_); + + for (int x = 0; x < num; ++x) { + auto y = keys[x] % task_pool_size_; + offset_bucket[y].push_back(x); + } + + std::vector> tasks(task_pool_size_); + + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( + [this, shard_id, &keys, &values, num, &offset_bucket]() -> int { + auto& offsets = offset_bucket[shard_id]; + optimizer_->update(keys, values, num, offsets, + shard_values_[shard_id].get()); + return 0; + }); + } + + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } + rwlock_->UNLock(); + return 0; +} + +int32_t CommonSparseTable::push_sparse(const uint64_t* keys, + const float* values, size_t num) { + if (sync) { + std::future task = + _shards_task_pool[0]->enqueue([this, &keys, &values, num]() -> int { + for (int x = 0; x < num; ++x) { + auto id = keys[x]; + auto has = pull_reservoir_.find(id); + + if (has == pull_reservoir_.end()) { + pull_reservoir_[id] = ReservoirValue(param_dim_); + } + + auto& reservoir = pull_reservoir_[id]; + reservoir.add(values + x * param_dim_, param_dim_); + } + return 0; + }); + task.wait(); + } else { + _push_sparse(keys, values, num); + } + + return 0; +} + +int32_t CommonSparseTable::push_sparse_param(const uint64_t* keys, + const float* values, size_t num) { + rwlock_->RDLock(); + std::vector value_names; + for (auto name : _config.common().params()) { + value_names.push_back(name); + } + + std::vector> offset_bucket; + offset_bucket.resize(task_pool_size_); + + for (int x = 0; x < num; ++x) { + auto y = keys[x] % task_pool_size_; + offset_bucket[y].push_back(x); + } + + std::vector> tasks(task_pool_size_); + + for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { + tasks[shard_id] = _shards_task_pool[shard_id]->enqueue( + [this, shard_id, &keys, &offset_bucket, &value_names, + &values]() -> int { + auto& block = shard_values_[shard_id]; + auto& offsets = offset_bucket[shard_id]; + + for (int i = 0; i < offsets.size(); ++i) { + auto offset = offsets[i]; + auto id = keys[offset]; + block->InitFromInitializer(id, value_names); + auto values_ = block->Get(id, {"Param"}); + auto dim = values_[0]->size(); + std::copy_n(values + dim * offset, dim, values_[0]->data()); + } + return 0; + }); + } + + for (size_t shard_id = 0; shard_id < tasks.size(); ++shard_id) { + tasks[shard_id].wait(); + } + rwlock_->UNLock(); + return 0; +} + +int32_t CommonSparseTable::flush() { return 0; } + +int32_t CommonSparseTable::shrink() { + VLOG(0) << "shrink coming soon"; + return 0; +} +void CommonSparseTable::clear() { VLOG(0) << "clear coming soon"; } + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/common_sparse_table.h b/paddle/fluid/distributed/table/common_sparse_table.h new file mode 100644 index 0000000000000000000000000000000000000000..6baf60a44c15b0055faf2d486e484edb97365e42 --- /dev/null +++ b/paddle/fluid/distributed/table/common_sparse_table.h @@ -0,0 +1,97 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include // NOLINT +#include +#include +#include +#include +#include "Eigen/Dense" +#include "paddle/fluid/distributed/table/accessor.h" +#include "paddle/fluid/distributed/table/common_table.h" +#include "paddle/fluid/distributed/table/depends/initializers.h" +#include "paddle/fluid/distributed/table/depends/large_scale_kv.h" +#include "paddle/fluid/distributed/table/depends/sparse.h" +#include "paddle/fluid/framework/rw_lock.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace distributed { + +class CommonSparseTable : public SparseTable { + public: + CommonSparseTable() { rwlock_.reset(new framework::RWLock); } + virtual ~CommonSparseTable() {} + + // unused method begin + virtual int32_t pull_dense(float* pull_values, size_t num) { return 0; } + virtual int32_t push_dense_param(const float* values, size_t num) { + return 0; + } + virtual int32_t push_dense(const float* values, size_t num) { return 0; } + // unused method end + + virtual int32_t initialize(); + virtual int32_t initialize_shard() { return 0; } + virtual void create_initializer(const std::string& attr, + const std::string& name); + virtual int32_t initialize_value(); + virtual int32_t initialize_optimizer(); + virtual int32_t initialize_recorder(); + + int32_t load(const std::string& path, const std::string& param); + + int32_t save(const std::string& path, const std::string& param); + + virtual std::pair print_table_stat(); + virtual int32_t pull_sparse(float* pull_values, const uint64_t* keys, + size_t num); + + virtual int32_t push_sparse(const uint64_t* keys, const float* values, + size_t num); + + // only for sparse geo table + virtual int32_t push_sparse_param(const uint64_t* keys, const float* values, + size_t num); + + virtual int32_t pour(); + virtual int32_t flush(); + virtual int32_t shrink(); + virtual void clear(); + + protected: + virtual int32_t _push_sparse(const uint64_t* keys, const float* values, + size_t num); + + private: + const int task_pool_size_ = 11; + std::vector> _shards_task_pool; + + bool sync = false; + int param_dim_ = 0; + std::shared_ptr optimizer_; + std::unordered_map initializers_; + std::vector> shard_values_; + std::unordered_map> pull_reservoir_; + std::unique_ptr rwlock_{nullptr}; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/common_table.h b/paddle/fluid/distributed/table/common_table.h new file mode 100644 index 0000000000000000000000000000000000000000..d37e6677e634d7da93b661cf02389c2f4abee19a --- /dev/null +++ b/paddle/fluid/distributed/table/common_table.h @@ -0,0 +1,166 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include // NOLINT +#include // NOLINT +#include + +#include "paddle/fluid/distributed/table/table.h" + +#include "paddle/fluid/distributed/common/utils.h" + +namespace paddle { +namespace distributed { + +template +struct ReservoirValue { + std::vector values; + uint32_t counter; + uint32_t dim; + + ReservoirValue() { + dim = 0; + values.resize(dim); + counter = 0; + } + + ReservoirValue(uint32_t dim) { + this->dim = dim; + values.resize(dim); + counter = 0; + } + + void add(const T *value, int numel) { + GetBlas().VADD(numel, values.data(), value, values.data()); + counter++; + } + + void add(T *value, int numel) { + GetBlas().VADD(numel, values.data(), value, values.data()); + counter++; + } + + void avg() { + auto scale = 1 / static_cast(counter); + GetBlas().SCAL(values.size(), scale, values.data()); + } + + void reset() { + values.resize(dim, 0); + counter = 0; + } +}; + +class SparseTable : public Table { + public: + SparseTable() {} + virtual ~SparseTable() {} + + virtual void *get_shard(size_t shard_idx) { return 0; } + + int32_t pull_dense(float *values, size_t num) override { return 0; } + + int32_t push_dense(const float *values, size_t num) override { return 0; } + + static int32_t sparse_local_shard_num(uint32_t shard_num, + uint32_t server_num) { + if (shard_num % server_num == 0) { + return shard_num / server_num; + } + size_t local_shard_num = shard_num / server_num + 1; + return local_shard_num; + } + + static size_t get_sparse_shard(uint32_t shard_num, uint32_t server_num, + uint64_t key) { + return (key % shard_num) / sparse_local_shard_num(shard_num, server_num); + } +}; + +class DenseTable : public Table { + public: + DenseTable() {} + virtual ~DenseTable() {} + + virtual void *get_shard(size_t shard_idx) { return 0; } + int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) override { + return 0; + } + int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) override { + return 0; + } + int32_t push_dense_param(const float *values, size_t num) override { + return 0; + } + int32_t shrink() override { return 0; } +}; + +class BarrierTable : public Table { + public: + BarrierTable() {} + virtual ~BarrierTable() {} + + virtual void *get_shard(size_t shard_idx) { return 0; } + + int32_t pull_dense(float *values, size_t num) override { return 0; } + + int32_t push_dense(const float *values, size_t num) override { return 0; } + + int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) override { + return 0; + } + int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) override { + return 0; + } + int32_t push_dense_param(const float *values, size_t num) override { + return 0; + } + int32_t shrink() override { return 0; } + virtual void clear(){}; + virtual int32_t flush() { return 0; }; + virtual int32_t load(const std::string &path, const std::string ¶m) { + return 0; + } + virtual int32_t save(const std::string &path, const std::string ¶m) { + return 0; + } + virtual int32_t initialize_shard() { return 0; }; + + virtual int32_t initialize() override; + // only for barrier + // 0: send_barrier 1: recv_barrier 2: complete + virtual int32_t barrier(const uint32_t trainer_id, + const std::string barrier_type) override; + + virtual int32_t set_table_map( + std::unordered_map> *table_map) override; + + private: + std::mutex mutex_; + std::condition_variable trainer_wait_; + std::set trainer_ids_; + std::set trainer_all_; + std::atomic trigger_; + std::atomic exit_; + std::unordered_map> *table_map_; +}; +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/depends/dense.h b/paddle/fluid/distributed/table/depends/dense.h new file mode 100644 index 0000000000000000000000000000000000000000..8a71d9b5a8b651853333d8f4ce346471407dc901 --- /dev/null +++ b/paddle/fluid/distributed/table/depends/dense.h @@ -0,0 +1,182 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include // for sqrt in CPU and CUDA +#include +#include +#include +#include +#include + +#include "paddle/fluid/distributed/common/utils.h" + +namespace paddle { +namespace distributed { + +// dense optimzier +// TODO(tangwei12) integrate with sparse optimzer later. +class DenseOptimizer { + public: + DenseOptimizer() {} + explicit DenseOptimizer(const CommonAccessorParameter& accessor, + std::vector>* values) {} + virtual void update(const float* update_values, size_t num, int begin, + int end) = 0; +}; + +// sum calc for dense tensor +class DSUM : public DenseOptimizer { + public: + explicit DSUM(const CommonAccessorParameter& accessor, + std::vector>* values) { + auto& names = accessor.params(); + for (int x = 0; x < static_cast(names.size()); ++x) { + if (names[x] == "Param") { + param = (*values)[x].data(); + } + } + } + + void update(const float* update_values, size_t num, int begin, + int end) override { + auto update_numel = end - begin; + GetBlas().VADD(update_numel, update_values + begin, param + begin, + param + begin); + } + + float* param; +}; + +// sgd optimizer for dense tensor +class DSGD : public DenseOptimizer { + public: + explicit DSGD(const CommonAccessorParameter& accessor, + std::vector>* values) { + auto& names = accessor.params(); + for (int x = 0; x < static_cast(names.size()); ++x) { + if (names[x] == "LearningRate") { + learning_rate = (*values)[x].data(); + } + if (names[x] == "Param") { + param = (*values)[x].data(); + } + } + } + + void update(const float* update_values, size_t num, int begin, + int end) override { + auto update_numel = end - begin; + std::vector grads; + grads.resize(update_numel); + + auto blas = GetBlas(); + blas.VCOPY(update_numel, update_values + begin, grads.data()); + blas.SCAL(update_numel, *learning_rate, grads.data()); + blas.VSUB(update_numel, param + begin, grads.data(), param + begin); + } + + float* learning_rate; + float* param; +}; + +// adam optimizer for dense tensor +class DAdam : public DenseOptimizer { + public: + explicit DAdam(const CommonAccessorParameter& accessor, + std::vector>* values) { + auto& names = accessor.params(); + for (int x = 0; x < static_cast(names.size()); ++x) { + if (names[x] == "LearningRate") { + learning_rate = (*values)[x].data(); + } + if (names[x] == "Param") { + param = (*values)[x].data(); + } + if (names[x] == "Moment1") { + moment1 = (*values)[x].data(); + } + if (names[x] == "Moment2") { + moment2 = (*values)[x].data(); + } + if (names[x] == "Beta1Pow") { + beta1_pow = (*values)[x].data(); + } + if (names[x] == "Beta2Pow") { + beta2_pow = (*values)[x].data(); + } + } + + // add attr later + beta1 = 0.9; + beta2 = 0.999; + epsilon = 1.0e-8; + } + + void update(const float* update_values, size_t num, int begin, + int end) override { + auto update_numel = end - begin; + std::vector grad, grad2, tmp; + grad.resize(update_numel); + grad2.resize(update_numel); + tmp.resize(update_numel); + + auto blas = GetBlas(); + blas.VCOPY(update_numel, update_values + begin, grad.data()); + blas.VCOPY(update_numel, update_values + begin, grad2.data()); + + blas.SCAL(update_numel, 1 - beta1, grad.data()); + blas.VSQUARE(update_numel, grad2.data(), grad2.data()); + blas.SCAL(update_numel, 1 - beta2, grad2.data()); + + blas.SCAL(update_numel, beta1, moment1 + begin); + blas.VADD(update_numel, moment1 + begin, grad.data(), moment1 + begin); + blas.SCAL(update_numel, beta2, moment2 + begin); + blas.VADD(update_numel, moment2 + begin, grad2.data(), moment2 + begin); + + beta1_pow[0] = beta1_pow[0] * beta1; + beta2_pow[0] = beta2_pow[0] * beta2; + + float lr_ = learning_rate[0]; + lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); + + float* tmp_ = tmp.data(); + float eps_ = epsilon * sqrt(1 - beta2_pow[0]); + + SQRT(update_numel, moment2 + begin, tmp_); + ADD(update_numel, tmp_, eps_, tmp_); + + blas.VDIV(update_numel, moment1 + begin, tmp_, tmp_); + blas.SCAL(update_numel, lr_, tmp_); + blas.VSUB(update_numel, param + begin, tmp_, param + begin); + } + + float* learning_rate; + + float* param; + float* moment1; + float* moment2; + + float* beta1_pow; + float* beta2_pow; + + float beta1; + float beta2; + float epsilon; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/depends/geo_recorder.h b/paddle/fluid/distributed/table/depends/geo_recorder.h new file mode 100644 index 0000000000000000000000000000000000000000..ad094f0dfbc48aeab046b80527b0193fad4189cb --- /dev/null +++ b/paddle/fluid/distributed/table/depends/geo_recorder.h @@ -0,0 +1,94 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include // NOLINT +#include +#include +#include +#include +#include +#include + +namespace paddle { +namespace distributed { + +class ConcurrentSet { + public: + ConcurrentSet() : pool_(new ::ThreadPool(1)) {} + ~ConcurrentSet() {} + + std::future Update(const std::vector& rows) { + auto task = [this, rows] { + for (auto row : rows) { + set_.insert(row); + } + }; + return pool_->enqueue(std::move(task)); + } + + std::future GetAndClear(std::vector* result) { + auto task = [this, &result] { + result->clear(); + for (auto& id : set_) { + result->push_back(id); + } + set_.clear(); + }; + return pool_->enqueue(std::move(task)); + } + + private: + std::unordered_set set_; + std::unique_ptr<::ThreadPool> pool_{nullptr}; +}; + +class GeoRecorder { + public: + explicit GeoRecorder(int trainer_num) : trainer_num_(trainer_num) { + trainer_rows_.reserve(trainer_num); + for (auto i = 0; i < trainer_num; ++i) { + trainer_rows_.emplace_back(new ConcurrentSet()); + } + } + + ~GeoRecorder() = default; + + void Update(const std::vector& update_rows) { + VLOG(3) << " row size: " << update_rows.size(); + + std::vector> fs; + for (auto& set : trainer_rows_) { + fs.push_back(set->Update(update_rows)); + } + for (auto& f : fs) { + f.wait(); + } + } + + void GetAndClear(uint32_t trainer_id, std::vector* result) { + VLOG(3) << "GetAndClear for trainer: " << trainer_id; + trainer_rows_.at(trainer_id)->GetAndClear(result).wait(); + } + + private: + const int trainer_num_; + std::vector> trainer_rows_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/depends/initializers.h b/paddle/fluid/distributed/table/depends/initializers.h new file mode 100644 index 0000000000000000000000000000000000000000..e3d6e052c915863ecfd4ba5af636b4274f17f667 --- /dev/null +++ b/paddle/fluid/distributed/table/depends/initializers.h @@ -0,0 +1,102 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/generator.h" + +namespace paddle { +namespace distributed { + +class Initializer { + public: + Initializer() {} + + explicit Initializer(const std::vector &attrs) {} + + virtual float GetValue() = 0; + + virtual ~Initializer() {} + + protected: + std::string name_; + unsigned int seed_; +}; + +class UniformInitializer : public Initializer { + public: + explicit UniformInitializer(const std::vector &attrs) { + name_ = attrs[0]; + seed_ = static_cast(std::stoi(attrs[1])); + min_ = std::stof(attrs[2]); + max_ = std::stof(attrs[3]); + + dist_ = std::uniform_real_distribution(min_, max_); + random_engine_ = framework::GetCPURandomEngine(seed_); + } + + float GetValue() override { return dist_(*random_engine_); } + + private: + float min_; + float max_; + + std::shared_ptr random_engine_; + std::uniform_real_distribution dist_; +}; + +class GaussianInitializer : public Initializer { + public: + explicit GaussianInitializer(const std::vector &attrs) { + name_ = attrs[0]; + seed_ = static_cast(std::stoi(attrs[1])); + mean_ = std::stof(attrs[2]); + std_ = std::stof(attrs[3]); + + random_engine_ = framework::GetCPURandomEngine(seed_); + + dist_ = std::normal_distribution(mean_, std_); + } + + float GetValue() override { return dist_(*random_engine_); } + + private: + float std_; + float mean_; + + std::shared_ptr random_engine_; + std::normal_distribution dist_; +}; + +class FillConstantInitializer : public Initializer { + public: + explicit FillConstantInitializer(const std::vector &attrs) { + name_ = attrs[0]; + value_ = std::stof(attrs[1]); + } + + float GetValue() override { return value_; } + + private: + float value_; +}; +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/depends/large_scale_kv.h b/paddle/fluid/distributed/table/depends/large_scale_kv.h new file mode 100644 index 0000000000000000000000000000000000000000..c0c424e7458939c0d6e579b6bd2e4501837d07ea --- /dev/null +++ b/paddle/fluid/distributed/table/depends/large_scale_kv.h @@ -0,0 +1,264 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include // NOLINT +#include +#include +#include // NOLINT +#include +#include +#include +#include + +#include "paddle/fluid/distributed/common/utils.h" +#include "paddle/fluid/distributed/table/depends/initializers.h" +#include "paddle/fluid/framework/generator.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/rw_lock.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/tensor.h" +#include "paddle/fluid/framework/threadpool.h" +#include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/platform/port.h" +#include "paddle/fluid/string/printf.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace distributed { + +enum Mode { training, infer }; + +template +inline bool entry(const int count, const T threshold); + +template <> +inline bool entry(const int count, const std::string threshold) { + return true; +} + +template <> +inline bool entry(const int count, const int threshold) { + return count >= threshold; +} + +template <> +inline bool entry(const int count, const float threshold) { + UniformInitializer uniform = UniformInitializer({"0", "0", "1"}); + return uniform.GetValue() >= threshold; +} + +struct VALUE { + explicit VALUE(const std::vector &names) + : names_(names), count_(0), unseen_days_(0) { + values_.resize(names.size()); + for (int i = 0; i < static_cast(names.size()); i++) { + places[names[i]] = i; + } + } + + void set(std::vector> *values) { + values_ = std::move(*values); + } + + void set(const std::vector &names, + const std::vector> &values) { + for (int i = 0; i < static_cast(names.size()); i++) { + auto idx = places[names[i]]; + auto value = values[i]; + values_[idx].assign(value.begin(), value.end()); + } + } + + std::vector *> get() { + auto pts = std::vector *>(); + pts.reserve(values_.size()); + + for (auto &value : values_) { + pts.push_back(&value); + } + return pts; + } + + int fetch_count() { return ++count_; } + void reset_unseen_days() { unseen_days_ = 0; } + + void set_entry(bool is_entry) { is_entry_ = is_entry; } + + bool get_entry() { return is_entry_; } + + std::vector *> get(const std::vector names) { + auto pts = std::vector *>(); + pts.reserve(values_.size()); + + for (int i = 0; i < static_cast(names.size()); i++) { + pts.push_back(&(values_[places[names[i]]])); + } + return pts; + } + + std::vector names_; + int count_; + bool seen_after_last_save_; + int unseen_days_; + bool is_entry_; + std::vector> values_; + std::unordered_map places; +}; + +class ValueBlock { + public: + explicit ValueBlock( + const CommonAccessorParameter &common, + std::unordered_map *initializers) { + initializers_ = initializers; + int size = static_cast(common.params().size()); + + for (int x = 0; x < size; ++x) { + auto varname = common.params()[x]; + auto dim = common.dims()[x]; + value_names_.push_back(varname); + value_dims_.push_back(dim); + } + + // for Entry + { + // entry will add later + std::string entry_attr = "none"; + + if (entry_attr == "none") { + entry_func_ = + std::bind(entry, std::placeholders::_1, "none"); + } else { + auto slices = string::split_string(entry_attr, "&"); + if (slices[0] == "count_filter") { + int threshold = std::stoi(slices[1]); + entry_func_ = std::bind(entry, std::placeholders::_1, threshold); + } else if (slices[0] == "probability") { + float threshold = std::stof(slices[1]); + entry_func_ = + std::bind(entry, std::placeholders::_1, threshold); + } + } + } + } + + ~ValueBlock() {} + + void Init(const uint64_t &id, std::vector> *values, + int count) { + if (Has(id)) { + PADDLE_THROW(platform::errors::AlreadyExists("id already exist, error")); + } + + if (values->size() != value_names_.size()) { + PADDLE_THROW( + platform::errors::AlreadyExists("values can not match, error")); + } + + auto value = new VALUE(value_names_); + value->set(values); + value->seen_after_last_save_ = true; + value->count_ = count; + values_[id] = value; + } + + std::vector *> Get( + const uint64_t &id, const std::vector &value_names) { + auto ret_values = values_.at(id)->get(value_names); + return ret_values; + } + + std::vector *> Get(const uint64_t &id) { + auto ret_values = values_.at(id)->get(value_names_); + return ret_values; + } + + void InitFromInitializer(const uint64_t &id, + const std::vector &value_names) { + if (Has(id)) { + Update(id); + return; + } + + auto rets = std::vector>(); + rets.resize(value_names_.size()); + + for (int i = 0; i < static_cast(value_names_.size()); i++) { + auto name = value_names_[i]; + auto *init = initializers_->at(name); + + auto dim = value_dims_[i]; + rets[i].resize(dim); + + for (int j = 0; j < static_cast(dim); j++) { + rets[i][j] = init->GetValue(); + } + } + + Init(id, &rets, 0); + Update(id); + } + + bool GetEntry(const uint64_t &id) { + auto value = values_.at(id); + auto entry = value->get_entry(); + return entry; + } + + void Set(const uint64_t &id, const std::vector &value_names, + const std::vector> &values) { + auto value = values_.at(id); + value->set(value_names, values); + } + + void Update(const uint64_t id) { + auto *value = values_.at(id); + value->reset_unseen_days(); + auto count = value->fetch_count(); + + if (!value->get_entry()) { + value->set_entry(entry_func_(count)); + } + } + + private: + bool Has(const uint64_t id) { + auto got = values_.find(id); + if (got == values_.end()) { + return false; + } else { + return true; + } + } + + public: + std::unordered_map values_; + + private: + std::vector value_names_; + std::vector value_dims_; + std::function entry_func_; + std::unordered_map *initializers_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/depends/sparse.h b/paddle/fluid/distributed/table/depends/sparse.h new file mode 100644 index 0000000000000000000000000000000000000000..5d992a4c4f0f41b6e7d3ba9e22458294bd6e1e73 --- /dev/null +++ b/paddle/fluid/distributed/table/depends/sparse.h @@ -0,0 +1,210 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include // for sqrt in CPU and CUDA +#include +#include +#include +#include +#include + +#include "paddle/fluid/distributed/common/utils.h" +#include "paddle/fluid/distributed/table/depends/large_scale_kv.h" + +namespace paddle { +namespace distributed { + +class SparseOptimizer { + public: + SparseOptimizer() {} + explicit SparseOptimizer(const CommonAccessorParameter& common) {} + virtual void update(const uint64_t* keys, const float* update_values, + size_t num, const std::vector& offsets, + ValueBlock* block) = 0; +}; + +// sum calc for sparse tensor +class SSUM : public SparseOptimizer { + public: + SSUM(){}; + explicit SSUM(const CommonAccessorParameter& common) { + auto& names = common.params(); + for (int x = 0; x < static_cast(names.size()); ++x) { + if (names[x] == "Param") { + param_idx = x; + update_numel = common.dims()[x]; + } + } + } + + void update(const uint64_t* keys, const float* update_values, size_t num, + const std::vector& offsets, + ValueBlock* block) override { + auto blas = GetBlas(); + for (auto x : offsets) { + auto id = keys[x]; + auto values = block->Get(id); + float* param = values[param_idx]->data(); + + std::vector delta; + delta.resize(update_numel); + blas.VCOPY(update_numel, update_values + x * update_numel, delta.data()); + blas.VADD(update_numel, delta.data(), param, param); + } + } + + int param_idx; + int update_numel; +}; + +// sgd optimzer for sparse tensor +class SSGD : public SparseOptimizer { + public: + SSGD(){}; + explicit SSGD(const CommonAccessorParameter& common) { + auto& names = common.params(); + for (int x = 0; x < static_cast(names.size()); ++x) { + if (names[x] == "LearningRate") { + learning_rate_idx = x; + } + if (names[x] == "Param") { + param_idx = x; + update_numel = common.dims()[x]; + } + } + } + + void update(const uint64_t* keys, const float* update_values, size_t num, + const std::vector& offsets, + ValueBlock* block) override { + auto blas = GetBlas(); + for (auto x : offsets) { + auto id = keys[x]; + auto values = block->Get(id); + float* learning_rate = values[learning_rate_idx]->data(); + float* param = values[param_idx]->data(); + + std::vector grads; + grads.resize(update_numel); + blas.VCOPY(update_numel, update_values + x * update_numel, grads.data()); + blas.SCAL(update_numel, learning_rate[0], grads.data()); + blas.VSUB(update_numel, param, grads.data(), param); + } + } + + int learning_rate_idx; + int param_idx; + int update_numel; +}; + +// adam optimzer for sparse tensor +class SAdam : public SparseOptimizer { + public: + SAdam() {} + explicit SAdam(const CommonAccessorParameter& common) { + auto& names = common.params(); + for (int x = 0; x < static_cast(names.size()); ++x) { + if (names[x] == "LearningRate") { + learning_rate_idx = x; + } + if (names[x] == "Param") { + param_idx = x; + update_numel = common.dims()[x]; + } + if (names[x] == "Moment1") { + moment1_idx = x; + } + if (names[x] == "Moment2") { + moment2_idx = x; + } + if (names[x] == "Beta1Pow") { + beta1_pow_idx = x; + } + if (names[x] == "Beta2Pow") { + beta2_pow_idx = x; + } + } + + // add attr later + beta1 = 0.9; + beta2 = 0.999; + epsilon = 1.0e-8; + } + + void update(const uint64_t* keys, const float* update_values, size_t num, + const std::vector& offsets, + ValueBlock* block) override { + auto blas = GetBlas(); + for (auto x : offsets) { + auto id = keys[x]; + auto values = block->Get(id); + float* learning_rate = values[learning_rate_idx]->data(); + float* param = values[param_idx]->data(); + float* moment1 = values[moment1_idx]->data(); + float* moment2 = values[moment2_idx]->data(); + float* beta1_pow = values[beta1_pow_idx]->data(); + float* beta2_pow = values[beta2_pow_idx]->data(); + + beta1_pow[0] = beta1_pow[0] * beta1; + beta2_pow[0] = beta2_pow[0] * beta2; + + float lr_ = learning_rate[0]; + lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); + + std::vector grad, grad2, tmp; + grad.resize(update_numel); + grad2.resize(update_numel); + tmp.resize(update_numel); + + blas.VCOPY(update_numel, update_values + x * update_numel, grad.data()); + blas.VCOPY(update_numel, update_values + x * update_numel, grad2.data()); + + blas.SCAL(update_numel, 1 - beta1, grad.data()); + blas.VSQUARE(update_numel, grad2.data(), grad2.data()); + blas.SCAL(update_numel, 1 - beta2, grad2.data()); + + blas.SCAL(update_numel, beta1, moment1); + blas.VADD(update_numel, moment1, grad.data(), moment1); + blas.SCAL(update_numel, beta2, moment2); + blas.VADD(update_numel, moment2, grad2.data(), moment2); + + float* tmp_ = tmp.data(); + float eps_ = epsilon * sqrt(1 - beta2_pow[0]); + + SQRT(update_numel, moment2, tmp_); + ADD(update_numel, tmp_, eps_, tmp_); + + blas.VDIV(update_numel, moment1, tmp_, tmp_); + blas.SCAL(update_numel, lr_, tmp_); + blas.VSUB(update_numel, param, tmp_, param); + } + } + + int learning_rate_idx; + int param_idx; + int moment1_idx; + int moment2_idx; + int beta1_pow_idx; + int beta2_pow_idx; + float beta1; + float beta2; + float epsilon; + int update_numel; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/sparse_geo_table.cc b/paddle/fluid/distributed/table/sparse_geo_table.cc new file mode 100644 index 0000000000000000000000000000000000000000..9b276e7de5c92d495f9d40535033b0a82186bc82 --- /dev/null +++ b/paddle/fluid/distributed/table/sparse_geo_table.cc @@ -0,0 +1,41 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/table/sparse_geo_table.h" + +namespace paddle { +namespace distributed { + +int32_t SparseGeoTable::pull_geo_param(const uint32_t trainer_id, + std::vector* values, + std::vector* ids) { + geo_recorder->GetAndClear(trainer_id, ids); + auto dim = _config.common().dims()[0]; + values->resize(ids->size() * dim); + CommonSparseTable::pull_sparse(values->data(), ids->data(), ids->size()); + return 0; +} + +int32_t SparseGeoTable::push_sparse(const uint64_t* keys, const float* values, + size_t num) { + std::vector ids; + ids.resize(num); + std::copy_n(keys, num, ids.begin()); + geo_recorder->Update(ids); + CommonSparseTable::push_sparse(keys, values, num); + return 0; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/sparse_geo_table.h b/paddle/fluid/distributed/table/sparse_geo_table.h new file mode 100644 index 0000000000000000000000000000000000000000..267d30a30fb7b939255d424434964c00b2af2f7b --- /dev/null +++ b/paddle/fluid/distributed/table/sparse_geo_table.h @@ -0,0 +1,62 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include // NOLINT +#include +#include +#include +#include "Eigen/Dense" +#include "paddle/fluid/distributed/table/accessor.h" +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/common_table.h" +#include "paddle/fluid/distributed/table/depends/geo_recorder.h" +#include "paddle/fluid/distributed/table/depends/initializers.h" +#include "paddle/fluid/distributed/table/depends/large_scale_kv.h" +#include "paddle/fluid/distributed/table/depends/sparse.h" +#include "paddle/fluid/framework/rw_lock.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace distributed { + +class SparseGeoTable : public CommonSparseTable { + public: + explicit SparseGeoTable() : CommonSparseTable() { geo_recorder = nullptr; } + virtual ~SparseGeoTable() {} + + int32_t pull_geo_param(const uint32_t trainer_id, std::vector* values, + std::vector* keys); + + virtual int32_t push_sparse(const uint64_t* keys, const float* values, + size_t num) override; + + virtual int32_t initialize_recorder() { + if (!geo_recorder) { + auto trainers = _config.common().trainer_num(); + geo_recorder = std::make_shared(trainers); + } + return 0; + } + + private: + std::shared_ptr geo_recorder; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/table.cc b/paddle/fluid/distributed/table/table.cc new file mode 100644 index 0000000000000000000000000000000000000000..ff241ee1066483117ad02af88d91bdcfe9d4d38e --- /dev/null +++ b/paddle/fluid/distributed/table/table.cc @@ -0,0 +1,79 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/table/table.h" +#include +#include +#include "glog/logging.h" +#include "paddle/fluid/distributed/common/registerer.h" + +#include "paddle/fluid/distributed/table/common_dense_table.h" +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/sparse_geo_table.h" +#include "paddle/fluid/distributed/table/tensor_accessor.h" +#include "paddle/fluid/distributed/table/tensor_table.h" + +namespace paddle { +namespace distributed { + +REGISTER_CLASS(Table, CommonDenseTable); +REGISTER_CLASS(Table, CommonSparseTable); +REGISTER_CLASS(Table, DenseTensorTable); +REGISTER_CLASS(Table, SparseGeoTable); +REGISTER_CLASS(Table, BarrierTable); + +REGISTER_CLASS(ValueAccessor, CommMergeAccessor); + +int32_t TableManager::initialize() { + static bool initialized = false; + if (initialized) { + return 0; + } + initialized = true; + return 0; +} + +int32_t Table::initialize(const TableParameter &config, + const FsClientParameter &fs_config) { + _config = config; + if (initialize_accessor() != 0) { + LOG(WARNING) << "Table accessor initialize failed"; + return -1; + } + return initialize(); +} + +int32_t Table::initialize_accessor() { + if (!_config.has_accessor() || !_config.accessor().has_accessor_class()) { + LOG(ERROR) << "missing accessor config in table, table_id:" + << _config.table_id(); + return -1; + } + auto *accessor = + CREATE_CLASS(ValueAccessor, + _config.accessor().accessor_class()) if (accessor == NULL) { + LOG(ERROR) << "accessor is unregisteg, table_id:" << _config.table_id() + << ", accessor_name:" << _config.accessor().accessor_class(); + return -1; + } + if (accessor->configure(_config.accessor()) || accessor->initialize() != 0) { + LOG(ERROR) << " accessor initialize failed, table_id:" << _config.table_id() + << ", accessor_name:" << _config.accessor().accessor_class(); + return -1; + } + _value_accesor.reset(accessor); + return 0; +} +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/table.h b/paddle/fluid/distributed/table/table.h new file mode 100644 index 0000000000000000000000000000000000000000..70d1211fe81c70c7e579f15e1445a6ba5acecf79 --- /dev/null +++ b/paddle/fluid/distributed/table/table.h @@ -0,0 +1,125 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include // NOLINT +#include +#include +#include + +#include "paddle/fluid/distributed/table/accessor.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace distributed { +class Table { + public: + Table() {} + virtual ~Table() {} + virtual int32_t initialize(const TableParameter &config, + const FsClientParameter &fs_config) final; + + virtual int32_t pull_dense(float *values, size_t num) = 0; + virtual int32_t push_dense(const float *values, size_t num) = 0; + virtual int32_t push_dense_param(const float *values, size_t num) { + return 0; + } + + virtual int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) = 0; + virtual int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) = 0; + virtual int32_t push_sparse_param(const uint64_t *keys, const float *values, + size_t num) { + return 0; + } + + // only for sparse geo table + virtual int32_t pull_geo_param(const uint32_t trainer_id, + std::vector *values, + std::vector *keys) { + return 0; + } + + // only for barrier + virtual int32_t barrier(const uint32_t trainer_id, + const std::string barrier_type) { + return 0; + } + + // only for barrier table + virtual int32_t set_table_map( + std::unordered_map> *table_map) { + return 0; + } + + virtual int32_t pour() { return 0; } + + virtual void clear() = 0; + virtual int32_t flush() = 0; + virtual int32_t shrink() = 0; + + //指定加载路径 + virtual int32_t load(const std::string &path, + const std::string &converter) = 0; + //指定保存路径 + virtual int32_t save(const std::string &path, + const std::string &converter) = 0; + + virtual int32_t set_shard(size_t shard_idx, size_t shard_num) final { + _shard_idx = shard_idx; + _shard_num = shard_num; + return initialize_shard(); + } + + inline std::shared_ptr value_accesor() { + return _value_accesor; + } + + virtual void *get_shard(size_t shard_idx) = 0; + virtual std::pair print_table_stat() { return {0, 0}; } + + protected: + virtual int32_t initialize() = 0; + virtual int32_t initialize_accessor() final; + virtual int32_t initialize_shard() = 0; + virtual std::string table_dir(const std::string &model_dir) { + return paddle::string::format_string("%s/%03d/", model_dir.c_str(), + _config.table_id()); + } + + size_t _shard_idx; // table 分片编号 + size_t _shard_num; // table 分片总数 + TableParameter _config; + std::shared_ptr _value_accesor; +}; +REGISTER_REGISTERER(Table); + +class TableManager { + public: + static TableManager &instance() { + static TableManager manager; + return manager; + } + int32_t initialize(); + + private: + TableManager() {} + ~TableManager() {} +}; +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/tensor_accessor.cc b/paddle/fluid/distributed/table/tensor_accessor.cc new file mode 100644 index 0000000000000000000000000000000000000000..b1ece52c133a7169273d1a2f62da4d34a01cb029 --- /dev/null +++ b/paddle/fluid/distributed/table/tensor_accessor.cc @@ -0,0 +1,90 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/table/tensor_accessor.h" +#include "Eigen/Dense" + +namespace paddle { +namespace distributed { + +int CommMergeAccessor::initialize() { return 0; } + +// value 维度 +size_t CommMergeAccessor::dim() { return 0; } + +// value 各个维度的size +size_t CommMergeAccessor::dim_size(size_t dim) { return 0; } + +// value 各维度相加总size +size_t CommMergeAccessor::size() { return 0; } + +// pull value 维度 +size_t CommMergeAccessor::select_dim() { return _config.embedx_dim(); } + +// pull value 各个维度的size +size_t CommMergeAccessor::select_dim_size(size_t dim) { return sizeof(float); } + +// pull value 各维度相加总size +size_t CommMergeAccessor::select_size() { return select_dim() * sizeof(float); } + +// push value 维度 +size_t CommMergeAccessor::update_dim() { return _config.embedx_dim(); } + +// push value 各个维度的size +size_t CommMergeAccessor::update_dim_size(size_t dim) { return sizeof(float); } + +// push value 各维度相加总size +size_t CommMergeAccessor::update_size() { return update_dim() * sizeof(float); } + +// 判断该value 是否进行shrink +bool CommMergeAccessor::shrink(float * /*value*/) { return false; } + +// 判断该value 是否在save阶段dump, +// param作为参数用于标识save阶段,如downpour的xbox与batch_model +bool CommMergeAccessor::save(float * /*value*/, int /*param*/) { return true; } + +// keys不存在时,为values生成随机值 +int32_t CommMergeAccessor::create(float **value, size_t num) { return 0; } + +// 从values中选取到select_values中 +int32_t CommMergeAccessor::select(float **select_values, const float **values, + size_t num) { + return 0; +} + +// 将update_values聚合到一起 +int32_t CommMergeAccessor::merge(float **update_values, + const float **other_update_values, + size_t num) { + Eigen::Map u_mat(update_values[0], 1, num); + Eigen::Map o_mat(other_update_values[0], 1, num); + u_mat += o_mat; + return 0; +} + +// 将update_values聚合到一起,通过it.next判定是否进入下一个key +// int32_t merge(float** update_values, iterator it); +// 将update_values更新应用到values中 +int32_t CommMergeAccessor::update(float **values, const float **update_values, + size_t num) { + return 0; +} + +int CommMergeAccessor::set_weight(float **values, const float **update_values, + size_t num) { + return 0; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/tensor_accessor.h b/paddle/fluid/distributed/table/tensor_accessor.h new file mode 100644 index 0000000000000000000000000000000000000000..12fb8a42d985981c602950645d7cdd1316b7a9cb --- /dev/null +++ b/paddle/fluid/distributed/table/tensor_accessor.h @@ -0,0 +1,78 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include "paddle/fluid/distributed/common/registerer.h" +#include "paddle/fluid/distributed/ps.pb.h" +#include "paddle/fluid/distributed/table/accessor.h" + +namespace paddle { +namespace distributed { + +class CommMergeAccessor : public ValueAccessor { + public: + CommMergeAccessor() {} + virtual ~CommMergeAccessor() {} + virtual int initialize(); + // value维度 + virtual size_t dim(); + // value各个维度的size + virtual size_t dim_size(size_t dim); + // value各维度相加总size + virtual size_t size(); + // pull value维度 + virtual size_t select_dim(); + // pull value各个维度的size + virtual size_t select_dim_size(size_t dim); + // pull value各维度相加总size + virtual size_t select_size(); + // push value维度 + virtual size_t update_dim(); + // push value各个维度的size + virtual size_t update_dim_size(size_t dim); + // push value各维度相加总size + virtual size_t update_size(); + // 判断该value是否进行shrink + virtual bool shrink(float * /*value*/); + // 判断该value是否在save阶段dump, + // param作为参数用于标识save阶段,如downpour的xbox与batch_model + virtual bool save(float * /*value*/, int /*param*/); + + // keys不存在时,为values生成随机值 + virtual int32_t create(float **value, size_t num); + // 从values中选取到select_values中 + virtual int32_t select(float **select_values, const float **values, + size_t num); + // 将update_values聚合到一起 + virtual int32_t merge(float **update_values, + const float **other_update_values, size_t num); + // 将update_values聚合到一起,通过it.next判定是否进入下一个key + // virtual int32_t merge(float** update_values, iterator it); + // 将update_values更新应用到values中 + virtual int32_t update(float **values, const float **update_values, + size_t num); + + virtual int set_weight(float **values, const float **update_values, + size_t num); + virtual std::string parse_to_string(const float *value, int param) { + return ""; + } + virtual int parse_from_string(const std::string &str, float *v) { return 0; } +}; +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/tensor_table.cc b/paddle/fluid/distributed/table/tensor_table.cc new file mode 100644 index 0000000000000000000000000000000000000000..d8e1be7a9815c4aad21cd24733fd6747f3e0d56b --- /dev/null +++ b/paddle/fluid/distributed/table/tensor_table.cc @@ -0,0 +1,93 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/table/tensor_table.h" +#include "paddle/fluid/distributed/common/utils.h" + +namespace paddle { +namespace distributed { + +int32_t DenseTensorTable::initialize() { + _shards_task_pool.resize(10); + for (int i = 0; i < _shards_task_pool.size(); ++i) { + _shards_task_pool[i].reset(new ::ThreadPool(1)); + } + return 0; +} + +int32_t DenseTensorTable::initialize_tensor(framework::Scope *scope, + framework::ProgramDesc *program, + framework::Executor *executor) { + scope_ = scope; + program_ = program; + executor_ = executor; + + auto tensor_config = _config.tensor(); + if (tensor_config.has_common_block_map()) { + auto block_maps = + paddle::string::split_string(tensor_config.common_block_map(), "#"); + for (auto &block_map : block_maps) { + auto block = paddle::string::split_string(block_map, ":"); + auto block_id = std::stoi(block[0]); + std::vector block_ids{block_id}; + auto block_cmd = block[1]; + auto prepared = executor_->Prepare(*program_, block_ids); + (*prepared_ctx_)[block_cmd] = prepared[0]; + } + } +} + +int32_t DenseTensorTable::pull_dense(float *values, size_t numel) { + PADDLE_ENFORCE_EQ(numel, _data.numel(), + paddle::platform::errors::PreconditionNotMet( + "pull dense error, excepted numel %d, but actually %d.", + _data.numel(), numel)); + + GetBlas().VCOPY(numel, _data.data(), values); + return 0; +} + +int32_t DenseTensorTable::push_dense(const float *values, size_t numel) { + auto varname = _config.tensor().grad(); + auto local_scope = scope_->NewTmpScope(); + auto *var = local_scope->Var(varname); + auto *t = var->GetMutable(); + auto dims = paddle::framework::make_ddim({}); + + auto ctx = paddle::platform::CPUDeviceContext(); + t->mutable_data(_data.dims(), ctx.GetPlace()); + + GetBlas().VCOPY(numel, values, t->data()); + executor_->RunPreparedContext((*prepared_ctx_)["push"].get(), + local_scope.get()); +} + +int32_t DenseTensorTable::push_dense_param(const float *values, size_t numel) { + auto ctx = paddle::platform::CPUDeviceContext(); + if (_data.IsInitialized()) { + PADDLE_ENFORCE_EQ( + numel, _data.numel(), + paddle::platform::errors::PreconditionNotMet( + "pull dense error, excepted numel %d, but actually %d.", + _data.numel(), numel)); + } else { + _data.mutable_data( + framework::make_ddim({static_cast(numel), 1}), ctx.GetPlace()); + } + + GetBlas().VCOPY(numel, values, _data.data()); + return 0; +} +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/table/tensor_table.h b/paddle/fluid/distributed/table/tensor_table.h new file mode 100644 index 0000000000000000000000000000000000000000..9744c931c472053926ce1b772b050be08d6b46f0 --- /dev/null +++ b/paddle/fluid/distributed/table/tensor_table.h @@ -0,0 +1,179 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include "paddle/fluid/distributed/table/table.h" +#include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/tensor.h" +#include "paddle/fluid/operators/math/blas.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace distributed { + +class TensorTable : public Table { + public: + TensorTable() : Table() {} + + virtual ~TensorTable() {} + + virtual int32_t initialize() { return 0; } + + virtual int32_t pull_dense(float *values, size_t num) override { return 0; }; + + virtual int32_t push_dense(const float *values, size_t num) override { + return 0; + }; + + virtual void *get_shard(size_t shard_idx) override { return 0; } + + virtual int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) override { + return 0; + }; + + virtual int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) override { + return 0; + }; + + virtual int32_t push_dense_param(const float *values, size_t num) { + return 0; + } + + virtual int32_t shrink() { return 0; } + + virtual void clear() {} + + virtual int32_t flush() { return 0; } + + //指定加载路径 + virtual int32_t load(const std::string &path, const std::string &converter) { + return 0; + } + //指定保存路径 + virtual int32_t save(const std::string &path, const std::string &converter) { + return 0; + } + + protected: + virtual int32_t initialize_shard() { return 0; } + + virtual int32_t initialize_tensor(paddle::framework::Scope *scope, + paddle::framework::ProgramDesc *program, + paddle::framework::Executor *executor) { + return 0; + } + + std::vector> _shards_task_pool; + + framework::Executor *executor_; + framework::Scope *scope_; + framework::ProgramDesc *program_; + std::unordered_map> + *prepared_ctx_; +}; + +class DenseTensorTable : public TensorTable { + public: + DenseTensorTable() : TensorTable() {} + ~DenseTensorTable() {} + virtual int32_t initialize(); + + void *get_shard(size_t shard_idx) { return 0; } + + int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) { + return 0; + } + int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) { + return 0; + } + int32_t shrink() { return 0; } + + int32_t pull_dense(float *values, size_t num) override; + int32_t push_dense_param(const float *values, size_t num) override; + int32_t push_dense(const float *values, size_t num) override; + + virtual void clear() {} + virtual int32_t flush() { return 0; } + + //指定加载路径 + virtual int32_t load(const std::string &path, const std::string &converter) { + return 0; + } + //指定保存路径 + virtual int32_t save(const std::string &path, const std::string &converter) { + return 0; + } + + protected: + virtual int32_t initialize_shard() { return 0; } + + virtual int32_t initialize_tensor(paddle::framework::Scope *scope, + paddle::framework::ProgramDesc *program, + paddle::framework::Executor *executor); + + protected: + framework::Tensor _data; +}; +// +//// common sparse table [0, N) with out large scale +// class SparseTensorTable : public TensorTable { +// void *get_shard(size_t shard_idx) { return 0; } +// +// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) +// override; +// int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) +// override ; +// int32_t shrink() { return 0; } +// void *get_shard(size_t shard_idx) { return 0; }; +// +// int32_t pull_dense(float *values, size_t num) { return 0; }; +// int32_t push_dense_param(const float *values, size_t num) { return 0; }; +// int32_t push_dense(const float *values, size_t num) { return 0; }; +// +// protected: +// framework::Tensor _data; +//}; + +//// for Large scale kv tensor [0, int64] do not use specific optimizer +// class KvTensorTable : public TensorTable { +// int32_t pull_dense(float *values, size_t num) { return 0; }; +// int32_t push_dense_param(const float *values, size_t num) { return 0; }; +// int32_t push_dense(const float *values, size_t num) { return 0; }; +// +// void *get_shard(size_t shard_idx) override; +// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) +// override; +// int32_t push_sparse(const uint64_t *keys, const float *values, +// size_t num) override; +// int32_t shrink() override; +// void *get_shard(size_t shard_idx) override; +//}; +// +//// for Geo sparse handle +// class GeoSparseTensorTable : public TensorTable {}; +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/test/CMakeLists.txt b/paddle/fluid/distributed/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e4cc93c9adf65c74c7df6b01a90a34e8a88f502d --- /dev/null +++ b/paddle/fluid/distributed/test/CMakeLists.txt @@ -0,0 +1,18 @@ +if(APPLE) + return() +endif() + +set_source_files_properties(table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(table_test SRCS table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS}) + +set_source_files_properties(dense_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(dense_table_test SRCS dense_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS}) + +set_source_files_properties(sparse_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(sparse_table_test SRCS sparse_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS}) + +set_source_files_properties(geo_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(geo_table_test SRCS geo_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS}) + +set_source_files_properties(barrier_table_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_test(barrier_table_test SRCS barrier_table_test.cc DEPS common_table table tensor_accessor ps_framework_proto ${COMMON_DEPS}) diff --git a/paddle/fluid/distributed/test/barrier_table_test.cc b/paddle/fluid/distributed/test/barrier_table_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..12f6062c41c48f43459289c5fcad7e05acf458b7 --- /dev/null +++ b/paddle/fluid/distributed/test/barrier_table_test.cc @@ -0,0 +1,70 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include +#include +#include // NOLINT + +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/distributed/ps.pb.h" +#include "paddle/fluid/distributed/table/common_table.h" +#include "paddle/fluid/distributed/table/table.h" + +namespace paddle { +namespace distributed { + +TEST(BarrierTable, Barrier) { + int emb_dim = 10; + int trainers = 2; + bool sync = true; + + TableParameter table_config; + table_config.set_table_class("BarrierTable"); + FsClientParameter fs_config; + Table *table = new BarrierTable(); + TableAccessorParameter *accessor_config = table_config.mutable_accessor(); + accessor_config->set_accessor_class("CommMergeAccessor"); + CommonAccessorParameter *common_config = table_config.mutable_common(); + common_config->set_table_name("barrier_table"); + common_config->set_trainer_num(trainers); + common_config->set_sync(sync); + + auto ret = table->initialize(table_config, fs_config); + + std::unordered_map> maps = + std::unordered_map>(); + + table->set_table_map(&maps); + + std::shared_ptr<::ThreadPool> pool_ = + std::make_shared<::ThreadPool>(trainers); + std::vector> task_status; + + for (auto x = 0; x < trainers; x++) { + auto task = [table, x] { table->barrier(x, 0); }; + task_status.push_back(pool_->enqueue(std::move(task))); + } + + for (auto &status : task_status) { + status.wait(); + } + + ASSERT_EQ(ret, 0); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/test/dense_table_test.cc b/paddle/fluid/distributed/test/dense_table_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..75f9df168961fa337d6f405575aeb078a4c8ee6b --- /dev/null +++ b/paddle/fluid/distributed/test/dense_table_test.cc @@ -0,0 +1,195 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include +#include +#include // NOLINT + +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/distributed/ps.pb.h" +#include "paddle/fluid/distributed/table/common_dense_table.h" +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/sparse_geo_table.h" +#include "paddle/fluid/distributed/table/table.h" + +namespace paddle { +namespace distributed { + +// CommonDenseTable + Adam +TEST(CommonDenseTable, Adam) { + int fea_dim = 10; + int trainers = 2; + float beta1 = 0.9; + float beta2 = 0.999; + float epsilon = 1.0e-8; + + TableParameter table_config; + table_config.set_table_class("CommonDenseTable"); + FsClientParameter fs_config; + Table *table = new CommonDenseTable(); + TableAccessorParameter *accessor_config = table_config.mutable_accessor(); + accessor_config->set_accessor_class("CommMergeAccessor"); + CommonAccessorParameter *common_config = table_config.mutable_common(); + // set adam optimize config + common_config->set_name("adam"); + common_config->set_table_name("adam_test_table"); + common_config->set_trainer_num(trainers); + common_config->add_params("Param"); + common_config->add_dims(fea_dim); + common_config->add_initializers("gaussian_random&0&0.0&1.0"); + common_config->add_params("LearningRate"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + common_config->add_params("Moment1"); + common_config->add_dims(fea_dim); + common_config->add_initializers("fill_constant&0.0"); + common_config->add_params("Moment2"); + common_config->add_dims(fea_dim); + common_config->add_initializers("fill_constant&0.0"); + common_config->add_params("Beta1Pow"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + common_config->add_params("Beta2Pow"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + auto ret = table->initialize(table_config, fs_config); + ASSERT_EQ(ret, 0); + + // pull parameters for create and check + std::vector init_values; + init_values.resize(fea_dim); + table->pull_dense(init_values.data(), fea_dim); + + // push gradient + std::vector> trainer_gradient_values; + trainer_gradient_values.resize(trainers); + float start = 10.0; + for (int i = 0; i < trainers; i++) { + for (int k = 0; k < fea_dim; k++) { + trainer_gradient_values[i].push_back(start); + start += 0.1; + } + } + + // for adam + for (int i = 0; i < trainers; i++) { + auto &push_values = trainer_gradient_values[i]; + table->push_dense(push_values.data(), push_values.size()); + } + + std::vector pull_values; + pull_values.resize(fea_dim); + table->pull_dense(pull_values.data(), fea_dim); + + std::vector beta1_pow, beta2_pow, lr, mom1, mom2, param; + beta1_pow.push_back(beta1); + beta2_pow.push_back(beta2); + lr.push_back(1.0); + for (int i = 0; i < fea_dim; i++) { + mom1.push_back(0.0); + mom2.push_back(0.0); + param.push_back(init_values[i]); + } + + for (int i = 0; i < trainers; i++) { + auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); + for (int j = 0; j < fea_dim; j++) { + mom1[j] = beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][j]; + mom2[j] = beta2 * mom2[j] + + (1 - beta2) * trainer_gradient_values[i][j] * + trainer_gradient_values[i][j]; + param[j] = + param[j] - + lr_ * (mom1[j] / (sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0]))); + } + beta1_pow[0] *= beta1; + beta2_pow[0] *= beta2; + } + for (int j = 0; j < fea_dim; j++) { + ASSERT_TRUE(abs(param[j] - pull_values[j]) < 1e-6); + } +} + +// CommonDenseTable + Adam +TEST(CommonDenseTable, SGD) { + int fea_dim = 10; + int trainers = 2; + + TableParameter table_config; + table_config.set_table_class("CommonDenseTable"); + FsClientParameter fs_config; + Table *table = new CommonDenseTable(); + TableAccessorParameter *accessor_config = table_config.mutable_accessor(); + accessor_config->set_accessor_class("CommMergeAccessor"); + CommonAccessorParameter *common_config = table_config.mutable_common(); + common_config->set_name("sgd"); + common_config->set_table_name("sgd_test_table"); + common_config->set_trainer_num(trainers); + common_config->add_params("Param"); + common_config->add_dims(fea_dim); + common_config->add_initializers("gaussian_random&0&0.0&1.0"); + common_config->add_params("LearningRate"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + auto ret = table->initialize(table_config, fs_config); + ASSERT_EQ(ret, 0); + + // pull parameters for create and check + std::vector init_values; + init_values.resize(fea_dim); + table->pull_dense(init_values.data(), fea_dim); + + std::vector total_gradients; + total_gradients.resize(fea_dim); + memset(total_gradients.data(), 0, sizeof(float) * total_gradients.size()); + // push gradient + std::vector> trainer_gradient_values; + trainer_gradient_values.resize(trainers); + float start = 10.0; + for (int i = 0; i < trainers; i++) { + for (int k = 0; k < fea_dim; k++) { + trainer_gradient_values[i].push_back(start); + total_gradients[k] += start; + start += 0.1; + } + } + + std::shared_ptr<::ThreadPool> pool_ = + std::make_shared<::ThreadPool>(trainers); + std::vector> task_status; + for (int i = 0; i < trainers; i++) { + auto &push_values = trainer_gradient_values[i]; + auto task = [table, &push_values] { + table->push_dense(push_values.data(), push_values.size()); + }; + task_status.push_back(pool_->enqueue(std::move(task))); + } + for (auto &status : task_status) { + status.wait(); + } + + std::vector pull_values; + pull_values.resize(fea_dim); + table->pull_dense(pull_values.data(), fea_dim); + for (int j = 0; j < fea_dim; j++) { + auto update_val = init_values[j] - 1.0 * total_gradients[j]; + ASSERT_TRUE(abs(update_val - pull_values[j]) < 1e-5); + } +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/test/geo_table_test.cc b/paddle/fluid/distributed/test/geo_table_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..fffecbe199e055126519526d87d034bea30b331a --- /dev/null +++ b/paddle/fluid/distributed/test/geo_table_test.cc @@ -0,0 +1,119 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include +#include +#include // NOLINT + +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/distributed/ps.pb.h" +#include "paddle/fluid/distributed/table/common_dense_table.h" +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/sparse_geo_table.h" +#include "paddle/fluid/distributed/table/table.h" + +namespace paddle { +namespace distributed { + +// SparseGeoTable + SSUM +TEST(SparseGeoTable, SSUM) { + int emb_dim = 10; + int trainers = 2; + + TableParameter table_config; + table_config.set_table_class("SparseGeoTable"); + FsClientParameter fs_config; + Table *table = new SparseGeoTable(); + TableAccessorParameter *accessor_config = table_config.mutable_accessor(); + accessor_config->set_accessor_class("CommMergeAccessor"); + CommonAccessorParameter *common_config = table_config.mutable_common(); + common_config->set_name("sum"); + common_config->set_table_name("ssum_test_table"); + common_config->set_trainer_num(trainers); + common_config->add_params("Param"); + common_config->add_dims(emb_dim); + common_config->add_initializers("fill_constant&1.0"); + + auto ret = table->initialize(table_config, fs_config); + ASSERT_EQ(ret, 0); + + // test push_sparse_param, and create params + std::vector init_keys = {0, 1, 2, 3, 4}; + std::vector init_values; + for (size_t i = 0; i < init_keys.size() * emb_dim; i++) { + init_values.push_back(0.0); + } + table->push_sparse_param(init_keys.data(), init_values.data(), + init_keys.size()); + std::vector pull_values(init_values.size()); + table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size()); + for (size_t i = 0; i < init_keys.size() * emb_dim; i++) { + ASSERT_TRUE(abs(pull_values[i] - init_values[i]) < 1e-6); + } + + std::vector> trainer_keys; + std::vector> trainer_values; + trainer_keys.resize(trainers); + trainer_values.resize(trainers); + float start = 0.0; + for (int i = 0; i < trainers; i++) { + trainer_keys[i] = init_keys; + for (size_t j = 0; j < trainer_keys[i].size(); j++) { + auto id = trainer_keys[i][j]; + for (int k = 0; k < emb_dim; k++) { + trainer_values[i].push_back(start); + pull_values[id * emb_dim + k] += start; + start += 0.1; + } + } + } + + std::shared_ptr<::ThreadPool> pool_ = + std::make_shared<::ThreadPool>(trainers); + std::vector> task_status; + for (int i = 0; i < trainers; i++) { + auto &push_keys = trainer_keys[i]; + auto &push_values = trainer_values[i]; + auto task = [table, &push_keys, &push_values] { + table->push_sparse(push_keys.data(), push_values.data(), + push_keys.size()); + }; + task_status.push_back(pool_->enqueue(std::move(task))); + } + for (auto &status : task_status) { + status.wait(); + } + + std::vector> geo_pull_ids; + std::vector> geo_pull_values; + geo_pull_ids.resize(trainers); + geo_pull_values.resize(trainers); + for (int i = 0; i < trainers; i++) { + table->pull_geo_param(i, &geo_pull_values[i], &geo_pull_ids[i]); + ASSERT_EQ(geo_pull_values[i].size(), geo_pull_ids[i].size() * emb_dim); + for (size_t j = 0; j < geo_pull_ids[i].size(); ++j) { + auto id = geo_pull_ids[i][j]; + for (int k = 0; k < emb_dim; k++) { + ASSERT_TRUE(abs(geo_pull_values[i][j * emb_dim + k] - + pull_values[id * emb_dim + k]) < 1e-6); + } + } + } +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/test/heter_serde_test.cc b/paddle/fluid/distributed/test/heter_serde_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..21380921958dbbf3eb6d8cd0589316d5612fd290 --- /dev/null +++ b/paddle/fluid/distributed/test/heter_serde_test.cc @@ -0,0 +1,141 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include // NOLINT + +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/tensor_util.h" +#include "paddle/fluid/framework/variable.h" + +#include "paddle/fluid/distributed/service/heter_serde.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/string/printf.h" + +namespace framework = paddle::framework; +namespace platform = paddle::platform; +namespace operators = paddle::operators; +namespace math = paddle::operators::math; +namespace memory = paddle::memory; +namespace distributed = paddle::distributed; + +void CreateVarsOnScope(framework::Scope* scope, platform::Place* place, + const platform::DeviceContext& ctx) { + // var 1 + framework::Variable* var1 = scope->Var("x1"); + auto* tensor1 = var1->GetMutable(); + tensor1->Resize(framework::make_ddim({512, 8, 4, 2})); + framework::LoD lod1; + lod1.push_back(framework::Vector({1, 3, 8})); + tensor1->set_lod(lod1); + tensor1->mutable_data(*place); + math::set_constant(ctx, tensor1, 31.9); + + // var 2 + framework::Variable* var2 = scope->Var("x2"); + auto* tensor2 = var2->GetMutable(); + tensor2->Resize(framework::make_ddim({1000, 64})); + framework::LoD lod2; + lod2.push_back(framework::Vector({1, 1})); + tensor2->set_lod(lod2); + tensor2->mutable_data(*place); + math::set_constant(ctx, tensor2, 100); + + // var 3 + framework::Variable* var3 = scope->Var("x3"); + auto* slr = var3->GetMutable(); + slr->set_height(564); + auto* tensor3 = slr->mutable_value(); + auto* rows = slr->mutable_rows(); + tensor3->Resize(framework::make_ddim({564, 128})); + tensor3->mutable_data(*place); + math::set_constant(ctx, tensor3, 32.7); + for (int i = 0; i < 564; ++i) rows->push_back(i); +} + +void RunMultiVarMsg(platform::Place place) { + framework::Scope scope; + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(place); + CreateVarsOnScope(&scope, &place, ctx); + + ::paddle::MultiVariableMessage multi_msg; + std::string message_name("se_de_test"); + std::vector send_var_name = {"x1", "x2", "x3"}; + std::vector recv_var_name = {}; + LOG(INFO) << "begin SerializeToMultiVarMsg"; + + butil::IOBuf io_buf; + distributed::SerializeToMultiVarMsgAndIOBuf(message_name, send_var_name, + recv_var_name, ctx, &scope, + &multi_msg, &io_buf); + EXPECT_GT(multi_msg.ByteSizeLong(), static_cast(0)); + + // deserialize + framework::Scope scope_recv; + LOG(INFO) << "begin DeserializeFromMultiVarMsg"; + distributed::DeserializeFromMultiVarMsgAndIOBuf(multi_msg, &io_buf, ctx, + &scope_recv); + + // check var1 + framework::Variable* var1 = scope_recv.FindVar("x1"); + auto* tensor1 = var1->GetMutable(); + EXPECT_EQ(tensor1->dims(), framework::make_ddim({512, 8, 4, 2})); + // EXPECT_EQ(tensor1->lod(), framework::Vector({1, 3, 8})); + auto* tensor_data1 = const_cast(tensor1->data()); + int tensor_numel1 = 512 * 8 * 4 * 2; + for (int i = 0; i < tensor_numel1; ++i) + EXPECT_FLOAT_EQ(tensor_data1[i], 31.9); + + // check var2 + framework::Variable* var2 = scope_recv.FindVar("x2"); + auto* tensor2 = var2->GetMutable(); + EXPECT_EQ(tensor2->dims(), framework::make_ddim({1000, 64})); + // EXPECT_EQ(tensor2->lod(), framework::Vector({1, 1})); + auto* tensor_data2 = const_cast(tensor2->data()); + int tensor_numel2 = 1000 * 64; + for (int i = 0; i < tensor_numel2; ++i) EXPECT_EQ(tensor_data2[i], 100); + + // check var3 + framework::Variable* var3 = scope_recv.FindVar("x3"); + auto* slr = var3->GetMutable(); + EXPECT_EQ(slr->rows().size(), 564); + for (int i = 0; i < 564; ++i) { + EXPECT_EQ(slr->rows()[i], i); + } + + auto* tensor3 = slr->mutable_value(); + EXPECT_EQ(tensor3->dims(), framework::make_ddim({564, 128})); + auto* tensor_data3 = const_cast(tensor3->data()); + int tensor_numel3 = 564 * 128; + for (int i = 0; i < tensor_numel3; ++i) + EXPECT_FLOAT_EQ(tensor_data3[i], 32.7); +} + +TEST(MultiVarMsgCPU, Run) { + platform::CPUPlace place; + RunMultiVarMsg(place); +} + +// #ifdef PADDLE_WITH_CUDA +// TEST(MultiVarMsgGPU, Run) { +// platform::CUDAPlace place; +// RunMultiVarMsg(place); +// } +// #endif \ No newline at end of file diff --git a/paddle/fluid/distributed/test/sparse_table_test.cc b/paddle/fluid/distributed/test/sparse_table_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..65439014e8f0e26e0e6c2e06d692f416235029cf --- /dev/null +++ b/paddle/fluid/distributed/test/sparse_table_test.cc @@ -0,0 +1,213 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include +#include +#include // NOLINT + +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/distributed/ps.pb.h" +#include "paddle/fluid/distributed/table/common_dense_table.h" +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/sparse_geo_table.h" +#include "paddle/fluid/distributed/table/table.h" + +namespace paddle { +namespace distributed { + +// CommonSparseTable + SSGD +TEST(CommonSparseTable, SGD) { + int emb_dim = 10; + int trainers = 2; + + TableParameter table_config; + table_config.set_table_class("CommonSparseTable"); + FsClientParameter fs_config; + Table *table = new CommonSparseTable(); + TableAccessorParameter *accessor_config = table_config.mutable_accessor(); + accessor_config->set_accessor_class("CommMergeAccessor"); + CommonAccessorParameter *common_config = table_config.mutable_common(); + common_config->set_name("sgd"); + common_config->set_table_name("sgd_test_table"); + common_config->set_trainer_num(trainers); + common_config->add_params("Param"); + common_config->add_dims(emb_dim); + common_config->add_initializers("uniform_random&0&-1.0&1.0"); // param + common_config->add_params("LearningRate"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); // learning_rate + auto ret = table->initialize(table_config, fs_config); + ASSERT_EQ(ret, 0); + + // pull parameters for create and check + std::vector init_keys = {0, 1, 2, 3, 4}; + std::vector init_values; + init_values.resize(init_keys.size() * emb_dim); + table->pull_sparse(init_values.data(), init_keys.data(), init_keys.size()); + + // for check + std::vector total_gradients; + total_gradients.resize(init_keys.size() * emb_dim); + memset(total_gradients.data(), 0, sizeof(float) * total_gradients.size()); + + // push gradient + std::vector> trainer_keys; + std::vector> trainer_gradient_values; + trainer_keys.resize(trainers); + trainer_gradient_values.resize(trainers); + float start = 0.0; + for (int i = 0; i < trainers; i++) { + trainer_keys[i] = init_keys; + for (size_t j = 0; j < trainer_keys[i].size(); j++) { + auto id = trainer_keys[i][j]; + for (int k = 0; k < emb_dim; k++) { + trainer_gradient_values[i].push_back(start); + total_gradients[id * emb_dim + k] += start; + start += 0.1; + } + } + } + + std::shared_ptr<::ThreadPool> pool_ = + std::make_shared<::ThreadPool>(trainers); + std::vector> task_status; + for (int i = 0; i < trainers; i++) { + auto &push_keys = trainer_keys[i]; + auto &push_values = trainer_gradient_values[i]; + auto task = [table, &push_keys, &push_values] { + table->push_sparse(push_keys.data(), push_values.data(), + push_keys.size()); + }; + task_status.push_back(pool_->enqueue(std::move(task))); + } + for (auto &status : task_status) { + status.wait(); + } + + std::vector pull_values; + pull_values.resize(init_keys.size() * emb_dim); + table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size()); + for (size_t i = 0; i < init_values.size(); ++i) { + auto update_val = init_values[i] - 1.0 * total_gradients[i]; + ASSERT_TRUE(abs(update_val - pull_values[i]) < 1e-6); + } +} + +// CommonSparseTable + Adam +TEST(CommonSparseTable, Adam) { + int emb_dim = 10; + int trainers = 2; + float beta1 = 0.9; + float beta2 = 0.999; + float epsilon = 1.0e-8; + + TableParameter table_config; + table_config.set_table_class("CommonSparseTable"); + FsClientParameter fs_config; + Table *table = new CommonSparseTable(); + TableAccessorParameter *accessor_config = table_config.mutable_accessor(); + accessor_config->set_accessor_class("CommMergeAccessor"); + CommonAccessorParameter *common_config = table_config.mutable_common(); + common_config->set_name("adam"); + common_config->set_table_name("adam_test_table"); + common_config->set_trainer_num(trainers); + common_config->add_params("Param"); + common_config->add_dims(emb_dim); + common_config->add_initializers("uniform_random&0&-1.0&1.0"); + common_config->add_params("LearningRate"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + common_config->add_params("Moment1"); + common_config->add_dims(emb_dim); + common_config->add_initializers("fill_constant&0.0"); + common_config->add_params("Moment2"); + common_config->add_dims(emb_dim); + common_config->add_initializers("fill_constant&0.0"); + common_config->add_params("Beta1Pow"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + common_config->add_params("Beta2Pow"); + common_config->add_dims(1); + common_config->add_initializers("fill_constant&1.0"); + auto ret = table->initialize(table_config, fs_config); + ASSERT_EQ(ret, 0); + + // pull parameters for create and check + std::vector init_keys = {0, 1, 2, 3, 4}; + std::vector init_values; + init_values.resize(init_keys.size() * emb_dim); + table->pull_sparse(init_values.data(), init_keys.data(), init_keys.size()); + + // push gradient + std::vector> trainer_keys; + std::vector> trainer_gradient_values; + trainer_keys.resize(trainers); + trainer_gradient_values.resize(trainers); + float start = 0.0; + for (int i = 0; i < trainers; i++) { + trainer_keys[i] = init_keys; + for (size_t j = 0; j < trainer_keys[i].size(); j++) { + for (int k = 0; k < emb_dim; k++) { + trainer_gradient_values[i].push_back(start); + start += 0.1; + } + } + } + + for (int i = 0; i < trainers; i++) { + auto &push_keys = trainer_keys[i]; + auto &push_values = trainer_gradient_values[i]; + table->push_sparse(push_keys.data(), push_values.data(), push_keys.size()); + } + + std::vector pull_values; + pull_values.resize(init_keys.size() * emb_dim); + table->pull_sparse(pull_values.data(), init_keys.data(), init_keys.size()); + + for (size_t idx = 0; idx < init_keys.size(); idx += emb_dim) { + std::vector beta1_pow, beta2_pow, lr, mom1, mom2, param; + beta1_pow.push_back(beta1); + beta2_pow.push_back(beta2); + lr.push_back(1.0); + for (int i = 0; i < emb_dim; i++) { + mom1.push_back(0.0); + mom2.push_back(0.0); + param.push_back(init_values[idx + i]); + } + for (int i = 0; i < trainers; i++) { + auto lr_ = lr[0] * sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); + for (int j = 0; j < emb_dim; j++) { + mom1[j] = + beta1 * mom1[j] + (1 - beta1) * trainer_gradient_values[i][idx + j]; + mom2[j] = beta2 * mom2[j] + + (1 - beta2) * trainer_gradient_values[i][idx + j] * + trainer_gradient_values[i][idx + j]; + param[j] = param[j] - + lr_ * (mom1[j] / + (sqrt(mom2[j]) + epsilon * sqrt(1 - beta2_pow[0]))); + } + beta1_pow[0] *= beta1; + beta2_pow[0] *= beta2; + } + for (int i = 0; i < emb_dim; i++) { + ASSERT_TRUE(abs(param[i] - pull_values[idx + i]) < 1e-5); + } + } +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/test/table_test.cc b/paddle/fluid/distributed/test/table_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..98d52c268d77be179bf68349a2b8db702b124416 --- /dev/null +++ b/paddle/fluid/distributed/test/table_test.cc @@ -0,0 +1,42 @@ +/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include +#include +#include // NOLINT + +#include "google/protobuf/text_format.h" +#include "gtest/gtest.h" +#include "paddle/fluid/distributed/ps.pb.h" +#include "paddle/fluid/distributed/table/common_dense_table.h" +#include "paddle/fluid/distributed/table/common_sparse_table.h" +#include "paddle/fluid/distributed/table/sparse_geo_table.h" +#include "paddle/fluid/distributed/table/table.h" + +namespace paddle { +namespace distributed { + +TEST(Table, Initialize) { + TableParameter table_config; + table_config.set_table_class("SparseGeoTable"); + FsClientParameter fs_config; + // case 1. no accessor + Table *table = new SparseGeoTable(); + auto ret = table->initialize(table_config, fs_config); + ASSERT_EQ(ret, -1); +} +} // namespace distributed +} // // namespace paddle