提交 c750be6d 编写于 作者: Q Qiao Longfei

add some log

上级 d243e555
...@@ -39,10 +39,11 @@ void RPCServer::SavePort() const { ...@@ -39,10 +39,11 @@ void RPCServer::SavePort() const {
port_file.open(file_path); port_file.open(file_path);
port_file << selected_port_; port_file << selected_port_;
port_file.close(); port_file.close();
VLOG(4) << "selected port written to " << file_path; VLOG(3) << "selected port written to " << file_path;
} }
void RPCServer::WaitBarrier(const std::string& rpc_name) { void RPCServer::WaitBarrier(const std::string& rpc_name) {
VLOG(3) << "WaitBarrier: " << rpc_name;
std::unique_lock<std::mutex> lock(this->mutex_); std::unique_lock<std::mutex> lock(this->mutex_);
barrier_cond_.wait(lock, [this, &rpc_name] { barrier_cond_.wait(lock, [this, &rpc_name] {
return ((barrier_counter_[rpc_name] == client_num_ && client_num_ != 0) || return ((barrier_counter_[rpc_name] == client_num_ && client_num_ != 0) ||
...@@ -54,7 +55,7 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) { ...@@ -54,7 +55,7 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) {
} }
void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) { void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) {
VLOG(4) << "RPCServer begin IncreaseBatchBarrier " << rpc_name; VLOG(3) << "RPCServer begin IncreaseBatchBarrier " << rpc_name;
int b = 0; int b = 0;
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
b = ++barrier_counter_[rpc_name]; b = ++barrier_counter_[rpc_name];
...@@ -71,7 +72,7 @@ void RPCServer::Complete() { ...@@ -71,7 +72,7 @@ void RPCServer::Complete() {
client_num_--; client_num_--;
need_reset_all_vars_ = true; need_reset_all_vars_ = true;
VLOG(4) << "decrease client_num to: " << client_num_; VLOG(3) << "decrease client_num to: " << client_num_;
if (cur_cond_.load() == rpc_cond_map_[kRequestGet]) { if (cur_cond_.load() == rpc_cond_map_[kRequestGet]) {
barrier_counter_[kRequestGet]--; barrier_counter_[kRequestGet]--;
} }
...@@ -105,7 +106,7 @@ void RPCServer::RegisterRPC(const std::string& rpc_name, ...@@ -105,7 +106,7 @@ void RPCServer::RegisterRPC(const std::string& rpc_name,
static int cond = -1; static int cond = -1;
rpc_cond_map_[rpc_name] = ++cond; rpc_cond_map_[rpc_name] = ++cond;
VLOG(4) << "RegisterRPC rpc_name:" << rpc_name << ", handler:" << handler VLOG(3) << "RegisterRPC rpc_name:" << rpc_name << ", handler:" << handler
<< ", cond:" << rpc_cond_map_[rpc_name]; << ", cond:" << rpc_cond_map_[rpc_name];
} }
...@@ -120,7 +121,7 @@ void RPCServer::SetCond(const std::string& rpc_name) { ...@@ -120,7 +121,7 @@ void RPCServer::SetCond(const std::string& rpc_name) {
} }
void RPCServer::WaitCond(const std::string& rpc_name) { void RPCServer::WaitCond(const std::string& rpc_name) {
VLOG(4) << "RPCServer WaitCond " << rpc_name; VLOG(3) << "RPCServer WaitCond " << rpc_name;
int cond = 0; int cond = 0;
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
...@@ -151,7 +152,7 @@ void RPCServer::RegisterVar(const std::string& var_name, ...@@ -151,7 +152,7 @@ void RPCServer::RegisterVar(const std::string& var_name,
} }
rpc_cond_.notify_all(); rpc_cond_.notify_all();
VLOG(4) << "RegisterVar context:" << h.String(); VLOG(3) << "RegisterVar context:" << h.String();
} }
void RPCServer::IncreaseVarBarrier(const std::string& var_name) { void RPCServer::IncreaseVarBarrier(const std::string& var_name) {
...@@ -167,11 +168,11 @@ void RPCServer::IncreaseVarBarrier(const std::string& var_name) { ...@@ -167,11 +168,11 @@ void RPCServer::IncreaseVarBarrier(const std::string& var_name) {
barrier_cond_.notify_all(); barrier_cond_.notify_all();
} }
VLOG(4) << "IncreaseVarBarrier context:" << h.String(); VLOG(3) << "IncreaseVarBarrier context:" << h.String();
} }
void RPCServer::WaitVarBarrier(const std::string& var_name) { void RPCServer::WaitVarBarrier(const std::string& var_name) {
VLOG(4) << "WaitBarrier var_name:" << var_name; VLOG(3) << "WaitVarBarrier var_name:" << var_name;
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
barrier_cond_.wait(lock, [&]() { barrier_cond_.wait(lock, [&]() {
...@@ -179,11 +180,11 @@ void RPCServer::WaitVarBarrier(const std::string& var_name) { ...@@ -179,11 +180,11 @@ void RPCServer::WaitVarBarrier(const std::string& var_name) {
exit_flag_.load()); exit_flag_.load());
}); });
VLOG(4) << "WaitBarrier context: " << var_map_[var_name].String(); VLOG(3) << "WaitVarBarrier context: " << var_map_[var_name].String();
} }
void RPCServer::SetVarCond(const std::string& var_name) { void RPCServer::SetVarCond(const std::string& var_name) {
VLOG(4) << "SetVarCond var_name:" << var_name; VLOG(3) << "SetVarCond var_name:" << var_name;
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
if (var_map_.find(var_name) != var_map_.end()) { if (var_map_.find(var_name) != var_map_.end()) {
...@@ -193,14 +194,14 @@ void RPCServer::SetVarCond(const std::string& var_name) { ...@@ -193,14 +194,14 @@ void RPCServer::SetVarCond(const std::string& var_name) {
} }
void RPCServer::WaitVarCond(const std::string& var_name) { void RPCServer::WaitVarCond(const std::string& var_name) {
VLOG(4) << "WaitVarCond var_name:" << var_name; VLOG(3) << "WaitVarCond var_name:" << var_name;
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
rpc_cond_.wait(lock, [=] { rpc_cond_.wait(lock, [=] {
return (var_map_.find(var_name) != var_map_.end() || exit_flag_.load()); return (var_map_.find(var_name) != var_map_.end() || exit_flag_.load());
}); });
VLOG(4) << "WaitVarCond var_name:" << var_name << " end"; VLOG(3) << "WaitVarCond var_name:" << var_name << " end";
} }
MonomerHandle RPCServer::GetMonomer(const std::string& var_name) { MonomerHandle RPCServer::GetMonomer(const std::string& var_name) {
......
...@@ -137,7 +137,9 @@ void ListenAndServOp::RunSyncLoop( ...@@ -137,7 +137,9 @@ void ListenAndServOp::RunSyncLoop(
while (true) { while (true) {
// Get from multiple trainers, we don't care about the order in which // Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient. // the gradients arrives, just add suffix 0~n and merge the gradient.
VLOG(3) << "wait all clients to send gradient";
rpc_service_->SetCond(distributed::kRequestSend); rpc_service_->SetCond(distributed::kRequestSend);
VLOG(3) << "wait all clients to send send_barrier";
rpc_service_->WaitBarrier(distributed::kRequestSend); rpc_service_->WaitBarrier(distributed::kRequestSend);
if (rpc_service_->IsExit()) { if (rpc_service_->IsExit()) {
...@@ -168,12 +170,16 @@ void ListenAndServOp::RunSyncLoop( ...@@ -168,12 +170,16 @@ void ListenAndServOp::RunSyncLoop(
} }
ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program,
recv_scope); recv_scope);
VLOG(2) << "run all blocks spent " << GetTimestamp() - ts << "(ms)"; VLOG(3) << "run all blocks spent " << GetTimestamp() - ts << "(ms)";
VLOG(3) << "ResetReceivedVars";
ResetReceivedVars(recv_scope, dev_ctx, rpc_service_->NeedResetAllVars()); ResetReceivedVars(recv_scope, dev_ctx, rpc_service_->NeedResetAllVars());
VLOG(3) << "wait all clients to get parameters back";
rpc_service_->SetCond(distributed::kRequestGet); rpc_service_->SetCond(distributed::kRequestGet);
VLOG(3) << "wait all clients to send fetch_barrier";
rpc_service_->WaitBarrier(distributed::kRequestGet); rpc_service_->WaitBarrier(distributed::kRequestGet);
VLOG(3) << "ResetBarrierCounter";
rpc_service_->ResetBarrierCounter(); rpc_service_->ResetBarrierCounter();
} // while(true) } // while(true)
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册