diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 95f4738b4ff50852d9591719133ca650533bf848..92819ff9581a402e4b02182de514b55170fd9b04 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -241,6 +241,12 @@ void AsyncGRPCServer::RunSyncUpdate() { t_prefetch_.reset(new std::thread( std::bind(&AsyncGRPCServer::HandleRequest, this, cq_prefetch_.get(), "cq_prefetch", prefetch_register))); + + { + std::lock_guard lock(this->mutex_ready_); + ready_ = 1; + } + condition_ready_.notify_all(); // wait server server_->Wait(); t_send_->join(); diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 99b87b8c6cb3e597778b88c395e4abf400d82c39..d7c06fc1814380c2afc2cc3f7886199d2868c723 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -45,8 +45,9 @@ class RequestBase; class AsyncGRPCServer final { public: explicit AsyncGRPCServer(const std::string &address, bool sync_mode) - : address_(address), sync_mode_(sync_mode) {} + : address_(address), sync_mode_(sync_mode), ready_(0) {} + bool WaitServerReady(); void RunSyncUpdate(); // functions to sync server barrier status. @@ -118,6 +119,10 @@ class AsyncGRPCServer final { framework::ProgramDesc *program_; framework::Executor *executor_; int selected_port_; + + std::mutext mutex_ready_; + std::condition_variable condition_ready_; + int ready_; }; }; // namespace detail diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 57cff680ab89f2df7e71af4056ee06cdf330bbab..0a4b6a08e58f368346ff8b7087882f395bce0fef 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -265,6 +265,23 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, } // while(true) } +void ListenAndServOp::StartServerThread() { + server_thread_.reset(new std::thread( + std::bind(&ListenAndServOp::ServerThreadEntry, this, rpc_service_))); +} + +void ListenAndServOp::ServerThreadEntry( + std::shared_ptr service) { + service->RunSyncUpdate(); + VLOG(4) << "RunServer thread end"; + + { + std::lock_guard lock(this->barrier_mutex_); + barrier_cond_step_ = cond; + } + barrier_condition_.notify_all(); +} + void ListenAndServOp::RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); @@ -298,7 +315,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, // start the server listening after all member initialized. server_thread_.reset(new std::thread(RunServer, rpc_service_)); VLOG(3) << "wait server thread to become ready..."; - sleep(5); + // Write to a file of server selected port for python use. SavePort(rpc_service_); if (sync_mode) { diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 3cc0f3047733bea94daa310cd39cb0a4f44bef85..c85569acdcd82d8e4e3c418d4c0b98e811adcf4c 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -51,6 +51,10 @@ class ListenAndServOp : public framework::OperatorBase { framework::Scope* recv_scope, framework::BlockDesc* prefetch_block) const; + void StartServerThread(); + + void ServerThreadEntry(); + void Stop() override; void RunImpl(const framework::Scope& scope, @@ -59,6 +63,8 @@ class ListenAndServOp : public framework::OperatorBase { protected: mutable std::shared_ptr rpc_service_; mutable std::shared_ptr server_thread_; + std::mutext server_ready_mutex_; + std::condition_variable server_ready_; }; } // namespace operators