From ef48f3c7665b1142fc04dbfeb6aee04ebf4c2c45 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Sat, 28 Apr 2018 17:43:30 +0800 Subject: [PATCH] wip --- paddle/fluid/operators/detail/grpc_server.cc | 6 ++++++ paddle/fluid/operators/detail/grpc_server.h | 7 ++++++- paddle/fluid/operators/listen_and_serv_op.cc | 19 ++++++++++++++++++- paddle/fluid/operators/listen_and_serv_op.h | 6 ++++++ 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index 95f4738b4ff..92819ff9581 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -241,6 +241,12 @@ void AsyncGRPCServer::RunSyncUpdate() { t_prefetch_.reset(new std::thread( std::bind(&AsyncGRPCServer::HandleRequest, this, cq_prefetch_.get(), "cq_prefetch", prefetch_register))); + + { + std::lock_guard lock(this->mutex_ready_); + ready_ = 1; + } + condition_ready_.notify_all(); // wait server server_->Wait(); t_send_->join(); diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 99b87b8c6cb..d7c06fc1814 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -45,8 +45,9 @@ class RequestBase; class AsyncGRPCServer final { public: explicit AsyncGRPCServer(const std::string &address, bool sync_mode) - : address_(address), sync_mode_(sync_mode) {} + : address_(address), sync_mode_(sync_mode), ready_(0) {} + bool WaitServerReady(); void RunSyncUpdate(); // functions to sync server barrier status. @@ -118,6 +119,10 @@ class AsyncGRPCServer final { framework::ProgramDesc *program_; framework::Executor *executor_; int selected_port_; + + std::mutext mutex_ready_; + std::condition_variable condition_ready_; + int ready_; }; }; // namespace detail diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 57cff680ab8..0a4b6a08e58 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -265,6 +265,23 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, } // while(true) } +void ListenAndServOp::StartServerThread() { + server_thread_.reset(new std::thread( + std::bind(&ListenAndServOp::ServerThreadEntry, this, rpc_service_))); +} + +void ListenAndServOp::ServerThreadEntry( + std::shared_ptr service) { + service->RunSyncUpdate(); + VLOG(4) << "RunServer thread end"; + + { + std::lock_guard lock(this->barrier_mutex_); + barrier_cond_step_ = cond; + } + barrier_condition_.notify_all(); +} + void ListenAndServOp::RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); @@ -298,7 +315,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, // start the server listening after all member initialized. server_thread_.reset(new std::thread(RunServer, rpc_service_)); VLOG(3) << "wait server thread to become ready..."; - sleep(5); + // Write to a file of server selected port for python use. SavePort(rpc_service_); if (sync_mode) { diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 3cc0f304773..c85569acdcd 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -51,6 +51,10 @@ class ListenAndServOp : public framework::OperatorBase { framework::Scope* recv_scope, framework::BlockDesc* prefetch_block) const; + void StartServerThread(); + + void ServerThreadEntry(); + void Stop() override; void RunImpl(const framework::Scope& scope, @@ -59,6 +63,8 @@ class ListenAndServOp : public framework::OperatorBase { protected: mutable std::shared_ptr rpc_service_; mutable std::shared_ptr server_thread_; + std::mutext server_ready_mutex_; + std::condition_variable server_ready_; }; } // namespace operators -- GitLab