diff --git a/paddle/fluid/distributed/communicator_common.h b/paddle/fluid/distributed/communicator_common.h index 6a8ce552370bf72d95dd0d52a8e521afb0b0dfd0..66784c53c0026afa988119a506ef065181b0cb4d 100644 --- a/paddle/fluid/distributed/communicator_common.h +++ b/paddle/fluid/distributed/communicator_common.h @@ -30,7 +30,8 @@ struct CommContext { const std::vector §ions, const std::vector &origin_names, int id, bool merge_add_ = true, bool is_sparse_ = true, - bool is_distributed_ = false, int table_id_ = -1) + bool is_distributed_ = false, int table_id_ = -1, + bool is_tensor_table_ = false) : var_name(name), splited_varnames(names), epmap(emap), @@ -40,7 +41,8 @@ struct CommContext { merge_add(merge_add_), is_sparse(is_sparse_), is_distributed(is_distributed_), - table_id(table_id_) {} + table_id(table_id_), + is_tensor_table(is_tensor_table_) {} CommContext(const CommContext &ctx) { var_name = ctx.var_name; @@ -53,6 +55,7 @@ struct CommContext { origin_varnames = ctx.origin_varnames; is_distributed = ctx.is_distributed; table_id = ctx.table_id; + is_tensor_table = ctx.is_tensor_table; } std::string print() const { @@ -75,6 +78,7 @@ struct CommContext { ss << " is_sparse: " << is_sparse; ss << " is_distributed: " << is_distributed << "\n"; ss << " table_id: " << table_id << "\n"; + ss << " is_tensor_table: " << is_tensor_table << "\n"; return ss.str(); } @@ -89,6 +93,7 @@ struct CommContext { bool is_sparse; bool is_distributed; int table_id; + bool is_tensor_table; }; } // namespace distributed diff --git a/paddle/fluid/distributed/fleet.cc b/paddle/fluid/distributed/fleet.cc index 92211a72e748eb3ca7555a5a68707ad5a52dc4bf..7268bcbd23411994a3c6bdd6572e8f826a5bd9de 100644 --- a/paddle/fluid/distributed/fleet.cc +++ b/paddle/fluid/distributed/fleet.cc @@ -53,15 +53,16 @@ void FleetWrapper::LoadSparseOnServer(const std::string& path, pserver_ptr_->_server_ptr->table(table_id)->load(path, meta); } -void FleetWrapper::InitServer(const std::string& dist_desc, - const std::vector& host_sign_list, - int index) { +void FleetWrapper::InitServer( + const std::string& dist_desc, + const std::vector& host_sign_list, int index, + const std::vector& server_sub_program) { if (!is_initialized_) { VLOG(3) << "Going to init server"; pserver_ptr_ = std::shared_ptr( new paddle::distributed::PSCore()); pserver_ptr_->init_server(dist_desc, &host_sign_list, host_sign_list.size(), - index); + index, server_sub_program); is_initialized_ = true; } else { VLOG(3) << "Server can be initialized only once"; diff --git a/paddle/fluid/distributed/fleet.h b/paddle/fluid/distributed/fleet.h index 7f106fafbf2e2e3cb8fd4e7769d97314ee2f31e5..28ecedebf2c1e1bc4da5676abd83ec4f9f7a11ca 100644 --- a/paddle/fluid/distributed/fleet.h +++ b/paddle/fluid/distributed/fleet.h @@ -154,8 +154,10 @@ class FleetWrapper { // init server // void InitServer(const std::string& dist_desc, // const std::vector& host_sign_list, int index); - void InitServer(const std::string& dist_desc, - const std::vector& host_sign_list, int index); + void InitServer( + const std::string& dist_desc, + const std::vector& host_sign_list, int index, + const std::vector& server_sub_program = {}); // init trainer void InitWorker(const std::string& dist_desc, const std::vector& host_sign_list, Scope* scope, diff --git a/paddle/fluid/distributed/ps.proto b/paddle/fluid/distributed/ps.proto index 383ff73690bfdbb35ad87fa91c0f511c7b1a3b85..88ea04667f7018a19bd786bfffacd101a03084e0 100644 --- a/paddle/fluid/distributed/ps.proto +++ b/paddle/fluid/distributed/ps.proto @@ -126,12 +126,11 @@ message TableAccessorParameter { } message TensorAccessorParameter { - optional string tensor_class = 1; - optional uint32 fea_dim = 2; - optional uint32 emb_dim = 3; - optional string param = 4; - optional string grad = 5; - optional string common_block_map = 6; + optional string feed_var_name = 1; + optional string fetch_var_name = 2; + optional int64 startup_program_id = 3; + optional int64 main_program_id = 4; + optional string tensor_table_class = 6; } message CommonAccessorParameter { diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index 66b2329b8bc29891126d2eb4a06dcef878a42308..f4e11818561fcdbf3538754fd01a56a2c9e0cc1a 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -719,6 +719,34 @@ std::future BrpcPsClient::push_dense_raw_gradient( return fut; } +std::future BrpcPsClient::push_global_step(int table_id, + int64_t *total_send_data, + void *done) { + size_t request_call_num = _server_channels.size(); + DownpourBrpcClosure *closure = reinterpret_cast(done); + auto promise = std::make_shared>(); + closure->add_promise(promise); + std::future fut = promise->get_future(); + for (size_t i = 0; i < request_call_num; ++i) { + closure->request(i)->set_cmd_id(PS_PUSH_GLOBAL_STEP); + closure->request(i)->set_table_id(table_id); + closure->request(i)->set_client_id(_client_id); + auto *push_data = closure->request(i)->mutable_data(); + push_data->clear(); + int32_t num_per_shard = 1; + push_data->resize(sizeof(uint32_t) + num_per_shard * sizeof(int64_t)); + char *push_data_ptr = const_cast(push_data->data()); + memcpy(push_data_ptr, &num_per_shard, sizeof(uint32_t)); + memcpy(push_data_ptr + sizeof(uint32_t), total_send_data, + num_per_shard * sizeof(int64_t)); + + PsService_Stub rpc_stub(get_dense_channel(i)); + rpc_stub.service(closure->cntl(i), closure->request(i), + closure->response(i), closure); + } + return fut; +} + std::future BrpcPsClient::pull_sparse(float **select_values, size_t table_id, const uint64_t *keys, diff --git a/paddle/fluid/distributed/service/brpc_ps_client.h b/paddle/fluid/distributed/service/brpc_ps_client.h index c07165151507951a6c9023906ac3f3666e1209b3..ed4310f016441d18a0a95bfb29fad85072d7530d 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.h +++ b/paddle/fluid/distributed/service/brpc_ps_client.h @@ -140,7 +140,9 @@ class BrpcPsClient : public PSClient { std::vector *values, std::vector *keys, int pserver_idx); - + virtual std::future push_global_step(int table_id, + int64_t *total_send_data, + void *done); virtual std::future flush(); virtual std::future send_client2client_msg( diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index 1386e83447567f9e3acfefe8b992a3dbaa045d39..914b9971cbf948da2403452f0e4b04a0779dbd6b 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -100,6 +100,7 @@ int32_t PsService::initialize() { _service_handler_map[PS_BARRIER] = &PsService::barrier; _service_handler_map[PS_START_PROFILER] = &PsService::start_profiler; _service_handler_map[PS_STOP_PROFILER] = &PsService::stop_profiler; + _service_handler_map[PS_PUSH_GLOBAL_STEP] = &PsService::push_global_step; // shard初始化,server启动后才可从env获取到server_list的shard信息 initialize_shard_info(); @@ -526,5 +527,26 @@ int32_t PsService::start_profiler(Table *table, const PsRequestMessage &request, return 0; } +int32_t PsService::push_global_step(Table *table, + const PsRequestMessage &request, + PsResponseMessage &response, + brpc::Controller *cntl) { + CHECK_TABLE_EXIST(table, request, response); + auto req_buffer_size = request.data().size(); + if (req_buffer_size < 1) { + set_response_code(response, 0, "run_program data is empty"); + return 0; + } + uint32_t num = *(const uint32_t *)(request.data().data()); + const int64_t *values = + (const int64_t *)(request.data().data() + sizeof(uint32_t)); + auto trainer_id = request.client_id(); + if (table->push_dense(values, trainer_id) != 0) { + set_response_code(response, -1, "run_program failed"); + } + + return 0; +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/service/brpc_ps_server.h b/paddle/fluid/distributed/service/brpc_ps_server.h index 0a053848e1eb3c915b6405fcff33b5710c776943..e9eeb5d49c71705c139439afadfe2e18c680930b 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.h +++ b/paddle/fluid/distributed/service/brpc_ps_server.h @@ -110,6 +110,9 @@ class PsService : public PsBaseService { int32_t print_table_stat(Table *table, const PsRequestMessage &request, PsResponseMessage &response, brpc::Controller *cntl); + int32_t push_global_step(Table *table, const PsRequestMessage &request, + PsResponseMessage &response, brpc::Controller *cntl); + bool _is_initialize_shard_info; std::mutex _initialize_shard_mutex; std::unordered_map _service_handler_map; diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index 19b1c015e985b43c53864aea349d383e9a6dddd1..f0322a0cbe8f52bcb2c663a7dbf470e862c579d3 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -34,6 +34,9 @@ limitations under the License. */ #include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/split.h" +#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@" +#define STEP_COUNTER "@PS_STEP_COUNTER@" + namespace paddle { namespace distributed { @@ -377,6 +380,37 @@ void Communicator::RpcProfilerControl() { } } +void Communicator::SendGlobalStep(const CommContext &ctx, int batches, + Scope *send_scope) { + if (batches == 0) { + return; + } + auto &table_id = ctx.table_id; + size_t request_call_num = _worker_ptr->get_server_nums(); + + auto &var_name = STEP_COUNTER; + auto *out_var = send_scope->Var(var_name); + auto *out_t = out_var->GetMutable(); + auto *data = out_t->mutable_data({1}, platform::CPUPlace()); + data[0] = static_cast(batches); + VLOG(3) << "Communicator::SendGlobalStep send: " << batches; + DownpourBrpcClosure *closure = new DownpourBrpcClosure( + request_call_num, [this, request_call_num](void *done) { + int ret = 0; + auto *closure = (DownpourBrpcClosure *)done; + for (size_t i = 0; i < request_call_num; ++i) { + if (closure->check_response(i, PS_PUSH_GLOBAL_STEP) != 0) { + ret = -1; + break; + } + } + closure->set_promise_value(ret); + }); + auto status = _worker_ptr->push_global_step(table_id, data, closure); + status.wait(); + return; +} + void AsyncCommunicator::RecvThread() { if (!independent_recv_) return; VLOG(3) << "Independent RecvThread Start and Wait"; @@ -465,10 +499,16 @@ void AsyncCommunicator::SendByCommunicator() { for (size_t i = 0; i < var_nums; i++) { auto &var_name = varnames[i]; - MergeVars(var_name, vars[i], send_scope_.get(), 1); + if (var_name == STEP_COUNTER) { + MergeVars(var_name, vars[i], send_scope_.get(), 1); + } else { + MergeVars(var_name, vars[i], send_scope_.get(), 1); + } } - if (ctx.is_sparse) { + if (ctx.is_tensor_table) { + SendGlobalStep(ctx, merged_var_num, send_scope_.get()); + } else if (ctx.is_sparse) { PADDLE_ENFORCE_EQ( varnames.size(), 1, platform::errors::InvalidArgument( @@ -599,8 +639,18 @@ bool AsyncCommunicator::Check(const std::vector &var_tables) { platform::errors::InvalidArgument("var_tables.size() == 1 is permitted")); auto table_name = var_tables[0]; - if (send_varname_to_ctx_.find(table_name) == send_varname_to_ctx_.end()) + if (send_varname_to_ctx_.find(table_name) == send_varname_to_ctx_.end()) { return false; + } + if (table_name == STEP_COUNTER) { + VLOG(3) << "send step_counter into queue"; + auto tmp_var = std::make_shared(); + auto *tensor = tmp_var->GetMutable(); + tensor->Resize(framework::make_ddim({1})); + auto *out_d = tensor->mutable_data(platform::CPUPlace()); + out_d[0] = 1; + send_varname_to_queue_[table_name]->Push(tmp_var); + } return true; } diff --git a/paddle/fluid/distributed/service/communicator.h b/paddle/fluid/distributed/service/communicator.h index a22b006013461c9ca4c10710339ac60550dabec9..6544ede73cca12012ebb095e93811be00fdae8da 100644 --- a/paddle/fluid/distributed/service/communicator.h +++ b/paddle/fluid/distributed/service/communicator.h @@ -223,6 +223,9 @@ class Communicator { // 6. recv sparse param virtual void RpcRecvSparse(const std::string &varname, int table_id, Scope *scope); + // 7. send gloabl step + virtual void SendGlobalStep(const CommContext &ctx, int batches, + Scope *send_scope); virtual ~Communicator() {} virtual void RpcProfilerControl(); @@ -376,8 +379,6 @@ class AsyncCommunicator : public Communicator { virtual void SendByCommunicator(); - virtual void SendGlobalStep(int batches) {} - virtual void RecvByCommunicator(); virtual void RecvNoBarrier(); @@ -527,8 +528,6 @@ class GeoCommunicator : public AsyncCommunicator { void SendByCommunicator() { return; } - void SendGlobalStep(int batches) override { return; } - void RecvByCommunicator() override { return; } inline std::string GradToParam(const std::string var_name) { diff --git a/paddle/fluid/distributed/service/ps_client.h b/paddle/fluid/distributed/service/ps_client.h index 23b00b3c816088e26c8d05f090a8bc815038b0d9..d549d09778c580b45586a930a5f3d372e9fdfc42 100644 --- a/paddle/fluid/distributed/service/ps_client.h +++ b/paddle/fluid/distributed/service/ps_client.h @@ -131,6 +131,9 @@ class PSClient { std::vector *keys, int pserver_idx) = 0; + virtual std::future push_global_step(int table_id, + int64_t *total_send_data, + void *done) = 0; virtual void finalize_worker() = 0; // client to client, 消息发送 virtual std::future send_client2client_msg(int msg_type, diff --git a/paddle/fluid/distributed/service/sendrecv.proto b/paddle/fluid/distributed/service/sendrecv.proto index 8f5c8baa2f82427bf62f4429eb622986d761af9e..0cd849ced51db4b1a882f3b5c61cc76855e8e043 100644 --- a/paddle/fluid/distributed/service/sendrecv.proto +++ b/paddle/fluid/distributed/service/sendrecv.proto @@ -47,6 +47,7 @@ enum PsCmdID { PS_PUSH_SPARSE_PARAM = 26; PS_START_PROFILER = 27; PS_STOP_PROFILER = 28; + PS_PUSH_GLOBAL_STEP = 29; } message PsRequestMessage { diff --git a/paddle/fluid/distributed/service/server.cc b/paddle/fluid/distributed/service/server.cc index 6718098fd0bec7ba3d0f84c44c9dbe19f21d946d..fe5ee120dd1ecab0671826be8f0e98bbc6549d42 100644 --- a/paddle/fluid/distributed/service/server.cc +++ b/paddle/fluid/distributed/service/server.cc @@ -53,8 +53,10 @@ PSServer *PSServerFactory::create(const PSParameter &ps_config) { return server; } -int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, - size_t server_rank) { +int32_t PSServer::configure( + const PSParameter &config, PSEnvironment &env, size_t server_rank, + const std::vector &server_sub_program) { + scope_.reset(new framework::Scope()); _config = config.server_param(); _rank = server_rank; _environment = &env; @@ -65,6 +67,7 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, const auto &downpour_param = _config.downpour_server_param(); uint32_t barrier_table = UINT32_MAX; + uint32_t global_step_table = UINT32_MAX; for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) { auto *table = CREATE_CLASS( @@ -74,6 +77,12 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, "BarrierTable") { barrier_table = downpour_param.downpour_table_param(i).table_id(); } + if (downpour_param.downpour_table_param(i).table_class() == + "GlobalStepTable") { + global_step_table = downpour_param.downpour_table_param(i).table_id(); + } + + table->set_program_env(scope_.get(), place_, &server_sub_program); table->set_shard(_rank, shard_num); table->initialize(downpour_param.downpour_table_param(i), config.fs_client_param()); @@ -83,6 +92,9 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, if (barrier_table != UINT32_MAX) { _table_map[barrier_table]->set_table_map(&_table_map); } + if (global_step_table != UINT32_MAX) { + _table_map[global_step_table]->set_table_map(&_table_map); + } return initialize(); } diff --git a/paddle/fluid/distributed/service/server.h b/paddle/fluid/distributed/service/server.h index 4faa0f9db2c4c510f7e010616811f1a5fd10af43..532f458e436d2860b53287b274ee849ac9c5538f 100644 --- a/paddle/fluid/distributed/service/server.h +++ b/paddle/fluid/distributed/service/server.h @@ -27,6 +27,20 @@ #include "paddle/fluid/distributed/service/env.h" #include "paddle/fluid/distributed/service/sendrecv.pb.h" #include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/place.h" + +namespace paddle { +namespace framework { +class Executor; +class ProgramDesc; +class Scope; +} // namespace framework +namespace platform { +class DeviceContext; +} // namespace platform +} // namespace paddle namespace paddle { namespace distributed { @@ -40,8 +54,9 @@ class PSServer { PSServer(PSServer &&) = delete; PSServer(const PSServer &) = delete; - virtual int32_t configure(const PSParameter &config, PSEnvironment &env, - size_t server_rank) final; + virtual int32_t configure( + const PSParameter &config, PSEnvironment &env, size_t server_rank, + const std::vector &server_sub_program = {}) final; // return server_ip virtual std::string ip() { return butil::my_ip_cstr(); } @@ -86,6 +101,10 @@ class PSServer { PSEnvironment *_environment; std::unordered_map> _table_map; std::unordered_map _msg_handler_map; + + protected: + std::shared_ptr scope_; + platform::Place place_ = platform::CPUPlace(); }; REGISTER_REGISTERER(PSServer); diff --git a/paddle/fluid/distributed/service/service.cc b/paddle/fluid/distributed/service/service.cc index 40a6d2e122718790d1970e7697c05ff862e6f738..47b840cffd0808dae0f2ddb67a16b792cee3d57c 100644 --- a/paddle/fluid/distributed/service/service.cc +++ b/paddle/fluid/distributed/service/service.cc @@ -66,9 +66,10 @@ void PSCore::init_gflag(const std::string& gflags) { ::google::ParseCommandLineFlags(¶ms_cnt, ¶ms_ptr, true); } -int PSCore::init_server(const std::string& dist_desc, - const std::vector* host_sign_list, - int node_num, int index) { +int PSCore::init_server( + const std::string& dist_desc, + const std::vector* host_sign_list, int node_num, int index, + const std::vector& server_sub_program) { google::protobuf::TextFormat::ParseFromString(dist_desc, &_ps_param); init_gflag(_ps_param.init_gflags()); _ps_env = paddle::distributed::PaddlePSEnvironment(); @@ -76,7 +77,7 @@ int PSCore::init_server(const std::string& dist_desc, int ret = 0; _server_ptr = std::shared_ptr( paddle::distributed::PSServerFactory::create(_ps_param)); - ret = _server_ptr->configure(_ps_param, _ps_env, index); + ret = _server_ptr->configure(_ps_param, _ps_env, index, server_sub_program); CHECK(ret == 0) << "failed to configure server"; return ret; } diff --git a/paddle/fluid/distributed/service/service.h b/paddle/fluid/distributed/service/service.h index 97cb864e344bf8a152a494a5365ce7a22f5eb4c8..539638c803f2cf1db181880268505465da3df836 100644 --- a/paddle/fluid/distributed/service/service.h +++ b/paddle/fluid/distributed/service/service.h @@ -33,9 +33,10 @@ class PSCore { explicit PSCore() {} virtual ~PSCore() {} - virtual int init_server(const std::string& dist_desc, - const std::vector* host_sign_list, - int node_num, int index); + virtual int init_server( + const std::string& dist_desc, + const std::vector* host_sign_list, int node_num, int index, + const std::vector& server_sub_program = {}); virtual int init_worker( const std::string& dist_desc, const std::map>& diff --git a/paddle/fluid/distributed/table/CMakeLists.txt b/paddle/fluid/distributed/table/CMakeLists.txt index f3e329237cbf9f252c04ac535b7304e760b14a23..1e98e193d54ae6dc6e8a2d9981071283354e7f98 100644 --- a/paddle/fluid/distributed/table/CMakeLists.txt +++ b/paddle/fluid/distributed/table/CMakeLists.txt @@ -11,8 +11,9 @@ cc_library(common_table SRCS common_sparse_table.cc common_dense_table.cc sparse set_source_files_properties(tensor_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(tensor_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_framework_proto device_context) +cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_framework_proto device_context) +cc_library(tensor_table SRCS tensor_table.cc DEPS eigen3 ps_framework_proto executor scope device_context tensor ${TABLE_DEPS}) set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) -cc_library(table SRCS table.cc DEPS common_table tensor_accessor ps_framework_proto string_helper device_context gflags glog boost) +cc_library(table SRCS table.cc DEPS common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost) diff --git a/paddle/fluid/distributed/table/common_dense_table.cc b/paddle/fluid/distributed/table/common_dense_table.cc index e3d481f32eb8881505514281544ddd92b0d8f921..96e1ef0ee04ed0621de3350bbad528c03b05ca24 100644 --- a/paddle/fluid/distributed/table/common_dense_table.cc +++ b/paddle/fluid/distributed/table/common_dense_table.cc @@ -42,6 +42,7 @@ int32_t CommonDenseTable::initialize() { sync = _config.common().sync(); VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; + _global_lr = new float(1.0); initialize_value(); initialize_optimizer(); @@ -81,8 +82,10 @@ int32_t CommonDenseTable::initialize_optimizer() { if (name == "sgd") { optimizer_ = std::make_shared(common, &values_); + optimizer_->set_global_lr(_global_lr); } else if (name == "adam") { optimizer_ = std::make_shared(common, &values_); + optimizer_->set_global_lr(_global_lr); } else if (name == "sum") { optimizer_ = std::make_shared(common, &values_); } else { @@ -92,6 +95,12 @@ int32_t CommonDenseTable::initialize_optimizer() { return 0; } +int32_t CommonDenseTable::set_global_lr(float* lr) { + _global_lr = lr; + optimizer_->set_global_lr(_global_lr); + return 0; +} + int32_t CommonDenseTable::pull_dense(float* pull_values, size_t num) { std::copy(values_[param_idx_].begin(), values_[param_idx_].end(), pull_values); diff --git a/paddle/fluid/distributed/table/common_dense_table.h b/paddle/fluid/distributed/table/common_dense_table.h index eb97f3f26416a905020bcf722aee182dc2510de0..c32e6e194deea30304cd5bdad3b89af70ab23d20 100644 --- a/paddle/fluid/distributed/table/common_dense_table.h +++ b/paddle/fluid/distributed/table/common_dense_table.h @@ -42,6 +42,7 @@ class CommonDenseTable : public DenseTable { virtual int32_t push_dense_param(const float* values, size_t num) override; virtual int32_t push_dense(const float* values, size_t num) override; virtual int32_t pour() override; + virtual int32_t set_global_lr(float* lr) override; int32_t load(const std::string& path, const std::string& param) override { VLOG(0) << "Dense table may load by " diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index 4f8afd3d256847043ffec92c5231dbbb31fc23db..5c03b3f501880a021942d70066bc64836c31bbc9 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -175,6 +175,8 @@ int32_t CommonSparseTable::initialize() { sync = _config.common().sync(); VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; + _global_lr = new float(1.0); + auto common = _config.common(); int size = static_cast(common.params().size()); @@ -249,9 +251,11 @@ int32_t CommonSparseTable::initialize_optimizer() { if (name == "sgd") { optimizer_ = std::make_shared(value_names_, value_dims_, value_offsets_, value_idx_); + optimizer_->set_global_lr(_global_lr); } else if (name == "adam") { optimizer_ = std::make_shared(value_names_, value_dims_, value_offsets_, value_idx_); + optimizer_->set_global_lr(_global_lr); } else if (name == "sum") { optimizer_ = std::make_shared(value_names_, value_dims_, value_offsets_, value_idx_); @@ -263,6 +267,12 @@ int32_t CommonSparseTable::initialize_optimizer() { return 0; } +int32_t CommonSparseTable::set_global_lr(float* lr) { + _global_lr = lr; + optimizer_->set_global_lr(_global_lr); + return 0; +} + int32_t CommonSparseTable::load(const std::string& path, const std::string& param) { rwlock_->WRLock(); diff --git a/paddle/fluid/distributed/table/common_sparse_table.h b/paddle/fluid/distributed/table/common_sparse_table.h index 19199b682ac2968242de1d2129119cc4f32b3dbb..e74a6bac44ef56ddafc2422cb38c21979710f823 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.h +++ b/paddle/fluid/distributed/table/common_sparse_table.h @@ -69,6 +69,8 @@ class CommonSparseTable : public SparseTable { virtual int32_t push_sparse_param(const uint64_t* keys, const float* values, size_t num); + virtual int32_t set_global_lr(float* lr) override; + virtual int32_t pour(); virtual int32_t flush(); virtual int32_t shrink(); diff --git a/paddle/fluid/distributed/table/depends/dense.h b/paddle/fluid/distributed/table/depends/dense.h index 8a71d9b5a8b651853333d8f4ce346471407dc901..209595de7e636dc25667e0a606e254d4251cb2ab 100644 --- a/paddle/fluid/distributed/table/depends/dense.h +++ b/paddle/fluid/distributed/table/depends/dense.h @@ -36,6 +36,10 @@ class DenseOptimizer { std::vector>* values) {} virtual void update(const float* update_values, size_t num, int begin, int end) = 0; + virtual void set_global_lr(float* lr) { global_learning_rate_ = lr; } + + protected: + float* global_learning_rate_; }; // sum calc for dense tensor @@ -84,8 +88,10 @@ class DSGD : public DenseOptimizer { grads.resize(update_numel); auto blas = GetBlas(); + float lr = *(global_learning_rate_) * (*learning_rate); + VLOG(4) << "DSGD LearningRate: " << lr; blas.VCOPY(update_numel, update_values + begin, grads.data()); - blas.SCAL(update_numel, *learning_rate, grads.data()); + blas.SCAL(update_numel, lr, grads.data()); blas.VSUB(update_numel, param + begin, grads.data(), param + begin); } @@ -150,7 +156,8 @@ class DAdam : public DenseOptimizer { beta1_pow[0] = beta1_pow[0] * beta1; beta2_pow[0] = beta2_pow[0] * beta2; - float lr_ = learning_rate[0]; + float lr_ = *(global_learning_rate_)*learning_rate[0]; + VLOG(4) << "DAdam LearningRate: " << lr_; lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); float* tmp_ = tmp.data(); diff --git a/paddle/fluid/distributed/table/depends/sparse.h b/paddle/fluid/distributed/table/depends/sparse.h index f98057f9867011af236c2bfae7631e899594787f..1900da32155cd7d573029fd4b97ab97dbf9e30d5 100644 --- a/paddle/fluid/distributed/table/depends/sparse.h +++ b/paddle/fluid/distributed/table/depends/sparse.h @@ -44,12 +44,17 @@ class SparseOptimizer { size_t num, const std::vector& offsets, ValueBlock* block) = 0; + virtual void set_global_lr(float* lr) { global_learning_rate_ = lr; } + const std::vector& value_names_; const std::vector& value_dims_; const std::vector& value_offsets_; const std::unordered_map& value_idx_; int param_offset = 0; int update_numel = 0; + + protected: + float* global_learning_rate_; }; // sum calc for sparse tensor @@ -102,13 +107,14 @@ class SSGD : public SparseOptimizer { auto id = keys[x]; auto* value = block->Get(id); - float* learning_rate = value + lr_offset; + float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0]; + VLOG(4) << "SSGD LearningRate: " << learning_rate; float* param = value + param_offset; std::vector grads; grads.resize(update_numel); blas.VCOPY(update_numel, update_values + x * update_numel, grads.data()); - blas.SCAL(update_numel, learning_rate[0], grads.data()); + blas.SCAL(update_numel, learning_rate, grads.data()); blas.VSUB(update_numel, param, grads.data(), param); } } @@ -156,7 +162,8 @@ class SAdam : public SparseOptimizer { for (auto x : offsets) { auto id = keys[x]; auto* values = block->Get(id); - float* learning_rate = values + lr_offset; + float lr_ = *(global_learning_rate_) * (values + lr_offset)[0]; + VLOG(4) << "SAdam LearningRate: " << lr_; float* param = values + param_offset; float* moment1 = values + m1_offset; float* moment2 = values + m2_offset; @@ -166,7 +173,6 @@ class SAdam : public SparseOptimizer { beta1_pow[0] = beta1_pow[0] * beta1; beta2_pow[0] = beta2_pow[0] * beta2; - float lr_ = learning_rate[0]; lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); std::vector grad, grad2, tmp; diff --git a/paddle/fluid/distributed/table/table.cc b/paddle/fluid/distributed/table/table.cc index 892de0785f1d4ad84346b8e9c5ff76d8050897f3..ec08dc58da22e0773663de7c46e926b2205ece0a 100644 --- a/paddle/fluid/distributed/table/table.cc +++ b/paddle/fluid/distributed/table/table.cc @@ -22,6 +22,7 @@ #include "paddle/fluid/distributed/table/common_sparse_table.h" #include "paddle/fluid/distributed/table/sparse_geo_table.h" #include "paddle/fluid/distributed/table/tensor_accessor.h" +#include "paddle/fluid/distributed/table/tensor_table.h" namespace paddle { namespace distributed { @@ -30,7 +31,9 @@ REGISTER_CLASS(Table, CommonDenseTable); REGISTER_CLASS(Table, CommonSparseTable); REGISTER_CLASS(Table, SparseGeoTable); REGISTER_CLASS(Table, BarrierTable); - +REGISTER_CLASS(Table, TensorTable); +REGISTER_CLASS(Table, DenseTensorTable); +REGISTER_CLASS(Table, GlobalStepTable); REGISTER_CLASS(ValueAccessor, CommMergeAccessor); int32_t TableManager::initialize() { diff --git a/paddle/fluid/distributed/table/table.h b/paddle/fluid/distributed/table/table.h index 70d1211fe81c70c7e579f15e1445a6ba5acecf79..376d4a525b20decb43f2b2aa48e456716a603470 100644 --- a/paddle/fluid/distributed/table/table.h +++ b/paddle/fluid/distributed/table/table.h @@ -20,8 +20,11 @@ #include #include #include - #include "paddle/fluid/distributed/table/accessor.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/place.h" #include "paddle/fluid/string/string_helper.h" namespace paddle { @@ -35,6 +38,10 @@ class Table { virtual int32_t pull_dense(float *values, size_t num) = 0; virtual int32_t push_dense(const float *values, size_t num) = 0; + // for push global_step + virtual int32_t push_dense(const int64_t *values, const int32_t trainer_id) { + return 0; + } virtual int32_t push_dense_param(const float *values, size_t num) { return 0; } @@ -67,6 +74,18 @@ class Table { return 0; } + // only for tensor table + virtual int32_t set_program_env( + framework::Scope *scope, platform::Place place, + const std::vector *sub_program) { + return 0; + } + + virtual int32_t set_global_lr(float *lr) { + _global_lr = lr; + return 0; + } + virtual int32_t pour() { return 0; } virtual void clear() = 0; @@ -105,6 +124,7 @@ class Table { size_t _shard_idx; // table 分片编号 size_t _shard_num; // table 分片总数 TableParameter _config; + float *_global_lr = nullptr; std::shared_ptr _value_accesor; }; REGISTER_REGISTERER(Table); diff --git a/paddle/fluid/distributed/table/tensor_table.cc b/paddle/fluid/distributed/table/tensor_table.cc index d8e1be7a9815c4aad21cd24733fd6747f3e0d56b..708566345adcbcc3d9021ef62570a3a08bae3e47 100644 --- a/paddle/fluid/distributed/table/tensor_table.cc +++ b/paddle/fluid/distributed/table/tensor_table.cc @@ -13,81 +13,120 @@ // limitations under the License. #include "paddle/fluid/distributed/table/tensor_table.h" +#include // NOLINT +#include +#include +#include +#include +#include +#include #include "paddle/fluid/distributed/common/utils.h" - +DECLARE_double(eager_delete_tensor_gb); namespace paddle { namespace distributed { -int32_t DenseTensorTable::initialize() { - _shards_task_pool.resize(10); - for (int i = 0; i < _shards_task_pool.size(); ++i) { - _shards_task_pool[i].reset(new ::ThreadPool(1)); - } +int32_t TensorTable::set_program_env( + framework::Scope *scope, platform::Place place, + const std::vector *sub_program) { + scope_ = scope; + place_ = place; + executor_ = new framework::Executor(place_); + sub_program_ = sub_program; return 0; } -int32_t DenseTensorTable::initialize_tensor(framework::Scope *scope, - framework::ProgramDesc *program, - framework::Executor *executor) { - scope_ = scope; - program_ = program; - executor_ = executor; +int32_t GlobalStepTable::initialize() { + auto _program_config = _config.tensor(); + auto trainers_ = _config.common().trainer_num(); + FLAGS_eager_delete_tensor_gb = -1; + // Get Config + if (_program_config.has_startup_program_id()) { + startup_program_id_ = _program_config.startup_program_id(); + } + if (_program_config.has_main_program_id()) { + main_program_id_ = _program_config.main_program_id(); + } + if (_program_config.has_feed_var_name()) { + feed_var_name_ = _program_config.feed_var_name(); + } + if (_program_config.has_fetch_var_name()) { + fetch_var_name_ = _program_config.fetch_var_name(); + } + + // Run startup program + if (startup_program_id_ != -1) { + std::map fake_feed; + std::map fake_fetch; + auto startup_program_desc = sub_program_->at(startup_program_id_); + auto ctx = executor_->Prepare(startup_program_desc, 0); + executor_->RunPreparedContext(ctx.get(), scope_, false); + } - auto tensor_config = _config.tensor(); - if (tensor_config.has_common_block_map()) { - auto block_maps = - paddle::string::split_string(tensor_config.common_block_map(), "#"); - for (auto &block_map : block_maps) { - auto block = paddle::string::split_string(block_map, ":"); - auto block_id = std::stoi(block[0]); - std::vector block_ids{block_id}; - auto block_cmd = block[1]; - auto prepared = executor_->Prepare(*program_, block_ids); - (*prepared_ctx_)[block_cmd] = prepared[0]; + if (main_program_id_ != -1) { + // Run main porgram, if program is used for learning decay + auto main_program_desc = sub_program_->at(main_program_id_); + auto main_ctx = executor_->Prepare(main_program_desc, 0); + exec_context_ = std::move(main_ctx); + executor_->RunPreparedContext(exec_context_.get(), scope_, false); + // init decay_counters + decay_counters_.reserve(trainers_); + for (int32_t i = 0; i < trainers_; ++i) { + decay_counters_[i] = 0; } } + + return 0; } -int32_t DenseTensorTable::pull_dense(float *values, size_t numel) { - PADDLE_ENFORCE_EQ(numel, _data.numel(), - paddle::platform::errors::PreconditionNotMet( - "pull dense error, excepted numel %d, but actually %d.", - _data.numel(), numel)); +int32_t GlobalStepTable::set_table_map( + std::unordered_map> *table_map) { + auto *lr_var = scope_->FindVar(fetch_var_name_); + auto *lr_tensor = lr_var->GetMutable(); + auto *lr_value = lr_tensor->mutable_data(platform::CPUPlace()); + VLOG(3) << "GlobalStepTable::set_table_map set global lr: " << *lr_value; - GetBlas().VCOPY(numel, _data.data(), values); + for (auto iter = table_map->begin(); iter != table_map->end(); iter++) { + auto table_id = iter->first; + if (table_id == _config.table_id()) { + continue; + } + iter->second->set_global_lr(lr_value); + } return 0; } -int32_t DenseTensorTable::push_dense(const float *values, size_t numel) { - auto varname = _config.tensor().grad(); - auto local_scope = scope_->NewTmpScope(); - auto *var = local_scope->Var(varname); - auto *t = var->GetMutable(); - auto dims = paddle::framework::make_ddim({}); +int32_t GlobalStepTable::push_dense(const int64_t *values, + const int32_t trainer_id) { + return _run_program(values, trainer_id); +} - auto ctx = paddle::platform::CPUDeviceContext(); - t->mutable_data(_data.dims(), ctx.GetPlace()); +int32_t GlobalStepTable::_run_program(const int64_t *values, + const uint32_t trainer_id) { + FLAGS_eager_delete_tensor_gb = -1; + auto counter = decay_counters_.at(trainer_id); + counter += int(values[0]); + decay_counters_.at(trainer_id) = counter; - GetBlas().VCOPY(numel, values, t->data()); - executor_->RunPreparedContext((*prepared_ctx_)["push"].get(), - local_scope.get()); -} + auto *global_step_var = scope_->FindVar(feed_var_name_); + auto *tensor = global_step_var->GetMutable(); + auto *value = tensor->mutable_data(platform::CPUPlace()); -int32_t DenseTensorTable::push_dense_param(const float *values, size_t numel) { - auto ctx = paddle::platform::CPUDeviceContext(); - if (_data.IsInitialized()) { - PADDLE_ENFORCE_EQ( - numel, _data.numel(), - paddle::platform::errors::PreconditionNotMet( - "pull dense error, excepted numel %d, but actually %d.", - _data.numel(), numel)); - } else { - _data.mutable_data( - framework::make_ddim({static_cast(numel), 1}), ctx.GetPlace()); + auto global_counter = 0; + for (auto &trainer_counter : decay_counters_) { + global_counter += trainer_counter.second; } - GetBlas().VCOPY(numel, values, _data.data()); + // Todo: hard code for increment op + value[0] = global_counter - 1; + VLOG(3) << "GlobalStepTable::_run_program global_counter " << value[0]; + + executor_->RunPreparedContext(exec_context_.get(), scope_, false, false); + auto *lr_var = scope_->FindVar(fetch_var_name_); + auto *lr_tensor = lr_var->GetMutable(); + auto *lr_value = lr_tensor->mutable_data(platform::CPUPlace()); + VLOG(3) << "GlobalStepTable::LR value: " << lr_value[0]; return 0; } + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/table/tensor_table.h b/paddle/fluid/distributed/table/tensor_table.h index 9744c931c472053926ce1b772b050be08d6b46f0..58680145a43f63f60df2861b55ed91fd125e8d0a 100644 --- a/paddle/fluid/distributed/table/tensor_table.h +++ b/paddle/fluid/distributed/table/tensor_table.h @@ -14,166 +14,187 @@ #pragma once +#include +#include // NOLINT #include +#include // NOLINT +#include #include #include #include - -#include +#include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/distributed/table/table.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/program_desc.h" -#include "paddle/fluid/framework/scope.h" -#include "paddle/fluid/framework/tensor.h" -#include "paddle/fluid/operators/math/blas.h" #include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/enforce.h" namespace paddle { namespace distributed { +#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@" +#define STEP_COUNTER "@PS_STEP_COUNTER@" + class TensorTable : public Table { public: - TensorTable() : Table() {} - + TensorTable() {} virtual ~TensorTable() {} - virtual int32_t initialize() { return 0; } + int32_t pull_dense(float *values, size_t num) override { return 0; } - virtual int32_t pull_dense(float *values, size_t num) override { return 0; }; + int32_t push_dense(const float *values, size_t num) override { return 0; } - virtual int32_t push_dense(const float *values, size_t num) override { + int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) override { return 0; - }; + } + int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) override { + return 0; + } + int32_t shrink() override { return 0; } + + virtual void *get_shard(size_t shard_idx) { return 0; } - virtual void *get_shard(size_t shard_idx) override { return 0; } + virtual int32_t initialize_shard() { return 0; }; - virtual int32_t pull_sparse(float *values, const uint64_t *keys, - size_t num) override { + virtual int32_t flush() { return 0; }; + + virtual int32_t load(const std::string &path, const std::string ¶m) { return 0; - }; + } + virtual int32_t save(const std::string &path, const std::string ¶m) { + return 0; + } + + virtual void clear(){}; - virtual int32_t push_sparse(const uint64_t *keys, const float *values, - size_t num) override { + virtual int32_t initialize() override { return 0; }; + + virtual int32_t push_dense(const int64_t *values, + const int32_t trainer_id) override { return 0; }; - virtual int32_t push_dense_param(const float *values, size_t num) { + virtual int32_t set_program_env( + framework::Scope *scope, platform::Place place, + const std::vector *sub_program) override; + + protected: + framework::Executor *executor_; + framework::Scope *scope_; + platform::Place place_ = platform::CPUPlace(); + const std::vector *sub_program_; + paddle::distributed::TensorAccessorParameter program_config_; + std::shared_ptr exec_context_ = nullptr; +}; + +class DenseTensorTable : public TensorTable { + public: + DenseTensorTable() {} + virtual ~DenseTensorTable() {} + + int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) override { return 0; } + int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) override { + return 0; + } + int32_t shrink() override { return 0; } - virtual int32_t shrink() { return 0; } + virtual void *get_shard(size_t shard_idx) { return 0; } - virtual void clear() {} + virtual int32_t initialize_shard() { return 0; } virtual int32_t flush() { return 0; } - //指定加载路径 - virtual int32_t load(const std::string &path, const std::string &converter) { + virtual void clear() {} + + // Todo: Support program Load & Save + virtual int32_t load(const std::string &path, const std::string ¶m) { return 0; } - //指定保存路径 - virtual int32_t save(const std::string &path, const std::string &converter) { + virtual int32_t save(const std::string &path, const std::string ¶m) { return 0; } - protected: - virtual int32_t initialize_shard() { return 0; } + // Todo: Support pull dense + int32_t pull_dense(float *values, size_t num) override { return 0; } + + /*----------------------------------------------------------------------*/ + + virtual int32_t initialize() override { return 0; } + + int32_t push_dense(const float *values, size_t num) override { return 0; } - virtual int32_t initialize_tensor(paddle::framework::Scope *scope, - paddle::framework::ProgramDesc *program, - paddle::framework::Executor *executor) { + int32_t push_dense(const int64_t *values, const int32_t trainer_id) { return 0; } - std::vector> _shards_task_pool; + protected: + virtual int32_t _run_program(const float *values, size_t num, + const uint32_t trainer_id) { + return 0; + } - framework::Executor *executor_; - framework::Scope *scope_; - framework::ProgramDesc *program_; - std::unordered_map> - *prepared_ctx_; + int startup_program_id_ = -1; + int main_program_id_ = -1; + std::string feed_var_name_ = ""; + std::string fetch_var_name_ = ""; }; -class DenseTensorTable : public TensorTable { +class GlobalStepTable : public DenseTensorTable { public: - DenseTensorTable() : TensorTable() {} - ~DenseTensorTable() {} - virtual int32_t initialize(); + GlobalStepTable() {} + virtual ~GlobalStepTable() {} - void *get_shard(size_t shard_idx) { return 0; } - - int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) { + int32_t pull_sparse(float *values, const uint64_t *keys, + size_t num) override { return 0; } - int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) { + int32_t push_sparse(const uint64_t *keys, const float *values, + size_t num) override { return 0; } - int32_t shrink() { return 0; } + int32_t shrink() override { return 0; } - int32_t pull_dense(float *values, size_t num) override; - int32_t push_dense_param(const float *values, size_t num) override; - int32_t push_dense(const float *values, size_t num) override; + virtual void *get_shard(size_t shard_idx) { return 0; } + + virtual int32_t initialize_shard() { return 0; } - virtual void clear() {} virtual int32_t flush() { return 0; } - //指定加载路径 - virtual int32_t load(const std::string &path, const std::string &converter) { + virtual void clear() {} + + virtual int32_t load(const std::string &path, const std::string ¶m) { return 0; } - //指定保存路径 - virtual int32_t save(const std::string &path, const std::string &converter) { + virtual int32_t save(const std::string &path, const std::string ¶m) { return 0; } - protected: - virtual int32_t initialize_shard() { return 0; } + int32_t pull_dense(float *values, size_t num) override { return 0; } - virtual int32_t initialize_tensor(paddle::framework::Scope *scope, - paddle::framework::ProgramDesc *program, - paddle::framework::Executor *executor); + /*----------------------------------------------------------------------*/ - protected: - framework::Tensor _data; + int32_t initialize() override; + + int32_t push_dense(const float *values, size_t num) override { return 0; } + + int32_t push_dense(const int64_t *values, const int32_t trainer_id); + + int32_t set_table_map( + std::unordered_map> *table_map) override; + + private: + virtual int32_t _run_program(const int64_t *values, + const uint32_t trainer_id); + + private: + std::unordered_map decay_counters_; + int32_t trainers_; }; -// -//// common sparse table [0, N) with out large scale -// class SparseTensorTable : public TensorTable { -// void *get_shard(size_t shard_idx) { return 0; } -// -// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) -// override; -// int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) -// override ; -// int32_t shrink() { return 0; } -// void *get_shard(size_t shard_idx) { return 0; }; -// -// int32_t pull_dense(float *values, size_t num) { return 0; }; -// int32_t push_dense_param(const float *values, size_t num) { return 0; }; -// int32_t push_dense(const float *values, size_t num) { return 0; }; -// -// protected: -// framework::Tensor _data; -//}; - -//// for Large scale kv tensor [0, int64] do not use specific optimizer -// class KvTensorTable : public TensorTable { -// int32_t pull_dense(float *values, size_t num) { return 0; }; -// int32_t push_dense_param(const float *values, size_t num) { return 0; }; -// int32_t push_dense(const float *values, size_t num) { return 0; }; -// -// void *get_shard(size_t shard_idx) override; -// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) -// override; -// int32_t push_sparse(const uint64_t *keys, const float *values, -// size_t num) override; -// int32_t shrink() override; -// void *get_shard(size_t shard_idx) override; -//}; -// -//// for Geo sparse handle -// class GeoSparseTensorTable : public TensorTable {}; + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc b/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc index 3b2f808a2a82d558a6dabc85b57139f99d8ea389..a7af4c82897f1c97e4389f47936086fa8e145703 100644 --- a/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc +++ b/paddle/fluid/distributed/test/brpc_service_dense_sgd_test.cc @@ -20,10 +20,10 @@ limitations under the License. */ #include "google/protobuf/text_format.h" #include "gtest/gtest.h" #include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/variable.h" - #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/string/printf.h" @@ -157,7 +157,10 @@ void RunServer() { pserver_ptr_ = std::shared_ptr( paddle::distributed::PSServerFactory::create(server_proto)); LOG(INFO) << "RUN configure"; - pserver_ptr_->configure(server_proto, _ps_env, 0); + std::vector empty_vec; + framework::ProgramDesc empty_prog; + empty_vec.push_back(empty_prog); + pserver_ptr_->configure(server_proto, _ps_env, 0, empty_vec); LOG(INFO) << "RUN start"; pserver_ptr_->start(ip_, port_); LOG(INFO) << "End start"; diff --git a/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc b/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc index 224b9ba2fc780a217bbe4a007d624d0f7afcedf0..8cee608d5f76eb3daa88e93dbb5187171053d3f0 100644 --- a/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc +++ b/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc @@ -24,10 +24,6 @@ limitations under the License. */ #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/variable.h" -#include "paddle/fluid/operators/math/math_function.h" -#include "paddle/fluid/platform/place.h" -#include "paddle/fluid/string/printf.h" - #include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/service/brpc_ps_client.h" #include "paddle/fluid/distributed/service/brpc_ps_server.h" @@ -35,6 +31,10 @@ limitations under the License. */ #include "paddle/fluid/distributed/service/ps_client.h" #include "paddle/fluid/distributed/service/sendrecv.pb.h" #include "paddle/fluid/distributed/service/service.h" +#include "paddle/fluid/framework/program_desc.h" +#include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/string/printf.h" namespace framework = paddle::framework; namespace platform = paddle::platform; @@ -155,7 +155,10 @@ void RunServer() { _ps_env.set_ps_servers(&host_sign_list_, 1); pserver_ptr_ = std::shared_ptr( paddle::distributed::PSServerFactory::create(server_proto)); - pserver_ptr_->configure(server_proto, _ps_env, 0); + std::vector empty_vec; + framework::ProgramDesc empty_prog; + empty_vec.push_back(empty_prog); + pserver_ptr_->configure(server_proto, _ps_env, 0, empty_vec); pserver_ptr_->start(ip_, port_); } diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index aa2867debe3cc55634cd84db3be292fef7a70074..2eaf08153e8ecc9beda4edd28b5dadf11c86cb2b 100644 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -108,6 +108,7 @@ message AsyncConfig { optional bool runtime_split_send_recv = 8 [ default = false ]; optional bool launch_barrier = 9 [ default = true ]; optional string heter_worker_device_guard = 10 [ default = 'cpu' ]; + optional int32 lr_decay_steps = 11 [ default = 10 ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } diff --git a/paddle/fluid/operators/pscore/send_op.cc b/paddle/fluid/operators/pscore/send_op.cc index 2ede86e223e400ffb4b951e20572ead50e07e9bd..4e9f8a9a3606b90e4346cb52113ff5a237e2a565 100644 --- a/paddle/fluid/operators/pscore/send_op.cc +++ b/paddle/fluid/operators/pscore/send_op.cc @@ -52,8 +52,9 @@ class SendOp : public framework::OperatorBase { auto send_varnames = Attr>("send_varnames"); auto* communicator = paddle::distributed::Communicator::GetInstance(); - communicator->Check(send_varnames); - communicator->Send(ins, scope); + if (communicator->Check(send_varnames)) { + communicator->Send(ins, scope); + } // auto fleet = paddle::distributed::FleetWrapper::GetInstance(); // if (is_sparse == 0) { diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 428deee17bd631d5559f88f0629e49954a013a98..4dd43175a1162189fea2a143a547ae4c696b4a93 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -62,7 +62,7 @@ void BindDistFleetWrapper(py::module* m) { .def("stop_server", &FleetWrapper::StopServer) .def("stop_worker", &FleetWrapper::FinalizeWorker) .def("barrier", &FleetWrapper::BarrierWithTable); -} // end BindDistFleetWrapper +} void BindPSHost(py::module* m) { py::class_(*m, "PSHost") @@ -79,8 +79,8 @@ void BindCommunicatorContext(py::module* m) { .def( py::init&, const std::vector&, const std::vector&, - const std::vector&, int, bool, bool, bool, - int>()) + const std::vector&, int, bool, bool, bool, int, + bool>()) .def("var_name", [](const CommContext& self) { return self.var_name; }) .def("trainer_id", [](const CommContext& self) { return self.trainer_id; }) @@ -97,6 +97,8 @@ void BindCommunicatorContext(py::module* m) { [](const CommContext& self) { return self.is_distributed; }) .def("origin_varnames", [](const CommContext& self) { return self.origin_varnames; }) + .def("is_tensor_table", + [](const CommContext& self) { return self.is_tensor_table; }) .def("__str__", [](const CommContext& self) { return self.print(); }); } diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py index 3be2d320d494e9be67c9f510cfbb3908b661d845..8fd172b5227492fc3496d279a42ba99d93b814ec 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py @@ -64,6 +64,11 @@ class ParameterServerOptimizer(MetaOptimizerBase): _main = compiled_config.origin_main_program.clone() _startup = compiled_config.origin_startup_program.clone() + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _add_lr_decay_table_pass + _add_lr_decay_table_pass( + _main, compiled_config, + self.user_defined_strategy.a_sync_configs["lr_decay_steps"]) + if not compiled_config.is_geo_mode(): # for main program _main = worker.delete_optimizer_pass(_main, compiled_config) @@ -128,6 +133,12 @@ class ParameterServerOptimizer(MetaOptimizerBase): if len(ops) == 0: return _main, _startup + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _add_lr_decay_table_pass + lr_decay_steps = self.user_defined_strategy.a_sync_configs[ + "lr_decay_steps"] + _add_lr_decay_table_pass(main_program, compiled_config, + lr_decay_steps) + for op in ops: if op.type in ["sgd", "adam"]: is_sgd_adam = True diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index 4b932a8832429f17a49626a4a1bc3118d6eac81a..3b17be1aa075871d23d48a4d3746028667eb8bb6 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -206,6 +206,28 @@ class CommonAccessor: conv_indent(indent), attrs, conv_indent(indent)) +class Tensor: + def __init__(self): + self.main_program_id = None + self.startup_program_id = None + self.feed_var_name = None + self.fetch_var_name = None + self.tensor_table_class = False + + def to_string(self, indent): + program_str = "{}tensor {{{}\n{}}}" + attrs = "" + attrs += "feed_var_name: \"{}\" ".format(str(self.feed_var_name)) + attrs += "fetch_var_name: \"{}\" ".format(str(self.fetch_var_name)) + attrs += "startup_program_id: {} ".format(str(self.startup_program_id)) + attrs += "main_program_id: {} ".format(str(self.main_program_id)) + attrs += "tensor_table_class: \"{}\" ".format( + str(self.tensor_table_class)) + attrs += "\n" + return program_str.format( + conv_indent(indent), attrs, conv_indent(indent)) + + class Table: def __init__(self): self.id = -1 @@ -214,6 +236,7 @@ class Table: self.type = None self.accessor = None self.common = None + self.tensor = None def to_string(self, indent): table_str = "{}downpour_table_param {{{}\n{}}}" @@ -230,6 +253,10 @@ class Table: attrs += self.accessor.to_string(indent) attrs += "\n" + if self.tensor is not None: + attrs += self.tensor.to_string(indent) + attrs += "\n" + if self.common is not None: attrs += self.common.to_string(indent) attrs += "\n" @@ -355,6 +382,7 @@ class TheOnePSRuntime(RuntimeBase): self._communicator = None self._server = None self._worker = fluid.core.DistFleetWrapper() + self._server_sub_program = [] self._heter_client = None def _set_basic_info(self, context): @@ -569,17 +597,73 @@ class TheOnePSRuntime(RuntimeBase): table.common = common return table + def _build_tensor_table(idx, tensor_dict): + table = Table() + table.id = idx + table.type = "PS_OTHER_TABLE" + table.table_class = tensor_dict["tensor_table_class"] + table.shard_num = 256 + + accessor = Accessor() + accessor.accessor_class = "CommMergeAccessor" + accessor.optimizer = None + accessor.feature_dim = 0 + accessor.embedding_dim = 0 + table.accessor = accessor + + common = CommonAccessor() + common.table_name = tensor_dict["feed_var_name"] + common.trainer_num = self.compiled_strategy.get_trainers() + common.attrs = "" + common.dims = [] + common.params = [] + table.common = common + + tensor = Tensor() + tensor.main_program_id = tensor_dict["main_program_id"] + tensor.startup_program_id = tensor_dict["startup_program_id"] + tensor.feed_var_name = tensor_dict["feed_var_name"] + tensor.fetch_var_name = tensor_dict["fetch_var_name"] + tensor.tensor_table_class = tensor_dict["tensor_table_class"] + table.tensor = tensor + + return table + + def _add_tensor_table(tables): + tensor_table_dict = self.compiled_strategy.get_tensor_table_dict() + program_idx = 0 + for table_name in tensor_table_dict: + if tensor_table_dict[table_name]["startup_program"] != None: + tensor_table_dict[table_name][ + "startup_program_id"] = program_idx + self._server_sub_program.append(tensor_table_dict[ + table_name]["startup_program"].desc) + program_idx += 1 + if tensor_table_dict[table_name]["main_program"] != None: + tensor_table_dict[table_name][ + "main_program_id"] = program_idx + self._server_sub_program.append(tensor_table_dict[ + table_name]["main_program"].desc) + program_idx += 1 + # Todo: Hard code for lr_decay table apply table id + new_table = _build_tensor_table( + len(tables), tensor_table_dict[table_name]) + tables.append(new_table) + return tables + def _get_tables(): send_ctx = self.compiled_strategy.get_the_one_send_context( use_origin_program=True, split_dense_table=self.role_maker. _is_heter_parameter_server_mode) - tables = [i for i in range(len(send_ctx) + 1)] - + tables = [] for idx, (name, ctx) in enumerate(send_ctx.items()): table = Table() table.id = ctx.table_id() + if ctx.is_tensor_table(): + continue + if ctx.is_sparse(): if len(ctx.origin_varnames()) < 1: continue @@ -619,10 +703,17 @@ class TheOnePSRuntime(RuntimeBase): accessor = _build_merge_accessor(ctx) table.accessor = accessor - tables[table.id] = table + tables.append(table) + + tensor_table_dict = self.compiled_strategy.get_tensor_table_dict() + if len(tensor_table_dict) > 0: + tables = _add_tensor_table(tables) + else: + empty_porgram = Program() + self._server_sub_program.append(empty_porgram.desc) - barrier_table = _build_barrier_table(len(send_ctx)) - tables[-1] = barrier_table + barrier_table = _build_barrier_table(len(tables)) + tables.append(barrier_table) return tables if is_server: @@ -667,7 +758,8 @@ class TheOnePSRuntime(RuntimeBase): string_hosts.append(pshost.serialize_to_string()) self._server = fluid.core.DistFleetWrapper() - self._server.init_server(proto_txt, string_hosts, role_id) + self._server.init_server(proto_txt, string_hosts, role_id, + self._server_sub_program) from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_tablenames diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index 20eed71e06b2141e6aad71e6c914e45b6a422e68..b987e01bba46ec1dff516381bc534d384b928c9e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -19,7 +19,7 @@ import collections import math import os import warnings - +import logging import six import paddle.fluid as fluid from paddle.fluid import core @@ -162,6 +162,8 @@ class CompileTimeStrategy(object): self._build_var_distributed() + self.tensor_table_dict = {} + # for heter-ps save variables self.origin_merged_variables_pairs = list(self.merged_variables_pairs) self.origin_merged_dense_pairs = list(self.merged_dense_pairs) @@ -240,6 +242,24 @@ class CompileTimeStrategy(object): def get_origin_ps_startup_program(self): return self.origin_ps_startup_program + def add_tensor_table(self, + feed_var_name, + fetch_var_name="", + startup_program=None, + main_program=None, + tensor_table_class=""): + self.tensor_table_dict[feed_var_name] = {} + self.tensor_table_dict[feed_var_name]["feed_var_name"] = feed_var_name + self.tensor_table_dict[feed_var_name]["fetch_var_name"] = fetch_var_name + self.tensor_table_dict[feed_var_name][ + "startup_program"] = startup_program + self.tensor_table_dict[feed_var_name]["main_program"] = main_program + self.tensor_table_dict[feed_var_name][ + "tensor_table_class"] = tensor_table_class + + def get_tensor_table_dict(self): + return self.tensor_table_dict + def get_sparse_varname_on_ps(self, is_distributed, endpoint=None): if not endpoint: endpoint = self.get_ps_endpoint() @@ -523,9 +543,10 @@ class CompileTimeStrategy(object): grad.merged_var.name] var_numel = reduce(lambda x, y: x * y, var.shape[1:]) - sparse_ctx = CommContext( - grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], - [grad_name], trainer_id, True, True, is_distributed, idx) + sparse_ctx = CommContext(grad_name, [grad_name], + ["127.0.0.1:6071"], [var_numel], + [grad_name], trainer_id, True, True, + is_distributed, idx, False) idx += 1 send_ctx[sparse_ctx.var_name()] = sparse_ctx @@ -533,6 +554,10 @@ class CompileTimeStrategy(object): raise ValueError( "GeoSGD require sparse parameters in your net.") + if len(self.tensor_table_dict) > 0 and self.role_maker._is_worker(): + name, ctx = self._step_ctx(idx) + send_ctx[name] = ctx + return send_ctx else: return self.get_the_one_send_context(split_dense_table) @@ -559,7 +584,7 @@ class CompileTimeStrategy(object): aggregate = True dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], origin_varnames, trainer_id, - aggregate, False, False, idx) + aggregate, False, False, idx, False) send_ctx[grad_name] = dense_ctx idx += 1 else: @@ -571,9 +596,10 @@ class CompileTimeStrategy(object): var_numel = reduce(lambda x, y: x * y, var.shape) grad_name = origin_varname aggregate = True - dense_ctx = CommContext( - grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], - [origin_varname], trainer_id, aggregate, False, False, idx) + dense_ctx = CommContext(grad_name, [grad_name], + ["127.0.0.1:6071"], [var_numel], + [origin_varname], trainer_id, aggregate, + False, False, idx, False) send_ctx[grad_name] = dense_ctx idx += 1 return idx @@ -615,10 +641,15 @@ class CompileTimeStrategy(object): sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape, [grad_name], trainer_id, True, True, - is_distributed, idx) + is_distributed, idx, False) idx += 1 send_ctx[sparse_ctx.var_name()] = sparse_ctx + + if len(self.tensor_table_dict) > 0 and self.role_maker._is_worker(): + name, ctx = self._step_ctx(idx) + send_ctx[name] = ctx + return send_ctx def get_the_one_recv_context(self, @@ -633,6 +664,8 @@ class CompileTimeStrategy(object): for idx, (name, ctx) in enumerate(send_ctx.items()): if ctx.is_sparse(): continue + if ctx.is_tensor_table(): + continue origin_grad_varnames = ctx.origin_varnames() @@ -679,14 +712,14 @@ class CompileTimeStrategy(object): var_distributed.append((g.name, ep, g.shape[0])) return var_distributed - def _step_ctx(self): + def _step_ctx(self, idx): name = STEP_COUNTER trainer_id = self.get_role_id() endpoints = self.get_ps_endpoints() sections = [1] * len(endpoints) names = [name] * len(endpoints) ctx = CommContext(name, names, endpoints, sections, [name], trainer_id, - True, False, False) + True, False, False, idx, True) return name, ctx def _create_vars_from_blocklist(self, block_list): @@ -1118,6 +1151,89 @@ def _get_optimize_ops(_program): return opt_ops +def _add_lr_decay_table_pass(main_program, compiled_config, lr_decay_steps): + if hasattr(compiled_config.origin_main_program, 'lr_sheduler'): + from paddle.optimizer.lr import LRScheduler + assert isinstance(compiled_config.origin_main_program.lr_sheduler, + LRScheduler), "must be LRScheduler" + ops = _get_optimize_ops(compiled_config.origin_main_program) + lr_param_dict = _get_lr_param_dict(ops) + lr_decay_main_program, lr_decay_startup_program, lr_name = _get_lr_sheduler_program( + compiled_config.origin_main_program.lr_sheduler, lr_param_dict, + lr_decay_steps) + compiled_config.add_tensor_table( + "@LR_DECAY_COUNTER@", lr_name, lr_decay_startup_program, + lr_decay_main_program, "GlobalStepTable") + + +def _get_lr_param_dict(opt_ops): + lr_param_dict = {} + for op in opt_ops: + lr_name = op.input("LearningRate")[0] + param_name = op.input("Param")[0] + if lr_name not in lr_param_dict: + lr_param_dict[lr_name] = [] + lr_param_dict[lr_name].append(param_name) + return lr_param_dict + + +def _get_lr_sheduler_program(lr_sheduler, lr_param_dict, lr_decay_steps): + schedler_decay = [ + 'NoamDecay', 'NaturalExpDecay', 'InverseTimeDecay', 'ExponentialDecay' + ] + + from paddle.optimizer.lr import ExponentialDecay, NoamDecay, PiecewiseDecay, NaturalExpDecay, InverseTimeDecay + from paddle.fluid.layers.learning_rate_scheduler import exponential_decay, noam_decay, piecewise_decay, natural_exp_decay, inverse_time_decay + + decay_main_program = fluid.framework.Program() + decay_startup_program = fluid.framework.Program() + lr_name = "" + + if isinstance(lr_sheduler, ExponentialDecay): + with fluid.program_guard(decay_main_program, decay_startup_program): + lr = exponential_decay(1.0, lr_decay_steps, lr_sheduler.gamma, True) + lr_name = lr.name + logging.warn( + "ExponentialDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow: \n" + "\t strategy = paddle.distributed.fleet.DistributedStrategy() \n " + "\t strategy.a_sync = True \n" + "\t strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP } \n" + % lr_decay_steps) + elif isinstance(lr_sheduler, NoamDecay): + with fluid.program_guard(decay_main_program, decay_startup_program): + lr = noam_decay(lr_sheduler.d_model, lr_sheduler.warmup_steps, 1.0) + lr_name = lr.name + logging.warn("NoamDecay is set, warmup steps is [ %d ]" % + lr_sheduler.warmup_steps) + elif isinstance(lr_sheduler, NaturalExpDecay): + with fluid.program_guard(decay_main_program, decay_startup_program): + lr = natural_exp_decay(1.0, lr_decay_steps, lr_sheduler.gamma, True) + lr_name = lr.name + logging.warn( + "NaturalExpDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow: \n" + "\t strategy = paddle.distributed.fleet.DistributedStrategy() \n " + "\t strategy.a_sync = True \n" + "\t strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP } \n" + % lr_decay_steps) + elif isinstance(lr_sheduler, InverseTimeDecay): + with fluid.program_guard(decay_main_program, decay_startup_program): + lr = inverse_time_decay(1.0, lr_decay_steps, lr_sheduler.gamma, + True) + lr_name = lr.name + logging.warn( + "InverseTimeDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow: \n" + "\t strategy = paddle.distributed.fleet.DistributedStrategy() \n " + "\t strategy.a_sync = True \n" + "\t strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP } \n" + % lr_decay_steps) + else: + raise ValueError( + "Not supported current LearningRate strategy, please use follow decay strategy: {}". + format(schedler_decay)) + + return decay_main_program, decay_startup_program, lr_name + + def _get_varname_parts(varname): # returns origin, blockid, trainerid orig_var_name = "" diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 77c865c9a2fafedeefa91bb1874160ccebae129a..53fb86a9f5aa2cbe499af54f99f2eafa4821eb39 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -34,7 +34,6 @@ from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode OP_NAME_SCOPE = "op_namescope" CLIP_OP_NAME_SCOPE = "@CLIP" STEP_COUNTER = "@PS_STEP_COUNTER@" - OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName() RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC @@ -43,7 +42,6 @@ OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} - DEVICE_LIST = ["cpu", "gpu", "xpu"] COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] DEFAULT_DEVICE = 'cpu' @@ -72,11 +70,26 @@ def delete_optimizer_pass(program, config): if _program.global_block().has_var(var): _program.global_block()._remove_var(var) + def _add_lr_var(main_program, compiled_config): + # Todo: hard code for pe + lr_var = compiled_config.origin_main_program.global_block().vars[ + "learning_rate_0"] + main_program.global_block().create_var( + name=lr_var.name, + shape=lr_var.shape, + dtype=lr_var.dtype, + type=lr_var.type, + lod_level=lr_var.lod_level, + persistable=True) + optimizer_ops = _get_optimize_ops(program) lr_ops = _get_lr_ops(program) optimizer_ops.extend(lr_ops) _delete_optimizer_op_and_vars(program, optimizer_ops) + if hasattr(config.origin_main_program, 'lr_sheduler'): + _add_lr_var(program, config) + return program diff --git a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py index 9e3f0b7d9126e5fdb6767a5be4344943209d4c67..815e77896ed6d23c2e3021d75741fb36202058aa 100644 --- a/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py +++ b/python/paddle/fluid/tests/unittests/ctr_dataset_reader.py @@ -179,7 +179,7 @@ def gen_zero_line(dnn_data_num=7, lr_data_num=5): return line -def prepare_fake_data(file_nums=6, file_lines=1000): +def prepare_fake_data(file_nums=4, file_lines=500): """ Create fake data with same type as avazu_ctr_data """ diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 364077ebde833ce5584552c8446da781c34cfcb0..aefab47ee7c20e18748001f1d9de68db3b8fbb72 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -13,6 +13,11 @@ # limitations under the License. from __future__ import print_function +from paddle.distributed.fleet.utils.ps_util import Distributed +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid """ high level unit test for distribute fleet. """ @@ -37,6 +42,7 @@ import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.distributed.fleet.utils.ps_util import Distributed +paddle.enable_static() __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] @@ -120,14 +126,20 @@ class FleetDistRunnerBase(object): fluid.clip.set_gradient_clip( clip=fluid.clip.GradientClipByGlobalNorm(2.0)) - use_decay = int(os.getenv("DECAY", "0")) + use_decay = int(os.getenv("USE_DECAY", "0")) if use_decay: + scheduler = paddle.optimizer.lr.ExponentialDecay( + learning_rate=LEARNING_RATE, gamma=0.999, verbose=True) + optimizer = fluid.optimizer.SGD(scheduler) + """ + # learning rate decay method before 2.0 optimizer = fluid.optimizer.SGD( learning_rate=fluid.layers.exponential_decay( learning_rate=LEARNING_RATE, decay_steps=500, decay_rate=0.969, - staircase=True)) + staircase=True)) + """ else: optimizer = fluid.optimizer.SGD(LEARNING_RATE) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_decay.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_decay.py new file mode 100644 index 0000000000000000000000000000000000000000..f52cace4cf3bdf64400288631cacc5f362da719b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_decay.py @@ -0,0 +1,80 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid +import os +import unittest +import paddle +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestNoamDecay(unittest.TestCase): + def net(self): + input_data = paddle.static.data( + name="sparse_input", shape=[None, 1], dtype="int64") + input_label = paddle.static.data( + name="label", shape=[None, 1], dtype="int64") + label = paddle.cast(input_label, dtype="float32") + embedding = paddle.static.nn.embedding( + input_data, is_sparse=True, size=[1000, 128]) + + fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu") + fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu") + fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu") + predict = paddle.static.nn.fc(fc3, size=2, activation="softmax") + label = paddle.cast(label, dtype="int64") + cost = paddle.nn.functional.cross_entropy(input=predict, label=label) + paddle.static.Print(cost, message="heter_cost") + return cost + + def test(self): + endpoints = [ + "127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006", + "127.0.0.1:36007" + ] + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.WORKER, + worker_num=2, + server_endpoints=endpoints) + + fleet.init(role) + loss = self.net() + scheduler = paddle.optimizer.lr.NoamDecay( + d_model=0.01, warmup_steps=100, verbose=True) + optimizer = fluid.optimizer.Adam(scheduler) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True + strategy.a_sync_configs = {"launch_barrier": False} + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps10.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps10.py new file mode 100644 index 0000000000000000000000000000000000000000..f46934f3a4c606328d235fa8627067e5ccc3bd60 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps10.py @@ -0,0 +1,83 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.fluid as fluid +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.distributed.fleet as fleet +import unittest +import paddle + +import os + +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestExponentialDecay(unittest.TestCase): + def net(self): + input_data = paddle.static.data( + name="sparse_input", shape=[None, 1], dtype="int64") + input_label = paddle.static.data( + name="label", shape=[None, 1], dtype="int64") + label = paddle.cast(input_label, dtype="float32") + embedding = paddle.static.nn.embedding( + input_data, is_sparse=True, size=[1000, 128]) + + fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu") + fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu") + fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu") + predict = paddle.static.nn.fc(fc3, size=2, activation="softmax") + label = paddle.cast(label, dtype="int64") + cost = paddle.nn.functional.cross_entropy(input=predict, label=label) + return cost + + def test(self): + endpoints = [ + "127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006", + "127.0.0.1:36007" + ] + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=endpoints) + + fleet.init(role) + loss = self.net() + scheduler = paddle.optimizer.lr.InverseTimeDecay( + learning_rate=base_lr, gamma=0.999, verbose=True) + optimizer = fluid.optimizer.Adam(scheduler) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + fleet.init_server() + + +if __name__ == '__main__': + os.environ["GLOG_v"] = "4" + os.environ["GLOG_logtostderr"] = "1" + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps7.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps7.py new file mode 100644 index 0000000000000000000000000000000000000000..c6453d81520c55552bf57c44bff934d7be5f5886 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps7.py @@ -0,0 +1,82 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid +import os +import unittest +import paddle +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestNaturalExpDecay(unittest.TestCase): + def net(self): + input_data = paddle.static.data( + name="sparse_input", shape=[None, 1], dtype="int64") + input_label = paddle.static.data( + name="label", shape=[None, 1], dtype="int64") + label = paddle.cast(input_label, dtype="float32") + embedding = paddle.static.nn.embedding( + input_data, is_sparse=True, size=[1000, 128]) + + fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu") + fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu") + fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu") + predict = paddle.static.nn.fc(fc3, size=2, activation="softmax") + label = paddle.cast(label, dtype="int64") + cost = paddle.nn.functional.cross_entropy(input=predict, label=label) + paddle.static.Print(cost, message="heter_cost") + return cost + + def test(self): + endpoints = [ + "127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006", + "127.0.0.1:36007" + ] + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=endpoints) + + fleet.init(role) + loss = self.net() + scheduler = paddle.optimizer.lr.NaturalExpDecay( + learning_rate=base_lr, gamma=0.999, verbose=True) + optimizer = fluid.optimizer.Adam(scheduler) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + fleet.init_server() + + +if __name__ == '__main__': + os.environ["GLOG_v"] = "4" + os.environ["GLOG_logtostderr"] = "1" + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps8.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps8.py new file mode 100644 index 0000000000000000000000000000000000000000..32b2959531b26bd59c3aedc6cd0e454eca557a23 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps8.py @@ -0,0 +1,82 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid +import os +import unittest +import paddle +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestNoamDecay(unittest.TestCase): + def net(self): + input_data = paddle.static.data( + name="sparse_input", shape=[None, 1], dtype="int64") + input_label = paddle.static.data( + name="label", shape=[None, 1], dtype="int64") + label = paddle.cast(input_label, dtype="float32") + embedding = paddle.static.nn.embedding( + input_data, is_sparse=True, size=[1000, 128]) + + fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu") + fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu") + fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu") + predict = paddle.static.nn.fc(fc3, size=2, activation="softmax") + label = paddle.cast(label, dtype="int64") + cost = paddle.nn.functional.cross_entropy(input=predict, label=label) + paddle.static.Print(cost, message="heter_cost") + return cost + + def test(self): + endpoints = [ + "127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006", + "127.0.0.1:36007" + ] + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=endpoints) + + fleet.init(role) + loss = self.net() + scheduler = paddle.optimizer.lr.NoamDecay( + d_model=0.01, warmup_steps=100, verbose=True) + optimizer = fluid.optimizer.Adam(scheduler) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + fleet.init_server() + + +if __name__ == '__main__': + os.environ["GLOG_v"] = "4" + os.environ["GLOG_logtostderr"] = "1" + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps9.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps9.py new file mode 100644 index 0000000000000000000000000000000000000000..4cd49041b8aa9c24e4d674f9b932d0e7cbc63c4b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps9.py @@ -0,0 +1,82 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid +import os +import unittest +import paddle +paddle.enable_static() + +# For Net +base_lr = 0.2 +emb_lr = base_lr * 3 +dict_dim = 1500 +emb_dim = 128 +hid_dim = 128 +margin = 0.1 +sample_rate = 1 +batch_size = 4 + + +class TestExponentialDecay(unittest.TestCase): + def net(self): + input_data = paddle.static.data( + name="sparse_input", shape=[None, 1], dtype="int64") + input_label = paddle.static.data( + name="label", shape=[None, 1], dtype="int64") + label = paddle.cast(input_label, dtype="float32") + embedding = paddle.static.nn.embedding( + input_data, is_sparse=True, size=[1000, 128]) + + fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu") + fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu") + fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu") + predict = paddle.static.nn.fc(fc3, size=2, activation="softmax") + label = paddle.cast(label, dtype="int64") + cost = paddle.nn.functional.cross_entropy(input=predict, label=label) + paddle.static.Print(cost, message="heter_cost") + return cost + + def test(self): + endpoints = [ + "127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006", + "127.0.0.1:36007" + ] + + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=endpoints) + + fleet.init(role) + loss = self.net() + scheduler = paddle.optimizer.lr.ExponentialDecay( + learning_rate=base_lr, gamma=0.999, verbose=True) + optimizer = fluid.optimizer.Adam(scheduler) + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(loss) + fleet.init_server() + + +if __name__ == '__main__': + os.environ["GLOG_v"] = "4" + os.environ["GLOG_logtostderr"] = "1" + unittest.main()