未验证 提交 f7fd711e 编写于 作者: Y Yancey 提交者: GitHub

Merge pull request #11868 from Yancey1989/dist_pass_barrier

add dist pass barrier
......@@ -46,9 +46,16 @@ ExecutorPrepareContext::~ExecutorPrepareContext() {
Executor::Executor(const platform::Place& place) : place_(place) {}
#ifdef PADDLE_WITH_DISTRIBUTE
void Executor::Complete() {
::paddle::operators::distributed::RPCClient::GetInstance<RPCCLIENT_T>()
->SendComplete();
void Executor::BeginPass() {
::paddle::operators::distributed::RPCClient::GetInstance<
::paddle::operators::distributed::GRPCClient>()
->SendBeginPass();
}
void Executor::EndPass() {
::paddle::operators::distributed::RPCClient::GetInstance<
::paddle::operators::distributed::GRPCClient>()
->SendEndPass();
}
#endif
......
......@@ -46,9 +46,14 @@ class Executor {
#ifdef PADDLE_WITH_DISTRIBUTE
/*
* Sending signal to pserver to mark current trainer stop.
* Sending signal to pserver to mark current pass started.
*/
void Complete();
void BeginPass();
/*
* Sending signal to pserver to mark current pass finished.
*/
void EndPass();
#endif
/* @Brief
......
......@@ -35,10 +35,20 @@ void GRPCClient::InitEventLoop() {
client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this)));
}
void GRPCClient::SendComplete() {
void GRPCClient::SendBeginPass() {
for (auto& it : channels_) {
this->AsyncSendComplete(it.first);
VLOG(3) << "send begin pass to: " << it.first;
this->AsyncSendBeginPass(it.first);
}
this->Wait();
}
void GRPCClient::SendEndPass() {
for (auto& it : channels_) {
VLOG(3) << "send end pass to " << it.first;
this->AsyncSendEndPass(it.first);
}
this->Wait();
}
GRPCClient::~GRPCClient() {
......@@ -226,19 +236,32 @@ void GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
req_count_++;
}
void GRPCClient::AsyncSendComplete(const std::string& ep, int64_t time_out) {
void GRPCClient::AsyncSendBeginPass(const std::string& ep, int64_t time_out) {
const auto ch = GetChannel(ep);
BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
s->Prepare(time_out);
sendrecv::VariableMessage req;
req.set_varname(COMPLETE_MESSAGE);
req.set_varname(BEGIN_PASS_MESSAGE);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
req_count_++;
}
void GRPCClient::AsyncSendEndPass(const std::string& ep, int64_t time_out) {
const auto ch = GetChannel(ep);
FetchBarrierProcessor* s = new FetchBarrierProcessor(ch);
s->Prepare(time_out);
sendrecv::VariableMessage req;
req.set_varname(END_PASS_MESSAGE);
auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
req_count_++;
}
void GRPCClient::AsyncCheckpointNotify(const std::string& ep,
const std::string& dir,
int64_t time_out) {
......
......@@ -77,12 +77,13 @@ class BaseProcessor {
context_.reset(new grpc::ClientContext());
var_h_ = var_info;
context_->set_wait_for_ready(true);
if (time_out) {
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(time_out);
std::chrono::system_clock::now() +
std::chrono::milliseconds(time_out);
context_->set_deadline(deadline);
}
}
virtual void Prepare(int64_t time_out) {
context_.reset(new grpc::ClientContext());
......@@ -214,9 +215,17 @@ class GRPCClient : public RPCClient {
void AsyncCheckpointNotify(const std::string& ep, const std::string& dir,
int64_t time_out = FLAGS_rpc_deadline) override;
void AsyncSendBeginPass(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline) override;
void AsyncSendEndPass(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline) override;
void Wait() override;
void SendComplete() override;
void SendBeginPass() override;
void SendEndPass() override;
protected:
void InitImpl() override;
......@@ -227,9 +236,6 @@ class GRPCClient : public RPCClient {
void Proceed();
void AsyncSendComplete(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline);
std::shared_ptr<grpc::Channel> GetChannel(const std::string& ep);
private:
......
......@@ -37,11 +37,14 @@ constexpr char kRequestSend[] = "RequestSend";
constexpr char kRequestGet[] = "RequestGet";
constexpr char kRequestPrefetch[] = "RequestPrefetch";
constexpr char kRequestCheckpoint[] = "RequestCheckpoint";
constexpr char kRequestPassBarrier[] = "RequestPassBarrier";
#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV"
#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV"
#define FETCH_BARRIER_MESSAGE "FETCH_BARRIER@RECV"
#define COMPLETE_MESSAGE "COMPLETE@RECV"
#define BEGIN_PASS_MESSAGE "BEGIN_PASS@RECV"
#define END_PASS_MESSAGE "END_PASS@RECV"
#define CHECKPOINT_SAVE_MESSAGE "SAVE@CHECKPOINTNOTIFY"
#define CHECKPOINT_LOAD_MESSAGE "LOAD@CHECKPOINTNOTIFY"
......
......@@ -55,14 +55,14 @@ bool RequestSendHandler::Handle(const std::string& varname,
if (varname == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "sync: recv batch barrier message";
rpc_server_->IncreaseBatchBarrier(kRequestSend);
} else if (varname == COMPLETE_MESSAGE) {
VLOG(3) << "sync: recv complete message";
rpc_server_->DecreaseClientNum();
} else if (varname == BEGIN_PASS_MESSAGE) {
VLOG(3) << "sync: recv begin pass message";
rpc_server_->WaitCond(kRequestSend);
rpc_server_->BeginPass();
} else {
VLOG(3) << "sync: received var_name: " << varname;
if (sync_mode_) {
rpc_server_->WaitCond(kRequestSend);
}
VLOG(3) << "sync: processing received var: " << varname;
if (invar == nullptr) {
LOG(ERROR) << "sync: Can not find server side var: " << varname;
......@@ -91,21 +91,21 @@ bool RequestGetHandler::Handle(const std::string& varname,
framework::Variable** outvar,
const std::string& out_var_name) {
VLOG(4) << "RequestGetHandler:" << varname;
if (varname != FETCH_BARRIER_MESSAGE) {
if (sync_mode_) {
if (varname == FETCH_BARRIER_MESSAGE) {
VLOG(3) << "sync: recv fetch barrier message";
rpc_server_->IncreaseBatchBarrier(kRequestGet);
} else if (varname == END_PASS_MESSAGE) {
rpc_server_->EndPass();
} else {
rpc_server_->WaitCond(kRequestGet);
*outvar = scope_->FindVar(varname);
}
} else {
if (varname != FETCH_BARRIER_MESSAGE && varname != END_PASS_MESSAGE) {
*outvar = scope_->FindVar(varname);
return true;
}
// FETCH_BARRIER_MESSAGE
if (sync_mode_) {
VLOG(3) << "sync: recv fetch barrier message";
rpc_server_->IncreaseBatchBarrier(kRequestGet);
}
return true;
}
......
......@@ -60,10 +60,17 @@ class RPCClient {
const std::string& dir,
int64_t time_out = FLAGS_rpc_deadline) = 0;
// SendComplete tells all the server that current trainer have no more data
// to train, so that the pserver can reduce it's barrier count, and continue
// to train with other trainers.
virtual void SendComplete() = 0;
virtual void AsyncSendBeginPass(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline) = 0;
virtual void AsyncSendEndPass(const std::string& ep,
int64_t time_out = FLAGS_rpc_deadline) = 0;
// BeginePass/EndPass tells all the pserver that start/end a pass, so that
// the pserver can increase/reduce it's barrier count, and continue to train
// with other trainers.
virtual void SendBeginPass() = 0;
virtual void SendEndPass() = 0;
virtual void Wait() = 0;
......
......@@ -44,7 +44,8 @@ void RPCServer::SavePort() const {
void RPCServer::WaitBarrier(const std::string& rpc_name) {
std::unique_lock<std::mutex> lock(this->mutex_);
barrier_cond_.wait(lock, [this, &rpc_name] {
return (barrier_counter_[rpc_name] >= client_num_ || exit_flag_.load());
return ((barrier_counter_[rpc_name] == client_num_ && client_num_ != 0) ||
exit_flag_.load());
});
VLOG(3) << "batch_barrier_: " << rpc_name << " "
......@@ -63,10 +64,25 @@ void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) {
}
}
void RPCServer::DecreaseClientNum() {
void RPCServer::BeginPass() {
VLOG(4) << "RPCServer begin increase pass barrier";
{
std::unique_lock<std::mutex> lock(mutex_);
client_num_++;
VLOG(4) << "increase client_num to: " << client_num_;
}
barrier_cond_.notify_all();
}
void RPCServer::EndPass() {
VLOG(4) << "RPCServer begin increase pass barrier";
{
std::unique_lock<std::mutex> lock(mutex_);
client_num_--;
VLOG(4) << "decrease client_num to: " << client_num_;
if (cur_cond_.load() == rpc_cond_map_[kRequestGet]) {
barrier_counter_[kRequestGet]--;
}
}
barrier_cond_.notify_all();
}
......
......@@ -43,6 +43,9 @@ class RPCServer {
bool IsExit() { return exit_flag_.load(); }
int GetSelectedPort() const { return selected_port_; }
int GetClientNum() const;
void SavePort() const;
// RegisterRPC, register the rpc method name to a handler
......@@ -60,7 +63,10 @@ class RPCServer {
void SetCond(const std::string& rpc_name);
void WaitCond(const std::string& rpc_name);
void IncreaseBatchBarrier(const std::string rpc_name);
void DecreaseClientNum();
void BeginPass();
void EndPass();
void ResetBarrierCounter();
protected:
......
......@@ -493,7 +493,8 @@ All parameter, weight, gradient are variables in Paddle.
py::class_<framework::Executor>(m, "Executor")
.def(py::init<const platform::Place &>())
#ifdef PADDLE_WITH_DISTRIBUTE
.def("complete", &Executor::Complete)
.def("begin_pass", &Executor::BeginPass)
.def("end_pass", &Executor::EndPass)
#endif
.def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope,
int block_id, bool create_local_scope, bool create_vars) {
......
......@@ -348,6 +348,12 @@ class Executor(object):
]
return outs
def begin_pass(self):
self.executor.begin_pass()
def end_pass(self):
self.executor.end_pass()
def run(self,
program=None,
feed=None,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册