提交 1509ae3a 编写于 作者: X Xin Pan

Make status update thread-safe

The status is updated in the Process() thread
and can be checked in another HandleRequest() thread.
上级 9087c668
......@@ -41,11 +41,22 @@ class RequestBase {
virtual ~RequestBase() {}
virtual void Process() = 0;
CallStatus Status() { return status_; }
void SetStatus(CallStatus status) { status_ = status; }
CallStatus Status() const {
std::lock_guard<std::mutex> l(status_mu_);
return status_;
}
template <typename T>
void Finish(const T& reply, ServerAsyncResponseWriter<T>* responder) {
std::lock_guard<std::mutex> l(status_mu_);
status_ = FINISH;
responder->Finish(reply, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
}
virtual std::string GetReqName() = 0;
protected:
mutable std::mutex status_mu_;
::grpc::ServerContext ctx_;
GrpcService::AsyncService* service_;
::grpc::ServerCompletionQueue* cq_;
......@@ -80,9 +91,7 @@ class RequestSend final : public RequestBase {
framework::Variable* outvar = nullptr;
request_handler_->Handle(varname, scope, invar, &outvar);
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
Finish(reply_, &responder_);
}
protected:
......@@ -122,9 +131,7 @@ class RequestGet final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
}
status_ = FINISH;
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
Finish(reply_, &responder_);
}
protected:
......@@ -166,9 +173,7 @@ class RequestPrefetch final : public RequestBase {
SerializeToByteBuffer(varname, outvar, *request_handler_->dev_ctx(),
&reply_);
responder_.Finish(reply_, ::grpc::Status::OK,
reinterpret_cast<void*>(static_cast<intptr_t>(req_id_)));
status_ = FINISH;
Finish(reply_, &responder_);
}
protected:
......
......@@ -53,6 +53,7 @@ class AsyncGRPCServer final : public RPCServer {
void StartServer() override;
private:
// HandleRequest needs to be thread-safe.
void HandleRequest(
::grpc::ServerCompletionQueue* cq, const std::string& rpc_name,
std::function<void(const std::string&, int)> TryToRegisterNewOne);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册