diff --git a/paddle/framework/channel_test.cc b/paddle/framework/channel_test.cc index 1510fb8abf54f05804bd404d9bd00ecc42fbef63..2efa086f0090a130c84f6607a52d6d4d23e48673 100644 --- a/paddle/framework/channel_test.cc +++ b/paddle/framework/channel_test.cc @@ -78,3 +78,24 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) { t.join(); delete ch; } + +TEST(Channel, SimpleUnbufferedChannelTest) { + auto ch = MakeChannel(0); + unsigned sum_send = 0; + std::thread t([&]() { + for (int i = 0; i < 5; i++) { + ch->Send(&i); + sum_send += i; + } + }); + for (int i = 0; i < 5; i++) { + int recv; + ch->Receive(&recv); + EXPECT_EQ(recv, i); + } + + CloseChannel(ch); + t.join(); + EXPECT_EQ(sum_send, 10U); + delete ch; +} diff --git a/paddle/framework/details/unbuffered_channel.h b/paddle/framework/details/unbuffered_channel.h index cc2d2e587eca981307d4e522bd569fbffa450207..0dc5afd7e57c1f59dfc1b86093eea231d46966f1 100644 --- a/paddle/framework/details/unbuffered_channel.h +++ b/paddle/framework/details/unbuffered_channel.h @@ -1,4 +1,4 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -13,8 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include #include -#include #include #include "paddle/framework/channel.h" @@ -36,20 +36,104 @@ class UnBuffered : public paddle::framework::Channel { virtual ~UnBuffered(); private: - UnBuffered() {} + std::mutex mu_ch_; + // Mutex for readers and writers who are waiting for other reader + // and writer to complete execution + std::recursive_mutex mu_read_, mu_write_; + // reader_found_ is set true when a reader is ready to accept data + // writer_found_ is set true when a writer is ready to send data + // A transaction occurs only when both are true + std::atomic reader_found_{false}, writer_found_{false}; + std::condition_variable cv_channel_; + std::condition_variable_any cv_reader_, cv_writer_; + T* item{nullptr}; + std::atomic closed_{false}; + + UnBuffered() : closed_(false) {} + + void NotifyAllParticipants(std::unique_lock*); }; +// This function implements the concept of how data should +// be sent from a writer to a reader. +template +void UnBuffered::Send(T* data) { + // Prevent other writers from entering + std::unique_lock writer_lock(mu_write_); + writer_found_ = true; + std::unique_lock cv_lock(mu_write_); + // If writer comes first, it should wait till a reader arrives + cv_writer_.wait(cv_lock, + [this]() { return reader_found_ == true || closed_; }); + cv_reader_.notify_one(); + if (!closed_) { + std::unique_lock channel_lock(mu_ch_); + item = data; + channel_lock.unlock(); + cv_channel_.notify_one(); + channel_lock.lock(); + cv_channel_.wait(channel_lock, + [this]() { return item == nullptr || closed_; }); + } + writer_found_ = false; +} + +// This function implements the concept of how +// data that was sent by a writer is read from a reader. template -void UnBuffered::Send(T* channel_element) {} +void UnBuffered::Receive(T* data) { + // Prevent other readers from entering + std::unique_lock read_lock{mu_read_}; + reader_found_ = true; + std::unique_lock cv_lock{mu_read_}; + // If reader comes first, it should wait till a writer arrives + cv_reader_.wait(cv_lock, + [this]() { return writer_found_ == true || closed_; }); + cv_writer_.notify_one(); + if (!closed_) { + std::unique_lock lock_ch{mu_ch_}; + // Reader should wait for the writer to first write its data + cv_channel_.wait(lock_ch, [this]() { return item != nullptr || closed_; }); + if (!closed_) { + *data = std::move(*item); + item = nullptr; + lock_ch.unlock(); + } + cv_channel_.notify_one(); + } + reader_found_ = false; +} +// This function implements the sequence of events +// that take place once the channel is closed. template -void UnBuffered::Receive(T*) {} +void UnBuffered::Close() { + std::unique_lock lock(mu_ch_); + item = nullptr; + closed_ = true; + NotifyAllParticipants(&lock); +} +// This function implements the sequence of events +// that are executed once the object of an UnBuffered +// channel is destroyed. template -void UnBuffered::Close() {} +UnBuffered::~UnBuffered() { + std::unique_lock lock(mu_ch_); + item = nullptr; + closed_ = true; + NotifyAllParticipants(&lock); +} +// This function notifies all the readers, writers and +// the channel condition variables. template -UnBuffered::~UnBuffered() {} +void UnBuffered::NotifyAllParticipants(std::unique_lock* lock) { + lock->unlock(); + cv_writer_.notify_all(); + cv_channel_.notify_all(); + cv_reader_.notify_all(); +} } // namespace details } // namespace framework