diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 65d63784c1486f3304c053a2bd3c58b8b30eda2f..52f931188dc790682626b14da83d0835cad4f1a6 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -18,6 +18,7 @@ limitations under the License. */ #include +#include "glog/logging.h" // For VLOG #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/platform/profiler.h" @@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep, var_h.scope = p_scope; var_h.name = var_name_val; var_h.ctx = p_ctx; + var_h.method = "Send"; + + VLOG(3) << var_h.String() << " begin"; // stub context SendProcessor* s = new SendProcessor(ch); @@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep, var_h.scope = p_scope; var_h.name = var_name_val; var_h.ctx = p_ctx; + var_h.method = "Get"; + + VLOG(3) << var_h.String() << " begin"; // stub context GetProcessor* s = new GetProcessor(ch); @@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep, var_h.scope = p_scope; var_h.name = out_var_name_val; var_h.ctx = p_ctx; + var_h.method = "Prefetch"; + + VLOG(3) << var_h.String() << " begin"; // stub context GetProcessor* s = new GetProcessor(ch); @@ -243,10 +253,11 @@ void GRPCClient::Proceed() { GPR_ASSERT(ok); PADDLE_ENFORCE(c); if (c->status_.ok()) { + VLOG(3) << c->var_h_.String() << " process"; c->Process(); } else { - LOG(FATAL) << "var: " << c->var_h_.String() - << " grpc error:" << c->status_.error_message(); + LOG(FATAL) << c->var_h_.String() + << " meets grpc error:" << c->status_.error_message(); } delete c; { diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h index a6efa7dfd11e87caafa6109391f133b0233d58dd..7875939ff510e7e41a2a11ca965b52eedff3d05c 100644 --- a/paddle/fluid/operators/distributed/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -47,14 +47,18 @@ namespace operators { namespace distributed { struct VarHandle { + // RPC endpoint. std::string ep; const platform::DeviceContext* ctx; const framework::Scope* scope; + // Variable name. std::string name; + // RPC method name. + std::string method; std::string String() const { std::ostringstream s; - s << "name:[" << name << "] ep:[" << ep << "]"; + s << method << " name:[" << name << "], ep:[" << ep << "]"; return s.str(); } }; diff --git a/paddle/fluid/operators/distributed/grpc_server.cc b/paddle/fluid/operators/distributed/grpc_server.cc index 707f665a29d83d7cdf6e4e80624f2402a7b0a2e7..b9a9b12cecdada570da5af173e394999554e9cb8 100644 --- a/paddle/fluid/operators/distributed/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc_server.cc @@ -41,6 +41,19 @@ class RequestBase { virtual ~RequestBase() {} virtual void Process() = 0; + std::string Status2String(const std::string& method) { + std::string status = "Process"; + if (status_ == FINISH) { + status = "Finish"; + } + + std::ostringstream s; + s << method << " name:[" << GetReqName() << "]" + << ", ep:[" << ctx_.peer() << "]" + << " " << status << " using req_id:" << req_id_; + return s.str(); + } + CallStatus Status() const { std::lock_guard l(status_mu_); return status_; @@ -272,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, int req_id) { std::unique_lock lock(cq_mutex_); if (is_shut_down_) { - VLOG(3) << "shutdown, do not TryToRegisterNewSendOne"; + LOG(WARNING) << "shutdown, do not TryToRegisterNewSendOne"; return; } @@ -306,14 +319,14 @@ void AsyncGRPCServer::HandleRequest( bool ok = false; while (true) { - VLOG(3) << "HandleRequest " << rpc_name << " wait next"; + VLOG(4) << "HandleRequest " << rpc_name << " wait next"; if (!cq->Next(&tag, &ok)) { LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!"; break; } int req_id = static_cast(reinterpret_cast(tag)); - VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id + VLOG(4) << "HandleRequest " << rpc_name << ", req_id:" << req_id << " get next"; auto& reqs = rpc_reqs_[rpc_name]; @@ -324,22 +337,21 @@ void AsyncGRPCServer::HandleRequest( base = reqs[req_id]; } + VLOG(3) << base->Status2String(rpc_name); + // reference: // https://github.com/tensorflow/tensorflow/issues/5596 // https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM // https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I if (!ok) { LOG(WARNING) << "completion queue:" << rpc_name - << " recv no regular event:argument name[" - << base->GetReqName() << "]"; + << " recv no regular event" + << " context:" << base->Status2String(rpc_name); TryToRegisterNewOne(rpc_name, req_id); delete base; continue; } - VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id - << ", status:" << base->Status(); - switch (base->Status()) { case PROCESS: { base->Process(); diff --git a/paddle/fluid/operators/distributed/variable_response.cc b/paddle/fluid/operators/distributed/variable_response.cc index 619890b1939be8777b89b94e415a3c2d63376658..45832c60bf9172497afabac927ba39a7cbfb9a52 100644 --- a/paddle/fluid/operators/distributed/variable_response.cc +++ b/paddle/fluid/operators/distributed/variable_response.cc @@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, if (total_written + size_to_write > length) { size_to_write = length - total_written; } + // This log is useful to see how long a internal block size is of rpc. + VLOG(7) << "copy " << size_to_write << " data to CUDAPlace"; memory::Copy(boost::get(place), reinterpret_cast(p), cpu, data, size_to_write, gpu_dev_ctx.stream()); @@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, } // TODO(gongwb): can we avoid copy? platform::CPUPlace cpu; + // This log is useful to see how long a internal block size is of rpc. + VLOG(7) << "copy " << size_to_write << " data to CPUPlace"; memory::Copy(cpu, reinterpret_cast(p), cpu, data, size_to_write); p += size_to_write;