未验证 提交 528e03fc 编写于 作者: C Chengmo 提交者: GitHub

【Paddle.Fleet】Fix tensor table (#30075)

* add tensor table
上级 1bdf9242
...@@ -30,7 +30,8 @@ struct CommContext { ...@@ -30,7 +30,8 @@ struct CommContext {
const std::vector<int64_t> &sections, const std::vector<int64_t> &sections,
const std::vector<std::string> &origin_names, int id, const std::vector<std::string> &origin_names, int id,
bool merge_add_ = true, bool is_sparse_ = true, bool merge_add_ = true, bool is_sparse_ = true,
bool is_distributed_ = false, int table_id_ = -1) bool is_distributed_ = false, int table_id_ = -1,
bool is_tensor_table_ = false)
: var_name(name), : var_name(name),
splited_varnames(names), splited_varnames(names),
epmap(emap), epmap(emap),
...@@ -40,7 +41,8 @@ struct CommContext { ...@@ -40,7 +41,8 @@ struct CommContext {
merge_add(merge_add_), merge_add(merge_add_),
is_sparse(is_sparse_), is_sparse(is_sparse_),
is_distributed(is_distributed_), is_distributed(is_distributed_),
table_id(table_id_) {} table_id(table_id_),
is_tensor_table(is_tensor_table_) {}
CommContext(const CommContext &ctx) { CommContext(const CommContext &ctx) {
var_name = ctx.var_name; var_name = ctx.var_name;
...@@ -53,6 +55,7 @@ struct CommContext { ...@@ -53,6 +55,7 @@ struct CommContext {
origin_varnames = ctx.origin_varnames; origin_varnames = ctx.origin_varnames;
is_distributed = ctx.is_distributed; is_distributed = ctx.is_distributed;
table_id = ctx.table_id; table_id = ctx.table_id;
is_tensor_table = ctx.is_tensor_table;
} }
std::string print() const { std::string print() const {
...@@ -75,6 +78,7 @@ struct CommContext { ...@@ -75,6 +78,7 @@ struct CommContext {
ss << " is_sparse: " << is_sparse; ss << " is_sparse: " << is_sparse;
ss << " is_distributed: " << is_distributed << "\n"; ss << " is_distributed: " << is_distributed << "\n";
ss << " table_id: " << table_id << "\n"; ss << " table_id: " << table_id << "\n";
ss << " is_tensor_table: " << is_tensor_table << "\n";
return ss.str(); return ss.str();
} }
...@@ -89,6 +93,7 @@ struct CommContext { ...@@ -89,6 +93,7 @@ struct CommContext {
bool is_sparse; bool is_sparse;
bool is_distributed; bool is_distributed;
int table_id; int table_id;
bool is_tensor_table;
}; };
} // namespace distributed } // namespace distributed
......
...@@ -53,15 +53,16 @@ void FleetWrapper::LoadSparseOnServer(const std::string& path, ...@@ -53,15 +53,16 @@ void FleetWrapper::LoadSparseOnServer(const std::string& path,
pserver_ptr_->_server_ptr->table(table_id)->load(path, meta); pserver_ptr_->_server_ptr->table(table_id)->load(path, meta);
} }
void FleetWrapper::InitServer(const std::string& dist_desc, void FleetWrapper::InitServer(
const std::vector<std::string>& host_sign_list, const std::string& dist_desc,
int index) { const std::vector<std::string>& host_sign_list, int index,
const std::vector<framework::ProgramDesc>& server_sub_program) {
if (!is_initialized_) { if (!is_initialized_) {
VLOG(3) << "Going to init server"; VLOG(3) << "Going to init server";
pserver_ptr_ = std::shared_ptr<paddle::distributed::PSCore>( pserver_ptr_ = std::shared_ptr<paddle::distributed::PSCore>(
new paddle::distributed::PSCore()); new paddle::distributed::PSCore());
pserver_ptr_->init_server(dist_desc, &host_sign_list, host_sign_list.size(), pserver_ptr_->init_server(dist_desc, &host_sign_list, host_sign_list.size(),
index); index, server_sub_program);
is_initialized_ = true; is_initialized_ = true;
} else { } else {
VLOG(3) << "Server can be initialized only once"; VLOG(3) << "Server can be initialized only once";
......
...@@ -154,8 +154,10 @@ class FleetWrapper { ...@@ -154,8 +154,10 @@ class FleetWrapper {
// init server // init server
// void InitServer(const std::string& dist_desc, // void InitServer(const std::string& dist_desc,
// const std::vector<uint64_t>& host_sign_list, int index); // const std::vector<uint64_t>& host_sign_list, int index);
void InitServer(const std::string& dist_desc, void InitServer(
const std::vector<std::string>& host_sign_list, int index); const std::string& dist_desc,
const std::vector<std::string>& host_sign_list, int index,
const std::vector<framework::ProgramDesc>& server_sub_program = {});
// init trainer // init trainer
void InitWorker(const std::string& dist_desc, void InitWorker(const std::string& dist_desc,
const std::vector<std::string>& host_sign_list, Scope* scope, const std::vector<std::string>& host_sign_list, Scope* scope,
......
...@@ -126,12 +126,11 @@ message TableAccessorParameter { ...@@ -126,12 +126,11 @@ message TableAccessorParameter {
} }
message TensorAccessorParameter { message TensorAccessorParameter {
optional string tensor_class = 1; optional string feed_var_name = 1;
optional uint32 fea_dim = 2; optional string fetch_var_name = 2;
optional uint32 emb_dim = 3; optional int64 startup_program_id = 3;
optional string param = 4; optional int64 main_program_id = 4;
optional string grad = 5; optional string tensor_table_class = 6;
optional string common_block_map = 6;
} }
message CommonAccessorParameter { message CommonAccessorParameter {
......
...@@ -719,6 +719,34 @@ std::future<int32_t> BrpcPsClient::push_dense_raw_gradient( ...@@ -719,6 +719,34 @@ std::future<int32_t> BrpcPsClient::push_dense_raw_gradient(
return fut; return fut;
} }
std::future<int32_t> BrpcPsClient::push_global_step(int table_id,
int64_t *total_send_data,
void *done) {
size_t request_call_num = _server_channels.size();
DownpourBrpcClosure *closure = reinterpret_cast<DownpourBrpcClosure *>(done);
auto promise = std::make_shared<std::promise<int32_t>>();
closure->add_promise(promise);
std::future<int> fut = promise->get_future();
for (size_t i = 0; i < request_call_num; ++i) {
closure->request(i)->set_cmd_id(PS_PUSH_GLOBAL_STEP);
closure->request(i)->set_table_id(table_id);
closure->request(i)->set_client_id(_client_id);
auto *push_data = closure->request(i)->mutable_data();
push_data->clear();
int32_t num_per_shard = 1;
push_data->resize(sizeof(uint32_t) + num_per_shard * sizeof(int64_t));
char *push_data_ptr = const_cast<char *>(push_data->data());
memcpy(push_data_ptr, &num_per_shard, sizeof(uint32_t));
memcpy(push_data_ptr + sizeof(uint32_t), total_send_data,
num_per_shard * sizeof(int64_t));
PsService_Stub rpc_stub(get_dense_channel(i));
rpc_stub.service(closure->cntl(i), closure->request(i),
closure->response(i), closure);
}
return fut;
}
std::future<int32_t> BrpcPsClient::pull_sparse(float **select_values, std::future<int32_t> BrpcPsClient::pull_sparse(float **select_values,
size_t table_id, size_t table_id,
const uint64_t *keys, const uint64_t *keys,
......
...@@ -140,7 +140,9 @@ class BrpcPsClient : public PSClient { ...@@ -140,7 +140,9 @@ class BrpcPsClient : public PSClient {
std::vector<float> *values, std::vector<float> *values,
std::vector<uint64_t> *keys, std::vector<uint64_t> *keys,
int pserver_idx); int pserver_idx);
virtual std::future<int32_t> push_global_step(int table_id,
int64_t *total_send_data,
void *done);
virtual std::future<int32_t> flush(); virtual std::future<int32_t> flush();
virtual std::future<int32_t> send_client2client_msg( virtual std::future<int32_t> send_client2client_msg(
......
...@@ -100,6 +100,7 @@ int32_t PsService::initialize() { ...@@ -100,6 +100,7 @@ int32_t PsService::initialize() {
_service_handler_map[PS_BARRIER] = &PsService::barrier; _service_handler_map[PS_BARRIER] = &PsService::barrier;
_service_handler_map[PS_START_PROFILER] = &PsService::start_profiler; _service_handler_map[PS_START_PROFILER] = &PsService::start_profiler;
_service_handler_map[PS_STOP_PROFILER] = &PsService::stop_profiler; _service_handler_map[PS_STOP_PROFILER] = &PsService::stop_profiler;
_service_handler_map[PS_PUSH_GLOBAL_STEP] = &PsService::push_global_step;
// shard初始化,server启动后才可从env获取到server_list的shard信息 // shard初始化,server启动后才可从env获取到server_list的shard信息
initialize_shard_info(); initialize_shard_info();
...@@ -526,5 +527,26 @@ int32_t PsService::start_profiler(Table *table, const PsRequestMessage &request, ...@@ -526,5 +527,26 @@ int32_t PsService::start_profiler(Table *table, const PsRequestMessage &request,
return 0; return 0;
} }
int32_t PsService::push_global_step(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
CHECK_TABLE_EXIST(table, request, response);
auto req_buffer_size = request.data().size();
if (req_buffer_size < 1) {
set_response_code(response, 0, "run_program data is empty");
return 0;
}
uint32_t num = *(const uint32_t *)(request.data().data());
const int64_t *values =
(const int64_t *)(request.data().data() + sizeof(uint32_t));
auto trainer_id = request.client_id();
if (table->push_dense(values, trainer_id) != 0) {
set_response_code(response, -1, "run_program failed");
}
return 0;
}
} // namespace distributed } // namespace distributed
} // namespace paddle } // namespace paddle
...@@ -110,6 +110,9 @@ class PsService : public PsBaseService { ...@@ -110,6 +110,9 @@ class PsService : public PsBaseService {
int32_t print_table_stat(Table *table, const PsRequestMessage &request, int32_t print_table_stat(Table *table, const PsRequestMessage &request,
PsResponseMessage &response, brpc::Controller *cntl); PsResponseMessage &response, brpc::Controller *cntl);
int32_t push_global_step(Table *table, const PsRequestMessage &request,
PsResponseMessage &response, brpc::Controller *cntl);
bool _is_initialize_shard_info; bool _is_initialize_shard_info;
std::mutex _initialize_shard_mutex; std::mutex _initialize_shard_mutex;
std::unordered_map<int32_t, serviceHandlerFunc> _service_handler_map; std::unordered_map<int32_t, serviceHandlerFunc> _service_handler_map;
......
...@@ -34,6 +34,9 @@ limitations under the License. */ ...@@ -34,6 +34,9 @@ limitations under the License. */
#include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/printf.h"
#include "paddle/fluid/string/split.h" #include "paddle/fluid/string/split.h"
#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@"
#define STEP_COUNTER "@PS_STEP_COUNTER@"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
...@@ -377,6 +380,37 @@ void Communicator::RpcProfilerControl() { ...@@ -377,6 +380,37 @@ void Communicator::RpcProfilerControl() {
} }
} }
void Communicator::SendGlobalStep(const CommContext &ctx, int batches,
Scope *send_scope) {
if (batches == 0) {
return;
}
auto &table_id = ctx.table_id;
size_t request_call_num = _worker_ptr->get_server_nums();
auto &var_name = STEP_COUNTER;
auto *out_var = send_scope->Var(var_name);
auto *out_t = out_var->GetMutable<framework::LoDTensor>();
auto *data = out_t->mutable_data<int64_t>({1}, platform::CPUPlace());
data[0] = static_cast<int64_t>(batches);
VLOG(3) << "Communicator::SendGlobalStep send: " << batches;
DownpourBrpcClosure *closure = new DownpourBrpcClosure(
request_call_num, [this, request_call_num](void *done) {
int ret = 0;
auto *closure = (DownpourBrpcClosure *)done;
for (size_t i = 0; i < request_call_num; ++i) {
if (closure->check_response(i, PS_PUSH_GLOBAL_STEP) != 0) {
ret = -1;
break;
}
}
closure->set_promise_value(ret);
});
auto status = _worker_ptr->push_global_step(table_id, data, closure);
status.wait();
return;
}
void AsyncCommunicator::RecvThread() { void AsyncCommunicator::RecvThread() {
if (!independent_recv_) return; if (!independent_recv_) return;
VLOG(3) << "Independent RecvThread Start and Wait"; VLOG(3) << "Independent RecvThread Start and Wait";
...@@ -465,10 +499,16 @@ void AsyncCommunicator::SendByCommunicator() { ...@@ -465,10 +499,16 @@ void AsyncCommunicator::SendByCommunicator() {
for (size_t i = 0; i < var_nums; i++) { for (size_t i = 0; i < var_nums; i++) {
auto &var_name = varnames[i]; auto &var_name = varnames[i];
MergeVars<float>(var_name, vars[i], send_scope_.get(), 1); if (var_name == STEP_COUNTER) {
MergeVars<int64_t>(var_name, vars[i], send_scope_.get(), 1);
} else {
MergeVars<float>(var_name, vars[i], send_scope_.get(), 1);
}
} }
if (ctx.is_sparse) { if (ctx.is_tensor_table) {
SendGlobalStep(ctx, merged_var_num, send_scope_.get());
} else if (ctx.is_sparse) {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
varnames.size(), 1, varnames.size(), 1,
platform::errors::InvalidArgument( platform::errors::InvalidArgument(
...@@ -599,8 +639,18 @@ bool AsyncCommunicator::Check(const std::vector<std::string> &var_tables) { ...@@ -599,8 +639,18 @@ bool AsyncCommunicator::Check(const std::vector<std::string> &var_tables) {
platform::errors::InvalidArgument("var_tables.size() == 1 is permitted")); platform::errors::InvalidArgument("var_tables.size() == 1 is permitted"));
auto table_name = var_tables[0]; auto table_name = var_tables[0];
if (send_varname_to_ctx_.find(table_name) == send_varname_to_ctx_.end()) if (send_varname_to_ctx_.find(table_name) == send_varname_to_ctx_.end()) {
return false; return false;
}
if (table_name == STEP_COUNTER) {
VLOG(3) << "send step_counter into queue";
auto tmp_var = std::make_shared<Variable>();
auto *tensor = tmp_var->GetMutable<framework::LoDTensor>();
tensor->Resize(framework::make_ddim({1}));
auto *out_d = tensor->mutable_data<int64_t>(platform::CPUPlace());
out_d[0] = 1;
send_varname_to_queue_[table_name]->Push(tmp_var);
}
return true; return true;
} }
......
...@@ -223,6 +223,9 @@ class Communicator { ...@@ -223,6 +223,9 @@ class Communicator {
// 6. recv sparse param // 6. recv sparse param
virtual void RpcRecvSparse(const std::string &varname, int table_id, virtual void RpcRecvSparse(const std::string &varname, int table_id,
Scope *scope); Scope *scope);
// 7. send gloabl step
virtual void SendGlobalStep(const CommContext &ctx, int batches,
Scope *send_scope);
virtual ~Communicator() {} virtual ~Communicator() {}
virtual void RpcProfilerControl(); virtual void RpcProfilerControl();
...@@ -376,8 +379,6 @@ class AsyncCommunicator : public Communicator { ...@@ -376,8 +379,6 @@ class AsyncCommunicator : public Communicator {
virtual void SendByCommunicator(); virtual void SendByCommunicator();
virtual void SendGlobalStep(int batches) {}
virtual void RecvByCommunicator(); virtual void RecvByCommunicator();
virtual void RecvNoBarrier(); virtual void RecvNoBarrier();
...@@ -527,8 +528,6 @@ class GeoCommunicator : public AsyncCommunicator { ...@@ -527,8 +528,6 @@ class GeoCommunicator : public AsyncCommunicator {
void SendByCommunicator() { return; } void SendByCommunicator() { return; }
void SendGlobalStep(int batches) override { return; }
void RecvByCommunicator() override { return; } void RecvByCommunicator() override { return; }
inline std::string GradToParam(const std::string var_name) { inline std::string GradToParam(const std::string var_name) {
......
...@@ -131,6 +131,9 @@ class PSClient { ...@@ -131,6 +131,9 @@ class PSClient {
std::vector<uint64_t> *keys, std::vector<uint64_t> *keys,
int pserver_idx) = 0; int pserver_idx) = 0;
virtual std::future<int32_t> push_global_step(int table_id,
int64_t *total_send_data,
void *done) = 0;
virtual void finalize_worker() = 0; virtual void finalize_worker() = 0;
// client to client, 消息发送 // client to client, 消息发送
virtual std::future<int32_t> send_client2client_msg(int msg_type, virtual std::future<int32_t> send_client2client_msg(int msg_type,
......
...@@ -47,6 +47,7 @@ enum PsCmdID { ...@@ -47,6 +47,7 @@ enum PsCmdID {
PS_PUSH_SPARSE_PARAM = 26; PS_PUSH_SPARSE_PARAM = 26;
PS_START_PROFILER = 27; PS_START_PROFILER = 27;
PS_STOP_PROFILER = 28; PS_STOP_PROFILER = 28;
PS_PUSH_GLOBAL_STEP = 29;
} }
message PsRequestMessage { message PsRequestMessage {
......
...@@ -53,8 +53,10 @@ PSServer *PSServerFactory::create(const PSParameter &ps_config) { ...@@ -53,8 +53,10 @@ PSServer *PSServerFactory::create(const PSParameter &ps_config) {
return server; return server;
} }
int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, int32_t PSServer::configure(
size_t server_rank) { const PSParameter &config, PSEnvironment &env, size_t server_rank,
const std::vector<framework::ProgramDesc> &server_sub_program) {
scope_.reset(new framework::Scope());
_config = config.server_param(); _config = config.server_param();
_rank = server_rank; _rank = server_rank;
_environment = &env; _environment = &env;
...@@ -65,6 +67,7 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, ...@@ -65,6 +67,7 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env,
const auto &downpour_param = _config.downpour_server_param(); const auto &downpour_param = _config.downpour_server_param();
uint32_t barrier_table = UINT32_MAX; uint32_t barrier_table = UINT32_MAX;
uint32_t global_step_table = UINT32_MAX;
for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) { for (size_t i = 0; i < downpour_param.downpour_table_param_size(); ++i) {
auto *table = CREATE_CLASS( auto *table = CREATE_CLASS(
...@@ -74,6 +77,12 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, ...@@ -74,6 +77,12 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env,
"BarrierTable") { "BarrierTable") {
barrier_table = downpour_param.downpour_table_param(i).table_id(); barrier_table = downpour_param.downpour_table_param(i).table_id();
} }
if (downpour_param.downpour_table_param(i).table_class() ==
"GlobalStepTable") {
global_step_table = downpour_param.downpour_table_param(i).table_id();
}
table->set_program_env(scope_.get(), place_, &server_sub_program);
table->set_shard(_rank, shard_num); table->set_shard(_rank, shard_num);
table->initialize(downpour_param.downpour_table_param(i), table->initialize(downpour_param.downpour_table_param(i),
config.fs_client_param()); config.fs_client_param());
...@@ -83,6 +92,9 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env, ...@@ -83,6 +92,9 @@ int32_t PSServer::configure(const PSParameter &config, PSEnvironment &env,
if (barrier_table != UINT32_MAX) { if (barrier_table != UINT32_MAX) {
_table_map[barrier_table]->set_table_map(&_table_map); _table_map[barrier_table]->set_table_map(&_table_map);
} }
if (global_step_table != UINT32_MAX) {
_table_map[global_step_table]->set_table_map(&_table_map);
}
return initialize(); return initialize();
} }
......
...@@ -27,6 +27,20 @@ ...@@ -27,6 +27,20 @@
#include "paddle/fluid/distributed/service/env.h" #include "paddle/fluid/distributed/service/env.h"
#include "paddle/fluid/distributed/service/sendrecv.pb.h" #include "paddle/fluid/distributed/service/sendrecv.pb.h"
#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
namespace paddle {
namespace framework {
class Executor;
class ProgramDesc;
class Scope;
} // namespace framework
namespace platform {
class DeviceContext;
} // namespace platform
} // namespace paddle
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
...@@ -40,8 +54,9 @@ class PSServer { ...@@ -40,8 +54,9 @@ class PSServer {
PSServer(PSServer &&) = delete; PSServer(PSServer &&) = delete;
PSServer(const PSServer &) = delete; PSServer(const PSServer &) = delete;
virtual int32_t configure(const PSParameter &config, PSEnvironment &env, virtual int32_t configure(
size_t server_rank) final; const PSParameter &config, PSEnvironment &env, size_t server_rank,
const std::vector<framework::ProgramDesc> &server_sub_program = {}) final;
// return server_ip // return server_ip
virtual std::string ip() { return butil::my_ip_cstr(); } virtual std::string ip() { return butil::my_ip_cstr(); }
...@@ -86,6 +101,10 @@ class PSServer { ...@@ -86,6 +101,10 @@ class PSServer {
PSEnvironment *_environment; PSEnvironment *_environment;
std::unordered_map<uint32_t, std::shared_ptr<Table>> _table_map; std::unordered_map<uint32_t, std::shared_ptr<Table>> _table_map;
std::unordered_map<int32_t, MsgHandlerFunc> _msg_handler_map; std::unordered_map<int32_t, MsgHandlerFunc> _msg_handler_map;
protected:
std::shared_ptr<framework::Scope> scope_;
platform::Place place_ = platform::CPUPlace();
}; };
REGISTER_REGISTERER(PSServer); REGISTER_REGISTERER(PSServer);
......
...@@ -66,9 +66,10 @@ void PSCore::init_gflag(const std::string& gflags) { ...@@ -66,9 +66,10 @@ void PSCore::init_gflag(const std::string& gflags) {
::google::ParseCommandLineFlags(&params_cnt, &params_ptr, true); ::google::ParseCommandLineFlags(&params_cnt, &params_ptr, true);
} }
int PSCore::init_server(const std::string& dist_desc, int PSCore::init_server(
const std::vector<std::string>* host_sign_list, const std::string& dist_desc,
int node_num, int index) { const std::vector<std::string>* host_sign_list, int node_num, int index,
const std::vector<framework::ProgramDesc>& server_sub_program) {
google::protobuf::TextFormat::ParseFromString(dist_desc, &_ps_param); google::protobuf::TextFormat::ParseFromString(dist_desc, &_ps_param);
init_gflag(_ps_param.init_gflags()); init_gflag(_ps_param.init_gflags());
_ps_env = paddle::distributed::PaddlePSEnvironment(); _ps_env = paddle::distributed::PaddlePSEnvironment();
...@@ -76,7 +77,7 @@ int PSCore::init_server(const std::string& dist_desc, ...@@ -76,7 +77,7 @@ int PSCore::init_server(const std::string& dist_desc,
int ret = 0; int ret = 0;
_server_ptr = std::shared_ptr<paddle::distributed::PSServer>( _server_ptr = std::shared_ptr<paddle::distributed::PSServer>(
paddle::distributed::PSServerFactory::create(_ps_param)); paddle::distributed::PSServerFactory::create(_ps_param));
ret = _server_ptr->configure(_ps_param, _ps_env, index); ret = _server_ptr->configure(_ps_param, _ps_env, index, server_sub_program);
CHECK(ret == 0) << "failed to configure server"; CHECK(ret == 0) << "failed to configure server";
return ret; return ret;
} }
......
...@@ -33,9 +33,10 @@ class PSCore { ...@@ -33,9 +33,10 @@ class PSCore {
explicit PSCore() {} explicit PSCore() {}
virtual ~PSCore() {} virtual ~PSCore() {}
virtual int init_server(const std::string& dist_desc, virtual int init_server(
const std::vector<std::string>* host_sign_list, const std::string& dist_desc,
int node_num, int index); const std::vector<std::string>* host_sign_list, int node_num, int index,
const std::vector<framework::ProgramDesc>& server_sub_program = {});
virtual int init_worker( virtual int init_worker(
const std::string& dist_desc, const std::string& dist_desc,
const std::map<uint64_t, std::vector<paddle::distributed::Region>>& const std::map<uint64_t, std::vector<paddle::distributed::Region>>&
......
...@@ -11,8 +11,9 @@ cc_library(common_table SRCS common_sparse_table.cc common_dense_table.cc sparse ...@@ -11,8 +11,9 @@ cc_library(common_table SRCS common_sparse_table.cc common_dense_table.cc sparse
set_source_files_properties(tensor_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(tensor_accessor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(tensor_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(tensor_table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_framework_proto device_context)
cc_library(tensor_accessor SRCS tensor_accessor.cc DEPS ${TABLE_DEPS} eigen3 ps_framework_proto device_context)
cc_library(tensor_table SRCS tensor_table.cc DEPS eigen3 ps_framework_proto executor scope device_context tensor ${TABLE_DEPS})
set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(table.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(table SRCS table.cc DEPS common_table tensor_accessor ps_framework_proto string_helper device_context gflags glog boost) cc_library(table SRCS table.cc DEPS common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost)
...@@ -42,6 +42,7 @@ int32_t CommonDenseTable::initialize() { ...@@ -42,6 +42,7 @@ int32_t CommonDenseTable::initialize() {
sync = _config.common().sync(); sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
_global_lr = new float(1.0);
initialize_value(); initialize_value();
initialize_optimizer(); initialize_optimizer();
...@@ -81,8 +82,10 @@ int32_t CommonDenseTable::initialize_optimizer() { ...@@ -81,8 +82,10 @@ int32_t CommonDenseTable::initialize_optimizer() {
if (name == "sgd") { if (name == "sgd") {
optimizer_ = std::make_shared<DSGD>(common, &values_); optimizer_ = std::make_shared<DSGD>(common, &values_);
optimizer_->set_global_lr(_global_lr);
} else if (name == "adam") { } else if (name == "adam") {
optimizer_ = std::make_shared<DAdam>(common, &values_); optimizer_ = std::make_shared<DAdam>(common, &values_);
optimizer_->set_global_lr(_global_lr);
} else if (name == "sum") { } else if (name == "sum") {
optimizer_ = std::make_shared<DSUM>(common, &values_); optimizer_ = std::make_shared<DSUM>(common, &values_);
} else { } else {
...@@ -92,6 +95,12 @@ int32_t CommonDenseTable::initialize_optimizer() { ...@@ -92,6 +95,12 @@ int32_t CommonDenseTable::initialize_optimizer() {
return 0; return 0;
} }
int32_t CommonDenseTable::set_global_lr(float* lr) {
_global_lr = lr;
optimizer_->set_global_lr(_global_lr);
return 0;
}
int32_t CommonDenseTable::pull_dense(float* pull_values, size_t num) { int32_t CommonDenseTable::pull_dense(float* pull_values, size_t num) {
std::copy(values_[param_idx_].begin(), values_[param_idx_].end(), std::copy(values_[param_idx_].begin(), values_[param_idx_].end(),
pull_values); pull_values);
......
...@@ -42,6 +42,7 @@ class CommonDenseTable : public DenseTable { ...@@ -42,6 +42,7 @@ class CommonDenseTable : public DenseTable {
virtual int32_t push_dense_param(const float* values, size_t num) override; virtual int32_t push_dense_param(const float* values, size_t num) override;
virtual int32_t push_dense(const float* values, size_t num) override; virtual int32_t push_dense(const float* values, size_t num) override;
virtual int32_t pour() override; virtual int32_t pour() override;
virtual int32_t set_global_lr(float* lr) override;
int32_t load(const std::string& path, const std::string& param) override { int32_t load(const std::string& path, const std::string& param) override {
VLOG(0) << "Dense table may load by " VLOG(0) << "Dense table may load by "
......
...@@ -175,6 +175,8 @@ int32_t CommonSparseTable::initialize() { ...@@ -175,6 +175,8 @@ int32_t CommonSparseTable::initialize() {
sync = _config.common().sync(); sync = _config.common().sync();
VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync; VLOG(1) << "table " << _config.common().table_name() << " is sync: " << sync;
_global_lr = new float(1.0);
auto common = _config.common(); auto common = _config.common();
int size = static_cast<int>(common.params().size()); int size = static_cast<int>(common.params().size());
...@@ -249,9 +251,11 @@ int32_t CommonSparseTable::initialize_optimizer() { ...@@ -249,9 +251,11 @@ int32_t CommonSparseTable::initialize_optimizer() {
if (name == "sgd") { if (name == "sgd") {
optimizer_ = std::make_shared<SSGD>(value_names_, value_dims_, optimizer_ = std::make_shared<SSGD>(value_names_, value_dims_,
value_offsets_, value_idx_); value_offsets_, value_idx_);
optimizer_->set_global_lr(_global_lr);
} else if (name == "adam") { } else if (name == "adam") {
optimizer_ = std::make_shared<SAdam>(value_names_, value_dims_, optimizer_ = std::make_shared<SAdam>(value_names_, value_dims_,
value_offsets_, value_idx_); value_offsets_, value_idx_);
optimizer_->set_global_lr(_global_lr);
} else if (name == "sum") { } else if (name == "sum") {
optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_, optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_,
value_offsets_, value_idx_); value_offsets_, value_idx_);
...@@ -263,6 +267,12 @@ int32_t CommonSparseTable::initialize_optimizer() { ...@@ -263,6 +267,12 @@ int32_t CommonSparseTable::initialize_optimizer() {
return 0; return 0;
} }
int32_t CommonSparseTable::set_global_lr(float* lr) {
_global_lr = lr;
optimizer_->set_global_lr(_global_lr);
return 0;
}
int32_t CommonSparseTable::load(const std::string& path, int32_t CommonSparseTable::load(const std::string& path,
const std::string& param) { const std::string& param) {
rwlock_->WRLock(); rwlock_->WRLock();
......
...@@ -69,6 +69,8 @@ class CommonSparseTable : public SparseTable { ...@@ -69,6 +69,8 @@ class CommonSparseTable : public SparseTable {
virtual int32_t push_sparse_param(const uint64_t* keys, const float* values, virtual int32_t push_sparse_param(const uint64_t* keys, const float* values,
size_t num); size_t num);
virtual int32_t set_global_lr(float* lr) override;
virtual int32_t pour(); virtual int32_t pour();
virtual int32_t flush(); virtual int32_t flush();
virtual int32_t shrink(); virtual int32_t shrink();
......
...@@ -36,6 +36,10 @@ class DenseOptimizer { ...@@ -36,6 +36,10 @@ class DenseOptimizer {
std::vector<std::vector<float>>* values) {} std::vector<std::vector<float>>* values) {}
virtual void update(const float* update_values, size_t num, int begin, virtual void update(const float* update_values, size_t num, int begin,
int end) = 0; int end) = 0;
virtual void set_global_lr(float* lr) { global_learning_rate_ = lr; }
protected:
float* global_learning_rate_;
}; };
// sum calc for dense tensor // sum calc for dense tensor
...@@ -84,8 +88,10 @@ class DSGD : public DenseOptimizer { ...@@ -84,8 +88,10 @@ class DSGD : public DenseOptimizer {
grads.resize(update_numel); grads.resize(update_numel);
auto blas = GetBlas<float>(); auto blas = GetBlas<float>();
float lr = *(global_learning_rate_) * (*learning_rate);
VLOG(4) << "DSGD LearningRate: " << lr;
blas.VCOPY(update_numel, update_values + begin, grads.data()); blas.VCOPY(update_numel, update_values + begin, grads.data());
blas.SCAL(update_numel, *learning_rate, grads.data()); blas.SCAL(update_numel, lr, grads.data());
blas.VSUB(update_numel, param + begin, grads.data(), param + begin); blas.VSUB(update_numel, param + begin, grads.data(), param + begin);
} }
...@@ -150,7 +156,8 @@ class DAdam : public DenseOptimizer { ...@@ -150,7 +156,8 @@ class DAdam : public DenseOptimizer {
beta1_pow[0] = beta1_pow[0] * beta1; beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2; beta2_pow[0] = beta2_pow[0] * beta2;
float lr_ = learning_rate[0]; float lr_ = *(global_learning_rate_)*learning_rate[0];
VLOG(4) << "DAdam LearningRate: " << lr_;
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
float* tmp_ = tmp.data(); float* tmp_ = tmp.data();
......
...@@ -44,12 +44,17 @@ class SparseOptimizer { ...@@ -44,12 +44,17 @@ class SparseOptimizer {
size_t num, const std::vector<uint64_t>& offsets, size_t num, const std::vector<uint64_t>& offsets,
ValueBlock* block) = 0; ValueBlock* block) = 0;
virtual void set_global_lr(float* lr) { global_learning_rate_ = lr; }
const std::vector<std::string>& value_names_; const std::vector<std::string>& value_names_;
const std::vector<int>& value_dims_; const std::vector<int>& value_dims_;
const std::vector<int>& value_offsets_; const std::vector<int>& value_offsets_;
const std::unordered_map<std::string, int>& value_idx_; const std::unordered_map<std::string, int>& value_idx_;
int param_offset = 0; int param_offset = 0;
int update_numel = 0; int update_numel = 0;
protected:
float* global_learning_rate_;
}; };
// sum calc for sparse tensor // sum calc for sparse tensor
...@@ -102,13 +107,14 @@ class SSGD : public SparseOptimizer { ...@@ -102,13 +107,14 @@ class SSGD : public SparseOptimizer {
auto id = keys[x]; auto id = keys[x];
auto* value = block->Get(id); auto* value = block->Get(id);
float* learning_rate = value + lr_offset; float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0];
VLOG(4) << "SSGD LearningRate: " << learning_rate;
float* param = value + param_offset; float* param = value + param_offset;
std::vector<float> grads; std::vector<float> grads;
grads.resize(update_numel); grads.resize(update_numel);
blas.VCOPY(update_numel, update_values + x * update_numel, grads.data()); blas.VCOPY(update_numel, update_values + x * update_numel, grads.data());
blas.SCAL(update_numel, learning_rate[0], grads.data()); blas.SCAL(update_numel, learning_rate, grads.data());
blas.VSUB(update_numel, param, grads.data(), param); blas.VSUB(update_numel, param, grads.data(), param);
} }
} }
...@@ -156,7 +162,8 @@ class SAdam : public SparseOptimizer { ...@@ -156,7 +162,8 @@ class SAdam : public SparseOptimizer {
for (auto x : offsets) { for (auto x : offsets) {
auto id = keys[x]; auto id = keys[x];
auto* values = block->Get(id); auto* values = block->Get(id);
float* learning_rate = values + lr_offset; float lr_ = *(global_learning_rate_) * (values + lr_offset)[0];
VLOG(4) << "SAdam LearningRate: " << lr_;
float* param = values + param_offset; float* param = values + param_offset;
float* moment1 = values + m1_offset; float* moment1 = values + m1_offset;
float* moment2 = values + m2_offset; float* moment2 = values + m2_offset;
...@@ -166,7 +173,6 @@ class SAdam : public SparseOptimizer { ...@@ -166,7 +173,6 @@ class SAdam : public SparseOptimizer {
beta1_pow[0] = beta1_pow[0] * beta1; beta1_pow[0] = beta1_pow[0] * beta1;
beta2_pow[0] = beta2_pow[0] * beta2; beta2_pow[0] = beta2_pow[0] * beta2;
float lr_ = learning_rate[0];
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
std::vector<float> grad, grad2, tmp; std::vector<float> grad, grad2, tmp;
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "paddle/fluid/distributed/table/common_sparse_table.h" #include "paddle/fluid/distributed/table/common_sparse_table.h"
#include "paddle/fluid/distributed/table/sparse_geo_table.h" #include "paddle/fluid/distributed/table/sparse_geo_table.h"
#include "paddle/fluid/distributed/table/tensor_accessor.h" #include "paddle/fluid/distributed/table/tensor_accessor.h"
#include "paddle/fluid/distributed/table/tensor_table.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
...@@ -30,7 +31,9 @@ REGISTER_CLASS(Table, CommonDenseTable); ...@@ -30,7 +31,9 @@ REGISTER_CLASS(Table, CommonDenseTable);
REGISTER_CLASS(Table, CommonSparseTable); REGISTER_CLASS(Table, CommonSparseTable);
REGISTER_CLASS(Table, SparseGeoTable); REGISTER_CLASS(Table, SparseGeoTable);
REGISTER_CLASS(Table, BarrierTable); REGISTER_CLASS(Table, BarrierTable);
REGISTER_CLASS(Table, TensorTable);
REGISTER_CLASS(Table, DenseTensorTable);
REGISTER_CLASS(Table, GlobalStepTable);
REGISTER_CLASS(ValueAccessor, CommMergeAccessor); REGISTER_CLASS(ValueAccessor, CommMergeAccessor);
int32_t TableManager::initialize() { int32_t TableManager::initialize() {
......
...@@ -20,8 +20,11 @@ ...@@ -20,8 +20,11 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include "paddle/fluid/distributed/table/accessor.h" #include "paddle/fluid/distributed/table/accessor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
namespace paddle { namespace paddle {
...@@ -35,6 +38,10 @@ class Table { ...@@ -35,6 +38,10 @@ class Table {
virtual int32_t pull_dense(float *values, size_t num) = 0; virtual int32_t pull_dense(float *values, size_t num) = 0;
virtual int32_t push_dense(const float *values, size_t num) = 0; virtual int32_t push_dense(const float *values, size_t num) = 0;
// for push global_step
virtual int32_t push_dense(const int64_t *values, const int32_t trainer_id) {
return 0;
}
virtual int32_t push_dense_param(const float *values, size_t num) { virtual int32_t push_dense_param(const float *values, size_t num) {
return 0; return 0;
} }
...@@ -67,6 +74,18 @@ class Table { ...@@ -67,6 +74,18 @@ class Table {
return 0; return 0;
} }
// only for tensor table
virtual int32_t set_program_env(
framework::Scope *scope, platform::Place place,
const std::vector<framework::ProgramDesc> *sub_program) {
return 0;
}
virtual int32_t set_global_lr(float *lr) {
_global_lr = lr;
return 0;
}
virtual int32_t pour() { return 0; } virtual int32_t pour() { return 0; }
virtual void clear() = 0; virtual void clear() = 0;
...@@ -105,6 +124,7 @@ class Table { ...@@ -105,6 +124,7 @@ class Table {
size_t _shard_idx; // table 分片编号 size_t _shard_idx; // table 分片编号
size_t _shard_num; // table 分片总数 size_t _shard_num; // table 分片总数
TableParameter _config; TableParameter _config;
float *_global_lr = nullptr;
std::shared_ptr<ValueAccessor> _value_accesor; std::shared_ptr<ValueAccessor> _value_accesor;
}; };
REGISTER_REGISTERER(Table); REGISTER_REGISTERER(Table);
......
...@@ -13,81 +13,120 @@ ...@@ -13,81 +13,120 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/table/tensor_table.h" #include "paddle/fluid/distributed/table/tensor_table.h"
#include <chrono> // NOLINT
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/distributed/common/utils.h"
DECLARE_double(eager_delete_tensor_gb);
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
int32_t DenseTensorTable::initialize() { int32_t TensorTable::set_program_env(
_shards_task_pool.resize(10); framework::Scope *scope, platform::Place place,
for (int i = 0; i < _shards_task_pool.size(); ++i) { const std::vector<framework::ProgramDesc> *sub_program) {
_shards_task_pool[i].reset(new ::ThreadPool(1)); scope_ = scope;
} place_ = place;
executor_ = new framework::Executor(place_);
sub_program_ = sub_program;
return 0; return 0;
} }
int32_t DenseTensorTable::initialize_tensor(framework::Scope *scope, int32_t GlobalStepTable::initialize() {
framework::ProgramDesc *program, auto _program_config = _config.tensor();
framework::Executor *executor) { auto trainers_ = _config.common().trainer_num();
scope_ = scope; FLAGS_eager_delete_tensor_gb = -1;
program_ = program; // Get Config
executor_ = executor; if (_program_config.has_startup_program_id()) {
startup_program_id_ = _program_config.startup_program_id();
}
if (_program_config.has_main_program_id()) {
main_program_id_ = _program_config.main_program_id();
}
if (_program_config.has_feed_var_name()) {
feed_var_name_ = _program_config.feed_var_name();
}
if (_program_config.has_fetch_var_name()) {
fetch_var_name_ = _program_config.fetch_var_name();
}
// Run startup program
if (startup_program_id_ != -1) {
std::map<std::string, const framework::LoDTensor *> fake_feed;
std::map<std::string, framework::FetchType *> fake_fetch;
auto startup_program_desc = sub_program_->at(startup_program_id_);
auto ctx = executor_->Prepare(startup_program_desc, 0);
executor_->RunPreparedContext(ctx.get(), scope_, false);
}
auto tensor_config = _config.tensor(); if (main_program_id_ != -1) {
if (tensor_config.has_common_block_map()) { // Run main porgram, if program is used for learning decay
auto block_maps = auto main_program_desc = sub_program_->at(main_program_id_);
paddle::string::split_string(tensor_config.common_block_map(), "#"); auto main_ctx = executor_->Prepare(main_program_desc, 0);
for (auto &block_map : block_maps) { exec_context_ = std::move(main_ctx);
auto block = paddle::string::split_string(block_map, ":"); executor_->RunPreparedContext(exec_context_.get(), scope_, false);
auto block_id = std::stoi(block[0]); // init decay_counters
std::vector<int> block_ids{block_id}; decay_counters_.reserve(trainers_);
auto block_cmd = block[1]; for (int32_t i = 0; i < trainers_; ++i) {
auto prepared = executor_->Prepare(*program_, block_ids); decay_counters_[i] = 0;
(*prepared_ctx_)[block_cmd] = prepared[0];
} }
} }
return 0;
} }
int32_t DenseTensorTable::pull_dense(float *values, size_t numel) { int32_t GlobalStepTable::set_table_map(
PADDLE_ENFORCE_EQ(numel, _data.numel(), std::unordered_map<uint32_t, std::shared_ptr<Table>> *table_map) {
paddle::platform::errors::PreconditionNotMet( auto *lr_var = scope_->FindVar(fetch_var_name_);
"pull dense error, excepted numel %d, but actually %d.", auto *lr_tensor = lr_var->GetMutable<framework::LoDTensor>();
_data.numel(), numel)); auto *lr_value = lr_tensor->mutable_data<float>(platform::CPUPlace());
VLOG(3) << "GlobalStepTable::set_table_map set global lr: " << *lr_value;
GetBlas<float>().VCOPY(numel, _data.data<float>(), values); for (auto iter = table_map->begin(); iter != table_map->end(); iter++) {
auto table_id = iter->first;
if (table_id == _config.table_id()) {
continue;
}
iter->second->set_global_lr(lr_value);
}
return 0; return 0;
} }
int32_t DenseTensorTable::push_dense(const float *values, size_t numel) { int32_t GlobalStepTable::push_dense(const int64_t *values,
auto varname = _config.tensor().grad(); const int32_t trainer_id) {
auto local_scope = scope_->NewTmpScope(); return _run_program(values, trainer_id);
auto *var = local_scope->Var(varname); }
auto *t = var->GetMutable<framework::LoDTensor>();
auto dims = paddle::framework::make_ddim({});
auto ctx = paddle::platform::CPUDeviceContext(); int32_t GlobalStepTable::_run_program(const int64_t *values,
t->mutable_data<float>(_data.dims(), ctx.GetPlace()); const uint32_t trainer_id) {
FLAGS_eager_delete_tensor_gb = -1;
auto counter = decay_counters_.at(trainer_id);
counter += int(values[0]);
decay_counters_.at(trainer_id) = counter;
GetBlas<float>().VCOPY(numel, values, t->data<float>()); auto *global_step_var = scope_->FindVar(feed_var_name_);
executor_->RunPreparedContext((*prepared_ctx_)["push"].get(), auto *tensor = global_step_var->GetMutable<framework::LoDTensor>();
local_scope.get()); auto *value = tensor->mutable_data<int64_t>(platform::CPUPlace());
}
int32_t DenseTensorTable::push_dense_param(const float *values, size_t numel) { auto global_counter = 0;
auto ctx = paddle::platform::CPUDeviceContext(); for (auto &trainer_counter : decay_counters_) {
if (_data.IsInitialized()) { global_counter += trainer_counter.second;
PADDLE_ENFORCE_EQ(
numel, _data.numel(),
paddle::platform::errors::PreconditionNotMet(
"pull dense error, excepted numel %d, but actually %d.",
_data.numel(), numel));
} else {
_data.mutable_data<float>(
framework::make_ddim({static_cast<int64_t>(numel), 1}), ctx.GetPlace());
} }
GetBlas<float>().VCOPY(numel, values, _data.data<float>()); // Todo: hard code for increment op
value[0] = global_counter - 1;
VLOG(3) << "GlobalStepTable::_run_program global_counter " << value[0];
executor_->RunPreparedContext(exec_context_.get(), scope_, false, false);
auto *lr_var = scope_->FindVar(fetch_var_name_);
auto *lr_tensor = lr_var->GetMutable<framework::LoDTensor>();
auto *lr_value = lr_tensor->mutable_data<float>(platform::CPUPlace());
VLOG(3) << "GlobalStepTable::LR value: " << lr_value[0];
return 0; return 0;
} }
} // namespace distributed } // namespace distributed
} // namespace paddle } // namespace paddle
...@@ -14,166 +14,187 @@ ...@@ -14,166 +14,187 @@
#pragma once #pragma once
#include <algorithm>
#include <condition_variable> // NOLINT
#include <memory> #include <memory>
#include <mutex> // NOLINT
#include <set>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/utils.h"
#include <ThreadPool.h>
#include "paddle/fluid/distributed/table/table.h" #include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/operators/math/blas.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
#define LEARNING_RATE_DECAY_COUNTER "@LR_DECAY_COUNTER@"
#define STEP_COUNTER "@PS_STEP_COUNTER@"
class TensorTable : public Table { class TensorTable : public Table {
public: public:
TensorTable() : Table() {} TensorTable() {}
virtual ~TensorTable() {} virtual ~TensorTable() {}
virtual int32_t initialize() { return 0; } int32_t pull_dense(float *values, size_t num) override { return 0; }
virtual int32_t pull_dense(float *values, size_t num) override { return 0; }; int32_t push_dense(const float *values, size_t num) override { return 0; }
virtual int32_t push_dense(const float *values, size_t num) override { int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
return 0; return 0;
}; }
int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) override {
return 0;
}
int32_t shrink() override { return 0; }
virtual void *get_shard(size_t shard_idx) { return 0; }
virtual void *get_shard(size_t shard_idx) override { return 0; } virtual int32_t initialize_shard() { return 0; };
virtual int32_t pull_sparse(float *values, const uint64_t *keys, virtual int32_t flush() { return 0; };
size_t num) override {
virtual int32_t load(const std::string &path, const std::string &param) {
return 0; return 0;
}; }
virtual int32_t save(const std::string &path, const std::string &param) {
return 0;
}
virtual void clear(){};
virtual int32_t push_sparse(const uint64_t *keys, const float *values, virtual int32_t initialize() override { return 0; };
size_t num) override {
virtual int32_t push_dense(const int64_t *values,
const int32_t trainer_id) override {
return 0; return 0;
}; };
virtual int32_t push_dense_param(const float *values, size_t num) { virtual int32_t set_program_env(
framework::Scope *scope, platform::Place place,
const std::vector<framework::ProgramDesc> *sub_program) override;
protected:
framework::Executor *executor_;
framework::Scope *scope_;
platform::Place place_ = platform::CPUPlace();
const std::vector<framework::ProgramDesc> *sub_program_;
paddle::distributed::TensorAccessorParameter program_config_;
std::shared_ptr<framework::ExecutorPrepareContext> exec_context_ = nullptr;
};
class DenseTensorTable : public TensorTable {
public:
DenseTensorTable() {}
virtual ~DenseTensorTable() {}
int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
return 0; return 0;
} }
int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) override {
return 0;
}
int32_t shrink() override { return 0; }
virtual int32_t shrink() { return 0; } virtual void *get_shard(size_t shard_idx) { return 0; }
virtual void clear() {} virtual int32_t initialize_shard() { return 0; }
virtual int32_t flush() { return 0; } virtual int32_t flush() { return 0; }
//指定加载路径 virtual void clear() {}
virtual int32_t load(const std::string &path, const std::string &converter) {
// Todo: Support program Load & Save
virtual int32_t load(const std::string &path, const std::string &param) {
return 0; return 0;
} }
//指定保存路径 virtual int32_t save(const std::string &path, const std::string &param) {
virtual int32_t save(const std::string &path, const std::string &converter) {
return 0; return 0;
} }
protected: // Todo: Support pull dense
virtual int32_t initialize_shard() { return 0; } int32_t pull_dense(float *values, size_t num) override { return 0; }
/*----------------------------------------------------------------------*/
virtual int32_t initialize() override { return 0; }
int32_t push_dense(const float *values, size_t num) override { return 0; }
virtual int32_t initialize_tensor(paddle::framework::Scope *scope, int32_t push_dense(const int64_t *values, const int32_t trainer_id) {
paddle::framework::ProgramDesc *program,
paddle::framework::Executor *executor) {
return 0; return 0;
} }
std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool; protected:
virtual int32_t _run_program(const float *values, size_t num,
const uint32_t trainer_id) {
return 0;
}
framework::Executor *executor_; int startup_program_id_ = -1;
framework::Scope *scope_; int main_program_id_ = -1;
framework::ProgramDesc *program_; std::string feed_var_name_ = "";
std::unordered_map<std::string, std::string fetch_var_name_ = "";
std::shared_ptr<framework::ExecutorPrepareContext>>
*prepared_ctx_;
}; };
class DenseTensorTable : public TensorTable { class GlobalStepTable : public DenseTensorTable {
public: public:
DenseTensorTable() : TensorTable() {} GlobalStepTable() {}
~DenseTensorTable() {} virtual ~GlobalStepTable() {}
virtual int32_t initialize();
void *get_shard(size_t shard_idx) { return 0; } int32_t pull_sparse(float *values, const uint64_t *keys,
size_t num) override {
int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) {
return 0; return 0;
} }
int32_t push_sparse(const uint64_t *keys, const float *values, size_t num) { int32_t push_sparse(const uint64_t *keys, const float *values,
size_t num) override {
return 0; return 0;
} }
int32_t shrink() { return 0; } int32_t shrink() override { return 0; }
int32_t pull_dense(float *values, size_t num) override; virtual void *get_shard(size_t shard_idx) { return 0; }
int32_t push_dense_param(const float *values, size_t num) override;
int32_t push_dense(const float *values, size_t num) override; virtual int32_t initialize_shard() { return 0; }
virtual void clear() {}
virtual int32_t flush() { return 0; } virtual int32_t flush() { return 0; }
//指定加载路径 virtual void clear() {}
virtual int32_t load(const std::string &path, const std::string &converter) {
virtual int32_t load(const std::string &path, const std::string &param) {
return 0; return 0;
} }
//指定保存路径 virtual int32_t save(const std::string &path, const std::string &param) {
virtual int32_t save(const std::string &path, const std::string &converter) {
return 0; return 0;
} }
protected: int32_t pull_dense(float *values, size_t num) override { return 0; }
virtual int32_t initialize_shard() { return 0; }
virtual int32_t initialize_tensor(paddle::framework::Scope *scope, /*----------------------------------------------------------------------*/
paddle::framework::ProgramDesc *program,
paddle::framework::Executor *executor);
protected: int32_t initialize() override;
framework::Tensor _data;
int32_t push_dense(const float *values, size_t num) override { return 0; }
int32_t push_dense(const int64_t *values, const int32_t trainer_id);
int32_t set_table_map(
std::unordered_map<uint32_t, std::shared_ptr<Table>> *table_map) override;
private:
virtual int32_t _run_program(const int64_t *values,
const uint32_t trainer_id);
private:
std::unordered_map<int, int64_t> decay_counters_;
int32_t trainers_;
}; };
//
//// common sparse table [0, N) with out large scale
// class SparseTensorTable : public TensorTable {
// void *get_shard(size_t shard_idx) { return 0; }
//
// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num)
// override;
// int32_t push_sparse(const uint64_t *keys, const float *values, size_t num)
// override ;
// int32_t shrink() { return 0; }
// void *get_shard(size_t shard_idx) { return 0; };
//
// int32_t pull_dense(float *values, size_t num) { return 0; };
// int32_t push_dense_param(const float *values, size_t num) { return 0; };
// int32_t push_dense(const float *values, size_t num) { return 0; };
//
// protected:
// framework::Tensor _data;
//};
//// for Large scale kv tensor [0, int64] do not use specific optimizer
// class KvTensorTable : public TensorTable {
// int32_t pull_dense(float *values, size_t num) { return 0; };
// int32_t push_dense_param(const float *values, size_t num) { return 0; };
// int32_t push_dense(const float *values, size_t num) { return 0; };
//
// void *get_shard(size_t shard_idx) override;
// int32_t pull_sparse(float *values, const uint64_t *keys, size_t num)
// override;
// int32_t push_sparse(const uint64_t *keys, const float *values,
// size_t num) override;
// int32_t shrink() override;
// void *get_shard(size_t shard_idx) override;
//};
//
//// for Geo sparse handle
// class GeoSparseTensorTable : public TensorTable {};
} // namespace distributed } // namespace distributed
} // namespace paddle } // namespace paddle
...@@ -20,10 +20,10 @@ limitations under the License. */ ...@@ -20,10 +20,10 @@ limitations under the License. */
#include "google/protobuf/text_format.h" #include "google/protobuf/text_format.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/printf.h"
...@@ -157,7 +157,10 @@ void RunServer() { ...@@ -157,7 +157,10 @@ void RunServer() {
pserver_ptr_ = std::shared_ptr<paddle::distributed::PSServer>( pserver_ptr_ = std::shared_ptr<paddle::distributed::PSServer>(
paddle::distributed::PSServerFactory::create(server_proto)); paddle::distributed::PSServerFactory::create(server_proto));
LOG(INFO) << "RUN configure"; LOG(INFO) << "RUN configure";
pserver_ptr_->configure(server_proto, _ps_env, 0); std::vector<framework::ProgramDesc> empty_vec;
framework::ProgramDesc empty_prog;
empty_vec.push_back(empty_prog);
pserver_ptr_->configure(server_proto, _ps_env, 0, empty_vec);
LOG(INFO) << "RUN start"; LOG(INFO) << "RUN start";
pserver_ptr_->start(ip_, port_); pserver_ptr_->start(ip_, port_);
LOG(INFO) << "End start"; LOG(INFO) << "End start";
......
...@@ -24,10 +24,6 @@ limitations under the License. */ ...@@ -24,10 +24,6 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/service/brpc_ps_client.h" #include "paddle/fluid/distributed/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/service/brpc_ps_server.h" #include "paddle/fluid/distributed/service/brpc_ps_server.h"
...@@ -35,6 +31,10 @@ limitations under the License. */ ...@@ -35,6 +31,10 @@ limitations under the License. */
#include "paddle/fluid/distributed/service/ps_client.h" #include "paddle/fluid/distributed/service/ps_client.h"
#include "paddle/fluid/distributed/service/sendrecv.pb.h" #include "paddle/fluid/distributed/service/sendrecv.pb.h"
#include "paddle/fluid/distributed/service/service.h" #include "paddle/fluid/distributed/service/service.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h"
namespace framework = paddle::framework; namespace framework = paddle::framework;
namespace platform = paddle::platform; namespace platform = paddle::platform;
...@@ -155,7 +155,10 @@ void RunServer() { ...@@ -155,7 +155,10 @@ void RunServer() {
_ps_env.set_ps_servers(&host_sign_list_, 1); _ps_env.set_ps_servers(&host_sign_list_, 1);
pserver_ptr_ = std::shared_ptr<paddle::distributed::PSServer>( pserver_ptr_ = std::shared_ptr<paddle::distributed::PSServer>(
paddle::distributed::PSServerFactory::create(server_proto)); paddle::distributed::PSServerFactory::create(server_proto));
pserver_ptr_->configure(server_proto, _ps_env, 0); std::vector<framework::ProgramDesc> empty_vec;
framework::ProgramDesc empty_prog;
empty_vec.push_back(empty_prog);
pserver_ptr_->configure(server_proto, _ps_env, 0, empty_vec);
pserver_ptr_->start(ip_, port_); pserver_ptr_->start(ip_, port_);
} }
......
...@@ -108,6 +108,7 @@ message AsyncConfig { ...@@ -108,6 +108,7 @@ message AsyncConfig {
optional bool runtime_split_send_recv = 8 [ default = false ]; optional bool runtime_split_send_recv = 8 [ default = false ];
optional bool launch_barrier = 9 [ default = true ]; optional bool launch_barrier = 9 [ default = true ];
optional string heter_worker_device_guard = 10 [ default = 'cpu' ]; optional string heter_worker_device_guard = 10 [ default = 'cpu' ];
optional int32 lr_decay_steps = 11 [ default = 10 ];
} }
message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; } message PipelineConfig { optional int32 micro_batch = 1 [ default = 1 ]; }
......
...@@ -52,8 +52,9 @@ class SendOp : public framework::OperatorBase { ...@@ -52,8 +52,9 @@ class SendOp : public framework::OperatorBase {
auto send_varnames = Attr<std::vector<std::string>>("send_varnames"); auto send_varnames = Attr<std::vector<std::string>>("send_varnames");
auto* communicator = paddle::distributed::Communicator::GetInstance(); auto* communicator = paddle::distributed::Communicator::GetInstance();
communicator->Check(send_varnames); if (communicator->Check(send_varnames)) {
communicator->Send(ins, scope); communicator->Send(ins, scope);
}
// auto fleet = paddle::distributed::FleetWrapper::GetInstance(); // auto fleet = paddle::distributed::FleetWrapper::GetInstance();
// if (is_sparse == 0) { // if (is_sparse == 0) {
......
...@@ -62,7 +62,7 @@ void BindDistFleetWrapper(py::module* m) { ...@@ -62,7 +62,7 @@ void BindDistFleetWrapper(py::module* m) {
.def("stop_server", &FleetWrapper::StopServer) .def("stop_server", &FleetWrapper::StopServer)
.def("stop_worker", &FleetWrapper::FinalizeWorker) .def("stop_worker", &FleetWrapper::FinalizeWorker)
.def("barrier", &FleetWrapper::BarrierWithTable); .def("barrier", &FleetWrapper::BarrierWithTable);
} // end BindDistFleetWrapper }
void BindPSHost(py::module* m) { void BindPSHost(py::module* m) {
py::class_<distributed::PSHost>(*m, "PSHost") py::class_<distributed::PSHost>(*m, "PSHost")
...@@ -79,8 +79,8 @@ void BindCommunicatorContext(py::module* m) { ...@@ -79,8 +79,8 @@ void BindCommunicatorContext(py::module* m) {
.def( .def(
py::init<const std::string&, const std::vector<std::string>&, py::init<const std::string&, const std::vector<std::string>&,
const std::vector<std::string>&, const std::vector<int64_t>&, const std::vector<std::string>&, const std::vector<int64_t>&,
const std::vector<std::string>&, int, bool, bool, bool, const std::vector<std::string>&, int, bool, bool, bool, int,
int>()) bool>())
.def("var_name", [](const CommContext& self) { return self.var_name; }) .def("var_name", [](const CommContext& self) { return self.var_name; })
.def("trainer_id", .def("trainer_id",
[](const CommContext& self) { return self.trainer_id; }) [](const CommContext& self) { return self.trainer_id; })
...@@ -97,6 +97,8 @@ void BindCommunicatorContext(py::module* m) { ...@@ -97,6 +97,8 @@ void BindCommunicatorContext(py::module* m) {
[](const CommContext& self) { return self.is_distributed; }) [](const CommContext& self) { return self.is_distributed; })
.def("origin_varnames", .def("origin_varnames",
[](const CommContext& self) { return self.origin_varnames; }) [](const CommContext& self) { return self.origin_varnames; })
.def("is_tensor_table",
[](const CommContext& self) { return self.is_tensor_table; })
.def("__str__", [](const CommContext& self) { return self.print(); }); .def("__str__", [](const CommContext& self) { return self.print(); });
} }
......
...@@ -64,6 +64,11 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -64,6 +64,11 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_main = compiled_config.origin_main_program.clone() _main = compiled_config.origin_main_program.clone()
_startup = compiled_config.origin_startup_program.clone() _startup = compiled_config.origin_startup_program.clone()
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _add_lr_decay_table_pass
_add_lr_decay_table_pass(
_main, compiled_config,
self.user_defined_strategy.a_sync_configs["lr_decay_steps"])
if not compiled_config.is_geo_mode(): if not compiled_config.is_geo_mode():
# for main program # for main program
_main = worker.delete_optimizer_pass(_main, compiled_config) _main = worker.delete_optimizer_pass(_main, compiled_config)
...@@ -128,6 +133,12 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -128,6 +133,12 @@ class ParameterServerOptimizer(MetaOptimizerBase):
if len(ops) == 0: if len(ops) == 0:
return _main, _startup return _main, _startup
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _add_lr_decay_table_pass
lr_decay_steps = self.user_defined_strategy.a_sync_configs[
"lr_decay_steps"]
_add_lr_decay_table_pass(main_program, compiled_config,
lr_decay_steps)
for op in ops: for op in ops:
if op.type in ["sgd", "adam"]: if op.type in ["sgd", "adam"]:
is_sgd_adam = True is_sgd_adam = True
......
...@@ -206,6 +206,28 @@ class CommonAccessor: ...@@ -206,6 +206,28 @@ class CommonAccessor:
conv_indent(indent), attrs, conv_indent(indent)) conv_indent(indent), attrs, conv_indent(indent))
class Tensor:
def __init__(self):
self.main_program_id = None
self.startup_program_id = None
self.feed_var_name = None
self.fetch_var_name = None
self.tensor_table_class = False
def to_string(self, indent):
program_str = "{}tensor {{{}\n{}}}"
attrs = ""
attrs += "feed_var_name: \"{}\" ".format(str(self.feed_var_name))
attrs += "fetch_var_name: \"{}\" ".format(str(self.fetch_var_name))
attrs += "startup_program_id: {} ".format(str(self.startup_program_id))
attrs += "main_program_id: {} ".format(str(self.main_program_id))
attrs += "tensor_table_class: \"{}\" ".format(
str(self.tensor_table_class))
attrs += "\n"
return program_str.format(
conv_indent(indent), attrs, conv_indent(indent))
class Table: class Table:
def __init__(self): def __init__(self):
self.id = -1 self.id = -1
...@@ -214,6 +236,7 @@ class Table: ...@@ -214,6 +236,7 @@ class Table:
self.type = None self.type = None
self.accessor = None self.accessor = None
self.common = None self.common = None
self.tensor = None
def to_string(self, indent): def to_string(self, indent):
table_str = "{}downpour_table_param {{{}\n{}}}" table_str = "{}downpour_table_param {{{}\n{}}}"
...@@ -230,6 +253,10 @@ class Table: ...@@ -230,6 +253,10 @@ class Table:
attrs += self.accessor.to_string(indent) attrs += self.accessor.to_string(indent)
attrs += "\n" attrs += "\n"
if self.tensor is not None:
attrs += self.tensor.to_string(indent)
attrs += "\n"
if self.common is not None: if self.common is not None:
attrs += self.common.to_string(indent) attrs += self.common.to_string(indent)
attrs += "\n" attrs += "\n"
...@@ -355,6 +382,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -355,6 +382,7 @@ class TheOnePSRuntime(RuntimeBase):
self._communicator = None self._communicator = None
self._server = None self._server = None
self._worker = fluid.core.DistFleetWrapper() self._worker = fluid.core.DistFleetWrapper()
self._server_sub_program = []
self._heter_client = None self._heter_client = None
def _set_basic_info(self, context): def _set_basic_info(self, context):
...@@ -569,17 +597,73 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -569,17 +597,73 @@ class TheOnePSRuntime(RuntimeBase):
table.common = common table.common = common
return table return table
def _build_tensor_table(idx, tensor_dict):
table = Table()
table.id = idx
table.type = "PS_OTHER_TABLE"
table.table_class = tensor_dict["tensor_table_class"]
table.shard_num = 256
accessor = Accessor()
accessor.accessor_class = "CommMergeAccessor"
accessor.optimizer = None
accessor.feature_dim = 0
accessor.embedding_dim = 0
table.accessor = accessor
common = CommonAccessor()
common.table_name = tensor_dict["feed_var_name"]
common.trainer_num = self.compiled_strategy.get_trainers()
common.attrs = ""
common.dims = []
common.params = []
table.common = common
tensor = Tensor()
tensor.main_program_id = tensor_dict["main_program_id"]
tensor.startup_program_id = tensor_dict["startup_program_id"]
tensor.feed_var_name = tensor_dict["feed_var_name"]
tensor.fetch_var_name = tensor_dict["fetch_var_name"]
tensor.tensor_table_class = tensor_dict["tensor_table_class"]
table.tensor = tensor
return table
def _add_tensor_table(tables):
tensor_table_dict = self.compiled_strategy.get_tensor_table_dict()
program_idx = 0
for table_name in tensor_table_dict:
if tensor_table_dict[table_name]["startup_program"] != None:
tensor_table_dict[table_name][
"startup_program_id"] = program_idx
self._server_sub_program.append(tensor_table_dict[
table_name]["startup_program"].desc)
program_idx += 1
if tensor_table_dict[table_name]["main_program"] != None:
tensor_table_dict[table_name][
"main_program_id"] = program_idx
self._server_sub_program.append(tensor_table_dict[
table_name]["main_program"].desc)
program_idx += 1
# Todo: Hard code for lr_decay table apply table id
new_table = _build_tensor_table(
len(tables), tensor_table_dict[table_name])
tables.append(new_table)
return tables
def _get_tables(): def _get_tables():
send_ctx = self.compiled_strategy.get_the_one_send_context( send_ctx = self.compiled_strategy.get_the_one_send_context(
use_origin_program=True, use_origin_program=True,
split_dense_table=self.role_maker. split_dense_table=self.role_maker.
_is_heter_parameter_server_mode) _is_heter_parameter_server_mode)
tables = [i for i in range(len(send_ctx) + 1)] tables = []
for idx, (name, ctx) in enumerate(send_ctx.items()): for idx, (name, ctx) in enumerate(send_ctx.items()):
table = Table() table = Table()
table.id = ctx.table_id() table.id = ctx.table_id()
if ctx.is_tensor_table():
continue
if ctx.is_sparse(): if ctx.is_sparse():
if len(ctx.origin_varnames()) < 1: if len(ctx.origin_varnames()) < 1:
continue continue
...@@ -619,10 +703,17 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -619,10 +703,17 @@ class TheOnePSRuntime(RuntimeBase):
accessor = _build_merge_accessor(ctx) accessor = _build_merge_accessor(ctx)
table.accessor = accessor table.accessor = accessor
tables[table.id] = table tables.append(table)
tensor_table_dict = self.compiled_strategy.get_tensor_table_dict()
if len(tensor_table_dict) > 0:
tables = _add_tensor_table(tables)
else:
empty_porgram = Program()
self._server_sub_program.append(empty_porgram.desc)
barrier_table = _build_barrier_table(len(send_ctx)) barrier_table = _build_barrier_table(len(tables))
tables[-1] = barrier_table tables.append(barrier_table)
return tables return tables
if is_server: if is_server:
...@@ -667,7 +758,8 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -667,7 +758,8 @@ class TheOnePSRuntime(RuntimeBase):
string_hosts.append(pshost.serialize_to_string()) string_hosts.append(pshost.serialize_to_string())
self._server = fluid.core.DistFleetWrapper() self._server = fluid.core.DistFleetWrapper()
self._server.init_server(proto_txt, string_hosts, role_id) self._server.init_server(proto_txt, string_hosts, role_id,
self._server_sub_program)
from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_tablenames from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_tablenames
......
...@@ -19,7 +19,7 @@ import collections ...@@ -19,7 +19,7 @@ import collections
import math import math
import os import os
import warnings import warnings
import logging
import six import six
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid import core from paddle.fluid import core
...@@ -162,6 +162,8 @@ class CompileTimeStrategy(object): ...@@ -162,6 +162,8 @@ class CompileTimeStrategy(object):
self._build_var_distributed() self._build_var_distributed()
self.tensor_table_dict = {}
# for heter-ps save variables # for heter-ps save variables
self.origin_merged_variables_pairs = list(self.merged_variables_pairs) self.origin_merged_variables_pairs = list(self.merged_variables_pairs)
self.origin_merged_dense_pairs = list(self.merged_dense_pairs) self.origin_merged_dense_pairs = list(self.merged_dense_pairs)
...@@ -240,6 +242,24 @@ class CompileTimeStrategy(object): ...@@ -240,6 +242,24 @@ class CompileTimeStrategy(object):
def get_origin_ps_startup_program(self): def get_origin_ps_startup_program(self):
return self.origin_ps_startup_program return self.origin_ps_startup_program
def add_tensor_table(self,
feed_var_name,
fetch_var_name="",
startup_program=None,
main_program=None,
tensor_table_class=""):
self.tensor_table_dict[feed_var_name] = {}
self.tensor_table_dict[feed_var_name]["feed_var_name"] = feed_var_name
self.tensor_table_dict[feed_var_name]["fetch_var_name"] = fetch_var_name
self.tensor_table_dict[feed_var_name][
"startup_program"] = startup_program
self.tensor_table_dict[feed_var_name]["main_program"] = main_program
self.tensor_table_dict[feed_var_name][
"tensor_table_class"] = tensor_table_class
def get_tensor_table_dict(self):
return self.tensor_table_dict
def get_sparse_varname_on_ps(self, is_distributed, endpoint=None): def get_sparse_varname_on_ps(self, is_distributed, endpoint=None):
if not endpoint: if not endpoint:
endpoint = self.get_ps_endpoint() endpoint = self.get_ps_endpoint()
...@@ -523,9 +543,10 @@ class CompileTimeStrategy(object): ...@@ -523,9 +543,10 @@ class CompileTimeStrategy(object):
grad.merged_var.name] grad.merged_var.name]
var_numel = reduce(lambda x, y: x * y, var.shape[1:]) var_numel = reduce(lambda x, y: x * y, var.shape[1:])
sparse_ctx = CommContext( sparse_ctx = CommContext(grad_name, [grad_name],
grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], ["127.0.0.1:6071"], [var_numel],
[grad_name], trainer_id, True, True, is_distributed, idx) [grad_name], trainer_id, True, True,
is_distributed, idx, False)
idx += 1 idx += 1
send_ctx[sparse_ctx.var_name()] = sparse_ctx send_ctx[sparse_ctx.var_name()] = sparse_ctx
...@@ -533,6 +554,10 @@ class CompileTimeStrategy(object): ...@@ -533,6 +554,10 @@ class CompileTimeStrategy(object):
raise ValueError( raise ValueError(
"GeoSGD require sparse parameters in your net.") "GeoSGD require sparse parameters in your net.")
if len(self.tensor_table_dict) > 0 and self.role_maker._is_worker():
name, ctx = self._step_ctx(idx)
send_ctx[name] = ctx
return send_ctx return send_ctx
else: else:
return self.get_the_one_send_context(split_dense_table) return self.get_the_one_send_context(split_dense_table)
...@@ -559,7 +584,7 @@ class CompileTimeStrategy(object): ...@@ -559,7 +584,7 @@ class CompileTimeStrategy(object):
aggregate = True aggregate = True
dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"], dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], origin_varnames, trainer_id, [var_numel], origin_varnames, trainer_id,
aggregate, False, False, idx) aggregate, False, False, idx, False)
send_ctx[grad_name] = dense_ctx send_ctx[grad_name] = dense_ctx
idx += 1 idx += 1
else: else:
...@@ -571,9 +596,10 @@ class CompileTimeStrategy(object): ...@@ -571,9 +596,10 @@ class CompileTimeStrategy(object):
var_numel = reduce(lambda x, y: x * y, var.shape) var_numel = reduce(lambda x, y: x * y, var.shape)
grad_name = origin_varname grad_name = origin_varname
aggregate = True aggregate = True
dense_ctx = CommContext( dense_ctx = CommContext(grad_name, [grad_name],
grad_name, [grad_name], ["127.0.0.1:6071"], [var_numel], ["127.0.0.1:6071"], [var_numel],
[origin_varname], trainer_id, aggregate, False, False, idx) [origin_varname], trainer_id, aggregate,
False, False, idx, False)
send_ctx[grad_name] = dense_ctx send_ctx[grad_name] = dense_ctx
idx += 1 idx += 1
return idx return idx
...@@ -615,10 +641,15 @@ class CompileTimeStrategy(object): ...@@ -615,10 +641,15 @@ class CompileTimeStrategy(object):
sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape, sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape,
[grad_name], trainer_id, True, True, [grad_name], trainer_id, True, True,
is_distributed, idx) is_distributed, idx, False)
idx += 1 idx += 1
send_ctx[sparse_ctx.var_name()] = sparse_ctx send_ctx[sparse_ctx.var_name()] = sparse_ctx
if len(self.tensor_table_dict) > 0 and self.role_maker._is_worker():
name, ctx = self._step_ctx(idx)
send_ctx[name] = ctx
return send_ctx return send_ctx
def get_the_one_recv_context(self, def get_the_one_recv_context(self,
...@@ -633,6 +664,8 @@ class CompileTimeStrategy(object): ...@@ -633,6 +664,8 @@ class CompileTimeStrategy(object):
for idx, (name, ctx) in enumerate(send_ctx.items()): for idx, (name, ctx) in enumerate(send_ctx.items()):
if ctx.is_sparse(): if ctx.is_sparse():
continue continue
if ctx.is_tensor_table():
continue
origin_grad_varnames = ctx.origin_varnames() origin_grad_varnames = ctx.origin_varnames()
...@@ -679,14 +712,14 @@ class CompileTimeStrategy(object): ...@@ -679,14 +712,14 @@ class CompileTimeStrategy(object):
var_distributed.append((g.name, ep, g.shape[0])) var_distributed.append((g.name, ep, g.shape[0]))
return var_distributed return var_distributed
def _step_ctx(self): def _step_ctx(self, idx):
name = STEP_COUNTER name = STEP_COUNTER
trainer_id = self.get_role_id() trainer_id = self.get_role_id()
endpoints = self.get_ps_endpoints() endpoints = self.get_ps_endpoints()
sections = [1] * len(endpoints) sections = [1] * len(endpoints)
names = [name] * len(endpoints) names = [name] * len(endpoints)
ctx = CommContext(name, names, endpoints, sections, [name], trainer_id, ctx = CommContext(name, names, endpoints, sections, [name], trainer_id,
True, False, False) True, False, False, idx, True)
return name, ctx return name, ctx
def _create_vars_from_blocklist(self, block_list): def _create_vars_from_blocklist(self, block_list):
...@@ -1118,6 +1151,89 @@ def _get_optimize_ops(_program): ...@@ -1118,6 +1151,89 @@ def _get_optimize_ops(_program):
return opt_ops return opt_ops
def _add_lr_decay_table_pass(main_program, compiled_config, lr_decay_steps):
if hasattr(compiled_config.origin_main_program, 'lr_sheduler'):
from paddle.optimizer.lr import LRScheduler
assert isinstance(compiled_config.origin_main_program.lr_sheduler,
LRScheduler), "must be LRScheduler"
ops = _get_optimize_ops(compiled_config.origin_main_program)
lr_param_dict = _get_lr_param_dict(ops)
lr_decay_main_program, lr_decay_startup_program, lr_name = _get_lr_sheduler_program(
compiled_config.origin_main_program.lr_sheduler, lr_param_dict,
lr_decay_steps)
compiled_config.add_tensor_table(
"@LR_DECAY_COUNTER@", lr_name, lr_decay_startup_program,
lr_decay_main_program, "GlobalStepTable")
def _get_lr_param_dict(opt_ops):
lr_param_dict = {}
for op in opt_ops:
lr_name = op.input("LearningRate")[0]
param_name = op.input("Param")[0]
if lr_name not in lr_param_dict:
lr_param_dict[lr_name] = []
lr_param_dict[lr_name].append(param_name)
return lr_param_dict
def _get_lr_sheduler_program(lr_sheduler, lr_param_dict, lr_decay_steps):
schedler_decay = [
'NoamDecay', 'NaturalExpDecay', 'InverseTimeDecay', 'ExponentialDecay'
]
from paddle.optimizer.lr import ExponentialDecay, NoamDecay, PiecewiseDecay, NaturalExpDecay, InverseTimeDecay
from paddle.fluid.layers.learning_rate_scheduler import exponential_decay, noam_decay, piecewise_decay, natural_exp_decay, inverse_time_decay
decay_main_program = fluid.framework.Program()
decay_startup_program = fluid.framework.Program()
lr_name = ""
if isinstance(lr_sheduler, ExponentialDecay):
with fluid.program_guard(decay_main_program, decay_startup_program):
lr = exponential_decay(1.0, lr_decay_steps, lr_sheduler.gamma, True)
lr_name = lr.name
logging.warn(
"ExponentialDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow: \n"
"\t strategy = paddle.distributed.fleet.DistributedStrategy() \n "
"\t strategy.a_sync = True \n"
"\t strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP } \n"
% lr_decay_steps)
elif isinstance(lr_sheduler, NoamDecay):
with fluid.program_guard(decay_main_program, decay_startup_program):
lr = noam_decay(lr_sheduler.d_model, lr_sheduler.warmup_steps, 1.0)
lr_name = lr.name
logging.warn("NoamDecay is set, warmup steps is [ %d ]" %
lr_sheduler.warmup_steps)
elif isinstance(lr_sheduler, NaturalExpDecay):
with fluid.program_guard(decay_main_program, decay_startup_program):
lr = natural_exp_decay(1.0, lr_decay_steps, lr_sheduler.gamma, True)
lr_name = lr.name
logging.warn(
"NaturalExpDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow: \n"
"\t strategy = paddle.distributed.fleet.DistributedStrategy() \n "
"\t strategy.a_sync = True \n"
"\t strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP } \n"
% lr_decay_steps)
elif isinstance(lr_sheduler, InverseTimeDecay):
with fluid.program_guard(decay_main_program, decay_startup_program):
lr = inverse_time_decay(1.0, lr_decay_steps, lr_sheduler.gamma,
True)
lr_name = lr.name
logging.warn(
"InverseTimeDecay is set, staircase = True, global learning rate decay step is [ %d ], Change decay steps as follow: \n"
"\t strategy = paddle.distributed.fleet.DistributedStrategy() \n "
"\t strategy.a_sync = True \n"
"\t strategy.a_sync_configs= { 'lr_decay_steps' : YOUR_DECAY_STEP } \n"
% lr_decay_steps)
else:
raise ValueError(
"Not supported current LearningRate strategy, please use follow decay strategy: {}".
format(schedler_decay))
return decay_main_program, decay_startup_program, lr_name
def _get_varname_parts(varname): def _get_varname_parts(varname):
# returns origin, blockid, trainerid # returns origin, blockid, trainerid
orig_var_name = "" orig_var_name = ""
......
...@@ -34,7 +34,6 @@ from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode ...@@ -34,7 +34,6 @@ from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
OP_NAME_SCOPE = "op_namescope" OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "@CLIP" CLIP_OP_NAME_SCOPE = "@CLIP"
STEP_COUNTER = "@PS_STEP_COUNTER@" STEP_COUNTER = "@PS_STEP_COUNTER@"
OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName() RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName()
RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC
...@@ -43,7 +42,6 @@ OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize ...@@ -43,7 +42,6 @@ OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize
op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"}
DEVICE_LIST = ["cpu", "gpu", "xpu"] DEVICE_LIST = ["cpu", "gpu", "xpu"]
COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"]
DEFAULT_DEVICE = 'cpu' DEFAULT_DEVICE = 'cpu'
...@@ -72,11 +70,26 @@ def delete_optimizer_pass(program, config): ...@@ -72,11 +70,26 @@ def delete_optimizer_pass(program, config):
if _program.global_block().has_var(var): if _program.global_block().has_var(var):
_program.global_block()._remove_var(var) _program.global_block()._remove_var(var)
def _add_lr_var(main_program, compiled_config):
# Todo: hard code for pe
lr_var = compiled_config.origin_main_program.global_block().vars[
"learning_rate_0"]
main_program.global_block().create_var(
name=lr_var.name,
shape=lr_var.shape,
dtype=lr_var.dtype,
type=lr_var.type,
lod_level=lr_var.lod_level,
persistable=True)
optimizer_ops = _get_optimize_ops(program) optimizer_ops = _get_optimize_ops(program)
lr_ops = _get_lr_ops(program) lr_ops = _get_lr_ops(program)
optimizer_ops.extend(lr_ops) optimizer_ops.extend(lr_ops)
_delete_optimizer_op_and_vars(program, optimizer_ops) _delete_optimizer_op_and_vars(program, optimizer_ops)
if hasattr(config.origin_main_program, 'lr_sheduler'):
_add_lr_var(program, config)
return program return program
......
...@@ -179,7 +179,7 @@ def gen_zero_line(dnn_data_num=7, lr_data_num=5): ...@@ -179,7 +179,7 @@ def gen_zero_line(dnn_data_num=7, lr_data_num=5):
return line return line
def prepare_fake_data(file_nums=6, file_lines=1000): def prepare_fake_data(file_nums=4, file_lines=500):
""" """
Create fake data with same type as avazu_ctr_data Create fake data with same type as avazu_ctr_data
""" """
......
...@@ -13,6 +13,11 @@ ...@@ -13,6 +13,11 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
from paddle.distributed.fleet.utils.ps_util import Distributed
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
""" """
high level unit test for distribute fleet. high level unit test for distribute fleet.
""" """
...@@ -34,12 +39,6 @@ import unittest ...@@ -34,12 +39,6 @@ import unittest
import paddle import paddle
paddle.enable_static() paddle.enable_static()
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.distributed.fleet.utils.ps_util import Distributed
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']
RUN_STEP = 5 RUN_STEP = 5
...@@ -122,14 +121,20 @@ class FleetDistRunnerBase(object): ...@@ -122,14 +121,20 @@ class FleetDistRunnerBase(object):
fluid.clip.set_gradient_clip( fluid.clip.set_gradient_clip(
clip=fluid.clip.GradientClipByGlobalNorm(2.0)) clip=fluid.clip.GradientClipByGlobalNorm(2.0))
use_decay = int(os.getenv("DECAY", "0")) use_decay = int(os.getenv("USE_DECAY", "0"))
if use_decay: if use_decay:
scheduler = paddle.optimizer.lr.ExponentialDecay(
learning_rate=LEARNING_RATE, gamma=0.999, verbose=True)
optimizer = fluid.optimizer.SGD(scheduler)
"""
# learning rate decay method before 2.0
optimizer = fluid.optimizer.SGD( optimizer = fluid.optimizer.SGD(
learning_rate=fluid.layers.exponential_decay( learning_rate=fluid.layers.exponential_decay(
learning_rate=LEARNING_RATE, learning_rate=LEARNING_RATE,
decay_steps=500, decay_steps=500,
decay_rate=0.969, decay_rate=0.969,
staircase=True)) staircase=True))
"""
else: else:
optimizer = fluid.optimizer.SGD(LEARNING_RATE) optimizer = fluid.optimizer.SGD(LEARNING_RATE)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
import os
import unittest
import paddle
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestNoamDecay(unittest.TestCase):
def net(self):
input_data = paddle.static.data(
name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.static.data(
name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="float32")
embedding = paddle.static.nn.embedding(
input_data, is_sparse=True, size=[1000, 128])
fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
label = paddle.cast(label, dtype="int64")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)
paddle.static.Print(cost, message="heter_cost")
return cost
def test(self):
endpoints = [
"127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006",
"127.0.0.1:36007"
]
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.WORKER,
worker_num=2,
server_endpoints=endpoints)
fleet.init(role)
loss = self.net()
scheduler = paddle.optimizer.lr.NoamDecay(
d_model=0.01, warmup_steps=100, verbose=True)
optimizer = fluid.optimizer.Adam(scheduler)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"launch_barrier": False}
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
import unittest
import paddle
import os
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestExponentialDecay(unittest.TestCase):
def net(self):
input_data = paddle.static.data(
name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.static.data(
name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="float32")
embedding = paddle.static.nn.embedding(
input_data, is_sparse=True, size=[1000, 128])
fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
label = paddle.cast(label, dtype="int64")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)
paddle.static.Print(cost, message="heter_cost")
return cost
def test(self):
endpoints = [
"127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006",
"127.0.0.1:36007"
]
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=endpoints)
fleet.init(role)
loss = self.net()
scheduler = paddle.optimizer.lr.InverseTimeDecay(
learning_rate=base_lr, gamma=0.999, verbose=True)
optimizer = fluid.optimizer.Adam(scheduler)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss)
fleet.init_server()
if __name__ == '__main__':
os.environ["GLOG_v"] = "4"
os.environ["GLOG_logtostderr"] = "1"
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
import os
import unittest
import paddle
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestNaturalExpDecay(unittest.TestCase):
def net(self):
input_data = paddle.static.data(
name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.static.data(
name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="float32")
embedding = paddle.static.nn.embedding(
input_data, is_sparse=True, size=[1000, 128])
fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
label = paddle.cast(label, dtype="int64")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)
paddle.static.Print(cost, message="heter_cost")
return cost
def test(self):
endpoints = [
"127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006",
"127.0.0.1:36007"
]
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=endpoints)
fleet.init(role)
loss = self.net()
scheduler = paddle.optimizer.lr.NaturalExpDecay(
learning_rate=base_lr, gamma=0.999, verbose=True)
optimizer = fluid.optimizer.Adam(scheduler)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss)
fleet.init_server()
if __name__ == '__main__':
os.environ["GLOG_v"] = "4"
os.environ["GLOG_logtostderr"] = "1"
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
import os
import unittest
import paddle
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestNoamDecay(unittest.TestCase):
def net(self):
input_data = paddle.static.data(
name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.static.data(
name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="float32")
embedding = paddle.static.nn.embedding(
input_data, is_sparse=True, size=[1000, 128])
fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
label = paddle.cast(label, dtype="int64")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)
paddle.static.Print(cost, message="heter_cost")
return cost
def test(self):
endpoints = [
"127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006",
"127.0.0.1:36007"
]
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=endpoints)
fleet.init(role)
loss = self.net()
scheduler = paddle.optimizer.lr.NoamDecay(
d_model=0.01, warmup_steps=100, verbose=True)
optimizer = fluid.optimizer.Adam(scheduler)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss)
fleet.init_server()
if __name__ == '__main__':
os.environ["GLOG_v"] = "4"
os.environ["GLOG_logtostderr"] = "1"
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
import os
import unittest
import paddle
paddle.enable_static()
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4
class TestExponentialDecay(unittest.TestCase):
def net(self):
input_data = paddle.static.data(
name="sparse_input", shape=[None, 1], dtype="int64")
input_label = paddle.static.data(
name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="float32")
embedding = paddle.static.nn.embedding(
input_data, is_sparse=True, size=[1000, 128])
fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
label = paddle.cast(label, dtype="int64")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)
paddle.static.Print(cost, message="heter_cost")
return cost
def test(self):
endpoints = [
"127.0.0.1:36004", "127.0.0.1:36005", "127.0.0.1:36006",
"127.0.0.1:36007"
]
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=endpoints)
fleet.init(role)
loss = self.net()
scheduler = paddle.optimizer.lr.ExponentialDecay(
learning_rate=base_lr, gamma=0.999, verbose=True)
optimizer = fluid.optimizer.Adam(scheduler)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss)
fleet.init_server()
if __name__ == '__main__':
os.environ["GLOG_v"] = "4"
os.environ["GLOG_logtostderr"] = "1"
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册