未验证 提交 dbca7f16 编写于 作者: G gongweibao 提交者: GitHub

tune logs (#11649)

上级 e45a5552
...@@ -18,6 +18,7 @@ limitations under the License. */ ...@@ -18,6 +18,7 @@ limitations under the License. */
#include <limits> #include <limits>
#include "glog/logging.h" // For VLOG
#include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/operators/distributed/request_handler.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep, ...@@ -75,6 +76,9 @@ bool GRPCClient::AsyncSendVar(const std::string& ep,
var_h.scope = p_scope; var_h.scope = p_scope;
var_h.name = var_name_val; var_h.name = var_name_val;
var_h.ctx = p_ctx; var_h.ctx = p_ctx;
var_h.method = "Send";
VLOG(3) << var_h.String() << " begin";
// stub context // stub context
SendProcessor* s = new SendProcessor(ch); SendProcessor* s = new SendProcessor(ch);
...@@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep, ...@@ -129,6 +133,9 @@ bool GRPCClient::AsyncGetVar(const std::string& ep,
var_h.scope = p_scope; var_h.scope = p_scope;
var_h.name = var_name_val; var_h.name = var_name_val;
var_h.ctx = p_ctx; var_h.ctx = p_ctx;
var_h.method = "Get";
VLOG(3) << var_h.String() << " begin";
// stub context // stub context
GetProcessor* s = new GetProcessor(ch); GetProcessor* s = new GetProcessor(ch);
...@@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep, ...@@ -172,6 +179,9 @@ bool GRPCClient::AsyncPrefetchVar(const std::string& ep,
var_h.scope = p_scope; var_h.scope = p_scope;
var_h.name = out_var_name_val; var_h.name = out_var_name_val;
var_h.ctx = p_ctx; var_h.ctx = p_ctx;
var_h.method = "Prefetch";
VLOG(3) << var_h.String() << " begin";
// stub context // stub context
GetProcessor* s = new GetProcessor(ch); GetProcessor* s = new GetProcessor(ch);
...@@ -243,10 +253,11 @@ void GRPCClient::Proceed() { ...@@ -243,10 +253,11 @@ void GRPCClient::Proceed() {
GPR_ASSERT(ok); GPR_ASSERT(ok);
PADDLE_ENFORCE(c); PADDLE_ENFORCE(c);
if (c->status_.ok()) { if (c->status_.ok()) {
VLOG(3) << c->var_h_.String() << " process";
c->Process(); c->Process();
} else { } else {
LOG(FATAL) << "var: " << c->var_h_.String() LOG(FATAL) << c->var_h_.String()
<< " grpc error:" << c->status_.error_message(); << " meets grpc error:" << c->status_.error_message();
} }
delete c; delete c;
{ {
......
...@@ -47,14 +47,18 @@ namespace operators { ...@@ -47,14 +47,18 @@ namespace operators {
namespace distributed { namespace distributed {
struct VarHandle { struct VarHandle {
// RPC endpoint.
std::string ep; std::string ep;
const platform::DeviceContext* ctx; const platform::DeviceContext* ctx;
const framework::Scope* scope; const framework::Scope* scope;
// Variable name.
std::string name; std::string name;
// RPC method name.
std::string method;
std::string String() const { std::string String() const {
std::ostringstream s; std::ostringstream s;
s << "name:[" << name << "] ep:[" << ep << "]"; s << method << " name:[" << name << "], ep:[" << ep << "]";
return s.str(); return s.str();
} }
}; };
......
...@@ -41,6 +41,19 @@ class RequestBase { ...@@ -41,6 +41,19 @@ class RequestBase {
virtual ~RequestBase() {} virtual ~RequestBase() {}
virtual void Process() = 0; virtual void Process() = 0;
std::string Status2String(const std::string& method) {
std::string status = "Process";
if (status_ == FINISH) {
status = "Finish";
}
std::ostringstream s;
s << method << " name:[" << GetReqName() << "]"
<< ", ep:[" << ctx_.peer() << "]"
<< " " << status << " using req_id:" << req_id_;
return s.str();
}
CallStatus Status() const { CallStatus Status() const {
std::lock_guard<std::mutex> l(status_mu_); std::lock_guard<std::mutex> l(status_mu_);
return status_; return status_;
...@@ -272,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name, ...@@ -272,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
int req_id) { int req_id) {
std::unique_lock<std::mutex> lock(cq_mutex_); std::unique_lock<std::mutex> lock(cq_mutex_);
if (is_shut_down_) { if (is_shut_down_) {
VLOG(3) << "shutdown, do not TryToRegisterNewSendOne"; LOG(WARNING) << "shutdown, do not TryToRegisterNewSendOne";
return; return;
} }
...@@ -306,14 +319,14 @@ void AsyncGRPCServer::HandleRequest( ...@@ -306,14 +319,14 @@ void AsyncGRPCServer::HandleRequest(
bool ok = false; bool ok = false;
while (true) { while (true) {
VLOG(3) << "HandleRequest " << rpc_name << " wait next"; VLOG(4) << "HandleRequest " << rpc_name << " wait next";
if (!cq->Next(&tag, &ok)) { if (!cq->Next(&tag, &ok)) {
LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!"; LOG(INFO) << "CompletionQueue " << rpc_name << " shutdown!";
break; break;
} }
int req_id = static_cast<int>(reinterpret_cast<intptr_t>(tag)); int req_id = static_cast<int>(reinterpret_cast<intptr_t>(tag));
VLOG(3) << "HandleRequest " << rpc_name << ", req_id:" << req_id VLOG(4) << "HandleRequest " << rpc_name << ", req_id:" << req_id
<< " get next"; << " get next";
auto& reqs = rpc_reqs_[rpc_name]; auto& reqs = rpc_reqs_[rpc_name];
...@@ -324,22 +337,21 @@ void AsyncGRPCServer::HandleRequest( ...@@ -324,22 +337,21 @@ void AsyncGRPCServer::HandleRequest(
base = reqs[req_id]; base = reqs[req_id];
} }
VLOG(3) << base->Status2String(rpc_name);
// reference: // reference:
// https://github.com/tensorflow/tensorflow/issues/5596 // https://github.com/tensorflow/tensorflow/issues/5596
// https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM // https://groups.google.com/forum/#!topic/grpc-io/xftlRy-IQwM
// https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I // https://groups.google.com/forum/#!topic/grpc-io/ywATt88Ef_I
if (!ok) { if (!ok) {
LOG(WARNING) << "completion queue:" << rpc_name LOG(WARNING) << "completion queue:" << rpc_name
<< " recv no regular event:argument name[" << " recv no regular event"
<< base->GetReqName() << "]"; << " context:" << base->Status2String(rpc_name);
TryToRegisterNewOne(rpc_name, req_id); TryToRegisterNewOne(rpc_name, req_id);
delete base; delete base;
continue; continue;
} }
VLOG(3) << "queue id:" << rpc_name << ", req_id:" << req_id
<< ", status:" << base->Status();
switch (base->Status()) { switch (base->Status()) {
case PROCESS: { case PROCESS: {
base->Process(); base->Process();
......
...@@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -76,6 +76,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
if (total_written + size_to_write > length) { if (total_written + size_to_write > length) {
size_to_write = length - total_written; size_to_write = length - total_written;
} }
// This log is useful to see how long a internal block size is of rpc.
VLOG(7) << "copy " << size_to_write << " data to CUDAPlace";
memory::Copy(boost::get<platform::CUDAPlace>(place), memory::Copy(boost::get<platform::CUDAPlace>(place),
reinterpret_cast<void*>(p), cpu, data, size_to_write, reinterpret_cast<void*>(p), cpu, data, size_to_write,
gpu_dev_ctx.stream()); gpu_dev_ctx.stream());
...@@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, ...@@ -103,6 +105,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input,
} }
// TODO(gongwb): can we avoid copy? // TODO(gongwb): can we avoid copy?
platform::CPUPlace cpu; platform::CPUPlace cpu;
// This log is useful to see how long a internal block size is of rpc.
VLOG(7) << "copy " << size_to_write << " data to CPUPlace";
memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write); memory::Copy(cpu, reinterpret_cast<void*>(p), cpu, data, size_to_write);
p += size_to_write; p += size_to_write;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册