未验证 提交 63cd5fb0 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #9523 from jacquesqiao/fix-test_send_recv

fix send_recv_op_test
...@@ -174,13 +174,13 @@ void AsyncGRPCServer::ShutdownQueue() { ...@@ -174,13 +174,13 @@ void AsyncGRPCServer::ShutdownQueue() {
std::unique_lock<std::mutex> lock(cq_mutex_); std::unique_lock<std::mutex> lock(cq_mutex_);
cq_send_->Shutdown(); cq_send_->Shutdown();
cq_get_->Shutdown(); cq_get_->Shutdown();
is_shut_down_ = true;
} }
// This URL explains why shutdown is complicate: // This URL explains why shutdown is complicate:
void AsyncGRPCServer::ShutDown() { void AsyncGRPCServer::ShutDown() {
server_->Shutdown(); is_shut_down_ = true;
ShutdownQueue(); ShutdownQueue();
server_->Shutdown();
} }
void AsyncGRPCServer::TryToRegisterNewSendOne() { void AsyncGRPCServer::TryToRegisterNewSendOne() {
...@@ -213,14 +213,14 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq, ...@@ -213,14 +213,14 @@ void AsyncGRPCServer::HandleRequest(::grpc::ServerCompletionQueue* cq,
bool ok = false; bool ok = false;
while (true) { while (true) {
if (!cq->Next(&tag, &ok)) { if (!cq->Next(&tag, &ok)) {
LOG(INFO) << cq_name << " get CompletionQueue shutdown!"; LOG(INFO) << cq_name << " CompletionQueue shutdown!";
break; break;
} }
PADDLE_ENFORCE(tag); PADDLE_ENFORCE(tag);
// FIXME(typhoonzero): de-couple the barriers with recv_op // FIXME(typhoonzero): de-couple the barriers with recv_op
if (cq_name == "cq_get") WaitCond(1); if (!is_shut_down_ && cq_name == "cq_get") WaitCond(1);
if (cq_name == "cq_send") WaitCond(0); if (!is_shut_down_ && cq_name == "cq_send") WaitCond(0);
RequestBase* base = (RequestBase*)tag; RequestBase* base = (RequestBase*)tag;
// reference: // reference:
......
...@@ -88,7 +88,6 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -88,7 +88,6 @@ class ListenAndServOp : public framework::OperatorBase {
void Stop() override { void Stop() override {
rpc_service_->Push(LISTEN_TERMINATE_MESSAGE); rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
rpc_service_->ShutDown();
server_thread_->join(); server_thread_->join();
} }
......
...@@ -122,7 +122,8 @@ void StartServerNet(bool is_sparse) { ...@@ -122,7 +122,8 @@ void StartServerNet(bool is_sparse) {
// sub program run in listen_and_serv_op, for simple test we use sum // sub program run in listen_and_serv_op, for simple test we use sum
f::ProgramDesc program; f::ProgramDesc program;
f::BlockDesc *optimize_block = program.MutableBlock(0); const auto &root_block = program.Block(0);
auto *optimize_block = program.AppendBlock(root_block);
// X for server side tensors, RX for received tensers, must be of same shape. // X for server side tensors, RX for received tensers, must be of same shape.
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block); AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册