diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index a34e22ff8765fccbd5ac3a284b7c6820f0055ec3..c425c71160a8fa3830a5fbdae1baaed850710877 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -104,7 +104,7 @@ cc_test(init_test SRCS init_test.cc DEPS init) cc_test(op_kernel_type_test SRCS op_kernel_type_test.cc DEPS place device_context framework_proto) cc_test(cow_ptr_tests SRCS details/cow_ptr_test.cc) -# cc_test(channel_test SRCS channel_test.cc) +cc_test(channel_test SRCS channel_test.cc) cc_test(tuple_test SRCS tuple_test.cc ) cc_test(concurrency_test SRCS concurrency_test.cc DEPS go_op channel_close_op channel_create_op channel_send_op channel_recv_op sum_op select_op elementwise_add_op compare_op diff --git a/paddle/fluid/framework/channel_impl.h b/paddle/fluid/framework/channel_impl.h index c47d629289af2c3d1f7c30d711d338745bf5234c..e056779ea0dd0a31191b628f82724298efaf50ff 100644 --- a/paddle/fluid/framework/channel_impl.h +++ b/paddle/fluid/framework/channel_impl.h @@ -138,8 +138,8 @@ void ChannelImpl::Send(T *item) { // If channel is closed, throw exception if (closed_) { - lock.unlock(); send_return(); + lock.unlock(); PADDLE_THROW("Cannot send on closed channel"); } @@ -152,11 +152,9 @@ void ChannelImpl::Send(T *item) { if (m != nullptr) { *(m->data) = std::move(*item); m->Notify(); - lock.unlock(); send_return(); return; } else { - lock.unlock(); Send(item); send_return(); return; @@ -169,8 +167,6 @@ void ChannelImpl::Send(T *item) { if (buf_.size() < cap_) { // Copy to buffer buf_.push_back(std::move(*item)); - // Release lock and return true - lock.unlock(); send_return(); return; } @@ -181,8 +177,8 @@ void ChannelImpl::Send(T *item) { sendq.push_back(m); m->Wait(lock); if (m->chan_closed) { - lock.unlock(); send_return(); + lock.unlock(); PADDLE_THROW("Cannot send on closed channel"); } send_return(); @@ -195,10 +191,7 @@ bool ChannelImpl::Receive(T *item) { // If channel is closed and buffer is empty or // channel is unbuffered - if (closed_ && buf_.empty()) { - lock.unlock(); - return recv_return(false); - } + if (closed_ && buf_.empty()) return recv_return(false); // If there is a sender, directly receive the value we want // from the sender. In case of a buffered channel, read from @@ -229,7 +222,6 @@ bool ChannelImpl::Receive(T *item) { } else return recv_return(Receive(item)); } - lock.unlock(); return recv_return(true); } @@ -238,8 +230,7 @@ bool ChannelImpl::Receive(T *item) { // Directly read from buffer *item = std::move(buf_.front()); buf_.pop_front(); - // Release lock and return true - lock.unlock(); + // return true return recv_return(true); }