diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 25c6bb9ee2e1b1b651ffdb9efe30240334d760fc..8abcfbb4388aeb3dc263c1e6403f540561f7e007 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -82,7 +82,9 @@ class RequestSend final : public RequestBase { virtual std::string GetReqName() { return request_->Varname(); } virtual void Process() { - queue_->Push(std::make_pair(request_->Varname(), request_)); + std::string var_name = GetReqName(); + VLOG(3) << "RequestSend " << var_name; + queue_->Push(std::make_pair(var_name, request_)); sendrecv::VoidMessage reply; responder_.Finish(reply, ::grpc::Status::OK, this); @@ -106,7 +108,7 @@ class RequestGet final : public RequestBase { responder_(&ctx_), scope_(scope), queue_(queue) { - int method_id = static_cast(detail::GrpcMethod::kGetVariable); + auto method_id = static_cast(detail::GrpcMethod::kGetVariable); service_->RequestAsyncUnary(method_id, &ctx_, &request_, &responder_, cq_, cq_, this); } @@ -118,6 +120,7 @@ class RequestGet final : public RequestBase { virtual void Process() { // proc request. std::string var_name = request_.varname(); + VLOG(3) << "RequestGet " << var_name; auto* var = scope_->FindVar(var_name); ::grpc::ByteBuffer reply; @@ -176,7 +179,7 @@ class RequestPrefetch final : public RequestBase { ::grpc::ByteBuffer reply; std::string var_name = request_->OutVarname(); - VLOG(3) << "prefetch var " << var_name; + VLOG(3) << "RequestPrefetch " << var_name; auto var_desc = program_->Block(0).FindVar(var_name); framework::Scope* local_scope = &scope_->NewScope(); auto* var = local_scope->FindVar(var_name); @@ -307,18 +310,21 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq, bool ok = false; while (true) { - VLOG(3) << "HandleRequest for " << cq_name << " while in"; + VLOG(3) << "HandleRequest for " << cq_name << " wait Next"; if (!cq->Next(&tag, &ok)) { LOG(INFO) << cq_name << " CompletionQueue shutdown!"; break; } - VLOG(3) << "HandleRequest for " << cq_name << " while after Next"; + VLOG(3) << "HandleRequest for " << cq_name << " get Next"; PADDLE_ENFORCE(tag); + if (sync_mode_) { // FIXME(typhoonzero): de-couple the barriers with recv_op + VLOG(3) << "HandleRequest for " << cq_name << " before WaitCond"; if (!is_shut_down_ && cq_name == "cq_get") WaitCond(1); if (!is_shut_down_ && cq_name == "cq_send") WaitCond(0); + VLOG(3) << "HandleRequest for " << cq_name << " after WaitCond"; } RequestBase* base = reinterpret_cast(tag); @@ -353,8 +359,10 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq, void AsyncGRPCServer::WaitCond(int cond) { std::unique_lock lock(this->barrier_mutex_); + VLOG(3) << "WaitCond " << cond << " in"; barrier_condition_.wait(lock, [=] { return this->barrier_cond_step_ == cond; }); + VLOG(3) << "WaitCond " << cond << " out"; } void AsyncGRPCServer::SetCond(int cond) { diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 36bd1d399342133d43621c3103225d30794217fe..e0b828548fea0bdce1e04bec492a40ab23fdf092 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -255,6 +255,7 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, std::vector> fs; for (auto iter = grad_to_queue.begin(); iter != grad_to_queue.end(); iter++) { std::string grad_name = iter->first; + VLOG(3) << "create async update thread for " << grad_name; fs.push_back(framework::Async([grad_name, &exit_flag, &executor, &grad_to_queue, &grad_to_prepared_ctx]() { AsyncUpdateThread(grad_name, exit_flag, grad_to_queue[grad_name],