提交 ef48f3c7 编写于 作者: T typhoonzero

wip

上级 a338c7d8
...@@ -241,6 +241,12 @@ void AsyncGRPCServer::RunSyncUpdate() { ...@@ -241,6 +241,12 @@ void AsyncGRPCServer::RunSyncUpdate() {
t_prefetch_.reset(new std::thread( t_prefetch_.reset(new std::thread(
std::bind(&AsyncGRPCServer::HandleRequest, this, cq_prefetch_.get(), std::bind(&AsyncGRPCServer::HandleRequest, this, cq_prefetch_.get(),
"cq_prefetch", prefetch_register))); "cq_prefetch", prefetch_register)));
{
std::lock_guard<std::mutex> lock(this->mutex_ready_);
ready_ = 1;
}
condition_ready_.notify_all();
// wait server // wait server
server_->Wait(); server_->Wait();
t_send_->join(); t_send_->join();
......
...@@ -45,8 +45,9 @@ class RequestBase; ...@@ -45,8 +45,9 @@ class RequestBase;
class AsyncGRPCServer final { class AsyncGRPCServer final {
public: public:
explicit AsyncGRPCServer(const std::string &address, bool sync_mode) explicit AsyncGRPCServer(const std::string &address, bool sync_mode)
: address_(address), sync_mode_(sync_mode) {} : address_(address), sync_mode_(sync_mode), ready_(0) {}
bool WaitServerReady();
void RunSyncUpdate(); void RunSyncUpdate();
// functions to sync server barrier status. // functions to sync server barrier status.
...@@ -118,6 +119,10 @@ class AsyncGRPCServer final { ...@@ -118,6 +119,10 @@ class AsyncGRPCServer final {
framework::ProgramDesc *program_; framework::ProgramDesc *program_;
framework::Executor *executor_; framework::Executor *executor_;
int selected_port_; int selected_port_;
std::mutext mutex_ready_;
std::condition_variable condition_ready_;
int ready_;
}; };
}; // namespace detail }; // namespace detail
......
...@@ -265,6 +265,23 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, ...@@ -265,6 +265,23 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
} // while(true) } // while(true)
} }
void ListenAndServOp::StartServerThread() {
server_thread_.reset(new std::thread(
std::bind(&ListenAndServOp::ServerThreadEntry, this, rpc_service_)));
}
void ListenAndServOp::ServerThreadEntry(
std::shared_ptr<detail::AsyncGRPCServer> service) {
service->RunSyncUpdate();
VLOG(4) << "RunServer thread end";
{
std::lock_guard<std::mutex> lock(this->barrier_mutex_);
barrier_cond_step_ = cond;
}
barrier_condition_.notify_all();
}
void ListenAndServOp::RunImpl(const framework::Scope &scope, void ListenAndServOp::RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const { const platform::Place &dev_place) const {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
...@@ -298,7 +315,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -298,7 +315,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
// start the server listening after all member initialized. // start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_)); server_thread_.reset(new std::thread(RunServer, rpc_service_));
VLOG(3) << "wait server thread to become ready..."; VLOG(3) << "wait server thread to become ready...";
sleep(5);
// Write to a file of server selected port for python use. // Write to a file of server selected port for python use.
SavePort(rpc_service_); SavePort(rpc_service_);
if (sync_mode) { if (sync_mode) {
......
...@@ -51,6 +51,10 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -51,6 +51,10 @@ class ListenAndServOp : public framework::OperatorBase {
framework::Scope* recv_scope, framework::Scope* recv_scope,
framework::BlockDesc* prefetch_block) const; framework::BlockDesc* prefetch_block) const;
void StartServerThread();
void ServerThreadEntry();
void Stop() override; void Stop() override;
void RunImpl(const framework::Scope& scope, void RunImpl(const framework::Scope& scope,
...@@ -59,6 +63,8 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -59,6 +63,8 @@ class ListenAndServOp : public framework::OperatorBase {
protected: protected:
mutable std::shared_ptr<detail::AsyncGRPCServer> rpc_service_; mutable std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
mutable std::shared_ptr<std::thread> server_thread_; mutable std::shared_ptr<std::thread> server_thread_;
std::mutext server_ready_mutex_;
std::condition_variable server_ready_;
}; };
} // namespace operators } // namespace operators
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册