未验证 提交 39231804 编写于 作者: Y yuyang18

Merge branch 'feature/dctor_all_readers' into feature/combine_open_files_and_double_buffer

...@@ -67,7 +67,8 @@ void ReaderBase::Start() { ...@@ -67,7 +67,8 @@ void ReaderBase::Start() {
} }
} }
ReaderBase::~ReaderBase() { Shutdown(); } ReaderBase::~ReaderBase() {}
DecoratedReader::~DecoratedReader() { reader_->Shutdown(); }
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -25,8 +25,6 @@ ...@@ -25,8 +25,6 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
enum ReaderStatus { kRunning, kStopped };
class ReaderBase { class ReaderBase {
public: public:
virtual void ReadNext(std::vector<LoDTensor>* out); virtual void ReadNext(std::vector<LoDTensor>* out);
...@@ -48,6 +46,8 @@ class ReaderBase { ...@@ -48,6 +46,8 @@ class ReaderBase {
virtual void StartImpl() {} virtual void StartImpl() {}
enum ReaderStatus { kRunning, kStopped };
ReaderStatus status_{kRunning}; ReaderStatus status_{kRunning};
mutable std::mutex mu_; mutable std::mutex mu_;
...@@ -74,6 +74,8 @@ class DecoratedReader : public ReaderBase, ...@@ -74,6 +74,8 @@ class DecoratedReader : public ReaderBase,
reader_->InsertDecoratedReader(shared_from_this()); reader_->InsertDecoratedReader(shared_from_this());
} }
~DecoratedReader();
protected: protected:
void ShutdownImpl() override { reader_->Shutdown(); } void ShutdownImpl() override { reader_->Shutdown(); }
......
...@@ -14,79 +14,79 @@ ...@@ -14,79 +14,79 @@
#include <thread> // NOLINT #include <thread> // NOLINT
#include "ThreadPool.h"
#include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace reader { namespace reader {
class BufferedReader : public framework::DecoratedReader {
using TensorVec = std::vector<framework::LoDTensor>;
using VecFuture = std::future<TensorVec>;
// 'Double buffer' means we shall maintain two batches of input data at the same
// time. So the kCacheSize shoul be at least 2.
static constexpr size_t kCacheSize = 3;
// There will be two bacthes out of the channel during training:
// 1. the one waiting to be sent to the channel
// 2. the one just be received from the channel, which is also being used by
// subsequent operators.
// So the channel size should be kChacheSize - 2
static constexpr size_t kChannelSize = 1; // kCacheSize - 2
class DoubleBufferReader : public framework::DecoratedReader {
public: public:
explicit DoubleBufferReader( BufferedReader(const std::shared_ptr<framework::ReaderBase>& reader,
const std::shared_ptr<ReaderBase>& reader, const platform::Place& place, size_t buffer_size)
platform::Place target_place = platform::CPUPlace()) : framework::DecoratedReader(reader),
: DecoratedReader(reader), place_(target_place) { thread_pool_(1),
cpu_tensor_cache_.resize(kCacheSize); place_(place),
gpu_tensor_cache_.resize(kCacheSize); buffer_size_(buffer_size) {
#ifdef PADDLE_WITH_CUDA AppendFutureToBatchSize();
if (platform::is_gpu_place(place_)) { }
for (size_t i = 0; i < kCacheSize; ++i) {
ctxs_.emplace_back(new platform::CUDADeviceContext( ~BufferedReader() override {
boost::get<platform::CUDAPlace>(place_))); reader_->Shutdown();
buffer_.clear();
} }
private:
void AppendFutureToBatchSize() {
while (buffer_.size() < buffer_size_) {
AppendFuture();
} }
#endif
StartPrefetcher();
} }
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override; void AppendFuture() {
buffer_.emplace_back(thread_pool_.enqueue([this] {
TensorVec cpu_buffer;
reader_->ReadNext(&cpu_buffer);
if (platform::is_gpu_place(place_)) {
TensorVec gpu_buffer;
for (size_t i = 0; i < cpu_buffer.size(); ++i) {
gpu_buffer.emplace_back();
framework::TensorCopySync(cpu_buffer[i], place_, &gpu_buffer.back());
}
~DoubleBufferReader() { EndPrefetcher(); } cpu_buffer = gpu_buffer;
}
return cpu_buffer;
}));
}
private: protected:
void ShutdownImpl() override { void ShutdownImpl() override {
EndPrefetcher();
reader_->Shutdown(); reader_->Shutdown();
buffer_.clear();
} }
void StartImpl() override { void StartImpl() override {
reader_->Start(); reader_->Start();
StartPrefetcher(); AppendFutureToBatchSize();
}
void StartPrefetcher() {
channel_ = new reader::BlockingQueue<size_t>(kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
} }
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override {
void EndPrefetcher() { std::cerr << "Read" << std::endl;
channel_->Close(); PADDLE_ENFORCE_EQ(buffer_.size(), buffer_size_);
if (prefetcher_.joinable()) { *out = buffer_.front().get();
prefetcher_.join(); buffer_.pop_front();
} AppendFuture();
delete channel_;
channel_ = nullptr;
} }
void PrefetchThreadFunc(); private:
ThreadPool thread_pool_;
std::thread prefetcher_;
reader::BlockingQueue<size_t>* channel_;
platform::Place place_; platform::Place place_;
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache_; const size_t buffer_size_;
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache_; std::list<VecFuture> buffer_;
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
}; };
class CreateDoubleBufferReaderOp : public framework::OperatorBase { class CreateDoubleBufferReaderOp : public framework::OperatorBase {
...@@ -118,8 +118,8 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase { ...@@ -118,8 +118,8 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
place = platform::CUDAPlace(static_cast<int>(num)); place = platform::CUDAPlace(static_cast<int>(num));
} }
out->Reset(framework::MakeDecoratedReader<DoubleBufferReader>( out->Reset(framework::MakeDecoratedReader<BufferedReader>(underlying_reader,
underlying_reader, place)); place, 2));
} }
}; };
...@@ -146,51 +146,6 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { ...@@ -146,51 +146,6 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
} }
}; };
void DoubleBufferReader::ReadNextImpl(std::vector<framework::LoDTensor>* out) {
size_t cached_tensor_id;
if (channel_->Receive(&cached_tensor_id)) {
if (platform::is_gpu_place(place_)) {
*out = gpu_tensor_cache_[cached_tensor_id];
} else {
// CPU place
*out = cpu_tensor_cache_[cached_tensor_id];
}
} else {
out->clear();
}
}
void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
size_t cached_tensor_id = 0;
while (true) {
auto& cpu_batch = cpu_tensor_cache_[cached_tensor_id];
reader_->ReadNext(&cpu_batch);
if (cpu_batch.empty()) {
// The underlying reader have no next data.
break;
}
if (platform::is_gpu_place(place_)) {
auto& gpu_batch = gpu_tensor_cache_[cached_tensor_id];
gpu_batch.resize(cpu_batch.size());
for (size_t i = 0; i < cpu_batch.size(); ++i) {
// TODO(fengjiayi): Use asynchronous TensorCopy instead
framework::TensorCopySync(cpu_batch[i], place_, &gpu_batch[i]);
gpu_batch[i].set_lod(cpu_batch[i].lod());
}
}
if (!channel_->Send(cached_tensor_id)) {
VLOG(5) << "WARNING: The double buffer channel has been closed. The "
"prefetch thread will terminate.";
break;
}
++cached_tensor_id;
cached_tensor_id %= kCacheSize;
}
channel_->Close();
VLOG(5) << "Prefetch thread terminates.";
}
} // namespace reader } // namespace reader
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
......
...@@ -190,3 +190,7 @@ class TestDataBalance(unittest.TestCase): ...@@ -190,3 +190,7 @@ class TestDataBalance(unittest.TestCase):
def test_all(self): def test_all(self):
self.main() self.main()
self.main_lod() self.main_lod()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册